Files
Signal_Label_Reborn/func/Module_detect_Rpeak.py
Yorusora aad3dfba49 1、完善命名规范
2、完成<冗余数据切割和标签映射>的全部代码
2025-05-09 17:21:55 +08:00

442 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from gc import collect
from pathlib import Path
import matplotlib.pyplot as plt
from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication
from matplotlib import gridspec
from matplotlib.backends.backend_qt import NavigationToolbar2QT
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg
from overrides import overrides
from pandas import read_csv, DataFrame
from yaml import dump, load, FullLoader
from func.utils.PublicFunc import PublicFunc
from func.utils.Constants import Constants, ConfigParams
from func.utils.detect_Rpeak import preprocess, Rpeak_Detection, get_method
from ui.MainWindow.MainWindow_detect_Rpeak import Ui_MainWindow_detect_Rpeak
from ui.setting.detect_Rpeak_input_setting import Ui_MainWindow_detect_Rpeak_input_setting
Config = {
}
ButtonState = {
"Default": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
},
"Current": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
}
}
class SettingWindow(QMainWindow):
def __init__(self, root_path, sampID):
super(SettingWindow, self).__init__()
self.ui = Ui_MainWindow_detect_Rpeak_input_setting()
self.ui.setupUi(self)
self.root_path = root_path
self.sampID = sampID
self.config = None
self.__read_config__()
self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__)
self.ui.pushButton_confirm.clicked.connect(self.__write_config__)
self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__)
self.ui.pushButton_cancel.clicked.connect(self.close)
def __read_config__(self):
if not Path(ConfigParams.DETECT_RPEAK_CONFIG_FILE_PATH).exists():
with open(ConfigParams.DETECT_RPEAK_CONFIG_FILE_PATH, "w") as f:
dump(ConfigParams.DETECT_RPEAK_CONFIG_NEW_CONTENT, f)
with open(ConfigParams.DETECT_RPEAK_CONFIG_FILE_PATH, "r") as f:
file_config = load(f.read(), Loader=FullLoader)
Config.update(file_config)
self.config = file_config
Config.update({
"Path": {
"Input": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.DETECT_RPEAK_INPUT_ECG_FILENAME +
str(Config["InputConfig"]["Freq"]) +
ConfigParams.ENDSWITH_TXT))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.DETECT_RPEAK_SAVE_FILENAME +
ConfigParams.ENDSWITH_TXT)))
}
})
# 数据回显
self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"])
self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"])
self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"])
def __write_config__(self):
# 从界面写入配置
Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText()
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
# 保存配置到文件
self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
with open(ConfigParams.DETECT_RPEAK_CONFIG_FILE_PATH, "w") as f:
dump(self.config, f)
self.close()
def __rollback_config__(self):
self.__read_config__()
def __update_ui__(self):
self.ui.plainTextEdit_file_path_input.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.DETECT_RPEAK_INPUT_ECG_FILENAME +
str(self.ui.spinBox_input_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
class MainWindow_detect_Rpeak(QMainWindow):
def __init__(self):
super(MainWindow_detect_Rpeak, self).__init__()
self.ui = Ui_MainWindow_detect_Rpeak()
self.ui.setupUi(self)
self.root_path = None
self.sampID = None
self.data = None
self.setting = None
# 初始化进度条
self.progressbar = None
PublicFunc.add_progressbar(self)
#初始化画框
self.fig = None
self.canvas = None
self.figToolbar = None
self.gs = None
self.ax0 = None
self.ax1 = None
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
@overrides
def show(self, root_path, sampID):
super().show()
self.root_path = root_path
self.sampID = sampID
self.setting = SettingWindow(root_path, sampID)
# 初始化画框
self.fig = plt.figure(figsize=(12, 9), dpi=100)
self.canvas = FigureCanvasQTAgg(self.fig)
self.figToolbar = NavigationToolbar2QT(self.canvas)
for action in self.figToolbar.actions():
if action.text() == "Subplots" or action.text() == "Customize":
self.figToolbar.removeAction(action)
self.ui.verticalLayout_canvas.addWidget(self.canvas)
self.ui.verticalLayout_canvas.addWidget(self.figToolbar)
self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1])
self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0)
self.ax0 = self.fig.add_subplot(self.gs[0])
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER)
self.ax0.tick_params(axis='x', colors=Constants.PLOT_COLOR_WHITE)
self.ax1 = self.fig.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(ConfigParams.FORMATTER)
PublicFunc.__resetAllButton__(self, ButtonState)
self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BandPassLow"])
self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BandPassHigh"])
self.ui.spinBox_peaksValue.setValue(Config["PeaksValue"])
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__)
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__)
self.ui.spinBox_peaksValue.editingFinished.connect(self.__update_config__)
self.ui.comboBox_method.currentTextChanged.connect(self.__update_config__)
@overrides
def closeEvent(self, event):
PublicFunc.__disableAllButton__(self, ButtonState)
PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
QApplication.processEvents()
# 清空画框
self.ax0.clear()
self.ax1.clear()
# 释放资源
del self.data
self.fig.clf()
plt.close(self.fig)
self.deleteLater()
collect()
self.canvas = None
event.accept()
@staticmethod
def __reset__():
ButtonState["Current"].update(ButtonState["Default"].copy())
ButtonState["Current"]["pushButton_view"] = True
def __plot__(self):
# 清空画框
self.reset_axes()
sender = self.sender()
if sender == self.ui.pushButton_view:
self.ax0.plot(self.data.peak[2:], self.data.RRIV,
'r.',
label=Constants.DETECT_RPEAK_PLOT_LABEL_RRIV)
self.ax1.plot(self.data.processed_data,
color=Constants.PLOT_COLOR_BLUE,
label=Constants.DETECT_RPEAK_PLOT_LABEL_ECG)
self.ax1.plot(self.data.peak, self.data.processed_data[self.data.peak],
'r*',
label=Constants.DETECT_RPEAK_PLOT_LABEL_R_PEAKS)
self.ax1.plot(self.data.interval,
color=Constants.PLOT_COLOR_GREEN,
label=Constants.DETECT_RPEAK_PLOT_LABEL_INTERVAL)
self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT)
self.ax1.legend(loc=Constants.PLOT_UPPER_RIGHT)
status = True
info = Constants.DRAWING_FINISHED
else:
status = False
info = Constants.DRAWING_FAILURE
self.canvas.draw()
return status, info
def __update_config__(self):
Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value()
Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value()
Config["PeaksValue"] = self.ui.spinBox_peaksValue.value()
Config["DetectMethod"] = self.ui.comboBox_method.currentText()
def __slot_btn_input__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 清空画框
self.reset_axes()
self.canvas.draw()
# 清空方法列表
self.ui.comboBox_method.clear()
self.data = Data()
# 寻找方法
PublicFunc.progressbar_update(self, 1, 2, Constants.DETECT_RPEAK_LOADING_METHOD, 0)
# TODO获取检测方法的解耦
method_list = get_method()
if len(method_list) == 0 or method_list is None:
status = False
info = Constants.DETECT_RPEAK_LOAD_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Method_Not_Exist"]
else:
status = True
info = Constants.DETECT_RPEAK_LOAD_FINISHED
if not status:
PublicFunc.text_output(self.ui, "(1/2)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/2)" + info, Constants.TIPS_TYPE_INFO)
self.update_ui_comboBox_method(method_list)
# 导入数据
PublicFunc.progressbar_update(self, 2, 2, Constants.INPUTTING_DATA, 10)
status, info = self.data.open_file()
if not status:
PublicFunc.text_output(self.ui, "(2/2)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/2)" + info, Constants.TIPS_TYPE_INFO)
MainWindow_detect_Rpeak.__reset__()
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_view__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 数据预处理
PublicFunc.progressbar_update(self, 1, 3, Constants.DETECT_RPEAK_PROCESSING_DATA, 0)
status, info = self.data.preprocess()
if not status:
PublicFunc.text_output(self.ui, "(1/3)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/3)" + info, Constants.TIPS_TYPE_INFO)
# 预测峰值
PublicFunc.progressbar_update(self, 2, 3, Constants.DETECT_RPEAK_PREDICTING_PEAK, 10)
status, info = self.data.predict_Rpeak()
if not status:
PublicFunc.text_output(self.ui, "(2/3)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/3)" + info, Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_DATA_LENGTH_POINTS + str(len(self.data.raw_data)),
Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_DURATION_MIN +
str((len(self.data.raw_data) / Config["InputConfig"]["Freq"] / 60)),
Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_PEAK_AMOUNT + str(len(self.data.peak)),
Constants.TIPS_TYPE_INFO)
# 绘图
PublicFunc.progressbar_update(self, 3, 3, Constants.DRAWING_DATA, 70)
status, info = self.__plot__()
if not status:
PublicFunc.text_output(self.ui, "(3/3)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(3/3)" + info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_save"] = True
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_save__(self):
reply = QMessageBox.question(self, Constants.QUESTION_TITLE,
Constants.QUESTION_CONTENT + Config["Path"]["Save"],
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes)
if reply == QMessageBox.Yes:
PublicFunc.__disableAllButton__(self, ButtonState)
# 保存
PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0)
total_rows = len(DataFrame(self.data.peak.reshape(-1)))
chunk_size = ConfigParams.DETECT_RPEAK_SAVE_CHUNK_SIZE
with open(Config["Path"]["Save"], 'w') as f:
for start in range(0, total_rows, chunk_size):
end = min(start + chunk_size, total_rows)
chunk = DataFrame(self.data.peak.reshape(-1)).iloc[start:end]
status, info = self.data.save(chunk)
progress = int((end / total_rows) * 100)
self.progressbar.setValue(progress)
QApplication.processEvents()
if not status:
PublicFunc.text_output(self.ui, "(1/1)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + info, Constants.TIPS_TYPE_INFO)
PublicFunc.msgbox_output(self, info, Constants.TIPS_TYPE_INFO)
PublicFunc.finish_operation(self, ButtonState)
def reset_axes(self):
self.ax0.clear()
self.ax1.clear()
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER)
self.ax0.tick_params(axis='x', colors=Constants.PLOT_COLOR_WHITE)
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(ConfigParams.FORMATTER)
def update_ui_comboBox_method(self, method_list):
self.ui.comboBox_method.clear()
self.ui.comboBox_method.addItems(method_list)
class Data:
def __init__(self):
self.raw_data = None
self.processed_data = None
self.peak = None
self.interval = None
self.RRIV = None
def open_file(self):
if not Path(Config["Path"]["Input"]).exists():
return False, Constants.INPUT_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Data_Path_Not_Exist"]
try:
self.raw_data = read_csv(Config["Path"]["Input"],
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception:
return False, Constants.INPUT_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Read_Data_Exception"]
return True, Constants.INPUT_FINISHED
def preprocess(self):
if self.raw_data is None:
return False, Constants.DETECT_RPEAK_PROCESS_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Raw_Data_Not_Exist"]
try:
self.processed_data = preprocess(self.raw_data,
Config["InputConfig"]["Freq"],
Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"])
except Exception:
return False, Constants.DETECT_RPEAK_PROCESS_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Filter_Exception"]
return True, Constants.DETECT_RPEAK_PROCESS_FINISHED
def predict_Rpeak(self):
if self.processed_data is None:
return False, Constants.DETECT_RPEAK_PREDICT_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Processed_Data_Not_Exist"]
try:
self.peak, self.interval, self.RRIV = Rpeak_Detection(self.processed_data,
Config["InputConfig"]["Freq"],
Config["PeaksValue"],
Config["DetectMethod"])
except Exception:
return False, Constants.DETECT_RPEAK_PREDICT_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Predict_Exception"]
return True, Constants.DETECT_RPEAK_PREDICT_FINISHED
def save(self, chunk):
if self.peak is None:
return False, Constants.SAVING_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Peak_Not_Exist"]
try:
chunk.to_csv(Config["Path"]["Save"], mode='a', index=False, header=False)
except Exception:
return False, Constants.SAVING_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Save_Exception"]
return True, Constants.SAVING_FINISHED