Files
Signal_Label_Reborn/func/Module_detect_Rpeak.py
Yorusora 2a13ceac39 1、完成了<ECG的R峰算法定位>的重构
2、创建好了<人工纠正>和<体动标注>的界面绘制
2025-04-28 16:18:59 +08:00

550 lines
21 KiB
Python

from gc import collect
from pathlib import Path
import matplotlib.pyplot as plt
from PySide6.QtWidgets import QMessageBox, QMainWindow, QWidget, QPushButton, QProgressBar, QApplication
from matplotlib import gridspec
from matplotlib.backends.backend_qt import NavigationToolbar2QT
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg
from overrides import overrides
from pandas import read_csv, DataFrame
from yaml import dump, load, FullLoader
from func.utils.PublicFunc import PublicFunc
from func.utils.Constants import Constants, ConfigParams
from func.utils.detect_Rpeak import preprocess, Rpeak_Detection, get_method
from ui.MainWindow.MainWindow_detect_Rpeak import Ui_MainWindow_detect_Rpeak
from ui.setting.detect_Rpeak_input_setting import Ui_MainWindow_detect_Rpeak_input_setting
Config = {
}
ButtonState = {
"Default": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
},
"Current": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
}
}
class SettingWindow(QMainWindow):
def __init__(self, root_path, sampID):
super(SettingWindow, self).__init__()
self.ui = Ui_MainWindow_detect_Rpeak_input_setting()
self.ui.setupUi(self)
self.root_path = root_path
self.sampID = sampID
self.config = None
self.__read_config__()
self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__)
self.ui.pushButton_confirm.clicked.connect(self.__write_config__)
self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__)
self.ui.pushButton_cancel.clicked.connect(self.close)
def __read_config__(self):
if not Path(ConfigParams.DETECT_RPEAK_CONFIG_FILE_PATH).exists():
with open(ConfigParams.DETECT_RPEAK_CONFIG_FILE_PATH, "w") as f:
dump(ConfigParams.DETECT_RPEAK_CONFIG_NEW_CONTENT, f)
with open(ConfigParams.DETECT_RPEAK_CONFIG_FILE_PATH, "r") as f:
file_config = load(f.read(), Loader=FullLoader)
Config.update(file_config)
self.config = file_config
Config.update({
"Path": {
"Input": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.DETECT_RPEAK_INPUT_ECG_FILENAME +
str(Config["InputConfig"]["Freq"]) +
ConfigParams.ENDSWITH_TXT))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.DETECT_RPEAK_SAVE_FILENAME +
ConfigParams.ENDSWITH_TXT)))
}
})
# 数据回显
self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"])
self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"])
self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"])
def __write_config__(self):
# 从界面写入配置
Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText()
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
# 保存配置到文件
self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
with open(ConfigParams.DETECT_RPEAK_CONFIG_FILE_PATH, "w") as f:
dump(self.config, f)
self.close()
def __rollback_config__(self):
self.__read_config__()
def __update_ui__(self):
self.ui.plainTextEdit_file_path_input.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.DETECT_RPEAK_INPUT_ECG_FILENAME +
str(self.ui.spinBox_input_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
class MainWindow_detect_Rpeak(QMainWindow):
def __init__(self):
super(MainWindow_detect_Rpeak, self).__init__()
self.ui = Ui_MainWindow_detect_Rpeak()
self.ui.setupUi(self)
self.root_path = None
self.sampID = None
self.data = None
self.setting = None
# 初始化进度条
self.progressbar = None
self.add_progressbar()
#初始化画框
self.fig = None
self.canvas = None
self.figToolbar = None
self.gs = None
self.ax0 = None
self.ax1 = None
self.line_data = None
self.point_peak = None
self.line_interval = None
self.point_RRIV = None
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
@overrides
def show(self, root_path, sampID):
super().show()
self.root_path = root_path
self.sampID = sampID
self.setting = SettingWindow(root_path, sampID)
# 初始化画框
self.fig = plt.figure(figsize=(12, 9), dpi=100)
self.canvas = FigureCanvasQTAgg(self.fig)
self.figToolbar = NavigationToolbar2QT(self.canvas)
for action in self.figToolbar.actions():
if action.text() == "Subplots" or action.text() == "Customize":
self.figToolbar.removeAction(action)
self.ui.verticalLayout_canvas.addWidget(self.canvas)
self.ui.verticalLayout_canvas.addWidget(self.figToolbar)
self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1])
self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0)
self.ax0 = self.fig.add_subplot(self.gs[0])
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER)
self.ax0.tick_params(axis='x', colors=Constants.PLOT_COLOR_WHITE)
self.ax1 = self.fig.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(ConfigParams.FORMATTER)
self.__resetAllButton__()
self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BandPassLow"])
self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BandPassHigh"])
self.ui.spinBox_peaksValue.setValue(Config["PeaksValue"])
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__)
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__)
self.ui.spinBox_peaksValue.editingFinished.connect(self.__update_config__)
self.ui.comboBox_method.currentTextChanged.connect(self.__update_config__)
@overrides
def closeEvent(self, event):
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
QApplication.processEvents()
# 清空画框
if self.line_data and self.point_peak:
del self.line_data
del self.point_peak
del self.line_interval
self.canvas.draw()
# 释放资源
del self.data
self.fig.clf()
plt.close(self.fig)
self.deleteLater()
collect()
self.canvas = None
event.accept()
@staticmethod
def __reset__():
ButtonState["Current"].update(ButtonState["Default"].copy())
ButtonState["Current"]["pushButton_view"] = True
def __plot__(self):
# 清空画框
if self.line_data and self.point_peak and self.line_interval and self.point_RRIV:
try:
self.line_data.remove()
self.point_peak.remove()
self.line_interval.remove()
self.point_RRIV.remove()
except ValueError:
pass
sender = self.sender()
if sender == self.ui.pushButton_view:
self.point_RRIV, = self.ax0.plot(self.data.peak[2:], self.data.RRIV,
'r.',
label=Constants.DETECT_RPEAK_PLOT_LABEL_RRIV)
self.line_data, = self.ax1.plot(self.data.processed_data,
color=Constants.PLOT_COLOR_BLUE,
label=Constants.DETECT_RPEAK_PLOT_LABEL_ECG)
self.point_peak, = self.ax1.plot(self.data.peak, self.data.processed_data[self.data.peak],
'r*',
label=Constants.DETECT_RPEAK_PLOT_LABEL_R_PEAKS)
self.line_interval, = self.ax1.plot(self.data.interval,
color=Constants.PLOT_COLOR_GREEN,
label=Constants.DETECT_RPEAK_PLOT_LABEL_INTERVAL)
self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT)
self.ax1.legend(loc=Constants.PLOT_UPPER_RIGHT)
status = True
info = Constants.DRAWING_FINISHED
else:
status = False
info = Constants.DRAWING_FAILURE
self.canvas.draw()
return status, info
def __disableAllButton__(self):
# 禁用所有按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并禁用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Current"].keys():
widget.setEnabled(False)
def __enableAllButton__(self):
# 启用按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并启用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Current"].keys():
widget.setEnabled(ButtonState["Current"][widget.objectName()])
def __resetAllButton__(self):
# 启用按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并启用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Default"].keys():
widget.setEnabled(ButtonState["Default"][widget.objectName()])
def __update_config__(self):
Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value()
Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value()
Config["PeaksValue"] = self.ui.spinBox_peaksValue.value()
Config["DetectMethod"] = self.ui.comboBox_method.currentText()
def finish_operation(self):
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.OPERATION_FINISHED))
self.progressbar.setValue(100)
QApplication.processEvents()
self.__enableAllButton__()
def add_progressbar(self):
self.progressbar = QProgressBar()
self.progressbar.setRange(0, 100)
self.progressbar.setValue(0)
self.progressbar.setStyleSheet(Constants.PROGRESSBAR_STYLE)
self.ui.statusbar.addPermanentWidget(self.progressbar)
def statusbar_show_msg(self, msg):
self.ui.statusbar.showMessage(msg)
def statusbar_clear_msg(self):
self.ui.statusbar.clearMessage()
def __slot_btn_input__(self):
self.__disableAllButton__()
# 清空画框
if self.line_data and self.point_peak and self.line_interval:
try:
self.line_data.remove()
self.point_peak.remove()
self.line_interval.remove()
except ValueError:
pass
self.canvas.draw()
# 清空方法列表
self.ui.comboBox_method.clear()
self.statusbar_show_msg(PublicFunc.format_status_msg("(1/2)" + Constants.DETECT_RPEAK_LOADING_METHOD))
self.progressbar.setValue(0)
QApplication.processEvents()
# 寻找方法
method_list = get_method()
if len(method_list) == 0 or method_list is None:
status = False
info = Constants.DETECT_RPEAK_LOAD_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Method_Not_Exist"]
else:
status = True
info = Constants.DETECT_RPEAK_LOAD_FINISHED
if not status:
PublicFunc.text_output(self.ui, "(1/2)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, "(1/2)" + info, Constants.TIPS_TYPE_INFO)
self.update_ui_comboBox_method(method_list)
self.statusbar_show_msg(PublicFunc.format_status_msg("(2/2)" + Constants.INPUTTING_DATA))
self.progressbar.setValue(10)
QApplication.processEvents()
# 导入数据
self.data = Data()
status, info = self.data.open_file()
if not status:
PublicFunc.text_output(self.ui, "(2/2)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, "(2/2)" + info, Constants.TIPS_TYPE_INFO)
MainWindow_detect_Rpeak.__reset__()
self.finish_operation()
def __slot_btn_view__(self):
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg("(1/3)" + Constants.DETECT_RPEAK_PROCESSING_DATA))
self.progressbar.setValue(0)
QApplication.processEvents()
# 数据预处理
status, info = self.data.preprocess()
if not status:
PublicFunc.text_output(self.ui, "(1/3)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, "(1/3)" + info, Constants.TIPS_TYPE_INFO)
self.statusbar_show_msg(PublicFunc.format_status_msg("(2/3)" + Constants.DETECT_RPEAK_PREDICTING_PEAK))
self.progressbar.setValue(10)
QApplication.processEvents()
# 预测峰值
status, info = self.data.predict_Rpeak()
if not status:
PublicFunc.text_output(self.ui, "(2/3)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, "(2/3)" + info, Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_DATA_LENGTH_POINTS + str(len(self.data.raw_data)),
Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_DURATION_MIN +
str((len(self.data.raw_data) / Config["InputConfig"]["Freq"] / 60)),
Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_PEAK_AMOUNT + str(len(self.data.peak)),
Constants.TIPS_TYPE_INFO)
self.statusbar_show_msg(PublicFunc.format_status_msg("(3/3)" + Constants.DRAWING_DATA))
self.progressbar.setValue(70)
QApplication.processEvents()
# 绘图
status, info = self.__plot__()
if not status:
PublicFunc.text_output(self.ui, "(3/3)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, "(3/3)" + info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_save"] = True
self.finish_operation()
def __slot_btn_save__(self):
reply = QMessageBox.question(self, Constants.QUESTION_TITLE,
Constants.QUESTION_CONTENT + Config["Path"]["Save"],
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes)
if reply == QMessageBox.Yes:
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg("(1/1)" + Constants.SAVING_DATA))
self.progressbar.setValue(0)
QApplication.processEvents()
# 保存
# status, info = self.data.save()
total_rows = len(DataFrame(self.data.peak.reshape(-1)))
chunk_size = ConfigParams.DETECT_RPEAK_SAVE_CHUNK_SIZE
with open(Config["Path"]["Save"], 'w') as f:
for start in range(0, total_rows, chunk_size):
end = min(start + chunk_size, total_rows)
chunk = DataFrame(self.data.peak.reshape(-1)).iloc[start:end]
status, info = self.data.save(chunk)
progress = int((end / total_rows) * 100)
self.progressbar.setValue(progress)
QApplication.processEvents()
if not status:
PublicFunc.text_output(self.ui, "(1/1)" + info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + info, Constants.TIPS_TYPE_INFO)
PublicFunc.msgbox_output(self, info, Constants.TIPS_TYPE_INFO)
self.finish_operation()
def update_ui_comboBox_method(self, method_list):
self.ui.comboBox_method.clear()
self.ui.comboBox_method.addItems(method_list)
class Data:
def __init__(self):
self.file_path_input = Config["Path"]["Input"]
self.file_path_save = Config["Path"]["Save"]
self.raw_data = None
self.processed_data = None
self.peak = None
self.interval = None
self.RRIV = None
def open_file(self):
if not Path(Config["Path"]["Input"]).exists():
return False, Constants.INPUT_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Data_Path_Not_Exist"]
try:
self.raw_data = read_csv(self.file_path_input,
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception:
return False, Constants.INPUT_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Read_Data_Exception"]
return True, Constants.INPUT_FINISHED
def preprocess(self):
if self.raw_data is None:
return False, Constants.DETECT_RPEAK_PROCESS_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Raw_Data_Not_Exist"]
try:
self.processed_data = preprocess(self.raw_data,
Config["InputConfig"]["Freq"],
Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"])
except Exception:
return False, Constants.DETECT_RPEAK_PROCESS_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Filter_Exception"]
return True, Constants.DETECT_RPEAK_PROCESS_FINISHED
def predict_Rpeak(self):
if self.processed_data is None:
return False, Constants.DETECT_RPEAK_PREDICT_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Processed_Data_Not_Exist"]
try:
self.peak, self.interval, self.RRIV = Rpeak_Detection(self.processed_data,
Config["InputConfig"]["Freq"],
Config["PeaksValue"],
Config["DetectMethod"])
except Exception:
return False, Constants.DETECT_RPEAK_PREDICT_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Predict_Exception"]
return True, Constants.DETECT_RPEAK_PREDICT_FINISHED
def save(self, chunk):
if self.peak is None:
return False, Constants.SAVING_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Peak_Not_Exist"]
try:
# DataFrame(self.processed_data.reshape(-1)).to_csv(self.file_path_save,
# index=False,
# header=False,
# float_format='%.4f')
chunk.to_csv(self.file_path_save, mode='a', index=False, header=False)
except Exception:
return False, Constants.SAVING_FAILURE + Constants.DETECT_RPEAK_FAILURE_REASON["Save_Exception"]
return True, Constants.SAVING_FINISHED