from gc import collect from pathlib import Path from traceback import format_exc import matplotlib.pyplot as plt from PySide6.QtCore import QCoreApplication from PySide6.QtGui import QAction, QFont from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication, QButtonGroup from matplotlib import gridspec, patches from matplotlib.backends.backend_qt import NavigationToolbar2QT from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg from numpy import (diff, where, correlate, corrcoef, sum as np_sum, max as np_max, min as np_min, arange, array, append, delete, abs as np_abs, argmin as np_argmin, argmax as np_argmax, asarray) from overrides import overrides from pandas import read_csv, DataFrame from resampy import resample from scipy.signal import find_peaks from yaml import dump, load, FullLoader from func.utils.PublicFunc import PublicFunc from func.utils.Constants import Constants, ConfigParams from func.utils.Result import Result from ui.MainWindow.MainWindow_precisely_align import Ui_MainWindow_precisely_align from ui.setting.precisely_align_input_setting import Ui_MainWindow_precisely_align_input_setting Config = { } ButtonState = { "Default": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_calculate_correlation": False, "pushButton_correlation_align": False, "pushButton_view_align": False, "pushButton_save": False }, "Current": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_calculate_correlation": False, "pushButton_correlation_align": False, "pushButton_view_align": False, "pushButton_save": False }, "Statue_1": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_calculate_correlation": True, "pushButton_correlation_align": False, "pushButton_view_align": False, "pushButton_save": False }, "Statue_2": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_calculate_correlation": True, "pushButton_correlation_align": True, "pushButton_view_align": False, "pushButton_save": False }, "Statue_3": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_calculate_correlation": False, "pushButton_correlation_align": False, "pushButton_view_align": True, "pushButton_save": False }, "Statue_4": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_calculate_correlation": False, "pushButton_correlation_align": False, "pushButton_view_align": False, "pushButton_save": True }, } class SettingWindow(QMainWindow): def __init__(self, root_path, sampID): super(SettingWindow, self).__init__() self.ui = Ui_MainWindow_precisely_align_input_setting() self.ui.setupUi(self) self.root_path = root_path self.sampID = sampID self.config = None self.__read_config__() self.ui.spinBox_input_freq_orgBcg.valueChanged.connect(self.__update_ui__) self.ui.spinBox_input_freq_BCG.valueChanged.connect(self.__update_ui__) self.ui.spinBox_input_freq_ECG.valueChanged.connect(self.__update_ui__) self.ui.pushButton_confirm.clicked.connect(self.__write_config__) self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__) self.ui.pushButton_cancel.clicked.connect(self.close) def __read_config__(self): if not Path(ConfigParams.PRECISELY_ALIGN_CONFIG_FILE_PATH).exists(): with open(ConfigParams.PRECISELY_ALIGN_CONFIG_FILE_PATH, "w") as f: dump(ConfigParams.PRECISELY_ALIGN_CONFIG_NEW_CONTENT, f) with open(ConfigParams.PRECISELY_ALIGN_CONFIG_FILE_PATH, "r") as f: file_config = load(f.read(), Loader=FullLoader) Config.update(file_config) self.config = file_config Config.update({ "Path": { "Input_OrgBCG": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Input_BCG": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Input_Jpeak": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Input_ECG": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)))), "Input_Rpeak": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)))), "Input_Approximately_Align": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)))), "Save_AlignInfo": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_LABEL / Path(str(self.sampID)))), "Save_OrgBCG": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_ALIGNED / Path(str(self.sampID)))), "Save_BCG": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_ALIGNED / Path(str(self.sampID)))), "Save_ECG": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_ALIGNED / Path(str(self.sampID)))), "Save_Jpeak": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_ALIGNED / Path(str(self.sampID)))), "Save_Rpeak": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_ALIGNED / Path(str(self.sampID)))) }, "Coordinate": { "BCG_front_1": 0, "BCG_front_2": 0, "BCG_back_1": 0, "BCG_back_2": 0, "ECG_front_1": 0, "ECG_front_2": 0, "ECG_back_1": 0, "ECG_back_2": 0 }, "IV_Coordinate": { "BCG_front_1": 0, "BCG_front_2": 0, "BCG_back_1": 0, "BCG_back_2": 0, "ECG_front_1": 0, "ECG_front_2": 0, "ECG_back_1": 0, "ECG_back_2": 0 }, "front": { "shift": 0, "offset_interval": 0, "anchor_J": 0, "anchor_R": 0, "offset_interval_duration": 0 }, "back": { "shift": 0, "offset_interval": 0, "anchor_J": 0, "anchor_R": 0, "offset_interval_duration": 0 }, "offset_anchor": 0, "orgfs": 0, "offset_correct": 0, "frontcut_index_BCG": 0, "backcut_index_BCG": 0, "frontcut_index_ECG": 0, "backcut_index_ECG": 0 }) # 数据回显 self.ui.spinBox_input_freq_orgBcg.setValue(Config["InputConfig"]["orgBcgFreq"]) self.ui.spinBox_input_freq_BCG.setValue(Config["InputConfig"]["BCGFreq"]) self.ui.spinBox_input_freq_ECG.setValue(Config["InputConfig"]["ECGFreq"]) self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText(Config["Path"]["Input_OrgBCG"]) self.ui.plainTextEdit_file_path_input_BCG.setPlainText(Config["Path"]["Input_BCG"]) self.ui.plainTextEdit_file_path_input_Jpeak.setPlainText(Config["Path"]["Input_Jpeak"]) self.ui.plainTextEdit_file_path_input_ECG.setPlainText(Config["Path"]["Input_ECG"]) self.ui.plainTextEdit_file_path_input_Rpeak.setPlainText(Config["Path"]["Input_Rpeak"]) self.ui.plainTextEdit_file_path_input_approximately_align.setPlainText(Config["Path"]["Input_Approximately_Align"]) self.ui.plainTextEdit_file_path_save_AlignInfo.setPlainText(Config["Path"]["Save_AlignInfo"]) self.ui.plainTextEdit_file_path_save_orgBcg.setPlainText(Config["Path"]["Save_OrgBCG"]) self.ui.plainTextEdit_file_path_save_BCG.setPlainText(Config["Path"]["Save_BCG"]) self.ui.plainTextEdit_file_path_save_ECG.setPlainText(Config["Path"]["Save_ECG"]) self.ui.plainTextEdit_file_path_save_Jpeak.setPlainText(Config["Path"]["Save_Jpeak"]) self.ui.plainTextEdit_file_path_save_Rpeak.setPlainText(Config["Path"]["Save_Rpeak"]) def __write_config__(self): # 从界面写入配置 Config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_freq_orgBcg.value() Config["InputConfig"]["BCGFreq"] = self.ui.spinBox_input_freq_BCG.value() Config["InputConfig"]["ECGFreq"] = self.ui.spinBox_input_freq_ECG.value() Config["Path"]["Input_OrgBCG"] = self.ui.plainTextEdit_file_path_input_orgBcg.toPlainText() Config["Path"]["Input_BCG"] = self.ui.plainTextEdit_file_path_input_BCG.toPlainText() Config["Path"]["Input_Jpeak"] = self.ui.plainTextEdit_file_path_input_Jpeak.toPlainText() Config["Path"]["Input_ECG"] = self.ui.plainTextEdit_file_path_input_ECG.toPlainText() Config["Path"]["Input_Rpeak"] = self.ui.plainTextEdit_file_path_input_Rpeak.toPlainText() Config["Path"]["Input_Approximately_Align"] = self.ui.plainTextEdit_file_path_input_approximately_align.toPlainText() Config["Path"]["Save_AlignInfo"] = self.ui.plainTextEdit_file_path_save_AlignInfo.toPlainText() Config["Path"]["Save_OrgBCG"] = self.ui.plainTextEdit_file_path_save_orgBcg.toPlainText() Config["Path"]["Save_BCG"] = self.ui.plainTextEdit_file_path_save_BCG.toPlainText() Config["Path"]["Save_ECG"] = self.ui.plainTextEdit_file_path_save_ECG.toPlainText() Config["Path"]["Save_Jpeak"] = self.ui.plainTextEdit_file_path_save_Jpeak.toPlainText() Config["Path"]["Save_Rpeak"] = self.ui.plainTextEdit_file_path_save_Rpeak.toPlainText() # 保存配置到文件 self.config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_freq_orgBcg.value() self.config["InputConfig"]["BCGFreq"] = self.ui.spinBox_input_freq_BCG.value() self.config["InputConfig"]["ECGFreq"] = self.ui.spinBox_input_freq_ECG.value() with open(ConfigParams.PRECISELY_ALIGN_CONFIG_FILE_PATH, "w") as f: dump(self.config, f) self.close() def __rollback_config__(self): self.__read_config__() def __update_ui__(self): self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.ORGBCG_RAW + str(self.ui.spinBox_input_freq_orgBcg.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_input_BCG.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.BCG_FILTER + str(self.ui.spinBox_input_freq_BCG.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_input_ECG.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.ECG_FILTER + str(self.ui.spinBox_input_freq_ECG.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_save_orgBcg.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_ALIGNED / Path(str(self.sampID)) / Path(ConfigParams.ORGBCG_SYNC + str(self.ui.spinBox_input_freq_orgBcg.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_save_BCG.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_ALIGNED / Path(str(self.sampID)) / Path(ConfigParams.BCG_SYNC + str(self.ui.spinBox_input_freq_BCG.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_save_ECG.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_ALIGNED / Path(str(self.sampID)) / Path(ConfigParams.ECG_SYNC + str(self.ui.spinBox_input_freq_ECG.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_save_Jpeak.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_ALIGNED / Path(str(self.sampID)) / Path(ConfigParams.JPEAK_SYNC + str(self.ui.spinBox_input_freq_BCG.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_save_Rpeak.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_ALIGNED / Path(str(self.sampID)) / Path(ConfigParams.RPEAK_SYNC + str(self.ui.spinBox_input_freq_ECG.value()) + ConfigParams.ENDSWITH_TXT)))) class MainWindow_precisely_align(QMainWindow): def __init__(self): super(MainWindow_precisely_align, self).__init__() self.ui = Ui_MainWindow_precisely_align() self.ui.setupUi(self) self.root_path = None self.sampID = None self.data = None self.setting = None self.buttonGroup = None # 初始化进度条 self.progressbar = None PublicFunc.add_progressbar(self) # 初始化画框 self.fig = None self.canvas = None self.figToolbar = None self.gs = None self.ax0 = None self.ax1 = None self.ax2 = None self.ax3 = None self.ax4 = None self.is_left_button_pressed = None self.rect_down = None self.rect_up = None self.point0 = None self.point1 = None self.selected_point0 = None self.selected_point1 = None self.selected_point2 = None self.selected_point3 = None self.stem_black0 = None self.stem_black1 = None self.ax0_xlime = None self.ax0_ylime = None self.ax2_xlime = None self.ax2_ylime = None self.ax4_xlime = None self.ax4_ylime = None self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) @overrides def show(self, root_path, sampID): super().show() self.root_path = root_path self.sampID = sampID self.setting = SettingWindow(root_path, sampID) # 初始化画框 self.fig = plt.figure(figsize=(12, 9), dpi=100) self.canvas = FigureCanvasQTAgg(self.fig) self.figToolbar = CustomNavigationToolbar(self.canvas, self) self.figToolbar.action_Get_Range.setEnabled(False) for action in self.figToolbar._actions.values(): action.setEnabled(False) for action in self.figToolbar.actions(): if action.text() == "Subplots" or action.text() == "Customize": self.figToolbar.removeAction(action) self.figToolbar._actions['home'].triggered.connect(self.toggle_home) self.figToolbar.action_Get_Range.triggered.connect(self.toggle_getRange) self.ui.verticalLayout_canvas.addWidget(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.figToolbar) PublicFunc.__resetAllButton__(self, ButtonState) self.buttonGroup = QButtonGroup(self) self.__update_info__() self.buttonGroup.addButton(self.ui.radioButton_BCG_front) self.buttonGroup.addButton(self.ui.radioButton_ECG_front) self.buttonGroup.addButton(self.ui.radioButton_BCG_back) self.buttonGroup.addButton(self.ui.radioButton_ECG_back) self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__) self.ui.pushButton_input_setting.clicked.connect(self.setting.show) self.ui.pushButton_calculate_correlation.clicked.connect(self.__slot_btn_calculate_correlation__) self.ui.pushButton_correlation_align.clicked.connect(self.__slot_btn_correlation_align__) self.ui.pushButton_view_align.clicked.connect(self.__slot_btn_view_align__) self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__) self.canvas.mpl_connect('pick_event', self.on_pick) self.ui.spinBox_BCG_front_JJIV_1.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_BCG_front_JJIV_2.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_BCG_back_JJIV_1.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_BCG_back_JJIV_2.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_ECG_front_RRIV_1.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_ECG_front_RRIV_2.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_ECG_back_RRIV_1.editingFinished.connect(self.__update_coordinate__) self.ui.spinBox_ECG_back_RRIV_2.editingFinished.connect(self.__update_coordinate__) @overrides def closeEvent(self, event): reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN)) QApplication.processEvents() # 清空画框 del self.point0 del self.point1 del self.selected_point0 del self.selected_point1 del self.selected_point2 del self.selected_point3 del self.stem_black0 del self.stem_black1 del self.figToolbar.ax0_BCG_rectangle_front del self.figToolbar.ax0_BCG_rectangle_back del self.figToolbar.ax1_ECG_rectangle_front del self.figToolbar.ax1_ECG_rectangle_back if self.ax0 is not None: self.ax0.clear() if self.ax1 is not None: self.ax1.clear() if self.ax2 is not None: self.ax2.clear() if self.ax3 is not None: self.ax3.clear() if self.ax4 is not None: self.ax4.clear() # 释放资源 del self.data self.fig.clf() plt.close(self.fig) self.deleteLater() collect() self.canvas = None event.accept() else: event.ignore() def __reset__(self): ButtonState["Current"].update(ButtonState["Default"].copy()) def __plot__(self, plot_element=None): # 清空画框 if self.figToolbar.ax0_BCG_rectangle_front is not None: self.figToolbar.ax0_BCG_rectangle_front.remove() if self.figToolbar.ax0_BCG_rectangle_back is not None: self.figToolbar.ax0_BCG_rectangle_back.remove() if self.figToolbar.ax1_ECG_rectangle_front is not None: self.figToolbar.ax1_ECG_rectangle_front.remove() if self.figToolbar.ax1_ECG_rectangle_back is not None: self.figToolbar.ax1_ECG_rectangle_back.remove() self.figToolbar.ax0_BCG_rectangle_front = None self.figToolbar.ax0_BCG_rectangle_back = None self.figToolbar.ax1_ECG_rectangle_front = None self.figToolbar.ax1_ECG_rectangle_back = None self.reset_axes() sender = self.sender() if sender == self.ui.pushButton_input: self.gs = gridspec.GridSpec(2, 1, height_ratios=[1, 1]) self.fig.subplots_adjust(top=0.95, bottom=0.05, right=0.98, left=0.05, hspace=0.15, wspace=0) self.ax0 = self.fig.add_subplot(self.gs[0]) self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER) self.ax1 = self.fig.add_subplot(self.gs[1], sharex=self.ax0, sharey=self.ax0) self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(ConfigParams.FORMATTER) Jpeak = self.data.Jpeak[:-2] Rpeak = self.data.Rpeak[:-2] self.ax0.set_title("JJIV") self.ax0.stem(Jpeak, plot_element["JJIVs"], markerfmt="C0.", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.ax1.set_title("RRIV") self.ax1.stem(Rpeak, plot_element["RRIVs"], markerfmt="C0.", linefmt=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RRIV) if self.data.BCG_early is True: self.ax0.axvline(x=self.data.approximately_align_pos, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") self.ax1.axvline(x=0, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") elif self.data.BCG_early is False: self.ax0.axvline(x=0, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") self.ax1.axvline(x=self.data.approximately_align_pos, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") else: self.ax0.axvline(x=self.data.approximately_align_pos, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") self.ax1.axvline(x=self.data.approximately_align_pos, color=Constants.PLOT_COLOR_BLACK, linestyle="--", label="Start Line") self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT) self.ax1.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAWING_FINISHED) elif sender == self.ui.pushButton_calculate_correlation and plot_element is not None and plot_element["mode"] == "init": self.gs = gridspec.GridSpec(2, 2, height_ratios=[1, 1], width_ratios=[1, 1]) self.fig.subplots_adjust(top=0.88, bottom=0.05, right=0.98, left=0.05, hspace=0.15, wspace=0.15) self.ax0 = self.fig.add_subplot(self.gs[0]) self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER) self.ax1 = self.fig.add_subplot(self.gs[2]) self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(ConfigParams.FORMATTER) self.ax2 = self.fig.add_subplot(self.gs[1]) self.ax2.grid(True) self.ax2.xaxis.set_major_formatter(ConfigParams.FORMATTER) self.ax3 = self.fig.add_subplot(self.gs[3]) self.ax3.grid(True) self.ax3.xaxis.set_major_formatter(ConfigParams.FORMATTER) self.ax0.set_title( "front\ncorre_IIV: {}, corre_II: {}\nsame_sign_rate:{}, total_time_ratio: {}\nshift: {}, alignment offset: {} seconds\noffset_interval: {}, anchor_J: {}, anchor_R: {}".format( plot_element["front"]["correlation_IIV"], plot_element["front"]["correlation_II"], plot_element["front"]["same_sign_rate"], plot_element["front"]["total_time_ratio"], plot_element["front"]["shift"], plot_element["front"]["offset_interval_duration"], plot_element["front"]["offset_interval"], plot_element["front"]["anchor_J"], plot_element["front"]["anchor_R"])) self.ax0.stem(plot_element["front"]["corre"], markerfmt="C0.", label=Constants.PRECISELY_ALIGN_PLOT_LABEL_CORRE_RRIV_JJIV) self.selected_point0, = self.ax0.plot(plot_element["front"]["shift"], plot_element["front"]["corre"][plot_element["front"]["shift"]] + 1, 'v', color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_SELECTED_POINT) self.ax0.plot(plot_element["front"]["corre"], 'o', color=Constants.PLOT_COLOR_BLUE, markersize=3, picker=True, pickradius=5) self.ax1.stem(plot_element["front"]["RRIVs"], markerfmt="b.", linefmt=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RRIV) self.stem_black0 = self.ax1.stem(arange(plot_element["front"]["shift"], plot_element["front"]["shift"] + len(plot_element["front"]["JJIVs"])), plot_element["front"]["JJIVs"], markerfmt="ko", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.ax2.set_title( "back\ncorre_IIV: {}, corre_II: {}\nsame_sign_rate:{}, total_time_ratio: {}\nshift: {}, alignment offset: {} seconds\noffset_interval: {}, anchor_J: {}, anchor_R: {}".format( plot_element["back"]["correlation_IIV"], plot_element["back"]["correlation_II"], plot_element["back"]["same_sign_rate"], plot_element["back"]["total_time_ratio"], plot_element["back"]["shift"], plot_element["back"]["offset_interval_duration"], plot_element["back"]["offset_interval"], plot_element["back"]["anchor_J"], plot_element["back"]["anchor_R"])) self.ax2.stem(plot_element["back"]["corre"], markerfmt="C0.", label=Constants.PRECISELY_ALIGN_PLOT_LABEL_CORRE_RRIV_JJIV) self.selected_point1, = self.ax2.plot(plot_element["back"]["shift"], plot_element["back"]["corre"][plot_element["back"]["shift"]] + 1, 'v', color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_SELECTED_POINT) self.ax2.plot(plot_element["back"]["corre"], 'o', color=Constants.PLOT_COLOR_BLUE, markersize=3, picker=True, pickradius=5) self.ax3.stem(plot_element["back"]["RRIVs"], markerfmt="b.", linefmt=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RRIV) self.stem_black1 = self.ax3.stem(arange(plot_element["back"]["shift"], plot_element["back"]["shift"] + len(plot_element["back"]["JJIVs"])), plot_element["back"]["JJIVs"], markerfmt="ko", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT) self.ax1.legend(loc=Constants.PLOT_UPPER_RIGHT) self.ax2.legend(loc=Constants.PLOT_UPPER_RIGHT) self.ax3.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAWING_FINISHED) elif sender == self.ui.pushButton_correlation_align or (plot_element is not None and plot_element["mode"] == "select"): self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.fig.subplots_adjust(top=0.95, bottom=0.05, right=0.98, left=0.05, hspace=0, wspace=0) self.ax4 = self.fig.add_subplot(self.gs[0]) self.ax4.grid(True) self.ax4.xaxis.set_major_formatter(ConfigParams.FORMATTER) self.ax4.set_title("offset correct") self.ax4.plot(plot_element["cut_ECG"], color=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_ECG) self.ax4.plot(plot_element["res_BCG"], color=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_BCG) self.ax4.plot([plot_element["anchor00"], plot_element["anchor00"]], [plot_element["b"], plot_element["a"]], 'k--') self.ax4.plot([plot_element["anchor10"], plot_element["anchor10"]], [plot_element["b"], plot_element["a"]], 'k--') self.point0, = self.ax4.plot(plot_element["peak_ECG"], plot_element["cut_ECG"][plot_element["peak_ECG"]], 'o', markersize=3, color=Constants.PLOT_COLOR_BLUE, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RPEAK) self.point1, = self.ax4.plot(plot_element["peak_BCG"], plot_element["res_BCG"][plot_element["peak_BCG"]], 'o', markersize=3, color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JPEAK) self.ax4.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAWING_FINISHED) elif sender == self.ui.pushButton_view_align: self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.fig.subplots_adjust(top=0.95, bottom=0.05, right=0.98, left=0.05, hspace=0, wspace=0) self.ax4 = self.fig.add_subplot(self.gs[0]) self.ax4.grid(True) self.ax4.xaxis.set_major_formatter(ConfigParams.FORMATTER) self.ax4.set_title("result preview") self.ax4.plot(self.data.cut_ECG, color=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_ECG) self.ax4.plot(self.data.res_BCG, color=Constants.PLOT_COLOR_ORANGE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_BCG) self.ax4.plot(self.data.cut_Rpeak, self.data.cut_ECG[self.data.cut_Rpeak], 'v', color=Constants.PLOT_COLOR_BLUE, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_RPEAK) self.ax4.plot(self.data.cut_Jpeak, self.data.res_BCG[self.data.cut_Jpeak], 'v', color=Constants.PLOT_COLOR_RED, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JPEAK) self.ax4.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAWING_FINISHED) else: self.canvas.draw() return Result().failure(info=Constants.DRAWING_FAILURE) def __update_info__(self): self.ui.spinBox_BCG_front_JJIV_1.setValue(Config["IV_Coordinate"]["BCG_front_1"]) self.ui.spinBox_BCG_front_JJIV_2.setValue(Config["IV_Coordinate"]["BCG_front_2"]) self.ui.spinBox_BCG_back_JJIV_1.setValue(Config["IV_Coordinate"]["BCG_back_1"]) self.ui.spinBox_BCG_back_JJIV_2.setValue(Config["IV_Coordinate"]["BCG_back_2"]) self.ui.spinBox_ECG_front_RRIV_1.setValue(Config["IV_Coordinate"]["ECG_front_1"]) self.ui.spinBox_ECG_front_RRIV_2.setValue(Config["IV_Coordinate"]["ECG_front_2"]) self.ui.spinBox_ECG_back_RRIV_1.setValue(Config["IV_Coordinate"]["ECG_back_1"]) self.ui.spinBox_ECG_back_RRIV_2.setValue(Config["IV_Coordinate"]["ECG_back_2"]) self.ui.spinBox_BCG_front_Signal_1.setValue(Config["Coordinate"]["BCG_front_1"]) self.ui.spinBox_BCG_front_Signal_2.setValue(Config["Coordinate"]["BCG_front_2"]) self.ui.spinBox_BCG_back_Signal_1.setValue(Config["Coordinate"]["BCG_back_1"]) self.ui.spinBox_BCG_back_Signal_2.setValue(Config["Coordinate"]["BCG_back_2"]) self.ui.spinBox_ECG_front_Signal_1.setValue(Config["Coordinate"]["ECG_front_1"]) self.ui.spinBox_ECG_front_Signal_2.setValue(Config["Coordinate"]["ECG_front_2"]) self.ui.spinBox_ECG_back_Signal_1.setValue(Config["Coordinate"]["ECG_back_1"]) self.ui.spinBox_ECG_back_Signal_2.setValue(Config["Coordinate"]["ECG_back_2"]) def __slot_btn_input__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 清空画框 self.reset_axes() self.canvas.draw() self.data = Data() # 导入数据 PublicFunc.progressbar_update(self, 1, 3, Constants.INPUTTING_DATA, 0) result = self.data.open_file() if not result.status: PublicFunc.text_output(self.ui, "(1/3)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/3)" + result.info, Constants.TIPS_TYPE_INFO) # 处理数据 PublicFunc.progressbar_update(self, 2, 3, Constants.PRECISELY_ALIGN_PROCESSING_DATA, 50) result = self.data.data_process_for_calculate_correlation() if not result.status: PublicFunc.text_output(self.ui, "(2/3)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/3)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 3, 3, Constants.DRAWING_DATA, 70) result = self.__plot__(result.data) if not result.status: PublicFunc.text_output(self.ui, "(3/3)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/3)" + result.info, Constants.TIPS_TYPE_INFO) self.__reset__() self.figToolbar.action_Get_Range.setEnabled(True) self.rect_down = min(self.ax0.get_ylim()[0], self.ax1.get_ylim()[0]) - 10000 self.rect_up = max(self.ax0.get_ylim()[1], self.ax1.get_ylim()[1]) + 10000 for action in self.figToolbar._actions.values(): action.setEnabled(True) ButtonState["Current"].update(ButtonState["Statue_1"].copy()) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_calculate_correlation__(self, test1=None, shift_front=None, shift_back=None): # TODO:这里有个未知的问BUG,虽然不影响功能,但会影响代码整洁性,第一个形参赋值为None时,之后使用变量时将会变成False,不知道为什么 PublicFunc.__disableAllButton__(self, ButtonState) sender = self.sender() if sender == self.ui.pushButton_calculate_correlation: mode = "init" else: mode = "select" self.ax0_xlime = self.ax0.get_xlim() self.ax0_ylime = self.ax0.get_ylim() self.ax2_xlime = self.ax2.get_xlim() self.ax2_ylime = self.ax2.get_ylim() # 计算前段相关性 PublicFunc.progressbar_update(self, 1, 3, Constants.PRECISELY_ALIGN_CALCULATING_CORRELATION_FRONT, 0) result1 = self.data.calculate_correlation_front(mode, shift_front) if not result1.status: PublicFunc.text_output(self.ui, "(1/3)" + result1.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result1.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/3)" + result1.info, Constants.TIPS_TYPE_INFO) # 计算后段相关性 PublicFunc.progressbar_update(self, 2, 3, Constants.PRECISELY_ALIGN_CALCULATING_CORRELATION_BACK, 30) result2 = self.data.calculate_correlation_back(mode, shift_back) if not result2.status: PublicFunc.text_output(self.ui, "(2/3)" + result2.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result2.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/3)" + result2.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 3, 3, Constants.DRAWING_DATA, 60) result = {} result.update(result1.data) result.update(result2.data) result.update({"mode": mode}) if mode == "init": result = self.__plot__(result) elif mode == "select": result = self.redraw_calculate_coordination(result) else: raise ValueError("模式不存在") if not result.status: PublicFunc.text_output(self.ui, "(3/3)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/3)" + result.info, Constants.TIPS_TYPE_INFO) self.figToolbar.action_Get_Range.setEnabled(False) self.figToolbar.deactivate_figToorbar_getRange_mode() ButtonState["Current"].update(ButtonState["Statue_2"].copy()) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_correlation_align__(self): PublicFunc.__disableAllButton__(self, ButtonState) sender = self.sender() if sender == self.ui.pushButton_correlation_align: mode = "init" else: mode = "select" self.ax4_xlime = self.ax4.get_xlim() self.ax4_ylime = self.ax4.get_ylim() # 处理相关对齐 PublicFunc.progressbar_update(self, 1, 2, Constants.PRECISELY_ALIGN_ALIGNING_CORRELATION, 0) result = self.data.correlation_align(mode) if not result.status: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 50) result.data.update({"mode": mode}) if mode == "init": result = self.__plot__(result.data) elif mode == "select": result = self.redraw_correlation_align(result.data) else: raise ValueError("模式不存在") if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"].update(ButtonState["Statue_3"].copy()) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_view_align__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 数据后处理 PublicFunc.progressbar_update(self, 1, 2, Constants.PRECISELY_ALIGN_POSTPROCESSING_VIEW, 0) result = self.data.data_postprocess() if not result.status: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 50) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"].update(ButtonState["Statue_4"].copy()) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_save__(self): reply = QMessageBox.question(self, Constants.QUESTION_TITLE, Constants.QUESTION_CONTENT, QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) # 保存对齐信息 PublicFunc.progressbar_update(self, 1, 6, Constants.PRECISELY_ALIGN_SAVING_ALIGNINFO, 0) result = self.data.save_alignInfo() if not result.status: PublicFunc.text_output(self.ui, "(1/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后orgBcg PublicFunc.progressbar_update(self, 2, 6, Constants.PRECISELY_ALIGN_SAVING_RES_ORGBCG, 0) total_rows = len(DataFrame(self.data.res_orgBcg.reshape(-1))) chunk_size = ConfigParams.PRECISELY_ALIGN_SAVE_CHUNK_SIZE with open(Config["Path"]["Save_OrgBCG"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.res_orgBcg.reshape(-1)).iloc[start:end] result = self.data.save_res_orgBcg(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() if not result.status: PublicFunc.text_output(self.ui, "(2/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后BCG PublicFunc.progressbar_update(self, 3, 6, Constants.PRECISELY_ALIGN_SAVING_RES_BCG, 0) total_rows = len(DataFrame(self.data.res_BCG.reshape(-1))) chunk_size = ConfigParams.PRECISELY_ALIGN_SAVE_CHUNK_SIZE with open(Config["Path"]["Save_BCG"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.res_BCG.reshape(-1)).iloc[start:end] result = self.data.save_res_BCG(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() if not result.status: PublicFunc.text_output(self.ui, "(3/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后ECG PublicFunc.progressbar_update(self, 4, 6, Constants.PRECISELY_ALIGN_SAVING_CUT_ECG, 0) total_rows = len(DataFrame(self.data.cut_ECG.reshape(-1))) chunk_size = ConfigParams.PRECISELY_ALIGN_SAVE_CHUNK_SIZE with open(Config["Path"]["Save_ECG"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.cut_ECG.reshape(-1)).iloc[start:end] result = self.data.save_cut_ECG(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() if not result.status: PublicFunc.text_output(self.ui, "(4/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(4/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后J峰 PublicFunc.progressbar_update(self, 5, 6, Constants.PRECISELY_ALIGN_SAVING_CUT_JPEAK, 0) total_rows = len(DataFrame(self.data.cut_Jpeak.reshape(-1))) chunk_size = ConfigParams.PRECISELY_ALIGN_SAVE_PEAK_CHUNK_SIZE with open(Config["Path"]["Save_Jpeak"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.cut_Jpeak.reshape(-1)).iloc[start:end] result = self.data.save_Jpeak(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() if not result.status: PublicFunc.text_output(self.ui, "(5/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(5/6)" + result.info, Constants.TIPS_TYPE_INFO) # 保存切割后R峰 PublicFunc.progressbar_update(self, 6, 6, Constants.PRECISELY_ALIGN_SAVING_CUT_RPEAK, 0) total_rows = len(DataFrame(self.data.cut_Rpeak.reshape(-1))) chunk_size = ConfigParams.PRECISELY_ALIGN_SAVE_PEAK_CHUNK_SIZE with open(Config["Path"]["Save_Rpeak"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.cut_Rpeak.reshape(-1)).iloc[start:end] result = self.data.save_Rpeak(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() if not result.status: PublicFunc.text_output(self.ui, "(6/6)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(6/6)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.msgbox_output(self, Constants.SAVING_FINISHED, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def __update_coordinate__(self): try: if self.data is not None: if self.data.Jpeak is None or self.data.Rpeak is None: PublicFunc.msgbox_output(self, Constants.FAILURE_REASON["Data_Not_Exist"], Constants.MSGBOX_TYPE_ERROR) return sender = self.sender() if sender == self.ui.spinBox_BCG_front_JJIV_1: if self.ui.spinBox_BCG_front_JJIV_1.value() >= len(self.data.Jpeak[:-2]): self.ui.spinBox_BCG_front_JJIV_1.setValue(len(self.data.Jpeak[:-2]) - 1) Config["IV_Coordinate"]["BCG_front_1"] = self.ui.spinBox_BCG_front_JJIV_1.value() Config["Coordinate"]["BCG_front_1"] = self.data.Jpeak[:-2][self.ui.spinBox_BCG_front_JJIV_1.value()] self.ui.spinBox_BCG_front_Signal_1.setValue(Config["Coordinate"]["BCG_front_1"]) elif sender == self.ui.spinBox_BCG_front_JJIV_2: if self.ui.spinBox_BCG_front_JJIV_2.value() >= len(self.data.Jpeak[:-2]): self.ui.spinBox_BCG_front_JJIV_2.setValue(len(self.data.Jpeak[:-2]) - 1) Config["IV_Coordinate"]["BCG_front_2"] = self.ui.spinBox_BCG_front_JJIV_2.value() Config["Coordinate"]["BCG_front_2"] = self.data.Jpeak[:-2][self.ui.spinBox_BCG_front_JJIV_2.value()] self.ui.spinBox_BCG_front_Signal_2.setValue(Config["Coordinate"]["BCG_front_2"]) elif sender == self.ui.spinBox_BCG_back_JJIV_1: if self.ui.spinBox_BCG_back_JJIV_1.value() >= len(self.data.Jpeak[:-2]): self.ui.spinBox_BCG_back_JJIV_1.setValue(len(self.data.Jpeak[:-2]) - 1) Config["IV_Coordinate"]["BCG_back_1"] = self.ui.spinBox_BCG_back_JJIV_1.value() Config["Coordinate"]["BCG_back_1"] = self.data.Jpeak[:-2][self.ui.spinBox_BCG_back_JJIV_1.value()] self.ui.spinBox_BCG_back_Signal_1.setValue(Config["Coordinate"]["BCG_back_1"]) elif sender == self.ui.spinBox_BCG_back_JJIV_2: if self.ui.spinBox_BCG_back_JJIV_2.value() >= len(self.data.Jpeak[:-2]): self.ui.spinBox_BCG_back_JJIV_2.setValue(len(self.data.Jpeak[:-2]) - 1) Config["IV_Coordinate"]["BCG_back_2"] = self.ui.spinBox_BCG_back_JJIV_2.value() Config["Coordinate"]["BCG_back_2"] = self.data.Jpeak[:-2][self.ui.spinBox_BCG_back_JJIV_2.value()] self.ui.spinBox_BCG_back_Signal_2.setValue(Config["Coordinate"]["BCG_back_2"]) elif sender == self.ui.spinBox_ECG_front_RRIV_1: if self.ui.spinBox_ECG_front_RRIV_1.value() >= len(self.data.Rpeak[:-2]): self.ui.spinBox_ECG_front_RRIV_1.setValue(len(self.data.Rpeak[:-2]) - 1) Config["IV_Coordinate"]["ECG_front_1"] = self.ui.spinBox_ECG_front_RRIV_1.value() Config["Coordinate"]["ECG_front_1"] = self.data.Rpeak[:-2][self.ui.spinBox_ECG_front_RRIV_1.value()] self.ui.spinBox_ECG_front_Signal_1.setValue(Config["Coordinate"]["ECG_front_1"]) elif sender == self.ui.spinBox_ECG_front_RRIV_2: if self.ui.spinBox_ECG_front_RRIV_2.value() >= len(self.data.Rpeak[:-2]): self.ui.spinBox_ECG_front_RRIV_2.setValue(len(self.data.Rpeak[:-2]) - 1) Config["IV_Coordinate"]["ECG_front_2"] = self.ui.spinBox_ECG_front_RRIV_2.value() Config["Coordinate"]["ECG_front_2"] = self.data.Rpeak[:-2][self.ui.spinBox_ECG_front_RRIV_2.value()] self.ui.spinBox_ECG_front_Signal_2.setValue(Config["Coordinate"]["ECG_front_2"]) elif sender == self.ui.spinBox_ECG_back_RRIV_1: if self.ui.spinBox_ECG_back_RRIV_1.value() >= len(self.data.Rpeak[:-2]): self.ui.spinBox_ECG_back_RRIV_1.setValue(len(self.data.Rpeak[:-2]) - 1) Config["IV_Coordinate"]["ECG_back_1"] = self.ui.spinBox_ECG_back_RRIV_1.value() Config["Coordinate"]["ECG_back_1"] = self.data.Rpeak[:-2][self.ui.spinBox_ECG_back_RRIV_1.value()] self.ui.spinBox_ECG_back_Signal_1.setValue(Config["Coordinate"]["ECG_back_1"]) elif sender == self.ui.spinBox_ECG_back_RRIV_2: if self.ui.spinBox_ECG_back_RRIV_2.value() >= len(self.data.Rpeak[:-2]): self.ui.spinBox_ECG_back_RRIV_2.setValue(len(self.data.Rpeak[:-2]) - 1) Config["IV_Coordinate"]["ECG_back_2"] = self.ui.spinBox_ECG_back_RRIV_2.value() Config["Coordinate"]["ECG_back_2"] = self.data.Rpeak[:-2][self.ui.spinBox_ECG_back_RRIV_2.value()] self.ui.spinBox_ECG_back_Signal_2.setValue(Config["Coordinate"]["ECG_back_2"]) except AttributeError: pass def reset_axes(self): for ax in self.fig.axes: self.fig.delaxes(ax) if self.ax0 is not None: self.ax0.clear() self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER) if self.ax1 is not None: self.ax1.clear() self.ax1.grid(True) self.ax1.xaxis.set_major_formatter(ConfigParams.FORMATTER) if self.ax2 is not None: self.ax2.clear() self.ax2.grid(True) self.ax2.xaxis.set_major_formatter(ConfigParams.FORMATTER) if self.ax3 is not None: self.ax3.clear() self.ax3.grid(True) self.ax3.xaxis.set_major_formatter(ConfigParams.FORMATTER) if self.ax4 is not None: self.ax4.clear() self.ax4.grid(True) self.ax4.xaxis.set_major_formatter(ConfigParams.FORMATTER) def redraw_calculate_coordination(self, plot_element=None): if plot_element is not None and plot_element["mode"] == "select": if self.selected_point0 is not None: self.selected_point0.remove() self.selected_point0 = None if self.selected_point1 is not None: self.selected_point1.remove() self.selected_point1 = None if self.stem_black0 is not None: self.stem_black0.remove() self.stem_black0 = None if self.stem_black1 is not None: self.stem_black1.remove() self.stem_black1 = None self.ax0.set_title( "front\ncorre_IIV: {}, corre_II: {}\nsame_sign_rate:{}, total_time_ratio: {}\nshift: {}, alignment offset: {} seconds\noffset_interval: {}, anchor_J: {}, anchor_R: {}".format( plot_element["front"]["correlation_IIV"], plot_element["front"]["correlation_II"], plot_element["front"]["same_sign_rate"], plot_element["front"]["total_time_ratio"], plot_element["front"]["shift"], plot_element["front"]["offset_interval_duration"], plot_element["front"]["offset_interval"], plot_element["front"]["anchor_J"], plot_element["front"]["anchor_R"])) self.ax2.set_title( "back\ncorre_IIV: {}, corre_II: {}\nsame_sign_rate:{}, total_time_ratio: {}\nshift: {}, alignment offset: {} seconds\noffset_interval: {}, anchor_J: {}, anchor_R: {}".format( plot_element["back"]["correlation_IIV"], plot_element["back"]["correlation_II"], plot_element["back"]["same_sign_rate"], plot_element["back"]["total_time_ratio"], plot_element["back"]["shift"], plot_element["back"]["offset_interval_duration"], plot_element["back"]["offset_interval"], plot_element["back"]["anchor_J"], plot_element["back"]["anchor_R"])) self.selected_point0, = self.ax0.plot(plot_element["front"]["shift"], plot_element["front"]["corre"][plot_element["front"]["shift"]] + 1, 'v', color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_SELECTED_POINT) self.stem_black0 = self.ax1.stem(arange(plot_element["front"]["shift"], plot_element["front"]["shift"] + len(plot_element["front"]["JJIVs"])), plot_element["front"]["JJIVs"], markerfmt="ko", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.selected_point1, = self.ax2.plot(plot_element["back"]["shift"], plot_element["back"]["corre"][plot_element["back"]["shift"]] + 1, 'v', color=Constants.PLOT_COLOR_RED, picker=True, pickradius=5, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_SELECTED_POINT) self.stem_black1 = self.ax3.stem(arange(plot_element["back"]["shift"], plot_element["back"]["shift"] + len(plot_element["back"]["JJIVs"])), plot_element["back"]["JJIVs"], markerfmt="ko", linefmt=Constants.PLOT_COLOR_GREEN, label=Constants.PRECISELY_ALIGN_PLOT_LABEL_JJIV) self.ax0.autoscale(False) self.ax2.autoscale(False) self.ax0.set_xlim(self.ax0_xlime) self.ax0.set_ylim(self.ax0_ylime) self.ax2.set_xlim(self.ax2_xlime) self.ax2.set_ylim(self.ax2_ylime) self.canvas.draw() return Result().success(info=Constants.DRAWING_FINISHED) return Result().failure(info=Constants.DRAWING_FAILURE) def redraw_correlation_align(self, plot_element=None): if plot_element is not None and plot_element["mode"] == "select": if self.selected_point0 is not None: self.selected_point0.remove() self.selected_point0 = None if self.selected_point1 is not None: self.selected_point1.remove() self.selected_point1 = None if self.selected_point2 is not None: self.selected_point2.remove() self.selected_point2 = None if self.selected_point3 is not None: self.selected_point3.remove() self.selected_point3 = None if len(self.data.correlation_align_point_match_ECG) > 0: self.selected_point0, = self.ax4.plot( [self.data.correlation_align_point_match_ECG[0], self.data.correlation_align_point_match_ECG[0]], [plot_element["b"], plot_element["a"]], 'b--') if len(self.data.correlation_align_point_match_ECG) == 2: self.selected_point1, = self.ax4.plot( [self.data.correlation_align_point_match_ECG[1], self.data.correlation_align_point_match_ECG[1]], [plot_element["b"], plot_element["a"]], 'b--') if len(self.data.correlation_align_point_match_BCG) > 0: self.selected_point2, = self.ax4.plot( [self.data.correlation_align_point_match_BCG[0], self.data.correlation_align_point_match_BCG[0]], [plot_element["b"], plot_element["a"]], 'r--') if len(self.data.correlation_align_point_match_BCG) == 2: self.selected_point3, = self.ax4.plot( [self.data.correlation_align_point_match_BCG[1], self.data.correlation_align_point_match_BCG[1]], [plot_element["b"], plot_element["a"]], 'r--') self.ax4.autoscale(False) self.ax4.set_xlim(self.ax4_xlime) self.ax4.set_ylim(self.ax4_ylime) self.canvas.draw() return Result().success(info=Constants.DRAWING_FINISHED) return Result().failure(info=Constants.DRAWING_FAILURE) def toggle_home(self): if self.ax0 is not None: self.ax0.autoscale(True) self.ax0.relim() self.ax0.autoscale_view() self.canvas.draw() self.ax0.autoscale(False) if self.ax1 is not None: self.ax1.autoscale(True) self.ax1.relim() self.ax1.autoscale_view() self.canvas.draw() self.ax1.autoscale(False) if self.ax2 is not None: self.ax2.autoscale(True) self.ax2.relim() self.ax2.autoscale_view() self.canvas.draw() self.ax2.autoscale(False) if self.ax3 is not None: self.ax3.autoscale(True) self.ax3.relim() self.ax3.autoscale_view() self.canvas.draw() self.ax3.autoscale(False) if self.ax4 is not None: self.ax4.autoscale(True) self.ax4.relim() self.ax4.autoscale_view() self.canvas.draw() self.ax4.autoscale(False) PublicFunc.text_output(self.ui, Constants.PRECISELY_ALIGN_RECOVER_SCALE, Constants.TIPS_TYPE_INFO) def toggle_getRange(self, state): if state: self.deactivate_figToolbar_buttons() self.figToolbar.action_Get_Range.setChecked(True) self.figToolbar.cid_mouse_press = self.canvas.mpl_connect( "button_press_event", self.on_click) self.figToolbar.cid_mouse_release = self.canvas.mpl_connect( "button_release_event", self.on_release) self.figToolbar.cid_mouse_hold = self.canvas.mpl_connect( "motion_notify_event", self.on_hold) else: if self.figToolbar.cid_mouse_press is not None: self.canvas.mpl_disconnect(self.figToolbar.cid_mouse_press) self.figToolbar.cid_mouse_press = None if self.figToolbar.cid_mouse_release is not None: self.canvas.mpl_disconnect(self.figToolbar.cid_mouse_release) self.figToolbar.cid_mouse_release = None if self.figToolbar.cid_mouse_hold: self.canvas.mpl_disconnect(self.figToolbar.cid_mouse_hold) self.figToolbar.cid_mouse_hold = None def deactivate_figToolbar_buttons(self): for action in self.figToolbar._actions.values(): if action.isChecked() == True: if action == self.figToolbar._actions['pan']: self.figToolbar.pan() if action == self.figToolbar._actions['zoom']: self.figToolbar.zoom() def on_click(self, event): if self.figToolbar.action_Get_Range.isChecked(): if event.button == 1: self.is_left_button_pressed = True self.figToolbar.rect_start_x = event.xdata # 如果矩形patch已存在,先移除 if self.ui.radioButton_BCG_front.isChecked(): if self.figToolbar.ax0_BCG_rectangle_front is not None: self.figToolbar.ax0_BCG_rectangle_front.remove() self.figToolbar.ax0_BCG_rectangle_front = None elif self.ui.radioButton_BCG_back.isChecked(): if self.figToolbar.ax0_BCG_rectangle_back is not None: self.figToolbar.ax0_BCG_rectangle_back.remove() self.figToolbar.ax0_BCG_rectangle_back = None elif self.ui.radioButton_ECG_front.isChecked(): if self.figToolbar.ax1_ECG_rectangle_front is not None: self.figToolbar.ax1_ECG_rectangle_front.remove() self.figToolbar.ax1_ECG_rectangle_front = None elif self.ui.radioButton_ECG_back.isChecked(): if self.figToolbar.ax1_ECG_rectangle_back is not None: self.figToolbar.ax1_ECG_rectangle_back.remove() self.figToolbar.ax1_ECG_rectangle_back = None else: raise ValueError("模式未选择") self.canvas.draw() def on_release(self, event): if self.figToolbar.action_Get_Range.isChecked(): if self.figToolbar.rect_start_x is not None: self.figToolbar.rect_end_x = event.xdata if self.figToolbar.rect_start_x is not None and self.figToolbar.rect_end_x is not None: if self.figToolbar.rect_start_x < self.figToolbar.rect_end_x: rect_left = self.figToolbar.rect_start_x rect_right = self.figToolbar.rect_end_x elif self.figToolbar.rect_start_x > self.figToolbar.rect_end_x: rect_left = self.figToolbar.rect_end_x rect_right = self.figToolbar.rect_start_x else: rect_left = self.figToolbar.rect_start_x rect_right = self.figToolbar.rect_start_x else: rect_left = self.figToolbar.rect_start_x rect_right = self.figToolbar.rect_start_x if event.button == 1 and self.is_left_button_pressed: self.is_left_button_pressed = False if self.ui.radioButton_BCG_front.isChecked() or self.ui.radioButton_BCG_back.isChecked(): if rect_left < 0: rect_left = 0 elif rect_left >= len(self.data.raw_BCG): rect_left = 0 rect_right = 0 if rect_right >= len(self.data.raw_BCG): rect_right = len(self.data.raw_BCG) - 1 elif rect_right < 0: rect_left = 0 rect_right = 0 indices = where((self.data.Jpeak[:-2] >= rect_left) & (self.data.Jpeak[:-2] <= rect_right))[0] if indices is None or len(indices) <= 0: if self.ui.radioButton_BCG_front.isChecked(): Config["IV_Coordinate"]["BCG_front_1"] = 0 Config["IV_Coordinate"]["BCG_front_2"] = 0 Config["Coordinate"]["BCG_front_1"] = 0 Config["Coordinate"]["BCG_front_2"] = 0 elif self.ui.radioButton_BCG_back.isChecked(): Config["IV_Coordinate"]["BCG_back_1"] = 0 Config["IV_Coordinate"]["BCG_back_2"] = 0 Config["Coordinate"]["BCG_back_1"] = 0 Config["Coordinate"]["BCG_back_2"] = 0 PublicFunc.text_output(self.ui, Constants.PRECISELY_ALIGN_NO_POINT_IN_THE_INTERVAL, Constants.TIPS_TYPE_INFO) else: if self.ui.radioButton_BCG_front.isChecked(): Config["IV_Coordinate"]["BCG_front_1"] = indices[0] Config["IV_Coordinate"]["BCG_front_2"] = indices[-1] Config["Coordinate"]["BCG_front_1"] = self.data.Jpeak[:-2][indices[0]] Config["Coordinate"]["BCG_front_2"] = self.data.Jpeak[:-2][indices[-1]] elif self.ui.radioButton_BCG_back.isChecked(): Config["IV_Coordinate"]["BCG_back_1"] = indices[0] Config["IV_Coordinate"]["BCG_back_2"] = indices[-1] Config["Coordinate"]["BCG_back_1"] = self.data.Jpeak[:-2][indices[0]] Config["Coordinate"]["BCG_back_2"] = self.data.Jpeak[:-2][indices[-1]] elif self.ui.radioButton_ECG_front.isChecked() or self.ui.radioButton_ECG_back.isChecked(): if rect_left < 0: rect_left = 0 elif rect_left >= len(self.data.raw_ECG): rect_left = 0 rect_right = 0 if rect_right >= len(self.data.raw_ECG): rect_right = len(self.data.raw_ECG) - 1 elif rect_right < 0: rect_left = 0 rect_right = 0 indices = where((self.data.Rpeak[:-2] >= rect_left) & (self.data.Rpeak[:-2] <= rect_right))[0] if indices is None or len(indices) <= 0: if self.ui.radioButton_ECG_front.isChecked(): Config["IV_Coordinate"]["ECG_front_1"] = 0 Config["IV_Coordinate"]["ECG_front_2"] = 0 Config["Coordinate"]["ECG_front_1"] = 0 Config["Coordinate"]["ECG_front_2"] = 0 elif self.ui.radioButton_ECG_back.isChecked(): Config["IV_Coordinate"]["ECG_back_1"] = 0 Config["IV_Coordinate"]["ECG_back_2"] = 0 Config["Coordinate"]["ECG_back_1"] = 0 Config["Coordinate"]["ECG_back_2"] = 0 PublicFunc.text_output(self.ui, Constants.PRECISELY_ALIGN_NO_POINT_IN_THE_INTERVAL, Constants.TIPS_TYPE_INFO) else: if self.ui.radioButton_ECG_front.isChecked(): Config["IV_Coordinate"]["ECG_front_1"] = indices[0] Config["IV_Coordinate"]["ECG_front_2"] = indices[-1] Config["Coordinate"]["ECG_front_1"] = self.data.Rpeak[:-2][indices[0]] Config["Coordinate"]["ECG_front_2"] = self.data.Rpeak[:-2][indices[-1]] elif self.ui.radioButton_ECG_back.isChecked(): Config["IV_Coordinate"]["ECG_back_1"] = indices[0] Config["IV_Coordinate"]["ECG_back_2"] = indices[-1] Config["Coordinate"]["ECG_back_1"] = self.data.Rpeak[:-2][indices[0]] Config["Coordinate"]["ECG_back_2"] = self.data.Rpeak[:-2][indices[-1]] self.figToolbar.rect_start_x = None self.figToolbar.rect_end_x = None self.__update_info__() self.canvas.draw() def on_hold(self, event): if event.button == 1: if self.figToolbar.rect_start_x is not None and event.xdata is not None: self.figToolbar.rect_end_x = event.xdata # 更新矩形patch的位置和大小 x_start = self.figToolbar.rect_start_x x_end = self.figToolbar.rect_end_x # 如果矩形patch不存在,则创建一个新的 if self.figToolbar.ax0_BCG_rectangle_front is None and self.is_left_button_pressed: self.figToolbar.ax0_BCG_rectangle_front = patches.Rectangle((0, 0), 1, 1, fill=True, alpha=ConfigParams.PRECISELY_ALIGN_LABEL_TRANSPARENCY, color=Constants.PLOT_COLOR_PINK) self.ax0.add_patch(self.figToolbar.ax0_BCG_rectangle_front) if self.figToolbar.ax0_BCG_rectangle_back is None and self.is_left_button_pressed: self.figToolbar.ax0_BCG_rectangle_back = patches.Rectangle((0, 0), 1, 1, fill=True, alpha=ConfigParams.PRECISELY_ALIGN_LABEL_TRANSPARENCY, color=Constants.PLOT_COLOR_PINK) self.ax0.add_patch(self.figToolbar.ax0_BCG_rectangle_back) if self.figToolbar.ax1_ECG_rectangle_front is None and self.is_left_button_pressed: self.figToolbar.ax1_ECG_rectangle_front = patches.Rectangle((0, 0), 1, 1, fill=True, alpha=ConfigParams.PRECISELY_ALIGN_LABEL_TRANSPARENCY, color=Constants.PLOT_COLOR_PINK) self.ax1.add_patch(self.figToolbar.ax1_ECG_rectangle_front) if self.figToolbar.ax1_ECG_rectangle_back is None and self.is_left_button_pressed: self.figToolbar.ax1_ECG_rectangle_back = patches.Rectangle((0, 0), 1, 1, fill=True, alpha=ConfigParams.PRECISELY_ALIGN_LABEL_TRANSPARENCY, color=Constants.PLOT_COLOR_PINK) self.ax1.add_patch(self.figToolbar.ax1_ECG_rectangle_back) if self.ui.radioButton_BCG_front.isChecked(): self.figToolbar.ax0_BCG_rectangle_front.set_xy((min(x_start, x_end), self.rect_down)) self.figToolbar.ax0_BCG_rectangle_front.set_width(abs(x_end - x_start)) self.figToolbar.ax0_BCG_rectangle_front.set_height(self.rect_up - self.rect_down) elif self.ui.radioButton_BCG_back.isChecked(): self.figToolbar.ax0_BCG_rectangle_back.set_xy((min(x_start, x_end), self.rect_down)) self.figToolbar.ax0_BCG_rectangle_back.set_width(abs(x_end - x_start)) self.figToolbar.ax0_BCG_rectangle_back.set_height(self.rect_up - self.rect_down) elif self.ui.radioButton_ECG_front.isChecked(): self.figToolbar.ax1_ECG_rectangle_front.set_xy((min(x_start, x_end), self.rect_down)) self.figToolbar.ax1_ECG_rectangle_front.set_width(abs(x_end - x_start)) self.figToolbar.ax1_ECG_rectangle_front.set_height(self.rect_up - self.rect_down) elif self.ui.radioButton_ECG_back.isChecked(): self.figToolbar.ax1_ECG_rectangle_back.set_xy((min(x_start, x_end), self.rect_down)) self.figToolbar.ax1_ECG_rectangle_back.set_width(abs(x_end - x_start)) self.figToolbar.ax1_ECG_rectangle_back.set_height(self.rect_up - self.rect_down) self.canvas.draw() def on_pick(self, event): this_line = event.artist if this_line.axes == self.ax0: xdata = this_line.get_xdata() ind = event.ind[-1] shift_front = int(xdata[ind]) self.__slot_btn_calculate_correlation__(shift_front=shift_front) elif this_line.axes == self.ax2: xdata = this_line.get_xdata() ind = event.ind[-1] shift_back = int(xdata[ind]) self.__slot_btn_calculate_correlation__(shift_back=shift_back) elif this_line.axes == self.ax4: xdata = this_line.get_xdata() ind = event.ind[-1] nm = int(xdata[ind]) if this_line == self.point0: if nm in self.data.correlation_align_point_match_ECG: self.data.correlation_align_point_match_ECG = delete(self.data.correlation_align_point_match_ECG, where(self.data.correlation_align_point_match_ECG == nm)[0]) elif len(self.data.correlation_align_point_match_ECG) < 2: self.data.correlation_align_point_match_ECG = append(self.data.correlation_align_point_match_ECG, nm) elif this_line == self.point1: if nm in self.data.correlation_align_point_match_BCG: self.data.correlation_align_point_match_BCG = delete(self.data.correlation_align_point_match_BCG, where(self.data.correlation_align_point_match_BCG == nm)[0]) elif len(self.data.correlation_align_point_match_BCG) < 2: self.data.correlation_align_point_match_BCG = append(self.data.correlation_align_point_match_BCG, nm) else: raise ValueError("this_line不存在") self.__slot_btn_correlation_align__() class Data: def __init__(self): self.raw_orgBcg = None self.raw_BCG = None self.Jpeak = None self.Jpeak_y = None self.raw_ECG = None self.Rpeak = None self.Rpeak_y = None self.approximately_align_pos = None self.BCG_early = None self.res_orgBcg = None self.res_BCG = None self.cut_ECG = None self.cut_Jpeak = None self.cut_Rpeak = None self.RRIs = None self.JJIs = None self.JJIs0_front = None self.RRIs0_front = None self.JJIVs_front = None self.RRIVs_front = None self.JJIs0_back = None self.RRIs0_back = None self.JJIVs_back = None self.RRIVs_back = None self.corre_front = None self.corre_back = None self.correlation_align_point_match_ECG = array([]).astype(int) self.correlation_align_point_match_BCG = array([]).astype(int) self.argmax_BCG = None self.argmax_ECG = None def open_file(self): if Path(Config["Path"]["Input_OrgBCG"]).is_file(): Config["Path"]["Input_OrgBCG"] = str(Path(Config["Path"]["Input_OrgBCG"]).parent) if Path(Config["Path"]["Input_BCG"]).is_file(): Config["Path"]["Input_BCG"] = str(Path(Config["Path"]["Input_BCG"]).parent) if Path(Config["Path"]["Input_Jpeak"]).is_file(): Config["Path"]["Input_Jpeak"] = str(Path(Config["Path"]["Input_Jpeak"]).parent) if Path(Config["Path"]["Input_ECG"]).is_file(): Config["Path"]["Input_ECG"] = str(Path(Config["Path"]["Input_ECG"]).parent) if Path(Config["Path"]["Input_Rpeak"]).is_file(): Config["Path"]["Input_Rpeak"] = str(Path(Config["Path"]["Input_Rpeak"]).parent) if Path(Config["Path"]["Input_Approximately_Align"]).is_file(): Config["Path"]["Input_Approximately_Align"] = str(Path(Config["Path"]["Input_Approximately_Align"]).parent) result = PublicFunc.examine_file(Config["Path"]["Input_OrgBCG"], ConfigParams.ORGBCG_RAW) if result.status: Config["Path"]["Input_OrgBCG"] = result.data["path"] Config["InputConfig"]["orgBcgFreq"] = result.data["freq"] Config["Path"]["Input_Approximately_Align"] = str( Path(Config["Path"]["Input_Approximately_Align"]) / Path( ConfigParams.APPROXIMATELY_ALIGN_INFO + ConfigParams.ENDSWITH_CSV)) Config["Path"]["Save_AlignInfo"] = str( Path(Config["Path"]["Save_AlignInfo"]) / Path( ConfigParams.PRECISELY_ALIGN_INFO + ConfigParams.ENDSWITH_TXT)) Config["Path"]["Save_OrgBCG"] = str( Path(Config["Path"]["Save_OrgBCG"]) / Path( ConfigParams.ORGBCG_SYNC + str(Config["InputConfig"]["orgBcgFreq"]) + ConfigParams.ENDSWITH_TXT)) else: return result result = PublicFunc.examine_file(Config["Path"]["Input_BCG"], ConfigParams.BCG_FILTER) if result.status: Config["Path"]["Input_BCG"] = result.data["path"] Config["InputConfig"]["BCGFreq"] = result.data["freq"] Config["Path"]["Input_Jpeak"] = str( Path(Config["Path"]["Input_Jpeak"]) / Path(ConfigParams.JPEAK_REVISE_CORRECTED + str(Config["InputConfig"]["BCGFreq"]) + ConfigParams.ENDSWITH_TXT)) Config["Path"]["Save_BCG"] = str( Path(Config["Path"]["Save_BCG"]) / Path( ConfigParams.BCG_SYNC + str(Config["InputConfig"]["BCGFreq"]) + ConfigParams.ENDSWITH_TXT)) Config["Path"]["Save_Jpeak"] = str( Path(Config["Path"]["Save_Jpeak"]) / Path( ConfigParams.JPEAK_SYNC + str(Config["InputConfig"]["BCGFreq"]) + ConfigParams.ENDSWITH_TXT)) else: return result result = PublicFunc.examine_file(Config["Path"]["Input_ECG"], ConfigParams.ECG_FILTER) if result.status: Config["Path"]["Input_ECG"] = result.data["path"] Config["InputConfig"]["ECGFreq"] = result.data["freq"] Config["Path"]["Input_Rpeak"] = str( Path(Config["Path"]["Input_Rpeak"]) / Path(ConfigParams.RPEAK_FINAL_CORRECTED + str(Config["InputConfig"]["ECGFreq"]) + ConfigParams.ENDSWITH_TXT)) Config["Path"]["Save_ECG"] = str( Path(Config["Path"]["Save_ECG"]) / Path( ConfigParams.ECG_SYNC + str(Config["InputConfig"]["ECGFreq"]) + ConfigParams.ENDSWITH_TXT)) Config["Path"]["Save_Rpeak"] = str( Path(Config["Path"]["Save_Rpeak"]) / Path( ConfigParams.RPEAK_SYNC + str(Config["InputConfig"]["ECGFreq"]) + ConfigParams.ENDSWITH_TXT)) else: return result if ((not Path(Config["Path"]["Input_OrgBCG"]).exists()) or (not Path(Config["Path"]["Input_BCG"]).exists()) or (not Path(Config["Path"]["Input_Jpeak"]).exists()) or (not Path(Config["Path"]["Input_ECG"]).exists()) or (not Path(Config["Path"]["Input_Rpeak"]).exists())): return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: self.raw_orgBcg = read_csv(Config["Path"]["Input_OrgBCG"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.raw_BCG = read_csv(Config["Path"]["Input_BCG"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.Jpeak = read_csv(Config["Path"]["Input_Jpeak"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.raw_ECG = read_csv(Config["Path"]["Input_ECG"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.Rpeak = read_csv(Config["Path"]["Input_Rpeak"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.argmax_BCG = np_argmax(self.raw_BCG) self.argmax_ECG = np_argmax(self.raw_ECG) except Exception as e: return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc()) try: df = read_csv(Config["Path"]["Input_Approximately_Align"]) pos = df["pos"].values[-1] ApplyFrequency = df["ApplyFrequency"].values[-1] self.approximately_align_pos = int(pos * (Config["InputConfig"]["UseFreq"] / ApplyFrequency)) if self.approximately_align_pos > 0: self.BCG_early = False elif self.approximately_align_pos < 0: self.BCG_early = True else: self.approximately_align_pos = 0 self.BCG_early = None except Exception: self.approximately_align_pos = 0 return Result().success(info=Constants.INPUT_FINISHED) def data_process_for_calculate_correlation(self): if self.Jpeak is None or self.Rpeak is None: return Result().failure(info=Constants.PRECISELY_ALIGN_PROCESS_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: self.JJIs = diff(self.Jpeak) JJIVs = diff(self.JJIs) self.RRIs = diff(self.Rpeak) RRIVs = diff(self.RRIs) result = {"JJIVs": JJIVs, "RRIVs": RRIVs} except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_PROCESS_FAILURE + Constants.FAILURE_REASON["Process_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_PROCESS_FINISHED, data=result) def calculate_correlation_front(self, mode, shift_front=None): if ((Config["IV_Coordinate"]["BCG_front_1"] == Config["IV_Coordinate"]["BCG_front_2"]) or (Config["IV_Coordinate"]["ECG_front_1"] == Config["IV_Coordinate"]["ECG_front_2"])): return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_FRONT + Constants.FAILURE_REASON["Calculate_Correlation_Value_Equal"]) if ((Config["IV_Coordinate"]["BCG_front_2"] - Config["IV_Coordinate"]["BCG_front_1"]) >= (Config["IV_Coordinate"]["ECG_front_2"] - Config["IV_Coordinate"]["ECG_front_1"])): return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_FRONT + Constants.FAILURE_REASON["Calculate_Correlation_JJIVRange_too_Large"]) try: if mode == "init": self.JJIs0_front = self.JJIs[Config["IV_Coordinate"]["BCG_front_1"]:Config["IV_Coordinate"]["BCG_front_2"]] self.RRIs0_front = self.RRIs[Config["IV_Coordinate"]["ECG_front_1"]:Config["IV_Coordinate"]["ECG_front_2"]] self.JJIVs_front = diff(self.JJIs0_front) self.RRIVs_front = diff(self.RRIs0_front) self.corre_front = correlate(self.RRIVs_front, self.JJIVs_front, 'valid') shift = np_argmax(self.corre_front) else: if shift_front is None: shift = Config["front"]["shift"] else: shift = shift_front RRIVs_cut = self.RRIVs_front[shift:shift + len(self.JJIVs_front)] RRIs_cut = self.RRIs0_front[shift:shift + len(self.JJIs0_front)] correlation = corrcoef(RRIVs_cut, self.JJIVs_front) correlation_IIV = (correlation[0, 1] + correlation[1, 0]) / 2 correlation = corrcoef(RRIs_cut, self.JJIs0_front) correlation_II = (correlation[0, 1] + correlation[1, 0]) / 2 same_sign_rate = np_sum(RRIVs_cut * self.JJIVs_front > 0) / len(self.JJIVs_front) total_time_ratio = np_sum(self.JJIs0_front) / np_sum(self.RRIs0_front[shift:shift + len(self.JJIs0_front)]) Jpeak_cut = self.Jpeak[Config["IV_Coordinate"]["BCG_front_1"]:Config["IV_Coordinate"]["BCG_front_2"] + 2] Rpeak_cut = self.Rpeak[Config["IV_Coordinate"]["ECG_front_1"]:Config["IV_Coordinate"]["ECG_front_2"] + 2] RRI = diff(Rpeak_cut) offset_interval = np_sum(RRI[:shift]) - (Jpeak_cut[0] - Rpeak_cut[0]) anchor_J = self.Jpeak[Config["IV_Coordinate"]["BCG_front_1"]] anchor_R = self.Rpeak[Config["IV_Coordinate"]["ECG_front_1"] + shift] offset_interval_duration = offset_interval / Config["InputConfig"]["UseFreq"] Config["front"]["shift"] = shift Config["front"]["offset_interval"] = offset_interval Config["front"]["anchor_J"] = anchor_J Config["front"]["anchor_R"] = anchor_R Config["front"]["offset_interval_duration"] = offset_interval_duration result = { "front": { "correlation_IIV": correlation_IIV, "correlation_II": correlation_II, "same_sign_rate": same_sign_rate, "total_time_ratio": total_time_ratio, "shift": shift, "corre": self.corre_front, "JJIVs": self.JJIVs_front, "RRIVs": self.RRIVs_front, "offset_interval": offset_interval, "anchor_J": anchor_J, "anchor_R": anchor_R, "offset_interval_duration": offset_interval_duration } } except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_FRONT + Constants.FAILURE_REASON["Calculate_Correlation_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_CALCULATE_FINISHED_FRONT, data=result) def calculate_correlation_back(self, mode, shift_back=None): if ((Config["IV_Coordinate"]["BCG_back_1"] == Config["IV_Coordinate"]["BCG_back_2"]) or (Config["IV_Coordinate"]["ECG_back_1"] == Config["IV_Coordinate"]["ECG_back_2"])): return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_BACK + Constants.FAILURE_REASON["Calculate_Correlation_Value_Equal"]) if ((Config["IV_Coordinate"]["BCG_back_2"] - Config["IV_Coordinate"]["BCG_back_1"]) >= (Config["IV_Coordinate"]["ECG_back_2"] - Config["IV_Coordinate"]["ECG_back_1"])): return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_BACK + Constants.FAILURE_REASON["Calculate_Correlation_JJIVRange_too_Large"]) try: if mode == "init": self.JJIs0_back = self.JJIs[Config["IV_Coordinate"]["BCG_back_1"]:Config["IV_Coordinate"]["BCG_back_2"]] self.RRIs0_back = self.RRIs[Config["IV_Coordinate"]["ECG_back_1"]:Config["IV_Coordinate"]["ECG_back_2"]] self.JJIVs_back = diff(self.JJIs0_back) self.RRIVs_back = diff(self.RRIs0_back) self.corre_back = correlate(self.RRIVs_back, self.JJIVs_back, 'valid') shift = np_argmax(self.corre_back) else: if shift_back is None: shift = Config["back"]["shift"] else: shift = shift_back RRIVs_cut = self.RRIVs_back[shift:shift + len(self.JJIVs_back)] RRIs_cut = self.RRIs0_back[shift:shift + len(self.JJIs0_back)] correlation = corrcoef(RRIVs_cut, self.JJIVs_back) correlation_IIV = (correlation[0, 1] + correlation[1, 0]) / 2 correlation = corrcoef(RRIs_cut, self.JJIs0_back) correlation_II = (correlation[0, 1] + correlation[1, 0]) / 2 same_sign_rate = np_sum(RRIVs_cut * self.JJIVs_back > 0) / len(self.JJIVs_back) total_time_ratio = np_sum(self.JJIs0_back) / np_sum(self.RRIs0_back[shift:shift + len(self.JJIs0_back)]) Jpeak_cut = self.Jpeak[Config["IV_Coordinate"]["BCG_back_1"]:Config["IV_Coordinate"]["BCG_back_2"] + 2] Rpeak_cut = self.Rpeak[Config["IV_Coordinate"]["ECG_back_1"]:Config["IV_Coordinate"]["ECG_back_2"] + 2] RRI = diff(Rpeak_cut) offset_interval = np_sum(RRI[:shift]) - (Jpeak_cut[0] - Rpeak_cut[0]) anchor_J = self.Jpeak[Config["IV_Coordinate"]["BCG_back_1"]] anchor_R = self.Rpeak[Config["IV_Coordinate"]["ECG_back_1"] + shift] offset_interval_duration = offset_interval / Config["InputConfig"]["UseFreq"] Config["back"]["shift"] = shift Config["back"]["offset_interval"] = offset_interval Config["back"]["anchor_J"] = anchor_J Config["back"]["anchor_R"] = anchor_R Config["back"]["offset_interval_duration"] = offset_interval_duration result = { "back": { "correlation_IIV": correlation_IIV, "correlation_II": correlation_II, "same_sign_rate": same_sign_rate, "total_time_ratio": total_time_ratio, "shift": shift, "corre": self.corre_back, "JJIVs": self.JJIVs_back, "RRIVs": self.RRIVs_back, "offset_interval": offset_interval, "anchor_J": anchor_J, "anchor_R": anchor_R, "offset_interval_duration": offset_interval_duration } } except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_CALCULATE_FAILURE_BACK + Constants.FAILURE_REASON["Calculate_Correlation_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_CALCULATE_FINISHED_BACK, data=result) def correlation_align(self, mode): try: if mode == "init": anchor0 = [Config["front"]["anchor_R"], Config["front"]["anchor_J"]] anchor1 = [Config["back"]["anchor_R"], Config["back"]["anchor_J"]] Config["orgfs"] = ((int(anchor1[1]) - int(anchor0[1])) * Config["InputConfig"]["UseFreq"] / (int(anchor1[0]) - int(anchor0[0]))) Config["offset_anchor"] = anchor0[0] - anchor0[1] orgfs = Config["orgfs"] off = Config["offset_anchor"] self.res_orgBcg = self.raw_orgBcg.copy() self.res_BCG = self.raw_BCG.copy() self.cut_ECG = self.raw_ECG.copy() self.cut_Rpeak = self.Rpeak.copy() if off > 0: self.cut_ECG = self.cut_ECG[off:] anchor0[0] = anchor0[0] - off anchor1[0] = anchor1[0] - off idxs = where(self.cut_Rpeak > off)[0] self.cut_Rpeak = self.cut_Rpeak[idxs] - off else: self.res_BCG = self.res_BCG[-off:] self.res_orgBcg = self.res_orgBcg[-off:] anchor0[1] = anchor0[1] + off anchor1[1] = anchor1[1] + off self.res_BCG = resample(self.res_BCG, orgfs, Config["InputConfig"]["UseFreq"]) self.res_orgBcg = resample(self.res_orgBcg, orgfs, Config["InputConfig"]["UseFreq"]) anchor0[1] = round(int(anchor0[1]) * Config["InputConfig"]["UseFreq"] / orgfs) anchor1[1] = round(int(anchor1[1]) * Config["InputConfig"]["UseFreq"] / orgfs) off = anchor1[0] - anchor1[1] if off > 0: self.cut_ECG = self.cut_ECG[off:] anchor0[0] = anchor0[0] - off anchor1[0] = anchor1[0] - off idxs = where(self.cut_Rpeak > off)[0] self.cut_Rpeak = self.cut_Rpeak[idxs] - off else: self.res_BCG = self.res_BCG[-off:] self.res_orgBcg = self.res_orgBcg[-off:] anchor0[1] = anchor0[1] + off anchor1[1] = anchor1[1] + off datalen = np_min([len(self.cut_ECG), len(self.res_BCG)]) self.cut_ECG = self.cut_ECG[:datalen] self.res_BCG = self.res_BCG[:datalen] self.res_orgBcg = self.res_orgBcg[:datalen] a = np_max([np_max(self.cut_ECG), np_max(self.res_BCG)]) b = np_min([np_min(self.cut_ECG), np_min(self.res_BCG)]) peak_ECG, _ = find_peaks(self.cut_ECG) peak_BCG, _ = find_peaks(self.res_BCG) idxs = where(self.cut_Rpeak < datalen)[0] self.cut_Rpeak = self.cut_Rpeak[idxs] result = { "res_BCG": self.res_BCG, "cut_ECG": self.cut_ECG, "anchor00": anchor0[0], "anchor10": anchor1[0], "a": a, "b": b, "peak_ECG": peak_ECG, "peak_BCG": peak_BCG } elif mode == "select": a = np_max([np_max(self.raw_ECG), np_max(self.raw_BCG)]) b = np_min([np_min(self.raw_ECG), np_min(self.raw_BCG)]) result = { "a": a, "b": b } else: raise ValueError("模式不存在") except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_ALIGN_CORRELATION_FAILURE + Constants.FAILURE_REASON["Correlation_Align_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_ALIGN_CORRELATION_FINISHED, data=result) def data_postprocess(self): try: if len(self.correlation_align_point_match_ECG) != 2 and len(self.correlation_align_point_match_BCG) != 2: off = 0 else: self.correlation_align_point_match_ECG.sort() self.correlation_align_point_match_BCG.sort() off = round(((int(self.correlation_align_point_match_ECG[1]) - int(self.correlation_align_point_match_BCG[1])) + (int(self.correlation_align_point_match_ECG[0]) - int(self.correlation_align_point_match_BCG[0]))) / 2) anchor0 = [Config["front"]["anchor_R"], Config["front"]["anchor_J"]] anchor1 = [Config["back"]["anchor_R"], Config["back"]["anchor_J"]] if off > 0: self.cut_ECG = self.cut_ECG[off:] anchor0[0] = anchor0[0] - off anchor1[0] = anchor1[0] - off self.cut_Rpeak = self.cut_Rpeak[where(self.cut_Rpeak > off)[0]] - off else: self.res_BCG = self.res_BCG[-off:] self.res_orgBcg = self.res_orgBcg[-off:] anchor0[1] = anchor0[1] + off anchor1[1] = anchor1[1] + off datalen = np_min([len(self.cut_ECG), len(self.res_BCG)]) self.cut_ECG = self.cut_ECG[:datalen] self.res_BCG = self.res_BCG[:datalen] self.res_orgBcg = self.res_orgBcg[:datalen] idxs = where(self.cut_Rpeak < datalen)[0] self.cut_Rpeak = self.cut_Rpeak[idxs] Config["offset_correct"] = off self.cut_Jpeak = [] peaks, _ = find_peaks(self.res_BCG) for i in self.cut_Rpeak: tmp = np_abs(peaks - i) idx = np_argmin(tmp) self.cut_Jpeak.append(peaks[idx]) self.cut_Jpeak = asarray(self.cut_Jpeak).astype(int) frontcut_index_BCG = int((self.argmax_BCG - np_argmax(self.res_BCG) / Config["InputConfig"]["UseFreq"] * Config["orgfs"])) backcut_index_BCG = int(len(self.res_BCG) / Config["InputConfig"]["UseFreq"] * Config["orgfs"] + np_argmax(self.raw_BCG) - np_argmax(self.res_BCG) / Config["InputConfig"]["UseFreq"] * Config["orgfs"]) frontcut_index_ECG = self.argmax_ECG - np_argmax(self.cut_ECG) backcut_index_ECG = len(self.cut_ECG) + self.argmax_ECG - np_argmax(self.cut_ECG) Config["frontcut_index_BCG"] = frontcut_index_BCG Config["backcut_index_BCG"] = backcut_index_BCG Config["frontcut_index_ECG"] = frontcut_index_ECG Config["backcut_index_ECG"] = backcut_index_ECG except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_POSTPROCESS_VIEW_FAILURE + Constants.FAILURE_REASON["PostProcess_Align_Exception"] + "\n" + format_exc()) info = f"{Constants.PRECISELY_ALIGN_POSTPROCESS_VIEW_FINISHED},BCG前后段被切割的坐标值为[{frontcut_index_BCG}, {backcut_index_BCG}],ECG前后段被切割的坐标值为[{frontcut_index_ECG}, {backcut_index_ECG}]" return Result().success(info=info) def save_alignInfo(self): try: save_data = { "front": { "N_JJIV": [Config["IV_Coordinate"]["BCG_front_1"], Config["IV_Coordinate"]["BCG_front_2"]], "BCG_coordinate": [Config["Coordinate"]["BCG_front_1"], Config["Coordinate"]["BCG_front_2"]], "N_RRIV": [Config["IV_Coordinate"]["ECG_front_1"], Config["IV_Coordinate"]["ECG_front_2"]], "ECG_coordinate": [Config["Coordinate"]["ECG_front_1"], Config["Coordinate"]["ECG_front_2"]], "shift": Config["front"]["shift"], "offset_interval": Config["front"]["offset_interval"], "offset_interval_duration": Config["front"]["offset_interval_duration"], "anchorsRJ": [Config["front"]["anchor_R"], Config["front"]["anchor_J"]] }, "back": { "N_JJIV": [Config["IV_Coordinate"]["BCG_back_1"], Config["IV_Coordinate"]["BCG_back_2"]], "BCG_coordinate": [Config["Coordinate"]["BCG_back_1"], Config["Coordinate"]["BCG_back_2"]], "N_RRIV": [Config["IV_Coordinate"]["ECG_back_1"], Config["IV_Coordinate"]["ECG_back_2"]], "ECG_coordinate": [Config["Coordinate"]["ECG_back_1"], Config["Coordinate"]["ECG_back_2"]], "shift": Config["back"]["shift"], "offset_interval": Config["back"]["offset_interval"], "offset_interval_duration": Config["back"]["offset_interval_duration"], "anchorsRJ": [Config["back"]["anchor_R"], Config["back"]["anchor_J"]] }, "offset_anchor": Config["offset_anchor"], "orgfs": Config["orgfs"], "cut_index": { "front_BCG": Config["frontcut_index_BCG"], "back_BCG": Config["backcut_index_BCG"], "front_ECG": Config["frontcut_index_ECG"], "back_ECG": Config["backcut_index_ECG"] } } save_data = [str(save_data)] DataFrame(save_data).to_csv(Config["Path"]["Save_AlignInfo"], index=False, header=False) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_ALIGNINFO_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_ALIGNINFO_FINISHED) def save_res_orgBcg(self, chunk): if self.res_orgBcg is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_RES_ORGBCG_FAILURE + Constants.FAILURE_REASON["res_orgBcg_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_OrgBCG"], mode='a', index=False, header=False) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_RES_ORGBCG_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_RES_ORGBCG_FINISHED) def save_res_BCG(self, chunk): if self.res_BCG is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_RES_BCG_FAILURE + Constants.FAILURE_REASON["res_BCG_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_BCG"], mode='a', index=False, header=False) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_RES_BCG_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_RES_BCG_FINISHED) def save_cut_ECG(self, chunk): if self.cut_ECG is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_ECG_FAILURE + Constants.FAILURE_REASON["cut_ECG_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_ECG"], mode='a', index=False, header=False) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_ECG_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_CUT_ECG_FINISHED) def save_Jpeak(self, chunk): if self.cut_Jpeak is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_JPEAK_FAILURE + Constants.FAILURE_REASON["cut_Jpeak_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_Jpeak"], mode='a', index=False, header=False) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_JPEAK_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_CUT_JPEAK_FINISHED) def save_Rpeak(self, chunk): if self.cut_Rpeak is None: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_RPEAK_FAILURE + Constants.FAILURE_REASON["cut_Rpeak_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save_Rpeak"], mode='a', index=False, header=False) except Exception as e: return Result().failure(info=Constants.PRECISELY_ALIGN_SAVING_CUT_RPEAK_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PRECISELY_ALIGN_SAVING_CUT_RPEAK_FINISHED) class CustomNavigationToolbar(NavigationToolbar2QT): def __init__(self, canvas, parent): super().__init__(canvas, parent) # 初始化画框工具栏 self.action_Get_Range = QAction(Constants.PRECISELY_ALIGN_ACTION_GET_RANGE_NAME, self) self.action_Get_Range.setFont(QFont(ConfigParams.FONT, 14)) self.action_Get_Range.setCheckable(True) self.action_Get_Range.setShortcut(QCoreApplication.translate( "MainWindow", ConfigParams.PRECISELY_ALIGN_ACTION_GET_RANGE_SHORTCUT_KEY)) self.insertAction(self._actions['pan'], self.action_Get_Range) self._actions['pan'].setShortcut(QCoreApplication.translate( "MainWindow", ConfigParams.ACTION_PAN_SHORTCUT_KEY)) self._actions['zoom'].setShortcut(QCoreApplication.translate( "MainWindow", ConfigParams.ACTION_ZOOM_SHORTCUT_KEY)) # 用于存储事件连接ID self.cid_mouse_press = None self.cid_mouse_release = None self.cid_mouse_hold = None # 初始化矩形选择区域 self.rect_start_x = None self.rect_end_x = None self.ax0_BCG_rectangle_front = None self.ax0_BCG_rectangle_back = None self.ax1_ECG_rectangle_front = None self.ax1_ECG_rectangle_back = None def home(self, *args): pass def zoom(self, *args): super().zoom(*args) self.deactivate_figToorbar_getRange_mode() def pan(self, *args): super().pan(*args) self.deactivate_figToorbar_getRange_mode() def deactivate_figToorbar_getRange_mode(self): if self.action_Get_Range.isChecked(): self.action_Get_Range.setChecked(False) if self.cid_mouse_press is not None: self.canvas.mpl_disconnect(self.cid_mouse_press) self.cid_mouse_press = None if self.cid_mouse_release is not None: self.canvas.mpl_disconnect(self.cid_mouse_release) self.cid_mouse_release = None if self.cid_mouse_hold is not None: self.canvas.mpl_disconnect(self.cid_mouse_hold) self.cid_mouse_hold = None