from gc import collect from pathlib import Path from traceback import format_exc import matplotlib.pyplot as plt from PySide6.QtCore import QCoreApplication from PySide6.QtGui import QAction, QFont from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication, QButtonGroup from matplotlib import gridspec, patches from matplotlib.backends.backend_qt import NavigationToolbar2QT from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg from numpy import (diff, where, correlate, corrcoef, searchsorted, sum as np_sum, max as np_max, min as np_min, arange, array, append, delete, abs as np_abs, argmin as np_argmin, argmax as np_argmax, asarray) from overrides import overrides from pandas import read_csv, DataFrame from resampy import resample from scipy.signal import find_peaks from yaml import dump, load, FullLoader from func.utils.ConfigParams import Filename, Params from func.utils.PublicFunc import PublicFunc from func.utils.Constants import Constants from func.utils.Result import Result from ui.MainWindow.MainWindow_precisely_align import Ui_MainWindow_precisely_align from ui.setting.precisely_align_input_setting import Ui_MainWindow_precisely_align_input_setting Config = { } ButtonState = { "Default": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_calculate_correlation": False, "pushButton_correlation_align": False, "pushButton_view_align": False, "pushButton_save": False }, "Current": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_calculate_correlation": False, "pushButton_correlation_align": False, "pushButton_view_align": False, "pushButton_save": False }, "Statue_1": { "pushButton_input_setting": False, "pushButton_input": True, "pushButton_calculate_correlation": True, "pushButton_correlation_align": False, "pushButton_view_align": False, "pushButton_save": False }, "Statue_2": { "pushButton_input_setting": False, "pushButton_input": True, "pushButton_calculate_correlation": True, "pushButton_correlation_align": True, "pushButton_view_align": False, "pushButton_save": False }, "Statue_3": { "pushButton_input_setting": False, "pushButton_input": True, "pushButton_calculate_correlation": False, "pushButton_correlation_align": False, "pushButton_view_align": True, "pushButton_save": False }, "Statue_4": { "pushButton_input_setting": False, "pushButton_input": True, "pushButton_calculate_correlation": False, "pushButton_correlation_align": False, "pushButton_view_align": False, "pushButton_save": True }, } class SettingWindow(QMainWindow): def __init__(self, root_path, sampID): super(SettingWindow, self).__init__() self.ui = Ui_MainWindow_precisely_align_input_setting() self.ui.setupUi(self) self.root_path = root_path self.sampID = sampID self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) self.config = None self.__read_config__() self.__examine_freq__() self.ui.spinBox_input_freq_orgBcg.valueChanged.connect(self.__update_ui__) self.ui.spinBox_input_freq_BCG.valueChanged.connect(self.__update_ui__) self.ui.spinBox_input_freq_ECG.valueChanged.connect(self.__update_ui__) self.ui.pushButton_confirm.clicked.connect(self.__write_config__) self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__) self.ui.pushButton_cancel.clicked.connect(self.close) def __read_config__(self): if not Path(Params.PRECISELY_ALIGN_CONFIG_FILE_PATH).exists(): with open(Params.PRECISELY_ALIGN_CONFIG_FILE_PATH, "w") as f: dump(Params.PRECISELY_ALIGN_CONFIG_NEW_CONTENT, f) with open(Params.PRECISELY_ALIGN_CONFIG_FILE_PATH, "r") as f: file_config = load(f.read(), Loader=FullLoader) Config.update(file_config) self.config = file_config Config.update({ "Path": { "Input_OrgBCG": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Input_BCG": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Input_Jpeak": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Input_ECG": str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)))), "Input_Rpeak": str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)))), "Input_Approximately_Align": str((Path(self.root_path) / Filename.PATH_LABEL / Path(str(self.sampID)))), "Save_AlignInfo": str((Path(self.root_path) / Filename.PATH_LABEL / Path(str(self.sampID)))), "Save_OrgBCG": str((Path(self.root_path) / Filename.PATH_ORGBCG_ALIGNED / Path(str(self.sampID)))), "Save_BCG": str((Path(self.root_path) / Filename.PATH_ORGBCG_ALIGNED / Path(str(self.sampID)))), "Save_ECG": str((Path(self.root_path) / Filename.PATH_PSG_ALIGNED / Path(str(self.sampID)))), "Save_Jpeak": str((Path(self.root_path) / Filename.PATH_ORGBCG_ALIGNED / Path(str(self.sampID)))), "Save_Rpeak": str((Path(self.root_path) / Filename.PATH_PSG_ALIGNED / Path(str(self.sampID)))) }, "Coordinate": { "BCG_front_1": 0, "BCG_front_2": 0, "BCG_back_1": 0, "BCG_back_2": 0, "ECG_front_1": 0, "ECG_front_2": 0, "ECG_back_1": 0, "ECG_back_2": 0 }, "IV_Coordinate": { "BCG_front_1": 0, "BCG_front_2": 0, "BCG_back_1": 0, "BCG_back_2": 0, "ECG_front_1": 0, "ECG_front_2": 0, "ECG_back_1": 0, "ECG_back_2": 0 }, "front": { "shift": 0, "offset_interval": 0, "anchor_J": 0, "anchor_R": 0, "offset_interval_duration": 0 }, "back": { "shift": 0, "offset_interval": 0, "anchor_J": 0, "anchor_R": 0, "offset_interval_duration": 0 }, "offset_anchor": 0, "orgfs": 0, "offset_correct": 0, "frontcut_index_BCG": 0, "backcut_index_BCG": 0, "frontcut_index_ECG": 0, "backcut_index_ECG": 0 }) # 数据回显 self.ui.spinBox_input_freq_orgBcg.setValue(Config["InputConfig"]["orgBcgFreq"]) self.ui.spinBox_input_freq_BCG.setValue(Config["InputConfig"]["BCGFreq"]) self.ui.spinBox_input_freq_ECG.setValue(Config["InputConfig"]["ECGFreq"]) self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText(Config["Path"]["Input_OrgBCG"]) self.ui.plainTextEdit_file_path_input_BCG.setPlainText(Config["Path"]["Input_BCG"]) self.ui.plainTextEdit_file_path_input_Jpeak.setPlainText(Config["Path"]["Input_Jpeak"]) self.ui.plainTextEdit_file_path_input_ECG.setPlainText(Config["Path"]["Input_ECG"]) self.ui.plainTextEdit_file_path_input_Rpeak.setPlainText(Config["Path"]["Input_Rpeak"]) self.ui.plainTextEdit_file_path_input_approximately_align.setPlainText( Config["Path"]["Input_Approximately_Align"]) self.ui.plainTextEdit_file_path_save_AlignInfo.setPlainText(Config["Path"]["Save_AlignInfo"]) self.ui.plainTextEdit_file_path_save_orgBcg.setPlainText(Config["Path"]["Save_OrgBCG"]) self.ui.plainTextEdit_file_path_save_BCG.setPlainText(Config["Path"]["Save_BCG"]) self.ui.plainTextEdit_file_path_save_ECG.setPlainText(Config["Path"]["Save_ECG"]) self.ui.plainTextEdit_file_path_save_Jpeak.setPlainText(Config["Path"]["Save_Jpeak"]) self.ui.plainTextEdit_file_path_save_Rpeak.setPlainText(Config["Path"]["Save_Rpeak"]) def __write_config__(self): # 从界面写入配置 Config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_freq_orgBcg.value() Config["InputConfig"]["BCGFreq"] = self.ui.spinBox_input_freq_BCG.value() Config["InputConfig"]["ECGFreq"] = self.ui.spinBox_input_freq_ECG.value() Config["Path"]["Input_OrgBCG"] = self.ui.plainTextEdit_file_path_input_orgBcg.toPlainText() Config["Path"]["Input_BCG"] = self.ui.plainTextEdit_file_path_input_BCG.toPlainText() Config["Path"]["Input_Jpeak"] = self.ui.plainTextEdit_file_path_input_Jpeak.toPlainText() Config["Path"]["Input_ECG"] = self.ui.plainTextEdit_file_path_input_ECG.toPlainText() Config["Path"]["Input_Rpeak"] = self.ui.plainTextEdit_file_path_input_Rpeak.toPlainText() Config["Path"][ "Input_Approximately_Align"] = self.ui.plainTextEdit_file_path_input_approximately_align.toPlainText() Config["Path"]["Save_AlignInfo"] = self.ui.plainTextEdit_file_path_save_AlignInfo.toPlainText() Config["Path"]["Save_OrgBCG"] = self.ui.plainTextEdit_file_path_save_orgBcg.toPlainText() Config["Path"]["Save_BCG"] = self.ui.plainTextEdit_file_path_save_BCG.toPlainText() Config["Path"]["Save_ECG"] = self.ui.plainTextEdit_file_path_save_ECG.toPlainText() Config["Path"]["Save_Jpeak"] = self.ui.plainTextEdit_file_path_save_Jpeak.toPlainText() Config["Path"]["Save_Rpeak"] = self.ui.plainTextEdit_file_path_save_Rpeak.toPlainText() # 保存配置到文件 self.config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_freq_orgBcg.value() self.config["InputConfig"]["BCGFreq"] = self.ui.spinBox_input_freq_BCG.value() self.config["InputConfig"]["ECGFreq"] = self.ui.spinBox_input_freq_ECG.value() with open(Params.PRECISELY_ALIGN_CONFIG_FILE_PATH, "w") as f: dump(self.config, f) self.close() def __rollback_config__(self): self.__read_config__() def __update_ui__(self): self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText( str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(Filename.ORGBCG_RAW + str(self.ui.spinBox_input_freq_orgBcg.value()) + Params.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_input_BCG.setPlainText( str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(Filename.BCG_FILTER + str(self.ui.spinBox_input_freq_BCG.value()) + Params.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_input_ECG.setPlainText( str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)) / Path(Filename.ECG_FILTER + str(self.ui.spinBox_input_freq_ECG.value()) + Params.ENDSWITH_TXT)))) def __examine_freq__(self): if Path(Config["Path"]["Input_OrgBCG"]).is_file(): Config["Path"]["Input_OrgBCG"] = str(Path(Config["Path"]["Input_OrgBCG"]).parent) if Path(Config["Path"]["Input_BCG"]).is_file(): Config["Path"]["Input_BCG"] = str(Path(Config["Path"]["Input_BCG"]).parent) if Path(Config["Path"]["Input_Jpeak"]).is_file(): Config["Path"]["Input_Jpeak"] = str(Path(Config["Path"]["Input_Jpeak"]).parent) if Path(Config["Path"]["Input_ECG"]).is_file(): Config["Path"]["Input_ECG"] = str(Path(Config["Path"]["Input_ECG"]).parent) if Path(Config["Path"]["Input_Rpeak"]).is_file(): Config["Path"]["Input_Rpeak"] = str(Path(Config["Path"]["Input_Rpeak"]).parent) result = PublicFunc.examine_file(Config["Path"]["Input_OrgBCG"], Filename.ORGBCG_RAW, Params.ENDSWITH_TXT) if result.status: Config["InputConfig"]["orgBcgFreq"] = result.data["freq"] else: PublicFunc.msgbox_output(self, Filename.ORGBCG_RAW + Constants.FAILURE_REASON["Get_Freq_Not_Correct"], Constants.MSGBOX_TYPE_ERROR) result = PublicFunc.examine_file(Config["Path"]["Input_BCG"], Filename.BCG_FILTER, Params.ENDSWITH_TXT) if result.status: Config["InputConfig"]["BCGFreq"] = result.data["freq"] else: PublicFunc.msgbox_output(self, Filename.BCG_FILTER + Constants.FAILURE_REASON["Get_Freq_Not_Correct"], Constants.MSGBOX_TYPE_ERROR) result = PublicFunc.examine_file(Config["Path"]["Input_ECG"], Filename.ECG_FILTER, Params.ENDSWITH_TXT) if result.status: Config["InputConfig"]["ECGFreq"] = result.data["freq"] else: PublicFunc.msgbox_output(self, Filename.ECG_FILTER + Constants.FAILURE_REASON["Get_Freq_Not_Correct"], Constants.MSGBOX_TYPE_ERROR) # 数据回显 self.ui.spinBox_input_freq_orgBcg.setValue(Config["InputConfig"]["orgBcgFreq"]) self.ui.spinBox_input_freq_BCG.setValue(Config["InputConfig"]["BCGFreq"]) self.ui.spinBox_input_freq_ECG.setValue(Config["InputConfig"]["ECGFreq"]) class MainWindow_precisely_align(QMainWindow): def __init__(self): super(MainWindow_precisely_align, self).__init__() self.ui = Ui_MainWindow_precisely_align() self.ui.setupUi(self) self.root_path = None self.sampID = None self.data = None self.setting = None self.buttonGroup = None # 初始化进度条 self.progressbar = None PublicFunc.add_progressbar(self) # 初始化画框 self.fig = None self.canvas = None self.figToolbar = None self.gs = None self.ax0 = None self.ax1 = None self.ax2 = None self.ax3 = None self.ax4 = None self.is_left_button_pressed = None self.rect_down = None self.rect_up = None self.point0 = None self.point1 = None self.selected_point0 = None self.selected_point1 = None self.selected_point2 = None self.selected_point3 = None self.stem_black0 = None self.stem_black1 = None self.ax0_xlime = None self.ax0_ylime = None self.ax2_xlime = None self.ax2_ylime = None self.ax4_xlime = None self.ax4_ylime = None self.ui.textBrowser_info.setStyleSheet("QTextBrowser { background-color: rgb(255, 255, 200); }") PublicFunc.__styleAllButton__(self, ButtonState) self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) @overrides def show(self, root_path, sampID): super().show() self.root_path = root_path self.sampID = sampID self.setting = SettingWindow(root_path, sampID) # 初始化画框 self.fig = plt.figure(figsize=(12, 9), dpi=100) self.canvas = FigureCanvasQTAgg(self.fig) self.figToolbar = CustomNavigationToolbar(self.canvas, self) self.figToolbar.action_Get_Range.setEnabled(False) for action in self.figToolbar._actions.values(): action.setEnabled(False) for action in self.figToolbar.actions(): if action.text() == "Subplots" or action.text() == "Customize": self.figToolbar.removeAction(action) self.figToolbar._actions['home'].triggered.connect(self.toggle_home) self.figToolbar.action_Get_Range.triggered.connect(self.toggle_getRange) self.ui.verticalLayout_canvas.addWidget(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.figToolbar) PublicFunc.__resetAllButton__(self, ButtonState) self.buttonGroup = QButtonGroup(self) self.__update_info__() self.buttonGroup.addButton(self.ui.radioButton_BCG_front) self.buttonGroup.addButton(self.ui.radioButton_ECG_front) self.buttonGroup.addButton(self.ui.radioButton_BCG_back) self.buttonGroup.addButton(self.ui.radioButton_ECG_back) self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__) self.ui.pushButton_input_setting.clicked.connect(self.setting.show) self.ui.pushButton_calculate_correlation.clicked.connect(self.__slot_btn_calculate_correlation__) self.ui.pushButton_correlation_align.clicked.connect(self.__slot_btn_correlation_align__) self.ui.pushButton_view_align.clicked.connect(self.__slot_btn_view_align__) self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__) self.canvas.mpl_connect('pick_event', self.on_pick) self.ui.spinBox_BCG_front_JJIV_1.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_BCG_front_JJIV_2.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_BCG_back_JJIV_1.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_BCG_back_JJIV_2.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_ECG_front_RRIV_1.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_ECG_front_RRIV_2.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_ECG_back_RRIV_1.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_ECG_back_RRIV_2.editingFinished.connect(self.__update_coordinate__) @overrides def closeEvent(self, event): reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN)) QApplication.processEvents() # 清空画框 del self.point0 del self.point1 del self.selected_point0 del self.selected_point1 del self.selected_point2 del self.selected_point3 del self.stem_black0 del self.stem_black1 del self.figToolbar.ax0_BCG_rectangle_front del self.figToolbar.ax0_BCG_rectangle_back del self.figToolbar.ax1_ECG_rectangle_front del self.figToolbar.ax1_ECG_rectangle_back if self.ax0 is not None: self.ax0.clear() if self.ax1 is not None: self.ax1.clear() if self.ax2 is not None: self.ax2.clear() if self.ax3 is not None: self.ax3.clear() if self.ax4 is not None: self.ax4.clear() # 释放资源 self.setting.close() del self.data self.fig.clf() plt.close(self.fig) self.deleteLater() collect() self.canvas = None event.accept() else: event.ignore() def __reset__(self): ButtonState["Current"].update(ButtonState["Default"].copy()) def __plot__(self, plot_element=None): # 清空画框 if self.figToolbar.ax0_BCG_rectangle_front is not None: self.figToolbar.ax0_BCG_rectangle_front.remove() if self.figToolbar.ax0_BCG_rectangle_back is not None: self.figToolbar.ax0_BCG_rectangle_back.remove() if self.figToolbar.ax1_ECG_rectangle_front is not None: self.figToolbar.ax1_ECG_rectangle_front.remove() if self.figToolbar.ax1_ECG_rectangle_back is not None: self.figToolbar.ax1_ECG_rectangle_back.remove() self.figToolbar.ax0_BCG_rectangle_front = None self.figToolbar.ax0_BCG_rectangle_back = None self.figToolbar.ax1_ECG_rectangle_front = None self.figToolbar.ax1_ECG_rectangle_back = None self.reset_axes() sender = self.sender() if sender == self.ui.pushButton_input: self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1]) self.fig.subplots_adjust(top=0.95, bottom=0.05, right=0.98, left=0.05, hspace=0.15, wspace=0) self.ax0 = self.fig.add_subplot(self.gs[0]) self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(Params.FORMATTER) self.ax1 = self.fig.add_subplot(self.gs[1], sharex=self.ax0, sharey=self.ax0) self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(Params.FORMATTER) Jpeak = self.data.Jpeak[:-2] Rpeak = self.data.Rpeak[:-2] self.ax0.set_title("JJIV") self.ax0.stem(Jpeak, plot_element["JJIVs"], markerfmt="C0.", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.ax1.set_title("RRIV") self.ax1.stem(Rpeak, plot_element["RRIVs"], markerfmt="C0.", linefmt=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RRIV) if self.data.BCG_early is True: self.ax0.axvline(x=self.data.approximately_align_pos, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") self.ax1.axvline(x=0, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") elif self.data.BCG_early is False: self.ax0.axvline(x=0, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") self.ax1.axvline(x=self.data.approximately_align_pos, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") else: self.ax0.axvline(x=self.data.approximately_align_pos, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") self.ax1.axvline(x=self.data.approximately_align_pos, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT) self.ax1.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAW_FINISHED) elif sender == self.ui.pushButton_calculate_correlation and plot_element is not None and plot_element[ "mode"] == "init": self.gs = gridspec.GridSpec(2, 2, height_ratios=[1, 1], width_ratios=[1, 1]) self.fig.subplots_adjust(top=0.88, bottom=0.05, right=0.98, left=0.05, hspace=0.15, wspace=0.15) self.ax0 = self.fig.add_subplot(self.gs[0]) self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(Params.FORMATTER) self.ax1 = self.fig.add_subplot(self.gs[2]) self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(Params.FORMATTER) self.ax2 = self.fig.add_subplot(self.gs[1]) self.ax2.grid(True) self.ax2.xaxis.set_major_formatter(Params.FORMATTER) self.ax3 = self.fig.add_subplot(self.gs[3]) self.ax3.grid(True) self.ax3.xaxis.set_major_formatter(Params.FORMATTER) self.ax0.set_title( "front\ncorre_IIV: {}, corre_II: {}\nsame_sign_rate:{}, total_time_ratio: {}\nshift: {}, alignment offset: {} seconds\noffset_interval: {}, anchor_J: {}, anchor_R: {}".format( plot_element["front"]["correlation_IIV"], plot_element["front"]["correlation_II"], plot_element["front"]["same_sign_rate"], plot_element["front"]["total_time_ratio"], plot_element["front"]["shift"], plot_element["front"]["offset_interval_duration"], plot_element["front"]["offset_interval"], plot_element["front"]["anchor_J"], plot_element["front"]["anchor_R"])) self.ax0.stem(plot_element["front"]["corre"], markerfmt="C0.", label=Constants.PRECISELY_ALIGN_PLOT_LABEL_CORRE_RRIV_JJIV) self.selected_point0, = self.ax0.plot(plot_element["front"]["shift"], plot_element["front"]["corre"][plot_element["front"]["shift"]] + 1, 'v', color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_SELECTED_POINT) self.ax0.plot(plot_element["front"]["corre"], 'o', color=Constants.PLOT_COLOR_BLUE, markersize=3, picker=True, pickradius=5) self.ax1.stem(plot_element["front"]["RRIVs"], markerfmt="b.", linefmt=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RRIV) self.stem_black0 = self.ax1.stem(arange(plot_element["front"]["shift"], plot_element["front"]["shift"] + len( plot_element["front"]["JJIVs"])), plot_element["front"]["JJIVs"], markerfmt="ko", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.ax2.set_title( "back\ncorre_IIV: {}, corre_II: {}\nsame_sign_rate:{}, total_time_ratio: {}\nshift: {}, alignment offset: {} seconds\noffset_interval: {}, anchor_J: {}, anchor_R: {}".format( plot_element["back"]["correlation_IIV"], plot_element["back"]["correlation_II"], plot_element["back"]["same_sign_rate"], plot_element["back"]["total_time_ratio"], plot_element["back"]["shift"], plot_element["back"]["offset_interval_duration"], plot_element["back"]["offset_interval"], plot_element["back"]["anchor_J"], plot_element["back"]["anchor_R"])) self.ax2.stem(plot_element["back"]["corre"], markerfmt="C0.", label=Constants.PRECISELY_ALIGN_PLOT_LABEL_CORRE_RRIV_JJIV) self.selected_point1, = self.ax2.plot(plot_element["back"]["shift"], plot_element["back"]["corre"][plot_element["back"]["shift"]] + 1, 'v', color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_SELECTED_POINT) self.ax2.plot(plot_element["back"]["corre"], 'o', color=Constants.PLOT_COLOR_BLUE, markersize=3, picker=True, pickradius=5) self.ax3.stem(plot_element["back"]["RRIVs"], markerfmt="b.", linefmt=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RRIV) self.stem_black1 = self.ax3.stem(arange(plot_element["back"]["shift"], plot_element["back"]["shift"] + len(plot_element["back"]["JJIVs"])), plot_element["back"]["JJIVs"], markerfmt="ko", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT) self.ax1.legend(loc=Constants.PLOT_UPPER_RIGHT) self.ax2.legend(loc=Constants.PLOT_UPPER_RIGHT) self.ax3.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAW_FINISHED) elif sender == self.ui.pushButton_correlation_align or ( plot_element is not None and plot_element["mode"] == "select"): self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.fig.subplots_adjust(top=0.95, bottom=0.05, right=0.98, left=0.05, hspace=0, wspace=0) self.ax4 = self.fig.add_subplot(self.gs[0]) self.ax4.grid(True) self.ax4.xaxis.set_major_formatter(Params.FORMATTER) self.ax4.set_title("offset correct") self.ax4.plot(plot_element["cut_ECG"], color=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_ECG) self.ax4.plot(plot_element["res_BCG"], color=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_BCG) self.ax4.plot([plot_element["anchor00"], plot_element["anchor00"]], [plot_element["b"], plot_element["a"]], 'k--') self.ax4.plot([plot_element["anchor10"], plot_element["anchor10"]], [plot_element["b"], plot_element["a"]], 'k--') self.point0, = self.ax4.plot(plot_element["peak_ECG"], plot_element["cut_ECG"][plot_element["peak_ECG"]], 'o', markersize=3, color=Constants.PLOT_COLOR_BLUE, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RPEAK) self.point1, = self.ax4.plot(plot_element["peak_BCG"], plot_element["res_BCG"][plot_element["peak_BCG"]], 'o', markersize=3, color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JPEAK) self.ax4.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAW_FINISHED) elif sender == self.ui.pushButton_view_align: self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.fig.subplots_adjust(top=0.95, bottom=0.05, right=0.98, left=0.05, hspace=0, wspace=0) self.ax4 = self.fig.add_subplot(self.gs[0]) self.ax4.grid(True) self.ax4.xaxis.set_major_formatter(Params.FORMATTER) self.ax4.set_title("result preview") self.ax4.plot(self.data.cut_ECG, color=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_ECG) self.ax4.plot(self.data.res_BCG, color=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_BCG) self.ax4.plot(self.data.cut_Rpeak, self.data.cut_ECG[self.data.cut_Rpeak], 'v', color=Constants.PLOT_COLOR_BLUE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RPEAK) self.ax4.plot(self.data.cut_Jpeak, self.data.res_BCG[self.data.cut_Jpeak], 'v', color=Constants.PLOT_COLOR_RED, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JPEAK) self.ax4.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAW_FINISHED) else: self.canvas.draw() return Result().failure(info=Constants.DRAW_FAILURE) def __update_info__(self): self.ui.spinBox_BCG_front_JJIV_1.setValue(Config["IV_Coordinate"]["BCG_front_1"]) self.ui.spinBox_BCG_front_JJIV_2.setValue(Config["IV_Coordinate"]["BCG_front_2"]) self.ui.spinBox_BCG_back_JJIV_1.setValue(Config["IV_Coordinate"]["BCG_back_1"]) self.ui.spinBox_BCG_back_JJIV_2.setValue(Config["IV_Coordinate"]["BCG_back_2"]) self.ui.spinBox_ECG_front_RRIV_1.setValue(Config["IV_Coordinate"]["ECG_front_1"]) self.ui.spinBox_ECG_front_RRIV_2.setValue(Config["IV_Coordinate"]["ECG_front_2"]) self.ui.spinBox_ECG_back_RRIV_1.setValue(Config["IV_Coordinate"]["ECG_back_1"]) self.ui.spinBox_ECG_back_RRIV_2.setValue(Config["IV_Coordinate"]["ECG_back_2"]) self.ui.spinBox_BCG_front_Signal_1.setValue(Config["Coordinate"]["BCG_front_1"]) self.ui.spinBox_BCG_front_Signal_2.setValue(Config["Coordinate"]["BCG_front_2"]) self.ui.spinBox_BCG_back_Signal_1.setValue(Config["Coordinate"]["BCG_back_1"]) self.ui.spinBox_BCG_back_Signal_2.setValue(Config["Coordinate"]["BCG_back_2"]) self.ui.spinBox_ECG_front_Signal_1.setValue(Config["Coordinate"]["ECG_front_1"]) self.ui.spinBox_ECG_front_Signal_2.setValue(Config["Coordinate"]["ECG_front_2"]) self.ui.spinBox_ECG_back_Signal_1.setValue(Config["Coordinate"]["ECG_back_1"]) self.ui.spinBox_ECG_back_Signal_2.setValue(Config["Coordinate"]["ECG_back_2"]) def __slot_btn_input__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 清空画框 self.reset_axes() self.canvas.draw() self.data = Data() # 导入数据 PublicFunc.progressbar_update(self, 1, 3, Constants.INPUTTING_DATA, 0) result = self.data.open_file() if not result.status: PublicFunc.text_output(self.ui, "(1/3)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/3)" + result.info, Constants.TIPS_TYPE_INFO) # 重采样 PublicFunc.progressbar_update(self, 2, 4, Constants.RESAMPLING_DATA, 0) result = self.data.resample() if not result.status: PublicFunc.text_output(self.ui, "(2/4)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/4)" + result.info, Constants.TIPS_TYPE_INFO) # 处理数据 PublicFunc.progressbar_update(self, 3, 4, Constants.PRECISELY_ALIGN_PROCESSING_DATA, 50) result = self.data.data_process_for_calculate_correlation() if not result.status: PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 4, 4, Constants.DRAWING_DATA, 70) result = self.__plot__(result.data) if not result.status: PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_INFO) self.setting.close() self.figToolbar.action_Get_Range.setEnabled(True) self.rect_down = min(self.ax0.get_ylim()[0], self.ax1.get_ylim()[0]) - 10000 self.rect_up = max(self.ax0.get_ylim()[1], self.ax1.get_ylim()[1]) + 10000 for action in self.figToolbar._actions.values(): action.setEnabled(True) ButtonState["Current"].update(ButtonState["Statue_1"].copy()) PublicFunc.finish_operation(self, ButtonState) self.ui.pushButton_input.clicked.disconnect() self.ui.pushButton_input.clicked.connect(self.__slot_btn_repick__) self.ui.pushButton_input.setText("重新选取") def __slot_btn_repick__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 清空画框 self.reset_axes() self.canvas.draw() self.data.correlation_align_point_match_ECG = array([]).astype(int) self.data.correlation_align_point_match_BCG = array([]).astype(int) # 处理数据 PublicFunc.progressbar_update(self, 1, 2, Constants.PRECISELY_ALIGN_PROCESSING_DATA, 50) result = self.data.data_process_for_calculate_correlation() if not result.status: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 70) result = self.__plot__(result.data) if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) self.figToolbar.action_Get_Range.setEnabled(True) self.rect_down = min(self.ax0.get_ylim()[0], self.ax1.get_ylim()[0]) - 10000 self.rect_up = max(self.ax0.get_ylim()[1], self.ax1.get_ylim()[1]) + 10000 for action in self.figToolbar._actions.values(): action.setEnabled(True) ButtonState["Current"].update(ButtonState["Statue_1"].copy()) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_calculate_correlation__(self, test1=None, shift_front=None, shift_back=None): # TODO:这里有个未知的BUG,虽然不影响功能,但会影响代码整洁性,第一个形参赋值为None时,之后使用变量时将会变成False,不知道为什么 PublicFunc.__disableAllButton__(self, ButtonState) sender = self.sender() if sender == self.ui.pushButton_calculate_correlation: mode = "init" else: mode = "select" self.ax0_xlime = self.ax0.get_xlim() self.ax0_ylime = self.ax0.get_ylim() self.ax2_xlime = self.ax2.get_xlim() self.ax2_ylime = self.ax2.get_ylim() # 计算前段相关性 PublicFunc.progressbar_update(self, 1, 3, Constants.PRECISELY_ALIGN_CALCULATING_CORRELATION_FRONT, 0) result1 = self.data.calculate_correlation_front(mode, shift_front) if not result1.status: PublicFunc.text_output(self.ui, "(1/3)" + result1.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result1.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/3)" + result1.info, Constants.TIPS_TYPE_INFO) # 计算后段相关性 PublicFunc.progressbar_update(self, 2, 3, Constants.PRECISELY_ALIGN_CALCULATING_CORRELATION_BACK, 30) result2 = self.data.calculate_correlation_back(mode, shift_back) if not result2.status: PublicFunc.text_output(self.ui, "(2/3)" + result2.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result2.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/3)" + result2.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 3, 3, Constants.DRAWING_DATA, 60) result = {} result.update(result1.data) result.update(result2.data) result.update({"mode": mode}) if mode == "init": result = self.__plot__(result) elif mode == "select": result = self.redraw_calculate_coordination(result) else: raise ValueError("模式不存在") if not result.status: PublicFunc.text_output(self.ui, "(3/3)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/3)" + result.info, Constants.TIPS_TYPE_INFO) self.figToolbar.action_Get_Range.setEnabled(False) self.figToolbar.deactivate_figToorbar_getRange_mode() ButtonState["Current"].update(ButtonState["Statue_2"].copy()) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_correlation_align__(self): PublicFunc.__disableAllButton__(self, ButtonState) sender = self.sender() if sender == self.ui.pushButton_correlation_align: mode = "init" else: mode = "select" self.ax4_xlime = self.ax4.get_xlim() self.ax4_ylime = self.ax4.get_ylim() # 处理相关对齐 PublicFunc.progressbar_update(self, 1, 2, Constants.PRECISELY_ALIGN_ALIGNING_CORRELATION, 0) result = self.data.correlation_align(mode) if not result.status: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 50) result.data.update({"mode": mode}) if mode == "init": result = self.__plot__(result.data) elif mode == "select": result = self.redraw_correlation_align(result.data) else: raise ValueError("模式不存在") if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"].update(ButtonState["Statue_3"].copy()) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_view_align__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 数据后处理 PublicFunc.progressbar_update(self, 1, 2, Constants.PRECISELY_ALIGN_POSTPROCESSING_VIEW, 0) result = self.data.data_postprocess() if not result.status: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 50) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"].update(ButtonState["Statue_4"].copy()) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_save__(self): reply = QMessageBox.question(self, Constants.QUESTION_TITLE, Constants.QUESTION_CONTENT, QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) # 保存对齐信息 PublicFunc.progressbar_update(self, 1, 6, Constants.PRECISELY_ALIGN_SAVING_ALIGNINFO, 0) result = self.data.save_alignInfo() if not result.status: PublicFunc.text_output(self.ui, "(1/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后orgBcg PublicFunc.progressbar_update(self, 2, 6, Constants.PRECISELY_ALIGN_SAVING_RES_ORGBCG, 0) total_rows = len(DataFrame(self.data.res_orgBcg.reshape(-1))) chunk_size = Params.PRECISELY_ALIGN_SAVE_CHUNK_SIZE try: with open(Config["Path"]["Save_OrgBCG"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.res_orgBcg.reshape(-1)).iloc[start:end] result = self.data.save_res_orgBcg(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() except FileNotFoundError as e: result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"]) if not result.status: PublicFunc.text_output(self.ui, "(2/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后BCG PublicFunc.progressbar_update(self, 3, 6, Constants.PRECISELY_ALIGN_SAVING_RES_BCG, 0) total_rows = len(DataFrame(self.data.res_BCG.reshape(-1))) chunk_size = Params.PRECISELY_ALIGN_SAVE_CHUNK_SIZE try: with open(Config["Path"]["Save_BCG"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.res_BCG.reshape(-1)).iloc[start:end] result = self.data.save_res_BCG(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() except FileNotFoundError as e: result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"]) if not result.status: PublicFunc.text_output(self.ui, "(3/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后ECG PublicFunc.progressbar_update(self, 4, 6, Constants.PRECISELY_ALIGN_SAVING_CUT_ECG, 0) total_rows = len(DataFrame(self.data.cut_ECG.reshape(-1))) chunk_size = Params.PRECISELY_ALIGN_SAVE_CHUNK_SIZE try: with open(Config["Path"]["Save_ECG"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.cut_ECG.reshape(-1)).iloc[start:end] result = self.data.save_cut_ECG(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() except FileNotFoundError as e: result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"]) if not result.status: PublicFunc.text_output(self.ui, "(4/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(4/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后J峰 PublicFunc.progressbar_update(self, 5, 6, Constants.PRECISELY_ALIGN_SAVING_CUT_JPEAK, 0) total_rows = len(DataFrame(self.data.cut_Jpeak.reshape(-1))) chunk_size = Params.PRECISELY_ALIGN_SAVE_PEAK_CHUNK_SIZE try: with open(Config["Path"]["Save_Jpeak"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.cut_Jpeak.reshape(-1)).iloc[start:end] result = self.data.save_Jpeak(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() except FileNotFoundError as e: result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"]) if not result.status: PublicFunc.text_output(self.ui, "(5/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(5/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后R峰 PublicFunc.progressbar_update(self, 6, 6, Constants.PRECISELY_ALIGN_SAVING_CUT_RPEAK, 0) total_rows = len(DataFrame(self.data.cut_Rpeak.reshape(-1))) chunk_size = Params.PRECISELY_ALIGN_SAVE_PEAK_CHUNK_SIZE try: with open(Config["Path"]["Save_Rpeak"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.cut_Rpeak.reshape(-1)).iloc[start:end] result = self.data.save_Rpeak(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() except FileNotFoundError as e: result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"]) if not result.status: PublicFunc.text_output(self.ui, "(6/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(6/6)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.msgbox_output(self, Constants.SAVE_FINISHED, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def __update_coordinate__(self): try: if self.data is not None: if self.data.Jpeak is None or self.data.Rpeak is None: PublicFunc.msgbox_output(self, Constants.FAILURE_REASON["Data_Not_Exist"], Constants.MSGBOX_TYPE_ERROR) return sender = self.sender() if sender == self.ui.spinBox_BCG_front_JJIV_1: if self.ui.spinBox_BCG_front_JJIV_1.value() >= len(self.data.Jpeak[:-2]): self.ui.spinBox_BCG_front_JJIV_1.setValue(len(self.data.Jpeak[:-2]) - 1) Config["IV_Coordinate"]["BCG_front_1"] = self.ui.spinBox_BCG_front_JJIV_1.value() Config["Coordinate"]["BCG_front_1"] = self.data.Jpeak[:-2][self.ui.spinBox_BCG_front_JJIV_1.value()] self.ui.spinBox_BCG_front_Signal_1.setValue(Config["Coordinate"]["BCG_front_1"]) elif sender == self.ui.spinBox_BCG_front_JJIV_2: if self.ui.spinBox_BCG_front_JJIV_2.value() >= len(self.data.Jpeak[:-2]): self.ui.spinBox_BCG_front_JJIV_2.setValue(len(self.data.Jpeak[:-2]) - 1) Config["IV_Coordinate"]["BCG_front_2"] = self.ui.spinBox_BCG_front_JJIV_2.value() Config["Coordinate"]["BCG_front_2"] = self.data.Jpeak[:-2][self.ui.spinBox_BCG_front_JJIV_2.value()] self.ui.spinBox_BCG_front_Signal_2.setValue(Config["Coordinate"]["BCG_front_2"]) estimate_ECG_front_1 = self.data.get_corresponding_interval(Config["Coordinate"]["BCG_front_1"]).data[ "new_point"] estimate_ECG_front_2 = self.data.get_corresponding_interval(Config["Coordinate"]["BCG_front_2"]).data[ "new_point"] PublicFunc.text_output(self.ui, Constants.CORRESPONDING_INTERVAL_PROMOTE_TEMPLATE.format( Config["Coordinate"]["BCG_front_1"], Config["Coordinate"]["BCG_front_2"], estimate_ECG_front_1, estimate_ECG_front_2 ), Constants.TIPS_TYPE_INFO) if self.ui.checkBox_ECG_autoset.isChecked(): extend_second = int(self.ui.spinBox_ECG_expend_second.value()) reponse = self.data.get_rriv_from_ecg_point( estimate_ECG_front_1, extend_second, "right").data Config["IV_Coordinate"]["ECG_front_1"] = reponse["estimate_RRIV"] Config["Coordinate"]["ECG_front_1"] = reponse["extend_point"] reponse = self.data.get_rriv_from_ecg_point( estimate_ECG_front_2, extend_second, "left").data Config["IV_Coordinate"]["ECG_front_2"] = reponse["estimate_RRIV"] Config["Coordinate"]["ECG_front_2"] = reponse["extend_point"] elif sender == self.ui.spinBox_BCG_back_JJIV_1: if self.ui.spinBox_BCG_back_JJIV_1.value() >= len(self.data.Jpeak[:-2]): self.ui.spinBox_BCG_back_JJIV_1.setValue(len(self.data.Jpeak[:-2]) - 1) Config["IV_Coordinate"]["BCG_back_1"] = self.ui.spinBox_BCG_back_JJIV_1.value() Config["Coordinate"]["BCG_back_1"] = self.data.Jpeak[:-2][self.ui.spinBox_BCG_back_JJIV_1.value()] self.ui.spinBox_BCG_back_Signal_1.setValue(Config["Coordinate"]["BCG_back_1"]) elif sender == self.ui.spinBox_BCG_back_JJIV_2: if self.ui.spinBox_BCG_back_JJIV_2.value() >= len(self.data.Jpeak[:-2]): self.ui.spinBox_BCG_back_JJIV_2.setValue(len(self.data.Jpeak[:-2]) - 1) Config["IV_Coordinate"]["BCG_back_2"] = self.ui.spinBox_BCG_back_JJIV_2.value() Config["Coordinate"]["BCG_back_2"] = self.data.Jpeak[:-2][self.ui.spinBox_BCG_back_JJIV_2.value()] self.ui.spinBox_BCG_back_Signal_2.setValue(Config["Coordinate"]["BCG_back_2"]) estimate_ECG_back_1 = self.data.get_corresponding_interval( Config["Coordinate"]["BCG_back_1"]).data["new_point"] estimate_ECG_back_2 = self.data.get_corresponding_interval( Config["Coordinate"]["BCG_back_2"]).data["new_point"] PublicFunc.text_output(self.ui, Constants.CORRESPONDING_INTERVAL_PROMOTE_TEMPLATE.format( Config["Coordinate"]["BCG_back_1"], Config["Coordinate"]["BCG_back_2"], estimate_ECG_back_1, estimate_ECG_back_2 ), Constants.TIPS_TYPE_INFO) if self.ui.checkBox_ECG_autoset.isChecked(): extend_second = int(self.ui.spinBox_ECG_expend_second.value()) reponse = self.data.get_rriv_from_ecg_point( estimate_ECG_back_1, extend_second, "right").data Config["IV_Coordinate"]["ECG_back_1"] = reponse["estimate_RRIV"] Config["Coordinate"]["ECG_back_1"] = reponse["extend_point"] reponse = self.data.get_rriv_from_ecg_point( estimate_ECG_back_2, extend_second, "left").data Config["IV_Coordinate"]["ECG_back_2"] = reponse["estimate_RRIV"] Config["Coordinate"]["ECG_back_2"] = reponse["extend_point"] elif sender == self.ui.spinBox_ECG_front_RRIV_1: if self.ui.spinBox_ECG_front_RRIV_1.value() >= len(self.data.Rpeak[:-2]): self.ui.spinBox_ECG_front_RRIV_1.setValue(len(self.data.Rpeak[:-2]) - 1) Config["IV_Coordinate"]["ECG_front_1"] = self.ui.spinBox_ECG_front_RRIV_1.value() Config["Coordinate"]["ECG_front_1"] = self.data.Rpeak[:-2][self.ui.spinBox_ECG_front_RRIV_1.value()] self.ui.spinBox_ECG_front_Signal_1.setValue(Config["Coordinate"]["ECG_front_1"]) elif sender == self.ui.spinBox_ECG_front_RRIV_2: if self.ui.spinBox_ECG_front_RRIV_2.value() >= len(self.data.Rpeak[:-2]): self.ui.spinBox_ECG_front_RRIV_2.setValue(len(self.data.Rpeak[:-2]) - 1) Config["IV_Coordinate"]["ECG_front_2"] = self.ui.spinBox_ECG_front_RRIV_2.value() Config["Coordinate"]["ECG_front_2"] = self.data.Rpeak[:-2][self.ui.spinBox_ECG_front_RRIV_2.value()] self.ui.spinBox_ECG_front_Signal_2.setValue(Config["Coordinate"]["ECG_front_2"]) elif sender == self.ui.spinBox_ECG_back_RRIV_1: if self.ui.spinBox_ECG_back_RRIV_1.value() >= len(self.data.Rpeak[:-2]): self.ui.spinBox_ECG_back_RRIV_1.setValue(len(self.data.Rpeak[:-2]) - 1) Config["IV_Coordinate"]["ECG_back_1"] = self.ui.spinBox_ECG_back_RRIV_1.value() Config["Coordinate"]["ECG_back_1"] = self.data.Rpeak[:-2][self.ui.spinBox_ECG_back_RRIV_1.value()] self.ui.spinBox_ECG_back_Signal_1.setValue(Config["Coordinate"]["ECG_back_1"]) elif sender == self.ui.spinBox_ECG_back_RRIV_2: if self.ui.spinBox_ECG_back_RRIV_2.value() >= len(self.data.Rpeak[:-2]): self.ui.spinBox_ECG_back_RRIV_2.setValue(len(self.data.Rpeak[:-2]) - 1) Config["IV_Coordinate"]["ECG_back_2"] = self.ui.spinBox_ECG_back_RRIV_2.value() Config["Coordinate"]["ECG_back_2"] = self.data.Rpeak[:-2][self.ui.spinBox_ECG_back_RRIV_2.value()] self.ui.spinBox_ECG_back_Signal_2.setValue(Config["Coordinate"]["ECG_back_2"]) except AttributeError as e: print(e) pass def reset_axes(self): for ax in self.fig.axes: self.fig.delaxes(ax) if self.ax0 is not None: self.ax0.clear() self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(Params.FORMATTER) if self.ax1 is not None: self.ax1.clear() self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(Params.FORMATTER) if self.ax2 is not None: self.ax2.clear() self.ax2.grid(True) self.ax2.xaxis.set_major_formatter(Params.FORMATTER) if self.ax3 is not None: self.ax3.clear() self.ax3.grid(True) self.ax3.xaxis.set_major_formatter(Params.FORMATTER) if self.ax4 is not None: self.ax4.clear() self.ax4.grid(True) self.ax4.xaxis.set_major_formatter(Params.FORMATTER) def redraw_calculate_coordination(self, plot_element=None): if plot_element is not None and plot_element["mode"] == "select": if self.selected_point0 is not None: self.selected_point0.remove() self.selected_point0 = None if self.selected_point1 is not None: self.selected_point1.remove() self.selected_point1 = None if self.stem_black0 is not None: self.stem_black0.remove() self.stem_black0 = None if self.stem_black1 is not None: self.stem_black1.remove() self.stem_black1 = None self.ax0.set_title( "front\ncorre_IIV: {}, corre_II: {}\nsame_sign_rate:{}, total_time_ratio: {}\nshift: {}, alignment offset: {} seconds\noffset_interval: {}, anchor_J: {}, anchor_R: {}".format( plot_element["front"]["correlation_IIV"], plot_element["front"]["correlation_II"], plot_element["front"]["same_sign_rate"], plot_element["front"]["total_time_ratio"], plot_element["front"]["shift"], plot_element["front"]["offset_interval_duration"], plot_element["front"]["offset_interval"], plot_element["front"]["anchor_J"], plot_element["front"]["anchor_R"])) self.ax2.set_title( "back\ncorre_IIV: {}, corre_II: {}\nsame_sign_rate:{}, total_time_ratio: {}\nshift: {}, alignment offset: {} seconds\noffset_interval: {}, anchor_J: {}, anchor_R: {}".format( plot_element["back"]["correlation_IIV"], plot_element["back"]["correlation_II"], plot_element["back"]["same_sign_rate"], plot_element["back"]["total_time_ratio"], plot_element["back"]["shift"], plot_element["back"]["offset_interval_duration"], plot_element["back"]["offset_interval"], plot_element["back"]["anchor_J"], plot_element["back"]["anchor_R"])) self.selected_point0, = self.ax0.plot(plot_element["front"]["shift"], plot_element["front"]["corre"][plot_element["front"]["shift"]] + 1, 'v', color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_SELECTED_POINT) self.stem_black0 = self.ax1.stem(arange(plot_element["front"]["shift"], plot_element["front"]["shift"] + len( plot_element["front"]["JJIVs"])), plot_element["front"]["JJIVs"], markerfmt="ko", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.selected_point1, = self.ax2.plot(plot_element["back"]["shift"], plot_element["back"]["corre"][plot_element["back"]["shift"]] + 1, 'v', color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_SELECTED_POINT) self.stem_black1 = self.ax3.stem(arange(plot_element["back"]["shift"], plot_element["back"]["shift"] + len(plot_element["back"]["JJIVs"])), plot_element["back"]["JJIVs"], markerfmt="ko", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.ax0.autoscale(False) self.ax2.autoscale(False) self.ax0.set_xlim(self.ax0_xlime) self.ax0.set_ylim(self.ax0_ylime) self.ax2.set_xlim(self.ax2_xlime) self.ax2.set_ylim(self.ax2_ylime) self.canvas.draw() return Result().success(info=Constants.DRAW_FINISHED) return Result().failure(info=Constants.DRAW_FAILURE) def redraw_correlation_align(self, plot_element=None): if plot_element is not None and plot_element["mode"] == "select": if self.selected_point0 is not None: self.selected_point0.remove() self.selected_point0 = None if self.selected_point1 is not None: self.selected_point1.remove() self.selected_point1 = None if self.selected_point2 is not None: self.selected_point2.remove() self.selected_point2 = None if self.selected_point3 is not None: self.selected_point3.remove() self.selected_point3 = None if len(self.data.correlation_align_point_match_ECG) > 0: self.selected_point0, = self.ax4.plot( [self.data.correlation_align_point_match_ECG[0], self.data.correlation_align_point_match_ECG[0]], [plot_element["b"], plot_element["a"]], 'b--') if len(self.data.correlation_align_point_match_ECG) == 2: self.selected_point1, = self.ax4.plot( [self.data.correlation_align_point_match_ECG[1], self.data.correlation_align_point_match_ECG[1]], [plot_element["b"], plot_element["a"]], 'b--') if len(self.data.correlation_align_point_match_BCG) > 0: self.selected_point2, = self.ax4.plot( [self.data.correlation_align_point_match_BCG[0], self.data.correlation_align_point_match_BCG[0]], [plot_element["b"], plot_element["a"]], 'r--') if len(self.data.correlation_align_point_match_BCG) == 2: self.selected_point3, = self.ax4.plot( [self.data.correlation_align_point_match_BCG[1], self.data.correlation_align_point_match_BCG[1]], [plot_element["b"], plot_element["a"]], 'r--') self.ax4.autoscale(False) self.ax4.set_xlim(self.ax4_xlime) self.ax4.set_ylim(self.ax4_ylime) self.canvas.draw() return Result().success(info=Constants.DRAW_FINISHED) return Result().failure(info=Constants.DRAW_FAILURE) def toggle_home(self): if self.ax0 is not None: self.ax0.autoscale(True) self.ax0.relim() self.ax0.autoscale_view() self.canvas.draw() self.ax0.autoscale(False) if self.ax1 is not None: self.ax1.autoscale(True) self.ax1.relim() self.ax1.autoscale_view() self.canvas.draw() self.ax1.autoscale(False) if self.ax2 is not None: self.ax2.autoscale(True) self.ax2.relim() self.ax2.autoscale_view() self.canvas.draw() self.ax2.autoscale(False) if self.ax3 is not None: self.ax3.autoscale(True) self.ax3.relim() self.ax3.autoscale_view() self.canvas.draw() self.ax3.autoscale(False) if self.ax4 is not None: self.ax4.autoscale(True) self.ax4.relim() self.ax4.autoscale_view() self.canvas.draw() self.ax4.autoscale(False) PublicFunc.text_output(self.ui, Constants.PRECISELY_ALIGN_RECOVER_SCALE, Constants.TIPS_TYPE_INFO) def toggle_getRange(self, state): if state: self.deactivate_figToolbar_buttons() self.figToolbar.action_Get_Range.setChecked(True) self.figToolbar.cid_mouse_press = self.canvas.mpl_connect( "button_press_event", self.on_click) self.figToolbar.cid_mouse_release = self.canvas.mpl_connect( "button_release_event", self.on_release) self.figToolbar.cid_mouse_hold = self.canvas.mpl_connect( "motion_notify_event", self.on_hold) else: if self.figToolbar.cid_mouse_press is not None: self.canvas.mpl_disconnect(self.figToolbar.cid_mouse_press) self.figToolbar.cid_mouse_press = None if self.figToolbar.cid_mouse_release is not None: self.canvas.mpl_disconnect(self.figToolbar.cid_mouse_release) self.figToolbar.cid_mouse_release = None if self.figToolbar.cid_mouse_hold: self.canvas.mpl_disconnect(self.figToolbar.cid_mouse_hold) self.figToolbar.cid_mouse_hold = None def deactivate_figToolbar_buttons(self): for action in self.figToolbar._actions.values(): if action.isChecked() == True: if action == self.figToolbar._actions['pan']: self.figToolbar.pan() if action == self.figToolbar._actions['zoom']: self.figToolbar.zoom() def on_click(self, event): if self.figToolbar.action_Get_Range.isChecked(): if event.button == 1: self.is_left_button_pressed = True self.figToolbar.rect_start_x = event.xdata # 如果矩形patch已存在,先移除 if self.ui.radioButton_BCG_front.isChecked(): if self.figToolbar.ax0_BCG_rectangle_front is not None: self.figToolbar.ax0_BCG_rectangle_front.remove() self.figToolbar.ax0_BCG_rectangle_front = None elif self.ui.radioButton_BCG_back.isChecked(): if self.figToolbar.ax0_BCG_rectangle_back is not None: self.figToolbar.ax0_BCG_rectangle_back.remove() self.figToolbar.ax0_BCG_rectangle_back = None elif self.ui.radioButton_ECG_front.isChecked(): if self.figToolbar.ax1_ECG_rectangle_front is not None: self.figToolbar.ax1_ECG_rectangle_front.remove() self.figToolbar.ax1_ECG_rectangle_front = None elif self.ui.radioButton_ECG_back.isChecked(): if self.figToolbar.ax1_ECG_rectangle_back is not None: self.figToolbar.ax1_ECG_rectangle_back.remove() self.figToolbar.ax1_ECG_rectangle_back = None else: raise ValueError("模式未选择") self.canvas.draw() def on_release(self, event): if self.figToolbar.action_Get_Range.isChecked(): if self.figToolbar.rect_start_x is not None: self.figToolbar.rect_end_x = event.xdata if self.figToolbar.rect_start_x is not None and self.figToolbar.rect_end_x is not None: if self.figToolbar.rect_start_x < self.figToolbar.rect_end_x: rect_left = self.figToolbar.rect_start_x rect_right = self.figToolbar.rect_end_x elif self.figToolbar.rect_start_x > self.figToolbar.rect_end_x: rect_left = self.figToolbar.rect_end_x rect_right = self.figToolbar.rect_start_x else: rect_left = self.figToolbar.rect_start_x rect_right = self.figToolbar.rect_start_x else: rect_left = self.figToolbar.rect_start_x rect_right = self.figToolbar.rect_start_x if event.button == 1 and self.is_left_button_pressed: self.is_left_button_pressed = False if self.ui.radioButton_BCG_front.isChecked() or self.ui.radioButton_BCG_back.isChecked(): if rect_left < 0: rect_left = 0 elif rect_left >= len(self.data.raw_BCG): rect_left = 0 rect_right = 0 if rect_right >= len(self.data.raw_BCG): rect_right = len(self.data.raw_BCG) - 1 elif rect_right < 0: rect_left = 0 rect_right = 0 indices = where((self.data.Jpeak[:-2] >= rect_left) & (self.data.Jpeak[:-2] <= rect_right))[0] if indices is None or len(indices) <= 0: if self.ui.radioButton_BCG_front.isChecked(): Config["IV_Coordinate"]["BCG_front_1"] = 0 Config["IV_Coordinate"]["BCG_front_2"] = 0 Config["Coordinate"]["BCG_front_1"] = 0 Config["Coordinate"]["BCG_front_2"] = 0 elif self.ui.radioButton_BCG_back.isChecked(): Config["IV_Coordinate"]["BCG_back_1"] = 0 Config["IV_Coordinate"]["BCG_back_2"] = 0 Config["Coordinate"]["BCG_back_1"] = 0 Config["Coordinate"]["BCG_back_2"] = 0 PublicFunc.text_output(self.ui, Constants.PRECISELY_ALIGN_NO_POINT_IN_THE_INTERVAL, Constants.TIPS_TYPE_INFO) else: if self.ui.radioButton_BCG_front.isChecked(): Config["IV_Coordinate"]["BCG_front_1"] = indices[0] Config["IV_Coordinate"]["BCG_front_2"] = indices[-1] Config["Coordinate"]["BCG_front_1"] = self.data.Jpeak[:-2][indices[0]] Config["Coordinate"]["BCG_front_2"] = self.data.Jpeak[:-2][indices[-1]] estimate_ECG_front_1 = self.data.get_corresponding_interval( Config["Coordinate"]["BCG_front_1"]).data["new_point"] estimate_ECG_front_2 = self.data.get_corresponding_interval( Config["Coordinate"]["BCG_front_2"]).data["new_point"] PublicFunc.text_output( self.ui, Constants.CORRESPONDING_INTERVAL_PROMOTE_TEMPLATE.format( Config["Coordinate"]["BCG_front_1"], Config["Coordinate"]["BCG_front_2"], estimate_ECG_front_1, estimate_ECG_front_2 ), Constants.TIPS_TYPE_INFO) if self.ui.checkBox_ECG_autoset.isChecked(): extend_second = int(self.ui.spinBox_ECG_expend_second.value()) reponse = self.data.get_rriv_from_ecg_point( estimate_ECG_front_1, extend_second, "right").data Config["IV_Coordinate"]["ECG_front_1"] = reponse["estimate_RRIV"] Config["Coordinate"]["ECG_front_1"] = reponse["extend_point"] reponse = self.data.get_rriv_from_ecg_point( estimate_ECG_front_2, extend_second, "left").data Config["IV_Coordinate"]["ECG_front_2"] = reponse["estimate_RRIV"] Config["Coordinate"]["ECG_front_2"] = reponse["extend_point"] elif self.ui.radioButton_BCG_back.isChecked(): Config["IV_Coordinate"]["BCG_back_1"] = indices[0] Config["IV_Coordinate"]["BCG_back_2"] = indices[-1] Config["Coordinate"]["BCG_back_1"] = self.data.Jpeak[:-2][indices[0]] Config["Coordinate"]["BCG_back_2"] = self.data.Jpeak[:-2][indices[-1]] estimate_ECG_back_1 = self.data.get_corresponding_interval( Config["Coordinate"]["BCG_back_1"]).data["new_point"] estimate_ECG_back_2 = self.data.get_corresponding_interval( Config["Coordinate"]["BCG_back_2"]).data["new_point"] PublicFunc.text_output( self.ui, Constants.CORRESPONDING_INTERVAL_PROMOTE_TEMPLATE.format( Config["Coordinate"]["BCG_back_1"], Config["Coordinate"]["BCG_back_2"], estimate_ECG_back_1, estimate_ECG_back_2 ), Constants.TIPS_TYPE_INFO) if self.ui.checkBox_ECG_autoset.isChecked(): extend_second = int(self.ui.spinBox_ECG_expend_second.value()) reponse = self.data.get_rriv_from_ecg_point( estimate_ECG_back_1, extend_second, "right").data Config["IV_Coordinate"]["ECG_back_1"] = reponse["estimate_RRIV"] Config["Coordinate"]["ECG_back_1"] = reponse["extend_point"] reponse = self.data.get_rriv_from_ecg_point( estimate_ECG_back_2, extend_second, "left").data Config["IV_Coordinate"]["ECG_back_2"] = reponse["estimate_RRIV"] Config["Coordinate"]["ECG_back_2"] = reponse["extend_point"] elif self.ui.radioButton_ECG_front.isChecked() or self.ui.radioButton_ECG_back.isChecked(): if rect_left < 0: rect_left = 0 elif rect_left >= len(self.data.raw_ECG): rect_left = 0 rect_right = 0 if rect_right >= len(self.data.raw_ECG): rect_right = len(self.data.raw_ECG) - 1 elif rect_right < 0: rect_left = 0 rect_right = 0 indices = where((self.data.Rpeak[:-2] >= rect_left) & (self.data.Rpeak[:-2] <= rect_right))[0] if indices is None or len(indices) <= 0: if self.ui.radioButton_ECG_front.isChecked(): Config["IV_Coordinate"]["ECG_front_1"] = 0 Config["IV_Coordinate"]["ECG_front_2"] = 0 Config["Coordinate"]["ECG_front_1"] = 0 Config["Coordinate"]["ECG_front_2"] = 0 elif self.ui.radioButton_ECG_back.isChecked(): Config["IV_Coordinate"]["ECG_back_1"] = 0 Config["IV_Coordinate"]["ECG_back_2"] = 0 Config["Coordinate"]["ECG_back_1"] = 0 Config["Coordinate"]["ECG_back_2"] = 0 PublicFunc.text_output(self.ui, Constants.PRECISELY_ALIGN_NO_POINT_IN_THE_INTERVAL, Constants.TIPS_TYPE_INFO) else: if self.ui.radioButton_ECG_front.isChecked(): Config["IV_Coordinate"]["ECG_front_1"] = indices[0] Config["IV_Coordinate"]["ECG_front_2"] = indices[-1] Config["Coordinate"]["ECG_front_1"] = self.data.Rpeak[:-2][indices[0]] Config["Coordinate"]["ECG_front_2"] = self.data.Rpeak[:-2][indices[-1]] elif self.ui.radioButton_ECG_back.isChecked(): Config["IV_Coordinate"]["ECG_back_1"] = indices[0] Config["IV_Coordinate"]["ECG_back_2"] = indices[-1] Config["Coordinate"]["ECG_back_1"] = self.data.Rpeak[:-2][indices[0]] Config["Coordinate"]["ECG_back_2"] = self.data.Rpeak[:-2][indices[-1]] self.figToolbar.rect_start_x = None self.figToolbar.rect_end_x = None self.__update_info__() self.canvas.draw() def on_hold(self, event): if event.button == 1: if self.figToolbar.rect_start_x is not None and event.xdata is not None: self.figToolbar.rect_end_x = event.xdata # 更新矩形patch的位置和大小 x_start = self.figToolbar.rect_start_x x_end = self.figToolbar.rect_end_x # 如果矩形patch不存在,则创建一个新的 if self.figToolbar.ax0_BCG_rectangle_front is None and self.is_left_button_pressed: self.figToolbar.ax0_BCG_rectangle_front = patches.Rectangle((0, 0), 1, 1, fill=True, alpha=Params.PRECISELY_ALIGN_LABEL_TRANSPARENCY, color=Constants.PLOT_COLOR_PINK) self.ax0.add_patch(self.figToolbar.ax0_BCG_rectangle_front) if self.figToolbar.ax0_BCG_rectangle_back is None and self.is_left_button_pressed: self.figToolbar.ax0_BCG_rectangle_back = patches.Rectangle((0, 0), 1, 1, fill=True, alpha=Params.PRECISELY_ALIGN_LABEL_TRANSPARENCY, color=Constants.PLOT_COLOR_PINK) self.ax0.add_patch(self.figToolbar.ax0_BCG_rectangle_back) if self.figToolbar.ax1_ECG_rectangle_front is None and self.is_left_button_pressed: self.figToolbar.ax1_ECG_rectangle_front = patches.Rectangle((0, 0), 1, 1, fill=True, alpha=Params.PRECISELY_ALIGN_LABEL_TRANSPARENCY, color=Constants.PLOT_COLOR_PINK) self.ax1.add_patch(self.figToolbar.ax1_ECG_rectangle_front) if self.figToolbar.ax1_ECG_rectangle_back is None and self.is_left_button_pressed: self.figToolbar.ax1_ECG_rectangle_back = patches.Rectangle((0, 0), 1, 1, fill=True, alpha=Params.PRECISELY_ALIGN_LABEL_TRANSPARENCY, color=Constants.PLOT_COLOR_PINK) self.ax1.add_patch(self.figToolbar.ax1_ECG_rectangle_back) if self.ui.radioButton_BCG_front.isChecked(): self.figToolbar.ax0_BCG_rectangle_front.set_xy((min(x_start, x_end), self.rect_down)) self.figToolbar.ax0_BCG_rectangle_front.set_width(abs(x_end - x_start)) self.figToolbar.ax0_BCG_rectangle_front.set_height(self.rect_up - self.rect_down) elif self.ui.radioButton_BCG_back.isChecked(): self.figToolbar.ax0_BCG_rectangle_back.set_xy((min(x_start, x_end), self.rect_down)) self.figToolbar.ax0_BCG_rectangle_back.set_width(abs(x_end - x_start)) self.figToolbar.ax0_BCG_rectangle_back.set_height(self.rect_up - self.rect_down) elif self.ui.radioButton_ECG_front.isChecked(): self.figToolbar.ax1_ECG_rectangle_front.set_xy((min(x_start, x_end), self.rect_down)) self.figToolbar.ax1_ECG_rectangle_front.set_width(abs(x_end - x_start)) self.figToolbar.ax1_ECG_rectangle_front.set_height(self.rect_up - self.rect_down) elif self.ui.radioButton_ECG_back.isChecked(): self.figToolbar.ax1_ECG_rectangle_back.set_xy((min(x_start, x_end), self.rect_down)) self.figToolbar.ax1_ECG_rectangle_back.set_width(abs(x_end - x_start)) self.figToolbar.ax1_ECG_rectangle_back.set_height(self.rect_up - self.rect_down) self.canvas.draw() def on_pick(self, event): this_line = event.artist if this_line.axes == self.ax0: xdata = this_line.get_xdata() ind = event.ind[-1] shift_front = int(xdata[ind]) self.__slot_btn_calculate_correlation__(shift_front=shift_front) elif this_line.axes == self.ax2: xdata = this_line.get_xdata() ind = event.ind[-1] shift_back = int(xdata[ind]) self.__slot_btn_calculate_correlation__(shift_back=shift_back) elif this_line.axes == self.ax4: xdata = this_line.get_xdata() ind = event.ind[-1] nm = int(xdata[ind]) if this_line == self.point0: if nm in self.data.correlation_align_point_match_ECG: self.data.correlation_align_point_match_ECG = delete(self.data.correlation_align_point_match_ECG, where( self.data.correlation_align_point_match_ECG == nm)[ 0]) elif len(self.data.correlation_align_point_match_ECG) < 2: self.data.correlation_align_point_match_ECG = append(self.data.correlation_align_point_match_ECG, nm) elif this_line == self.point1: if nm in self.data.correlation_align_point_match_BCG: self.data.correlation_align_point_match_BCG = delete(self.data.correlation_align_point_match_BCG, where( self.data.correlation_align_point_match_BCG == nm)[ 0]) elif len(self.data.correlation_align_point_match_BCG) < 2: self.data.correlation_align_point_match_BCG = append(self.data.correlation_align_point_match_BCG, nm) else: raise ValueError("this_line不存在") self.__slot_btn_correlation_align__() class Data: def __init__(self): self.raw_orgBcg = None self.raw_BCG = None self.Jpeak = None self.Jpeak_y = None self.raw_ECG = None self.Rpeak = None self.Rpeak_y = None self.approximately_align_pos = None self.approximately_align_estimated_freq = None self.approximately_align_slope = None self.approximately_align_intercept = None self.BCG_early = None self.res_orgBcg = None self.res_BCG = None self.cut_ECG = None self.cut_Jpeak = None self.cut_Rpeak = None self.RRIs = None self.JJIs = None self.JJIs0_front = None self.RRIs0_front = None self.JJIVs_front = None self.RRIVs_front = None self.JJIs0_back = None self.RRIs0_back = None self.JJIVs_back = None self.RRIVs_back = None self.corre_front = None self.corre_back = None self.correlation_align_point_match_ECG = array([]).astype(int) self.correlation_align_point_match_BCG = array([]).astype(int) self.argmax_BCG = None self.argmax_ECG = None def open_file(self): if Path(Config["Path"]["Input_OrgBCG"]).is_file(): Config["Path"]["Input_OrgBCG"] = str(Path(Config["Path"]["Input_OrgBCG"]).parent) if Path(Config["Path"]["Input_BCG"]).is_file(): Config["Path"]["Input_BCG"] = str(Path(Config["Path"]["Input_BCG"]).parent) if Path(Config["Path"]["Input_Jpeak"]).is_file(): Config["Path"]["Input_Jpeak"] = str(Path(Config["Path"]["Input_Jpeak"]).parent) if Path(Config["Path"]["Input_ECG"]).is_file(): Config["Path"]["Input_ECG"] = str(Path(Config["Path"]["Input_ECG"]).parent) if Path(Config["Path"]["Input_Rpeak"]).is_file(): Config["Path"]["Input_Rpeak"] = str(Path(Config["Path"]["Input_Rpeak"]).parent) if Path(Config["Path"]["Input_Approximately_Align"]).is_file(): Config["Path"]["Input_Approximately_Align"] = str(Path(Config["Path"]["Input_Approximately_Align"]).parent) result = PublicFunc.examine_file(Config["Path"]["Input_OrgBCG"], Filename.ORGBCG_RAW, Params.ENDSWITH_TXT) if result.status: Config["Path"]["Input_OrgBCG"] = result.data["path"] Config["InputConfig"]["orgBcgFreq"] = result.data["freq"] else: return result Config["Path"]["Input_Approximately_Align"] = str( Path(Config["Path"]["Input_Approximately_Align"]) / Path( Filename.APPROXIMATELY_ALIGN_INFO + Params.ENDSWITH_CSV)) Config["Path"]["Save_AlignInfo"] = str( Path(Config["Path"]["Save_AlignInfo"]) / Path( Filename.PRECISELY_ALIGN_INFO + Params.ENDSWITH_TXT)) Config["Path"]["Save_OrgBCG"] = str( Path(Config["Path"]["Save_OrgBCG"]) / Path( Filename.ORGBCG_SYNC + str(Config["InputConfig"]["orgBcgFreq"]) + Params.ENDSWITH_TXT)) result = PublicFunc.examine_file(Config["Path"]["Input_BCG"], Filename.BCG_FILTER, Params.ENDSWITH_TXT) if result.status: Config["Path"]["Input_BCG"] = result.data["path"] Config["InputConfig"]["BCGFreq"] = result.data["freq"] else: return result Config["Path"]["Input_Jpeak"] = str( Path(Config["Path"]["Input_Jpeak"]) / Path(Filename.JPEAK_REVISE_CORRECTED + str( Config["InputConfig"]["BCGFreq"]) + Params.ENDSWITH_TXT)) Config["Path"]["Save_BCG"] = str( Path(Config["Path"]["Save_BCG"]) / Path( Filename.BCG_SYNC + str(Config["InputConfig"]["BCGFreq"]) + Params.ENDSWITH_TXT)) Config["Path"]["Save_Jpeak"] = str( Path(Config["Path"]["Save_Jpeak"]) / Path( Filename.JPEAK_SYNC + str(Config["InputConfig"]["BCGFreq"]) + Params.ENDSWITH_TXT)) result = PublicFunc.examine_file(Config["Path"]["Input_ECG"], Filename.ECG_FILTER, Params.ENDSWITH_TXT) if result.status: Config["Path"]["Input_ECG"] = result.data["path"] Config["InputConfig"]["ECGFreq"] = result.data["freq"] else: return result Config["Path"]["Input_Rpeak"] = str( Path(Config["Path"]["Input_Rpeak"]) / Path( Filename.RPEAK_FINAL_CORRECTED + str(Config["InputConfig"]["ECGFreq"]) + Params.ENDSWITH_TXT)) Config["Path"]["Save_ECG"] = str( Path(Config["Path"]["Save_ECG"]) / Path( Filename.ECG_SYNC + str(Config["InputConfig"]["ECGFreq"]) + Params.ENDSWITH_TXT)) Config["Path"]["Save_Rpeak"] = str( Path(Config["Path"]["Save_Rpeak"]) / Path( Filename.RPEAK_SYNC + str(Config["InputConfig"]["ECGFreq"]) + Params.ENDSWITH_TXT)) if not Path(Config["Path"]["Input_Jpeak"]).exists(): return Result().failure(info=Constants.INPUT_FAILURE + "\n" + Filename.JPEAK_REVISE_CORRECTED + ":" + Config["Path"]["Input_Jpeak"] + Constants.FAILURE_REASON["Path_Not_Exist"]) if not Path(Config["Path"]["Input_Rpeak"]).exists(): return Result().failure(info=Constants.INPUT_FAILURE + "\n" + Filename.RPEAK_FINAL_CORRECTED + ":" + Config["Path"]["Input_Rpeak"] + Constants.FAILURE_REASON["Path_Not_Exist"]) if not Path(Config["Path"]["Input_Approximately_Align"]).exists(): return Result().failure(info=Constants.INPUT_FAILURE + "\n" + Filename.APPROXIMATELY_ALIGN_INFO + ":" + Config["Path"]["Input_Approximately_Align"] + Constants.FAILURE_REASON["Path_Not_Exist"]) try: self.raw_orgBcg = read_csv(Config["Path"]["Input_OrgBCG"], encoding=Params.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.raw_BCG = read_csv(Config["Path"]["Input_BCG"], encoding=Params.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.Jpeak = read_csv(Config["Path"]["Input_Jpeak"], encoding=Params.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.raw_ECG = read_csv(Config["Path"]["Input_ECG"], encoding=Params.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.Rpeak = read_csv(Config["Path"]["Input_Rpeak"], encoding=Params.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.argmax_BCG = np_argmax(self.raw_BCG) self.argmax_ECG = np_argmax(self.raw_ECG) except Exception as e: return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc()) try: df = read_csv(Config["Path"]["Input_Approximately_Align"]) pos = df["pos"].values[-1] ApplyFrequency = df["ApplyFrequency"].values[-1] self.approximately_align_pos = int(pos * (Config["InputConfig"]["UseFreq"] / ApplyFrequency)) if self.approximately_align_pos > 0: self.BCG_early = False elif self.approximately_align_pos < 0: self.BCG_early = True self.approximately_align_pos = - self.approximately_align_pos else: self.approximately_align_pos = 0 self.BCG_early = None self.approximately_align_estimated_freq = df["estimate_freq"].values[-1] self.approximately_align_slope = df["estimate_slope"].values[-1] / \ Params.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["Second_PerEpoch"] self.approximately_align_intercept = df["estimate_intercept"].values[-1] * Config["InputConfig"]["UseFreq"] except Exception: self.approximately_align_pos = 0 return Result().success(info=Constants.INPUT_FINISHED) def resample(self): if self.raw_orgBcg is None or self.raw_BCG is None or self.raw_ECG is None: return Result().failure(info=Constants.RESAMPLE_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: if Config["InputConfig"]["orgBcgFreq"] != Config["InputConfig"]["UseFreq"]: self.raw_orgBcg = resample(self.raw_orgBcg, int(len(self.raw_orgBcg) * (Config["InputConfig"]["UseFreq"] / Config["InputConfig"][ "orgBcgFreq"]))) else: return Result().success(info=Constants.RESAMPLE_NO_NEED) except Exception as e: return Result().failure(info=Constants.RESAMPLE_FAILURE + Constants.FAILURE_REASON["Resample_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.RESAMPLE_FINISHED) def data_process_for_calculate_correlation(self): if self.Jpeak is None or self.Rpeak is None: return Result().failure(info=Constants.PRECISELY_ALIGN_PROCESS_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: self.JJIs = diff(self.Jpeak) JJIVs = diff(self.JJIs) self.RRIs = diff(self.Rpeak) RRIVs = diff(self.RRIs) result = {"JJIVs": JJIVs, "RRIVs": RRIVs} except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_PROCESS_FAILURE + Constants.FAILURE_REASON["Process_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_PROCESS_FINISHED, data=result) def calculate_correlation_front(self, mode, shift_front=None): if ((Config["IV_Coordinate"]["BCG_front_1"] == Config["IV_Coordinate"]["BCG_front_2"]) or (Config["IV_Coordinate"]["ECG_front_1"] == Config["IV_Coordinate"]["ECG_front_2"])): return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_FRONT + Constants.FAILURE_REASON["Calculate_Correlation_Value_Equal"]) if ((Config["IV_Coordinate"]["BCG_front_2"] - Config["IV_Coordinate"]["BCG_front_1"]) >= (Config["IV_Coordinate"]["ECG_front_2"] - Config["IV_Coordinate"]["ECG_front_1"])): return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_FRONT + Constants.FAILURE_REASON["Calculate_Correlation_JJIVRange_too_Large"]) try: if mode == "init": self.JJIs0_front = self.JJIs[ Config["IV_Coordinate"]["BCG_front_1"]:Config["IV_Coordinate"]["BCG_front_2"]] self.RRIs0_front = self.RRIs[ Config["IV_Coordinate"]["ECG_front_1"]:Config["IV_Coordinate"]["ECG_front_2"]] self.JJIVs_front = diff(self.JJIs0_front) self.RRIVs_front = diff(self.RRIs0_front) self.corre_front = correlate(self.RRIVs_front, self.JJIVs_front, 'valid') shift = np_argmax(self.corre_front) else: if shift_front is None: shift = Config["front"]["shift"] else: shift = shift_front RRIVs_cut = self.RRIVs_front[shift:shift + len(self.JJIVs_front)] RRIs_cut = self.RRIs0_front[shift:shift + len(self.JJIs0_front)] correlation = corrcoef(RRIVs_cut, self.JJIVs_front) correlation_IIV = (correlation[0, 1] + correlation[1, 0]) / 2 correlation = corrcoef(RRIs_cut, self.JJIs0_front) correlation_II = (correlation[0, 1] + correlation[1, 0]) / 2 same_sign_rate = np_sum(RRIVs_cut * self.JJIVs_front > 0) / len(self.JJIVs_front) total_time_ratio = np_sum(self.JJIs0_front) / np_sum(self.RRIs0_front[shift:shift + len(self.JJIs0_front)]) Jpeak_cut = self.Jpeak[Config["IV_Coordinate"]["BCG_front_1"]:Config["IV_Coordinate"]["BCG_front_2"] + 2] Rpeak_cut = self.Rpeak[Config["IV_Coordinate"]["ECG_front_1"]:Config["IV_Coordinate"]["ECG_front_2"] + 2] RRI = diff(Rpeak_cut) offset_interval = np_sum(RRI[:shift]) - (Jpeak_cut[0] - Rpeak_cut[0]) anchor_J = self.Jpeak[Config["IV_Coordinate"]["BCG_front_1"]] anchor_R = self.Rpeak[Config["IV_Coordinate"]["ECG_front_1"] + shift] offset_interval_duration = offset_interval / Config["InputConfig"]["UseFreq"] Config["front"]["shift"] = shift Config["front"]["offset_interval"] = offset_interval Config["front"]["anchor_J"] = anchor_J Config["front"]["anchor_R"] = anchor_R Config["front"]["offset_interval_duration"] = offset_interval_duration result = { "front": { "correlation_IIV": correlation_IIV, "correlation_II": correlation_II, "same_sign_rate": same_sign_rate, "total_time_ratio": total_time_ratio, "shift": shift, "corre": self.corre_front, "JJIVs": self.JJIVs_front, "RRIVs": self.RRIVs_front, "offset_interval": offset_interval, "anchor_J": anchor_J, "anchor_R": anchor_R, "offset_interval_duration": offset_interval_duration } } except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_FRONT + Constants.FAILURE_REASON[ "Calculate_Correlation_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_CALCULATE_FINISHED_FRONT, data=result) def calculate_correlation_back(self, mode, shift_back=None): if ((Config["IV_Coordinate"]["BCG_back_1"] == Config["IV_Coordinate"]["BCG_back_2"]) or (Config["IV_Coordinate"]["ECG_back_1"] == Config["IV_Coordinate"]["ECG_back_2"])): return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_BACK + Constants.FAILURE_REASON["Calculate_Correlation_Value_Equal"]) if ((Config["IV_Coordinate"]["BCG_back_2"] - Config["IV_Coordinate"]["BCG_back_1"]) >= (Config["IV_Coordinate"]["ECG_back_2"] - Config["IV_Coordinate"]["ECG_back_1"])): return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_BACK + Constants.FAILURE_REASON["Calculate_Correlation_JJIVRange_too_Large"]) try: if mode == "init": self.JJIs0_back = self.JJIs[Config["IV_Coordinate"]["BCG_back_1"]:Config["IV_Coordinate"]["BCG_back_2"]] self.RRIs0_back = self.RRIs[Config["IV_Coordinate"]["ECG_back_1"]:Config["IV_Coordinate"]["ECG_back_2"]] self.JJIVs_back = diff(self.JJIs0_back) self.RRIVs_back = diff(self.RRIs0_back) self.corre_back = correlate(self.RRIVs_back, self.JJIVs_back, 'valid') shift = np_argmax(self.corre_back) else: if shift_back is None: shift = Config["back"]["shift"] else: shift = shift_back RRIVs_cut = self.RRIVs_back[shift:shift + len(self.JJIVs_back)] RRIs_cut = self.RRIs0_back[shift:shift + len(self.JJIs0_back)] correlation = corrcoef(RRIVs_cut, self.JJIVs_back) correlation_IIV = (correlation[0, 1] + correlation[1, 0]) / 2 correlation = corrcoef(RRIs_cut, self.JJIs0_back) correlation_II = (correlation[0, 1] + correlation[1, 0]) / 2 same_sign_rate = np_sum(RRIVs_cut * self.JJIVs_back > 0) / len(self.JJIVs_back) total_time_ratio = np_sum(self.JJIs0_back) / np_sum(self.RRIs0_back[shift:shift + len(self.JJIs0_back)]) Jpeak_cut = self.Jpeak[Config["IV_Coordinate"]["BCG_back_1"]:Config["IV_Coordinate"]["BCG_back_2"] + 2] Rpeak_cut = self.Rpeak[Config["IV_Coordinate"]["ECG_back_1"]:Config["IV_Coordinate"]["ECG_back_2"] + 2] RRI = diff(Rpeak_cut) offset_interval = np_sum(RRI[:shift]) - (Jpeak_cut[0] - Rpeak_cut[0]) anchor_J = self.Jpeak[Config["IV_Coordinate"]["BCG_back_1"]] anchor_R = self.Rpeak[Config["IV_Coordinate"]["ECG_back_1"] + shift] offset_interval_duration = offset_interval / Config["InputConfig"]["UseFreq"] Config["back"]["shift"] = shift Config["back"]["offset_interval"] = offset_interval Config["back"]["anchor_J"] = anchor_J Config["back"]["anchor_R"] = anchor_R Config["back"]["offset_interval_duration"] = offset_interval_duration result = { "back": { "correlation_IIV": correlation_IIV, "correlation_II": correlation_II, "same_sign_rate": same_sign_rate, "total_time_ratio": total_time_ratio, "shift": shift, "corre": self.corre_back, "JJIVs": self.JJIVs_back, "RRIVs": self.RRIVs_back, "offset_interval": offset_interval, "anchor_J": anchor_J, "anchor_R": anchor_R, "offset_interval_duration": offset_interval_duration } } except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_BACK + Constants.FAILURE_REASON[ "Calculate_Correlation_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_CALCULATE_FINISHED_BACK, data=result) def correlation_align(self, mode): try: if mode == "init": Config["orgfs"] = ((int(Config["back"]["anchor_J"]) - int(Config["front"]["anchor_J"])) * Config["InputConfig"]["UseFreq"] / (int(Config["back"]["anchor_R"]) - int(Config["front"]["anchor_R"]))) Config["offset_anchor"] = Config["front"]["anchor_R"] - Config["front"]["anchor_J"] self.res_orgBcg = self.raw_orgBcg.copy() self.res_BCG = self.raw_BCG.copy() self.cut_ECG = self.raw_ECG.copy() self.cut_Rpeak = self.Rpeak.copy() if Config["offset_anchor"] > 0: self.cut_ECG = self.cut_ECG[Config["offset_anchor"]:] Config["front"]["anchor_R"] = Config["front"]["anchor_R"] - Config["offset_anchor"] Config["back"]["anchor_R"] = Config["back"]["anchor_R"] - Config["offset_anchor"] idxs = where(self.cut_Rpeak > Config["offset_anchor"])[0] self.cut_Rpeak = self.cut_Rpeak[idxs] - Config["offset_anchor"] else: self.res_BCG = self.res_BCG[-Config["offset_anchor"]:] self.res_orgBcg = self.res_orgBcg[-Config["offset_anchor"]:] Config["front"]["anchor_J"] = Config["front"]["anchor_J"] + Config["offset_anchor"] Config["back"]["anchor_J"] = Config["back"]["anchor_J"] + Config["offset_anchor"] self.res_BCG = resample(self.res_BCG, Config["orgfs"], Config["InputConfig"]["UseFreq"]) self.res_orgBcg = resample(self.res_orgBcg, Config["orgfs"], Config["InputConfig"]["UseFreq"]) Config["front"]["anchor_J"] = round(int(Config["front"]["anchor_J"]) * Config["InputConfig"]["UseFreq"] / Config["orgfs"]) Config["back"]["anchor_J"] = round(int(Config["back"]["anchor_J"]) * Config["InputConfig"]["UseFreq"] / Config["orgfs"]) Config["offset_anchor"] = Config["back"]["anchor_R"] - Config["back"]["anchor_J"] if Config["offset_anchor"] > 0: self.cut_ECG = self.cut_ECG[Config["offset_anchor"]:] Config["front"]["anchor_R"] = Config["front"]["anchor_R"] - Config["offset_anchor"] Config["back"]["anchor_R"] = Config["back"]["anchor_R"] - Config["offset_anchor"] idxs = where(self.cut_Rpeak > Config["offset_anchor"])[0] self.cut_Rpeak = self.cut_Rpeak[idxs] - Config["offset_anchor"] else: self.res_BCG = self.res_BCG[-Config["offset_anchor"]:] self.res_orgBcg = self.res_orgBcg[-Config["offset_anchor"]:] Config["front"]["anchor_J"] = Config["front"]["anchor_J"] + Config["offset_anchor"] Config["back"]["anchor_J"] = Config["back"]["anchor_J"] + Config["offset_anchor"] datalen = np_min([len(self.cut_ECG), len(self.res_BCG)]) self.cut_ECG = self.cut_ECG[:datalen] self.res_BCG = self.res_BCG[:datalen] self.res_orgBcg = self.res_orgBcg[:datalen] a = np_max([np_max(self.cut_ECG), np_max(self.res_BCG)]) b = np_min([np_min(self.cut_ECG), np_min(self.res_BCG)]) peak_ECG, _ = find_peaks(self.cut_ECG) peak_BCG, _ = find_peaks(self.res_BCG) idxs = where(self.cut_Rpeak < datalen)[0] self.cut_Rpeak = self.cut_Rpeak[idxs] result = { "res_BCG": self.res_BCG, "cut_ECG": self.cut_ECG, "anchor00": Config["front"]["anchor_R"], "anchor10": Config["back"]["anchor_R"], "a": a, "b": b, "peak_ECG": peak_ECG, "peak_BCG": peak_BCG } elif mode == "select": a = np_max([np_max(self.raw_ECG), np_max(self.raw_BCG)]) b = np_min([np_min(self.raw_ECG), np_min(self.raw_BCG)]) result = { "a": a, "b": b } else: raise ValueError("模式不存在") except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_ALIGN_CORRELATION_FAILURE + Constants.FAILURE_REASON["Correlation_Align_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_ALIGN_CORRELATION_FINISHED, data=result) def data_postprocess(self): try: if len(self.correlation_align_point_match_ECG) != 2 and len(self.correlation_align_point_match_BCG) != 2: Config["offset_anchor"] = 0 else: self.correlation_align_point_match_ECG.sort() self.correlation_align_point_match_BCG.sort() Config["offset_anchor"] = round(((int(self.correlation_align_point_match_ECG[1]) - int( self.correlation_align_point_match_BCG[1])) + (int(self.correlation_align_point_match_ECG[0]) - int( self.correlation_align_point_match_BCG[0]))) / 2) if Config["offset_anchor"] > 0: self.cut_ECG = self.cut_ECG[Config["offset_anchor"]:] Config["front"]["anchor_R"] = Config["front"]["anchor_R"] - Config["offset_anchor"] Config["back"]["anchor_R"] = Config["back"]["anchor_R"] - Config["offset_anchor"] self.cut_Rpeak = self.cut_Rpeak[where(self.cut_Rpeak > Config["offset_anchor"])[0]] - Config["offset_anchor"] else: self.res_BCG = self.res_BCG[-Config["offset_anchor"]:] self.res_orgBcg = self.res_orgBcg[-Config["offset_anchor"]:] Config["front"]["anchor_J"] = Config["front"]["anchor_J"] + Config["offset_anchor"] Config["back"]["anchor_J"] = Config["back"]["anchor_J"] + Config["offset_anchor"] datalen = np_min([len(self.cut_ECG), len(self.res_BCG)]) self.cut_ECG = self.cut_ECG[:datalen] self.res_BCG = self.res_BCG[:datalen] self.res_orgBcg = self.res_orgBcg[:datalen] idxs = where(self.cut_Rpeak < datalen)[0] self.cut_Rpeak = self.cut_Rpeak[idxs] self.cut_Jpeak = [] peaks, _ = find_peaks(self.res_BCG) for i in self.cut_Rpeak: tmp = np_abs(peaks - i) idx = np_argmin(tmp) self.cut_Jpeak.append(peaks[idx]) self.cut_Jpeak = asarray(self.cut_Jpeak).astype(int) frontcut_index_BCG = int( (self.argmax_BCG - np_argmax(self.res_BCG) / Config["InputConfig"]["UseFreq"] * Config["orgfs"])) backcut_index_BCG = int(len(self.res_BCG) / Config["InputConfig"]["UseFreq"] * Config["orgfs"] + np_argmax( self.raw_BCG) - np_argmax(self.res_BCG) / Config["InputConfig"]["UseFreq"] * Config["orgfs"]) frontcut_index_ECG = self.argmax_ECG - np_argmax(self.cut_ECG) backcut_index_ECG = len(self.cut_ECG) + self.argmax_ECG - np_argmax(self.cut_ECG) print("修正前的值:front_BCG:{}, back_BCG:{}, front_ECG:{}, back_ECG:{}".format(frontcut_index_BCG, backcut_index_BCG, frontcut_index_ECG, backcut_index_ECG)) if frontcut_index_BCG < 0: backcut_index_BCG = np_abs(frontcut_index_BCG) + backcut_index_BCG frontcut_index_BCG = 0 if frontcut_index_ECG < 0: backcut_index_ECG = np_abs(frontcut_index_ECG) + backcut_index_ECG frontcut_index_ECG = 0 print("修正后的值:front_BCG:{}, back_BCG:{}, front_ECG:{}, back_ECG:{}".format(frontcut_index_BCG, backcut_index_BCG, frontcut_index_ECG, backcut_index_ECG)) Config["frontcut_index_BCG"] = frontcut_index_BCG Config["backcut_index_BCG"] = backcut_index_BCG Config["frontcut_index_ECG"] = frontcut_index_ECG Config["backcut_index_ECG"] = backcut_index_ECG except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_POSTPROCESS_VIEW_FAILURE + Constants.FAILURE_REASON["PostProcess_Align_Exception"] + "\n" + format_exc()) info = f"{Constants.PRECISELY_ALIGN_POSTPROCESS_VIEW_FINISHED},BCG前后段被切割的坐标值为[{frontcut_index_BCG}, {backcut_index_BCG}],ECG前后段被切割的坐标值为[{frontcut_index_ECG}, {backcut_index_ECG}]" return Result().success(info=info) def save_alignInfo(self): try: save_data = { "front": { "N_JJIV": [Config["IV_Coordinate"]["BCG_front_1"], Config["IV_Coordinate"]["BCG_front_2"]], "BCG_coordinate": [Config["Coordinate"]["BCG_front_1"], Config["Coordinate"]["BCG_front_2"]], "N_RRIV": [Config["IV_Coordinate"]["ECG_front_1"], Config["IV_Coordinate"]["ECG_front_2"]], "ECG_coordinate": [Config["Coordinate"]["ECG_front_1"], Config["Coordinate"]["ECG_front_2"]], "shift": Config["front"]["shift"], "offset_interval": Config["front"]["offset_interval"], "offset_interval_duration": Config["front"]["offset_interval_duration"], "anchorsRJ": [Config["front"]["anchor_R"], Config["front"]["anchor_J"]] }, "back": { "N_JJIV": [Config["IV_Coordinate"]["BCG_back_1"], Config["IV_Coordinate"]["BCG_back_2"]], "BCG_coordinate": [Config["Coordinate"]["BCG_back_1"], Config["Coordinate"]["BCG_back_2"]], "N_RRIV": [Config["IV_Coordinate"]["ECG_back_1"], Config["IV_Coordinate"]["ECG_back_2"]], "ECG_coordinate": [Config["Coordinate"]["ECG_back_1"], Config["Coordinate"]["ECG_back_2"]], "shift": Config["back"]["shift"], "offset_interval": Config["back"]["offset_interval"], "offset_interval_duration": Config["back"]["offset_interval_duration"], "anchorsRJ": [Config["back"]["anchor_R"], Config["back"]["anchor_J"]] }, "offset_anchor": Config["offset_anchor"], "orgfs": Config["orgfs"], "cut_index": { "front_BCG": Config["frontcut_index_BCG"], "back_BCG": Config["backcut_index_BCG"], "front_ECG": Config["frontcut_index_ECG"], "back_ECG": Config["backcut_index_ECG"] } } save_data = [str(save_data)] DataFrame(save_data).to_csv(Config["Path"]["Save_AlignInfo"], index=False, header=False) except PermissionError as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"]) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_ALIGNINFO_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_ALIGNINFO_FINISHED) def save_res_orgBcg(self, chunk): if (not Path(Config["Path"]["Save_OrgBCG"]).parent.exists()) or ( not Path(Config["Path"]["Save_OrgBCG"]).parent.is_dir()): Path(Config["Path"]["Save_OrgBCG"]).parent.mkdir(parents=True, exist_ok=True) if Config["InputConfig"]["orgBcgFreq"] != Config["InputConfig"]["UseFreq"]: self.res_orgBcg = resample(self.res_orgBcg, int(len(self.res_orgBcg) * (Config["InputConfig"]["orgBcgFreq"] / Config["InputConfig"]["UseFreq"]))) if self.res_orgBcg is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_RES_ORGBCG_FAILURE + Constants.FAILURE_REASON["res_orgBcg_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_OrgBCG"], mode='a', index=False, header=False) except PermissionError as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"]) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_RES_ORGBCG_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_RES_ORGBCG_FINISHED) def save_res_BCG(self, chunk): if (not Path(Config["Path"]["Save_BCG"]).parent.exists()) or ( not Path(Config["Path"]["Save_BCG"]).parent.is_dir()): Path(Config["Path"]["Save_BCG"]).parent.mkdir(parents=True, exist_ok=True) if self.res_BCG is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_RES_BCG_FAILURE + Constants.FAILURE_REASON["res_BCG_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_BCG"], mode='a', index=False, header=False) except PermissionError as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"]) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_RES_BCG_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_RES_BCG_FINISHED) def save_cut_ECG(self, chunk): if (not Path(Config["Path"]["Save_ECG"]).parent.exists()) or ( not Path(Config["Path"]["Save_ECG"]).parent.is_dir()): Path(Config["Path"]["Save_ECG"]).parent.mkdir(parents=True, exist_ok=True) if self.cut_ECG is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_ECG_FAILURE + Constants.FAILURE_REASON["cut_ECG_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_ECG"], mode='a', index=False, header=False) except PermissionError as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"]) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_ECG_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_CUT_ECG_FINISHED) def save_Jpeak(self, chunk): if (not Path(Config["Path"]["Save_ECG"]).parent.exists()) or ( not Path(Config["Path"]["Save_ECG"]).parent.is_dir()): Path(Config["Path"]["Save_ECG"]).parent.mkdir(parents=True, exist_ok=True) if self.cut_Jpeak is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_JPEAK_FAILURE + Constants.FAILURE_REASON["cut_Jpeak_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_Jpeak"], mode='a', index=False, header=False) except PermissionError as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"]) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_JPEAK_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_CUT_JPEAK_FINISHED) def save_Rpeak(self, chunk): if (not Path(Config["Path"]["Save_Rpeak"]).parent.exists()) or ( not Path(Config["Path"]["Save_Rpeak"]).parent.is_dir()): Path(Config["Path"]["Save_Rpeak"]).parent.mkdir(parents=True, exist_ok=True) if self.cut_Rpeak is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_RPEAK_FAILURE + Constants.FAILURE_REASON["cut_Rpeak_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_Rpeak"], mode='a', index=False, header=False) except PermissionError as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"]) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_RPEAK_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_CUT_RPEAK_FINISHED) def get_corresponding_interval(self, point, bcg2ecg=True): try: if self.BCG_early: pos = -self.approximately_align_pos else: pos = self.approximately_align_pos if bcg2ecg: new_point = point / self.approximately_align_estimated_freq + pos - self.approximately_align_intercept else: new_point = (point - pos + self.approximately_align_intercept) * self.approximately_align_estimated_freq result = { "new_point": int(new_point) } except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_GET_CORRESPONDING_INTERVAL_FAILURE + Constants.FAILURE_REASON[ "Get_Corresponding_Interval_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_GET_CORRESPONDING_INTERVAL_FINISHED, data=result) def get_rriv_from_ecg_point(self, point:int, expend_second:int, side="left"): extend_point = expend_second * Config["InputConfig"]["UseFreq"] if side == "left": extend_point = max(point + extend_point, 0) elif side == "right": extend_point = min(point - extend_point, max(self.Rpeak)) else: return Result().failure(info=Constants.PRECISELY_ALIGN_GET_RRIV_FROM_ECG_POINT_FAILURE + Constants.FAILURE_REASON["Invalid_Side_Parameter"]) extend_estimate_RRIV = searchsorted(self.Rpeak[:-2], extend_point, side) result = { "estimate_RRIV": extend_estimate_RRIV, "extend_point": extend_point } return Result().success(info=Constants.PRECISELY_ALIGN_GET_RRIV_FROM_ECG_POINT_FINISHED, data=result) class CustomNavigationToolbar(NavigationToolbar2QT): def __init__(self, canvas, parent): super().__init__(canvas, parent) # 初始化画框工具栏 self.action_Get_Range = QAction(Constants.PRECISELY_ALIGN_ACTION_GET_RANGE_NAME, self) self.action_Get_Range.setFont(QFont(Params.FONT, 14)) self.action_Get_Range.setCheckable(True) self.action_Get_Range.setShortcut(QCoreApplication.translate( "MainWindow", Params.PRECISELY_ALIGN_ACTION_GET_RANGE_SHORTCUT_KEY)) self.insertAction(self._actions['pan'], self.action_Get_Range) self._actions['pan'].setShortcut(QCoreApplication.translate( "MainWindow", Params.ACTION_PAN_SHORTCUT_KEY)) self._actions['zoom'].setShortcut(QCoreApplication.translate( "MainWindow", Params.ACTION_ZOOM_SHORTCUT_KEY)) # 用于存储事件连接ID self.cid_mouse_press = None self.cid_mouse_release = None self.cid_mouse_hold = None # 初始化矩形选择区域 self.rect_start_x = None self.rect_end_x = None self.ax0_BCG_rectangle_front = None self.ax0_BCG_rectangle_back = None self.ax1_ECG_rectangle_front = None self.ax1_ECG_rectangle_back = None def home(self, *args): pass def zoom(self, *args): super().zoom(*args) self.deactivate_figToorbar_getRange_mode() def pan(self, *args): super().pan(*args) self.deactivate_figToorbar_getRange_mode() def deactivate_figToorbar_getRange_mode(self): if self.action_Get_Range.isChecked(): self.action_Get_Range.setChecked(False) if self.cid_mouse_press is not None: self.canvas.mpl_disconnect(self.cid_mouse_press) self.cid_mouse_press = None if self.cid_mouse_release is not None: self.canvas.mpl_disconnect(self.cid_mouse_release) self.cid_mouse_release = None if self.cid_mouse_hold is not None: self.canvas.mpl_disconnect(self.cid_mouse_hold) self.cid_mouse_hold = None