Files
Signal_Label_Reborn/func/Module_detect_Rpeak.py

487 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from gc import collect
from pathlib import Path
from traceback import format_exc
import matplotlib.pyplot as plt
from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication
from matplotlib import gridspec
from matplotlib.backends.backend_qt import NavigationToolbar2QT
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg
from overrides import overrides
from pandas import read_csv, DataFrame
from yaml import dump, load, FullLoader
from func.utils.ConfigParams import Filename, Params
from func.utils.PublicFunc import PublicFunc
from func.utils.Constants import Constants
from func.utils.Result import Result
from func.utils.detect_Rpeak import preprocess, Rpeak_Detection, get_method
from ui.MainWindow.MainWindow_detect_Rpeak import Ui_MainWindow_detect_Rpeak
from ui.setting.detect_Rpeak_input_setting import Ui_MainWindow_detect_Rpeak_input_setting
Config = {
}
ButtonState = {
"Default": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
},
"Current": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
}
}
class SettingWindow(QMainWindow):
def __init__(self, root_path, sampID):
super(SettingWindow, self).__init__()
self.ui = Ui_MainWindow_detect_Rpeak_input_setting()
self.ui.setupUi(self)
self.root_path = root_path
self.sampID = sampID
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
self.config = None
self.__read_config__()
self.__examine_freq__()
self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__)
self.ui.pushButton_confirm.clicked.connect(self.__write_config__)
self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__)
self.ui.pushButton_cancel.clicked.connect(self.close)
def __read_config__(self):
if not Path(Params.DETECT_RPEAK_CONFIG_FILE_PATH).exists():
with open(Params.DETECT_RPEAK_CONFIG_FILE_PATH, "w") as f:
dump(Params.DETECT_RPEAK_CONFIG_NEW_CONTENT, f)
with open(Params.DETECT_RPEAK_CONFIG_FILE_PATH, "r") as f:
file_config = load(f.read(), Loader=FullLoader)
Config.update(file_config)
self.config = file_config
Config.update({
"Path": {
"Input": str((Path(self.root_path) / Filename.PATH_PSG_TEXT /
Path(str(self.sampID)))),
"Save": str((Path(self.root_path) / Filename.PATH_PSG_TEXT /
Path(str(self.sampID))))
}
})
# 数据回显
self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"])
self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"])
self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"])
def __write_config__(self):
# 从界面写入配置
Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText()
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
# 保存配置到文件
self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
with open(Params.DETECT_RPEAK_CONFIG_FILE_PATH, "w") as f:
dump(self.config, f)
self.close()
def __rollback_config__(self):
self.__read_config__()
def __update_ui__(self):
self.ui.plainTextEdit_file_path_input.setPlainText(
str((Path(self.root_path) /
Filename.PATH_PSG_TEXT /
Path(str(self.sampID)) /
Path(Filename.ECG_FILTER +
str(self.ui.spinBox_input_freq.value()) +
Params.ENDSWITH_TXT))))
def __examine_freq__(self):
if Path(Config["Path"]["Input"]).is_file():
Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent)
result = PublicFunc.examine_file(Config["Path"]["Input"], Filename.ECG_FILTER, Params.ENDSWITH_TXT)
if result.status:
Config["InputConfig"]["Freq"] = result.data["freq"]
else:
PublicFunc.msgbox_output(self, Filename.ECG_FILTER + Constants.FAILURE_REASON["Get_Freq_Not_Correct"],
Constants.MSGBOX_TYPE_ERROR)
# 数据回显
self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"])
class MainWindow_detect_Rpeak(QMainWindow):
def __init__(self):
super(MainWindow_detect_Rpeak, self).__init__()
self.ui = Ui_MainWindow_detect_Rpeak()
self.ui.setupUi(self)
self.root_path = None
self.sampID = None
self.data = None
self.setting = None
# 初始化进度条
self.progressbar = None
PublicFunc.add_progressbar(self)
#初始化画框
self.fig = None
self.canvas = None
self.figToolbar = None
self.gs = None
self.ax0 = None
self.ax1 = None
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
@overrides
def show(self, root_path, sampID):
super().show()
self.root_path = root_path
self.sampID = sampID
self.setting = SettingWindow(root_path, sampID)
# 初始化画框
self.fig = plt.figure(figsize=(12, 9), dpi=100)
self.canvas = FigureCanvasQTAgg(self.fig)
self.figToolbar = NavigationToolbar2QT(self.canvas)
for action in self.figToolbar.actions():
if action.text() == "Subplots" or action.text() == "Customize":
self.figToolbar.removeAction(action)
self.ui.verticalLayout_canvas.addWidget(self.canvas)
self.ui.verticalLayout_canvas.addWidget(self.figToolbar)
self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1])
self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0)
self.ax0 = self.fig.add_subplot(self.gs[0])
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(Params.FORMATTER)
self.ax0.tick_params(axis='x', colors=Constants.PLOT_COLOR_WHITE)
self.ax1 = self.fig.add_subplot(self.gs[1], sharex=self.ax0)
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(Params.FORMATTER)
PublicFunc.__resetAllButton__(self, ButtonState)
self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BandPassLow"])
self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BandPassHigh"])
self.ui.spinBox_peaksValue.setValue(Config["PeaksValue"])
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__)
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__)
self.ui.spinBox_peaksValue.editingFinished.connect(self.__update_config__)
self.ui.comboBox_method.currentTextChanged.connect(self.__update_config__)
@overrides
def closeEvent(self, event):
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
PublicFunc.__disableAllButton__(self, ButtonState)
PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
QApplication.processEvents()
# 清空画框
if self.ax0 is not None:
self.ax0.clear()
if self.ax1 is not None:
self.ax1.clear()
# 释放资源
del self.data
self.fig.clf()
plt.close(self.fig)
self.deleteLater()
collect()
self.canvas = None
event.accept()
else:
event.ignore()
def __reset__(self):
ButtonState["Current"].update(ButtonState["Default"].copy())
def __plot__(self):
# 清空画框
self.reset_axes()
sender = self.sender()
if sender == self.ui.pushButton_view:
self.ax0.plot(self.data.peak[2:], self.data.RRIV,
'r.',
label=Constants.DETECT_RPEAK_PLOT_LABEL_RRIV)
self.ax1.plot(self.data.processed_data,
color=Constants.PLOT_COLOR_BLUE,
label=Constants.DETECT_RPEAK_PLOT_LABEL_ECG)
self.ax1.plot(self.data.peak, self.data.processed_data[self.data.peak],
'r*',
label=Constants.DETECT_RPEAK_PLOT_LABEL_R_PEAKS)
self.ax1.plot(self.data.interval,
color=Constants.PLOT_COLOR_GREEN,
label=Constants.DETECT_RPEAK_PLOT_LABEL_INTERVAL)
self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT)
self.ax1.legend(loc=Constants.PLOT_UPPER_RIGHT)
self.canvas.draw()
return Result().success(info=Constants.DRAW_FINISHED)
else:
self.canvas.draw()
return Result().failure(info=Constants.DRAW_FAILURE)
def __update_config__(self):
Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value()
Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value()
Config["PeaksValue"] = self.ui.spinBox_peaksValue.value()
Config["DetectMethod"] = self.ui.comboBox_method.currentText()
def __slot_btn_input__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 清空画框
self.reset_axes()
self.canvas.draw()
# 清空方法列表
self.ui.comboBox_method.clear()
self.data = Data()
# 寻找方法
PublicFunc.progressbar_update(self, 1, 2, Constants.DETECT_RPEAK_LOADING_METHOD, 0)
# TODO获取检测方法的解耦
method_list = get_method()
if len(method_list) == 0 or method_list is None:
result = Result().failure(info=Constants.DETECT_RPEAK_LOAD_FAILURE + Constants.FAILURE_REASON["Method_Not_Exist"])
else:
result = Result().success(info=Constants.DETECT_RPEAK_LOAD_FINISHED)
if not result.status:
PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO)
self.update_ui_comboBox_method(method_list)
# 导入数据
PublicFunc.progressbar_update(self, 2, 2, Constants.INPUTTING_DATA, 10)
result = self.data.open_file()
if not result.status:
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_input_setting"] = False
ButtonState["Current"]["pushButton_input"] = False
ButtonState["Current"]["pushButton_view"] = True
ButtonState["Current"]["pushButton_save"] = False
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_view__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 数据预处理
PublicFunc.progressbar_update(self, 1, 3, Constants.PREPROCESSING_DATA, 0)
result = self.data.preprocess()
if not result.status:
PublicFunc.text_output(self.ui, "(1/3)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/3)" + result.info, Constants.TIPS_TYPE_INFO)
# 预测峰值
PublicFunc.progressbar_update(self, 2, 3, Constants.DETECT_RPEAK_PREDICTING_PEAK, 10)
result = self.data.predict_Rpeak()
if not result.status:
PublicFunc.text_output(self.ui, "(2/3)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/3)" + result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_DATA_LENGTH_POINTS + str(len(self.data.raw_data)),
Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_DURATION_MIN +
str((len(self.data.raw_data) / Config["InputConfig"]["Freq"] / 60)),
Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_RPEAK_PEAK_AMOUNT + str(len(self.data.peak)),
Constants.TIPS_TYPE_INFO)
# 绘图
PublicFunc.progressbar_update(self, 3, 3, Constants.DRAWING_DATA, 70)
result = self.__plot__()
if not result.status:
PublicFunc.text_output(self.ui, "(3/3)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(3/3)" + result.info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_save"] = True
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_save__(self):
reply = QMessageBox.question(self, Constants.QUESTION_TITLE,
Constants.QUESTION_CONTENT + Config["Path"]["Save"],
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes)
if reply == QMessageBox.Yes:
PublicFunc.__disableAllButton__(self, ButtonState)
# 保存
PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0)
total_rows = len(DataFrame(self.data.peak.reshape(-1)))
chunk_size = Params.DETECT_RPEAK_SAVE_CHUNK_SIZE
try:
with open(Config["Path"]["Save"], 'w') as f:
for start in range(0, total_rows, chunk_size):
end = min(start + chunk_size, total_rows)
chunk = DataFrame(self.data.peak.reshape(-1)).iloc[start:end]
result = self.data.save(chunk)
progress = int((end / total_rows) * 100)
self.progressbar.setValue(progress)
QApplication.processEvents()
except FileNotFoundError as e:
result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"])
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.finish_operation(self, ButtonState)
def reset_axes(self):
self.ax0.clear()
self.ax1.clear()
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(Params.FORMATTER)
self.ax0.tick_params(axis='x', colors=Constants.PLOT_COLOR_WHITE)
self.ax1.grid(True)
self.ax1.xaxis.set_major_formatter(Params.FORMATTER)
def update_ui_comboBox_method(self, method_list):
self.ui.comboBox_method.clear()
self.ui.comboBox_method.addItems(method_list)
class Data:
def __init__(self):
self.raw_data = None
self.processed_data = None
self.peak = None
self.interval = None
self.RRIV = None
def open_file(self):
if Path(Config["Path"]["Input"]).is_file():
Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent)
result = PublicFunc.examine_file(Config["Path"]["Input"], Filename.ECG_FILTER, Params.ENDSWITH_TXT)
if result.status:
Config["Path"]["Input"] = result.data["path"]
Config["InputConfig"]["Freq"] = result.data["freq"]
Config["Path"]["Save"] = str(
Path(Config["Path"]["Save"]) / Path(Filename.RPEAK_FINAL + str(Config["InputConfig"]["Freq"]) + Params.ENDSWITH_TXT))
else:
return result
try:
self.raw_data = read_csv(Config["Path"]["Input"],
encoding=Params.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception as e:
return Result().failure(info=Constants.INPUT_FAILURE +
Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.INPUT_FINISHED)
def preprocess(self):
if self.raw_data is None:
return Result().failure(info=Constants.PREPROCESS_FAILURE +
Constants.FAILURE_REASON["Data_Not_Exist"])
try:
if ((Config["Filter"]["BandPassLow"] >= Config["Filter"]["BandPassHigh"]) or
(Config["Filter"]["BandPassLow"] <= 0) or (Config["Filter"]["BandPassHigh"] <= 0)):
return Result().failure(
info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Filter_Args_Not_Correct"])
self.processed_data = preprocess(self.raw_data,
Config["InputConfig"]["Freq"],
Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"])
except Exception as e:
return Result().failure(info=Constants.PREPROCESS_FAILURE +
Constants.FAILURE_REASON["Preprocess_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.PREPROCESS_FINISHED)
def predict_Rpeak(self):
if self.processed_data is None:
return Result().failure(info=Constants.DETECT_RPEAK_PREDICT_FAILURE +
Constants.FAILURE_REASON["Data_Not_Exist"])
try:
self.peak, self.interval, self.RRIV = Rpeak_Detection(self.processed_data,
Config["InputConfig"]["Freq"],
Config["PeaksValue"],
Config["DetectMethod"])
except Exception as e:
return Result().failure(info=Constants.DETECT_RPEAK_PREDICT_FAILURE +
Constants.FAILURE_REASON["Predict_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.DETECT_RPEAK_PREDICT_FINISHED)
def save(self, chunk):
if self.peak is None:
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"])
try:
chunk.to_csv(Config["Path"]["Save"], mode='a', index=False, header=False)
except PermissionError as e:
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"])
except Exception as e:
return Result().failure(info=Constants.SAVE_FAILURE +
Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.SAVE_FINISHED)