from gc import collect from pathlib import Path import matplotlib.pyplot as plt from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication from matplotlib.backends.backend_qt import NavigationToolbar2QT from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure from numba import prange, njit from numpy import repeat, convolve, ones, mean, std, empty, int64, sum as np_sum, pad, zeros, array, argmax, linspace, \ diff from overrides import overrides from pandas import read_csv, DataFrame from scipy.signal import find_peaks, resample, butter, sosfiltfilt from yaml import dump, load, FullLoader from func.utils.PublicFunc import PublicFunc from func.utils.Constants import Constants, ConfigParams from func.utils.Result import Result from ui.MainWindow.MainWindow_approximately_align import Ui_MainWindow_approximately_align from ui.setting.approximately_align_input_setting import Ui_MainWindow_approximately_align_input_setting Config = { } ButtonState = { "Default": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_Standardize": False, "pushButton_CutOff": False, "pushButton_GetPos": False, "pushButton_JUMP": False, "pushButton_EM1": False, "pushButton_EM10": False, "pushButton_EM100": False, "pushButton_EP1": False, "pushButton_EP10": False, "pushButton_EP100": False, "pushButton_save": False }, "Current": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_Standardize": False, "pushButton_CutOff": False, "pushButton_GetPos": False, "pushButton_JUMP": False, "pushButton_EM1": False, "pushButton_EM10": False, "pushButton_EM100": False, "pushButton_EP1": False, "pushButton_EP10": False, "pushButton_EP100": False, "pushButton_save": False } } class SettingWindow(QMainWindow): def __init__(self, root_path, sampID): super(SettingWindow, self).__init__() self.ui = Ui_MainWindow_approximately_align_input_setting() self.ui.setupUi(self) self.root_path = root_path self.sampID = sampID self.config = None self.__read_config__() self.ui.spinBox_input_orgBcg_freq.valueChanged.connect(self.__update_ui__) self.ui.spinBox_input_Tho_freq.valueChanged.connect(self.__update_ui__) self.ui.spinBox_input_Abd_freq.valueChanged.connect(self.__update_ui__) self.ui.spinBox_bandpassOrder.valueChanged.connect(self.__update_ui__) self.ui.doubleSpinBox_bandpassLow.valueChanged.connect(self.__update_ui__) self.ui.doubleSpinBox_bandpassHigh.valueChanged.connect(self.__update_ui__) self.ui.spinBox_display_freq.valueChanged.connect(self.__update_ui__) self.ui.pushButton_confirm.clicked.connect(self.__write_config__) self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__) self.ui.pushButton_cancel.clicked.connect(self.close) def __read_config__(self): if not Path(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH).exists(): with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "w") as f: dump(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT, f) with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "r") as f: file_config = load(f.read(), Loader=FullLoader) Config.update(file_config) self.config = file_config Config.update({ "Path": { "Input_orgBcg": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME + str(Config["InputConfig"]["orgBcgFreq"]) + ConfigParams.ENDSWITH_TXT))), "Input_Tho": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_THO_FILENAME + str(Config["InputConfig"]["ThoFreq"]) + ConfigParams.ENDSWITH_TXT))), "Input_Abd": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ABD_FILENAME + str(Config["InputConfig"]["AbdFreq"]) + ConfigParams.ENDSWITH_TXT))), "Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_SAVE_FILENAME + ConfigParams.ENDSWITH_CSV))) }, "orgBcgConfig": {}, "PSGConfig": {} }) # 数据回显 self.ui.spinBox_input_orgBcg_freq.setValue(Config["InputConfig"]["orgBcgFreq"]) self.ui.spinBox_input_Tho_freq.setValue(Config["InputConfig"]["ThoFreq"]) self.ui.spinBox_input_Abd_freq.setValue(Config["InputConfig"]["AbdFreq"]) self.ui.spinBox_bandpassOrder.setValue(Config["Filter"]["BandPassOrder"]) self.ui.doubleSpinBox_bandpassLow.setValue(Config["Filter"]["BandPassLow"]) self.ui.doubleSpinBox_bandpassHigh.setValue(Config["Filter"]["BandPassHigh"]) self.ui.spinBox_display_freq.setValue(Config["ApplyFrequency"]) self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText(Config["Path"]["Input_orgBcg"]) self.ui.plainTextEdit_file_path_input_Tho.setPlainText(Config["Path"]["Input_Tho"]) self.ui.plainTextEdit_file_path_input_Abd.setPlainText(Config["Path"]["Input_Abd"]) self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"]) def __write_config__(self): # 从界面写入配置 Config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value() Config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value() Config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value() Config["ApplyFrequency"] = self.ui.spinBox_display_freq.value() Config["Filter"]["BandPassOrder"] = self.ui.spinBox_bandpassOrder.value() Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandpassLow.value() Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandpassHigh.value() Config["Path"]["Input_orgBcg"] = self.ui.plainTextEdit_file_path_input_orgBcg.toPlainText() Config["Path"]["Input_Tho"] = self.ui.plainTextEdit_file_path_input_Tho.toPlainText() Config["Path"]["Input_Abd"] = self.ui.plainTextEdit_file_path_input_Abd.toPlainText() Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText() # 保存配置到文件 self.config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value() self.config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value() self.config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value() self.config["ApplyFrequency"] = self.ui.spinBox_display_freq.value() self.config["Filter"]["BandPassOrder"] = self.ui.spinBox_bandpassOrder.value() self.config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandpassLow.value() self.config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandpassHigh.value() with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "w") as f: dump(self.config, f) self.close() def __rollback_config__(self): self.__read_config__() def __update_ui__(self): self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME + str(self.ui.spinBox_input_orgBcg_freq.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_input_Tho.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_THO_FILENAME + str(self.ui.spinBox_input_Tho_freq.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_input_Abd.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ABD_FILENAME + str(self.ui.spinBox_input_Abd_freq.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_save.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_SAVE_FILENAME + ConfigParams.ENDSWITH_CSV)))) class MainWindow_approximately_align(QMainWindow): def __init__(self): super(MainWindow_approximately_align, self).__init__() self.ui = Ui_MainWindow_approximately_align() self.ui.setupUi(self) self.root_path = None self.sampID = None self.data = None self.setting = None # 初始化进度条 self.progressbar = None PublicFunc.add_progressbar(self) #初始化画框 self.fig = None self.canvas = None self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) @overrides def show(self, root_path, sampID): super().show() self.root_path = root_path self.sampID = sampID self.setting = SettingWindow(root_path, sampID) self.fig = Figure(figsize=(12, 9), dpi=100) self.fig.subplots_adjust(left=0.05, right=0.98, top=0.95, bottom=0.05) self.canvas = FigureCanvas(self.fig) self.figToolbar = NavigationToolbar2QT(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.figToolbar) PublicFunc.__resetAllButton__(self, ButtonState) self.ui.groupBox_align_position.setEnabled(False) self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__) self.ui.pushButton_input_setting.clicked.connect(self.setting.show) self.ui.pushButton_Standardize.clicked.connect(self.__slot_btn_Standardize__) self.ui.pushButton_CutOff.clicked.connect(self.__slot_btn_CutOff__) self.ui.pushButton_GetPos.clicked.connect(self.__slot_btn_GetPosition__) self.ui.pushButton_JUMP.clicked.connect(self.__slot_btn_jump__) self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__) self.ui.pushButton_EM1.clicked.connect(self.__EpochChange__) self.ui.pushButton_EM10.clicked.connect(self.__EpochChange__) self.ui.pushButton_EM100.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP1.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP10.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP100.clicked.connect(self.__EpochChange__) self.ui.radioButton_NTHO.clicked.connect(self.__enableAlign__) self.ui.radioButton_PTHO.clicked.connect(self.__enableAlign__) self.ui.radioButton_PABD.clicked.connect(self.__enableAlign__) self.ui.radioButton_NABD.clicked.connect(self.__enableAlign__) self.ui.radioButton_custom.clicked.connect(self.__enableAlign__) @overrides def closeEvent(self, event): reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN)) QApplication.processEvents() # 释放资源 del self.data self.fig.clf() plt.close(self.fig) self.deleteLater() collect() self.canvas = None event.accept() else: event.ignore() def __reset__(self): ButtonState["Current"].update(ButtonState["Default"].copy()) ButtonState["Current"]["pushButton_Standardize"] = True self.fig.clf() self.fig.canvas.draw() self.ui.spinBox_PSGPreA.setValue(0) self.ui.spinBox_PSGPreCut.setValue(0) self.ui.spinBox_PSGPostCut.setValue(0) self.ui.spinBox_orgBcgPreA.setValue(0) self.ui.spinBox_orgBcgPreCut.setValue(0) self.ui.spinBox_orgBcgPostCut.setValue(0) self.ui.radioButton_NABD.setChecked(False) self.ui.radioButton_NTHO.setChecked(False) self.ui.radioButton_PABD.setChecked(False) self.ui.radioButton_PTHO.setChecked(False) self.ui.radioButton_custom.setChecked(False) self.ui.spinBox_SelectEpoch.setValue(0) self.ui.spinBox_custom.setValue(0) self.ui.spinBox_SelectEpoch.setToolTip("") self.ui.radioButton_PABD.setText("备选3") self.ui.radioButton_PTHO.setText("备选1") self.ui.radioButton_NABD.setText("备选4") self.ui.radioButton_NTHO.setText("备选2") self.ui.groupBox_align_position.setEnabled(False) self.ui.spinBox_SelectEpoch.setMinimum(0) def __plot__(self, *args, **kwargs): sender = self.sender() self.fig.clf() result = None try: if sender == self.ui.pushButton_Standardize: result = self.DrawPicRawOverview() elif sender == self.ui.pushButton_CutOff: result = self.DrawPicOverviewWithCutOff() elif sender == self.ui.pushButton_GetPos: result = self.DrawPicCorrelate(*args, **kwargs) elif sender == self.ui.radioButton_NTHO: result = self.DrawPicTryAlign() elif sender == self.ui.radioButton_NABD: result = self.DrawPicTryAlign() elif sender == self.ui.radioButton_PTHO: result = self.DrawPicTryAlign() elif sender == self.ui.radioButton_PABD: result = self.DrawPicTryAlign() elif sender == self.ui.radioButton_custom: result = self.DrawPicTryAlign() elif sender == self.ui.pushButton_JUMP: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EM1: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EM10: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EM100: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EP1: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EP10: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EP100: result = self.DrawPicByEpoch(*args, **kwargs) except Exception: return Result().failure(info=Constants.DRAWING_FAILURE) if result is None: return Result().failure(info=Constants.DRAWING_FAILURE) else: return result def __slot_btn_input__(self): PublicFunc.__disableAllButton__(self, ButtonState) self.data = Data() # 导入数据 PublicFunc.progressbar_update(self, 1, 3, Constants.INPUTTING_DATA, 0) result = self.data.open_file() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) orgBcg_minutes = round(self.data.raw_orgBcg.shape[0] / Config["InputConfig"]["orgBcgFreq"] / 60) PSG_minutes = round(self.data.raw_Tho.shape[0] / Config["InputConfig"]["ThoFreq"] / 60) Config.update({ "orgBcg_minutes": orgBcg_minutes, "PSG_minutes": PSG_minutes }) self.ui.label_orgBcg_length.setText(str(orgBcg_minutes)) self.ui.label_PSG_length.setText(str(PSG_minutes)) self.__reset__() PublicFunc.finish_operation(self, ButtonState) def __slot_btn_save__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 保存对齐信息 PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0) epoch = self.ui.spinBox_SelectEpoch.value() result = self.data.save(epoch) if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_Standardize__(self): PublicFunc.__disableAllButton__(self, ButtonState) Config["RawSignal"] = self.ui.checkBox_RawSignal.isChecked() Config["orgBcgConfig"].update({ "orgBcgDelBase": self.ui.checkBox_orgBcgDelBase.isChecked(), "orgBcgZScore": self.ui.checkBox_orgBcgZScore.isChecked() }) Config["PSGConfig"].update({ "PSGDelBase": self.ui.checkBox_PSGDelBase.isChecked(), "PSGZScore": self.ui.checkBox_PSGZScore.isChecked() }) if Config["RawSignal"]: # 仅重采样 PublicFunc.progressbar_update(self, 1, 1, Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLING, 0) result = self.data.Standardize_0() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) else: # 呼吸提取 PublicFunc.progressbar_update(self, 1, 5, Constants.APPROXIMATELY_RESP_GETTING, 0) result = self.data.Standardize_1() if not result.status: PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_INFO) # 预重采样 PublicFunc.progressbar_update(self, 2, 5, Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLING, 20) result = self.data.Standardize_2() if not result.status: PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_INFO) # 去基线 PublicFunc.progressbar_update(self, 3, 5, Constants.APPROXIMATELY_DELETING_BASE, 40) result = self.data.Standardize_3() if not result.status: PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_INFO) # 标准化 PublicFunc.progressbar_update(self, 4, 5, Constants.APPROXIMATELY_STANDARDIZING, 60) result = self.data.Standardize_4() if not result.status: PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_INFO) # 重采样 PublicFunc.progressbar_update(self, 5, 5, Constants.APPROXIMATELY_ALIGN_RESAMPLING, 80) result = self.data.Standardize_5() if not result.status: PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_INFO) self.__plot__() ButtonState["Current"]["pushButton_CutOff"] = True PublicFunc.finish_operation(self, ButtonState) def __slot_btn_CutOff__(self): PublicFunc.__disableAllButton__(self, ButtonState) Config["orgBcgConfig"].update({"PreA": self.ui.spinBox_orgBcgPreA.value(), "PreCut": self.ui.spinBox_orgBcgPreCut.value(), "PostCut": self.ui.spinBox_orgBcgPostCut.value()}) Config["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(), "PreCut": self.ui.spinBox_PSGPreCut.value(), "PostCut": self.ui.spinBox_PSGPostCut.value()}) PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"]["pushButton_GetPos"] = True PublicFunc.finish_operation(self, ButtonState) def __slot_btn_GetPosition__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 计算互相关1/2 PublicFunc.progressbar_update(self, 1, 4, Constants.APPROXIMATELY_CORRELATION_CALCULATING1, 0) result1 = self.data.calculate_correlation1() if not result1.status: PublicFunc.text_output(self.ui, "(1/4)" + result1.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result1.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/4)" + result1.info, Constants.TIPS_TYPE_INFO) # 计算互相关2/2 PublicFunc.progressbar_update(self, 2, 4, Constants.APPROXIMATELY_CORRELATION_CALCULATING2, 25) result2 = self.data.calculate_correlation2() if not result2.status: PublicFunc.text_output(self.ui, "(2/4)" + result2.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result2.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/4)" + result2.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 3, 4, Constants.DRAWING_DATA, 50) result = self.__plot__(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"], result2.data["abd_relate2"]) if not result.status: PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_INFO) # 计算最大值位置 PublicFunc.progressbar_update(self, 4, 4, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 90) result = self.data.calculate_maxvalue_pos(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"], result2.data["abd_relate2"]) if not result.status: PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_INFO) self.ui.radioButton_PABD.setText(str(result.data["abd_max"] + result.data["bias"])) self.ui.radioButton_PTHO.setText(str(result.data["tho_max"] + result.data["bias"])) self.ui.radioButton_NABD.setText(str(result.data["abd_max2"] + result.data["bias"])) self.ui.radioButton_NTHO.setText(str(result.data["tho_max2"] + result.data["bias"])) self.ui.groupBox_align_position.setEnabled(True) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_jump__(self): PublicFunc.__disableAllButton__(self, ButtonState) epoch = self.ui.spinBox_SelectEpoch.value() # 绘图 PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0) result = self.__plot__(epoch=epoch) if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def __EpochChange__(self): # 获取当前值 value = self.ui.spinBox_SelectEpoch.value() # 判断按键 if self.sender() == self.ui.pushButton_EM1: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 1) elif self.sender() == self.ui.pushButton_EM10: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 10) elif self.sender() == self.ui.pushButton_EM100: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 100) elif self.sender() == self.ui.pushButton_EP1: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 1) elif self.sender() == self.ui.pushButton_EP10: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 10) elif self.sender() == self.ui.pushButton_EP100: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 100) else: return self.ui.spinBox_SelectEpoch.setValue(epoch) QApplication.processEvents() self.__slot_btn_jump__() def __enableAlign__(self): PublicFunc.__disableAllButton__(self, ButtonState) if self.ui.radioButton_NTHO.isChecked(): relate = int(self.ui.radioButton_NTHO.text()) reverse = -1 elif self.ui.radioButton_PTHO.isChecked(): relate = int(self.ui.radioButton_PTHO.text()) reverse = 1 elif self.ui.radioButton_PABD.isChecked(): relate = int(self.ui.radioButton_PABD.text()) reverse = 1 elif self.ui.radioButton_NABD.isChecked(): relate = int(self.ui.radioButton_NABD.text()) reverse = -1 elif self.ui.radioButton_custom.isChecked(): relate = int(self.ui.spinBox_custom.value()) reverse = 1 else: return # 最大相关系数值相对于PSG的位置 Config.update({"pos": relate}) # 设置epoch上下限 Config.update({"orgBcg_reverse": reverse}) PublicFunc.text_output(self.ui, "相对位置:{}".format(Config["pos"]), Constants.TIPS_TYPE_INFO) # 获取epoch PublicFunc.progressbar_update(self, 1, 2, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 0) result3 = self.data.get_epoch() if not result3.status: PublicFunc.text_output(self.ui, "(1/2)" + result3.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result3.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result3.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 30) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) self.ui.spinBox_SelectEpoch.setMinimum(result3.data["epoch_min"]) self.ui.spinBox_SelectEpoch.setMaximum(result3.data["epoch_max"]) self.ui.spinBox_SelectEpoch.setValue(result3.data["epoch_min"]) self.ui.spinBox_SelectEpoch.setToolTip("最小值:{}\n最大值:{}".format(result3.data["epoch_min"], result3.data["epoch_max"])) self.ui.spinBox_SelectEpoch.setEnabled(True) ButtonState["Current"]["pushButton_JUMP"] = True ButtonState["Current"]["pushButton_EM1"] = True ButtonState["Current"]["pushButton_EM10"] = True ButtonState["Current"]["pushButton_EM100"] = True ButtonState["Current"]["pushButton_EP1"] = True ButtonState["Current"]["pushButton_EP10"] = True ButtonState["Current"]["pushButton_EP100"] = True ButtonState["Current"]["pushButton_save"] = True PublicFunc.finish_operation(self, ButtonState) def DrawPicRawOverview(self): try: max_x = max(self.data.processed_Tho.shape[0], self.data.processed_Abd.shape[0], self.data.processed_orgBcg.shape[0]) ax1 = self.fig.add_subplot(311) ax1.plot(self.data.processed_Tho, color='blue') ax1.set_xlim(0, max_x) ax1.set_title("THO") ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1) ax2.plot(self.data.processed_orgBcg, color='blue') ax2.set_xlim(0, max_x) ax2.set_title("orgBcg") ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1) ax3.plot(self.data.processed_Abd, color='blue') ax3.set_xlim(0, max_x) ax3.set_title("ABD") self.fig.canvas.draw() except Exception: return Result().failure(info=Constants.DRAWING_FAILURE) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawPicOverviewWithCutOff(self): try: max_x = max(self.data.processed_Tho.shape[0] + Config["PSGConfig"]["PreA"], self.data.processed_orgBcg.shape[0] + Config["orgBcgConfig"]["PreA"]) min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"], 0) ax1 = self.fig.add_subplot(311) ax1.plot( linspace(Config["PSGConfig"]["PreA"], len(self.data.processed_Tho) + Config["PSGConfig"]["PreA"], len(self.data.processed_Tho)), self.data.processed_Tho, color='blue') # 绘制x = PreCut的线 和 x = PostCut的虚线 ax1.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax1.axvline( x=len(self.data.processed_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax1.set_xlim(min_x, max_x) ax1.set_title("THO") ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1) ax2.plot( linspace(Config["orgBcgConfig"]["PreA"], len(self.data.processed_orgBcg) + Config["orgBcgConfig"]["PreA"], len(self.data.processed_orgBcg)), self.data.processed_orgBcg, color='blue') ax2.axvline(x=Config["orgBcgConfig"]["PreCut"] + Config["orgBcgConfig"]["PreA"], color='red', linestyle='--') ax2.axvline(x=len(self.data.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"] + Config["orgBcgConfig"]["PreA"], color='red', linestyle='--') ax2.set_xlim(min_x, max_x) ax2.set_title("orgBcg") ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1) ax3.plot( linspace(Config["PSGConfig"]["PreA"], len(self.data.processed_Abd) + Config["PSGConfig"]["PreA"], len(self.data.processed_Abd)), self.data.processed_Abd, color='blue') ax3.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax3.axvline( x=len(self.data.processed_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax3.set_xlim(min_x, max_x) ax3.set_title("ABD") self.fig.canvas.draw() except Exception: return Result().failure(info=Constants.DRAWING_FAILURE) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawPicCorrelate(self, tho_pxx, tho_nxx, abd_pxx, abd_nxx): try: ax1 = self.fig.add_subplot(221) ax1.plot(tho_pxx, color='blue') ax1.set_title("The Correlation of THO and orgBcg") ax2 = self.fig.add_subplot(222, sharex=ax1, sharey=ax1) ax2.plot(tho_nxx, color='blue') ax2.set_title("The Correlation of THO and Reverse orgBcg") ax3 = self.fig.add_subplot(223, sharex=ax1, sharey=ax1) ax3.plot(abd_pxx, color='blue') ax3.set_title("The Correlation of ABD and orgBcg") ax4 = self.fig.add_subplot(224, sharex=ax1, sharey=ax1) ax4.plot(abd_nxx, color='blue') ax4.set_title("The Correlation of ABD and Reverse orgBcg") self.fig.canvas.draw() except Exception: return Result().failure(info=Constants.DRAWING_FAILURE) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawPicTryAlign(self): try: max_x = max(self.data.processed_Tho.shape[0], self.data.processed_orgBcg.shape[0] + Config["pos"]) min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"] + Config["pos"], 0) ax1 = self.fig.add_subplot(311) ax1.plot( linspace(0, len(self.data.processed_Tho), len(self.data.processed_Tho)), self.data.processed_Tho, color='blue') # 绘制x = PreCut的线 和 x = PostCut的虚线 ax1.set_xlim(min_x, max_x) ax1.set_title("THO") ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1) ax2.plot(linspace(Config["pos"], len(self.data.processed_orgBcg) + Config["pos"], len(self.data.processed_orgBcg)), self.data.processed_orgBcg, color='blue') ax2.set_xlim(min_x, max_x) ax2.set_title("orgBcg") ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1) ax3.plot( linspace(0, len(self.data.processed_Abd), len(self.data.processed_Abd)), self.data.processed_Abd, color='blue') ax3.set_xlim(min_x, max_x) ax3.set_title("ABD") self.fig.canvas.draw() except Exception: return Result().failure(info=Constants.DRAWING_FAILURE) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawPicByEpoch(self, epoch): try: PSG_SP = epoch * 30 * Config["ApplyFrequency"] PSG_EP = (epoch + 6) * 30 * Config["ApplyFrequency"] orgBcg_SP = PSG_SP - Config["pos"] orgBcg_EP = PSG_EP - Config["pos"] tho_seg = self.data.processed_Tho[PSG_SP:PSG_EP] orgBcg_seg = self.data.processed_orgBcg[orgBcg_SP:orgBcg_EP] * Config["orgBcg_reverse"] abd_seg = self.data.processed_Abd[PSG_SP:PSG_EP] # 根据PSG来和绘制 ax1 = self.fig.add_subplot(321) ax1.plot(linspace(PSG_SP, PSG_EP, len(tho_seg)), tho_seg) tho_peaks, _ = find_peaks(tho_seg, prominence=0, distance=3 * Config["ApplyFrequency"]) ax1.plot(linspace(PSG_SP, PSG_EP, len(tho_seg))[tho_peaks], tho_seg[tho_peaks], "x") ax3 = self.fig.add_subplot(323) ax3.plot(linspace(orgBcg_SP, orgBcg_EP, len(orgBcg_seg)), orgBcg_seg) orgBcg_peaks, _ = find_peaks(orgBcg_seg, prominence=0, distance=3 * Config["ApplyFrequency"]) ax3.plot(linspace(orgBcg_SP, orgBcg_EP, len(orgBcg_seg))[orgBcg_peaks], orgBcg_seg[orgBcg_peaks], "x") ax2 = self.fig.add_subplot(325) ax2.plot(linspace(PSG_SP, PSG_EP, len(abd_seg)), abd_seg) abd_peaks, _ = find_peaks(abd_seg, prominence=0, distance=3 * Config["ApplyFrequency"]) ax2.plot(linspace(PSG_SP, PSG_EP, len(abd_seg))[abd_peaks], abd_seg[abd_peaks], "x") # 绘制间期 ax4 = self.fig.add_subplot(322) ax4.plot(linspace(PSG_SP, PSG_EP, len(diff(tho_peaks).repeat(Config["ApplyFrequency"]))), diff(tho_peaks).repeat(Config["ApplyFrequency"]), alpha=0.5, label="tho") ax4.plot(linspace(PSG_SP, PSG_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))), diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp") ax4.set_title("tho_interval") ax4.legend() ax4.set_ylim((10, 50)) ax5 = self.fig.add_subplot(324) ax5.plot(linspace(orgBcg_SP, orgBcg_EP, len(diff(tho_peaks).repeat(Config["ApplyFrequency"]))), diff(tho_peaks).repeat(Config["ApplyFrequency"])) ax5.plot(linspace(orgBcg_SP, orgBcg_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))), diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp") ax5.set_title("resp_interval") ax5.set_ylim((10, 50)) ax6 = self.fig.add_subplot(326) ax6.plot(linspace(PSG_SP, PSG_EP, len(diff(abd_peaks).repeat(Config["ApplyFrequency"]))), diff(abd_peaks).repeat(Config["ApplyFrequency"])) ax6.plot(linspace(PSG_SP, PSG_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))), diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp") ax6.set_title("abd_interval") ax6.set_ylim((10, 50)) self.fig.canvas.draw() except Exception: return Result().failure(info=Constants.DRAWING_FAILURE) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) class Data: def __init__(self): self.raw_orgBcg = None self.raw_Tho = None self.raw_Abd = None self.processed_orgBcg = None self.processed_Tho = None self.processed_Abd = None def open_file(self): if ((not Path(Config["Path"]["Input_orgBcg"]).exists()) or (not Path(Config["Path"]["Input_Tho"]).exists()) or (not Path(Config["Path"]["Input_Abd"]).exists())): return Result().failure(info=Constants.INPUT_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Data_Path_Not_Exist"]) try: self.raw_orgBcg = read_csv(Config["Path"]["Input_orgBcg"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.raw_Tho = read_csv(Config["Path"]["Input_Tho"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.raw_Abd = read_csv(Config["Path"]["Input_Abd"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) except Exception: return Result().failure(info=Constants.INPUT_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Read_Data_Exception"]) return Result().success(info=Constants.INPUT_FINISHED) def save(self, epoch): try: pos = Config["pos"] ApplyFrequency = Config["ApplyFrequency"] # 保存到csv中 df = DataFrame({"pos": [pos], "epoch": [epoch], "ApplyFrequency": [ApplyFrequency]}) df.to_csv(Path(Config["Path"]["Save"]), mode="w", header=True, index=False) except Exception: return Result().failure(info=Constants.SAVING_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Save_Exception"]) return Result().success(info=Constants.SAVING_FINISHED) def Standardize_0(self): # 仅重采样 if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None: return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Raw_Data_Not_Exist"]) try: self.processed_orgBcg = resample(self.raw_orgBcg, int(Config["orgBcg_minutes"] * 60 * Config["ApplyFrequency"])) self.processed_Tho = resample(self.raw_Tho, int(Config["PSG_minutes"] * 60 * Config["ApplyFrequency"])) self.processed_Abd = resample(self.raw_Abd, int(Config["PSG_minutes"] * 60 * Config["ApplyFrequency"])) except Exception: return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Only_Resample_Exception"]) return Result().success(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FINISHED) def Standardize_1(self): # 呼吸提取 def butter_bandpass_filter(data, lowCut, highCut, fs, order): low = lowCut / (fs * 0.5) high = highCut / (fs * 0.5) sos = butter(order, [low, high], btype="bandpass", output='sos') return sosfiltfilt(sos, data) if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None: return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Raw_Data_Not_Exist"]) try: # 滤波 self.processed_orgBcg = butter_bandpass_filter(self.raw_orgBcg, Config["Filter"]["BandPassLow"], Config["Filter"]["BandPassHigh"], Config["InputConfig"]["orgBcgFreq"], Config["Filter"]["BandPassOrder"]) self.processed_Tho = butter_bandpass_filter(self.raw_Tho, Config["Filter"]["BandPassLow"], Config["Filter"]["BandPassHigh"], Config["InputConfig"]["ThoFreq"], Config["Filter"]["BandPassOrder"]) self.processed_Abd = butter_bandpass_filter(self.raw_Abd, Config["Filter"]["BandPassLow"], Config["Filter"]["BandPassHigh"], Config["InputConfig"]["AbdFreq"], Config["Filter"]["BandPassOrder"]) except Exception: return Result().failure(info=Constants.APPROXIMATELY_RESP_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Resp_Get_Exception"]) return Result().success(info=Constants.APPROXIMATELY_RESP_GET_FINISHED) def Standardize_2(self): # 预重采样 try: # 如果orgBcg采样率大于PSG采样率,那么orgBcg重采样到PSG采样率 if Config["InputConfig"]["orgBcgFreq"] > Config["InputConfig"]["ThoFreq"]: # 用[::]完成 self.processed_orgBcg = self.processed_orgBcg[::int(Config["InputConfig"]["orgBcgFreq"] / Config["InputConfig"]["ThoFreq"])] # 如果orgBcg采样率小于PSG采样率,那么orgBcg重采样到PSG采样率 elif Config["InputConfig"]["orgBcgFreq"] < Config["InputConfig"]["ThoFreq"] < 100: # 用repeat完成 self.processed_orgBcg = repeat(self.processed_orgBcg, int(Config["InputConfig"]["ThoFreq"] / Config["InputConfig"]["orgBcgFreq"]), axis=0) # 修改Config Config.update({"TempFrequency": Config["InputConfig"]["ThoFreq"]}) except Exception: return Result().failure(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Pre_Resample_Exception"]) return Result().success(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FINISHED) def Standardize_3(self): # 去基线 try: temp_frequency = Config["TempFrequency"] if Config["PSGConfig"]["PSGDelBase"]: # 减去四秒钟平均滤波 self.processed_Tho = self.processed_Tho - convolve(self.processed_Tho, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same') self.processed_Abd = self.processed_Abd - convolve(self.processed_Abd, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same') if Config["orgBcgConfig"]["orgBcgDelBase"]: self.processed_orgBcg = self.processed_orgBcg - convolve(self.processed_orgBcg, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same') except Exception: return Result().failure(info=Constants.APPROXIMATELY_DELETE_BASE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Delete_Base_Exception"]) return Result().success(info=Constants.APPROXIMATELY_DELETE_BASE_FINISHED) def Standardize_4(self): # 标准化 try: # 判断是否标准化 if Config["PSGConfig"]["PSGZScore"]: self.processed_Tho = (self.processed_Tho - mean(self.processed_Tho)) / std(self.processed_Tho) self.processed_Abd = (self.processed_Abd - mean(self.processed_Abd)) / std(self.processed_Abd) if Config["orgBcgConfig"]["orgBcgZScore"]: self.processed_orgBcg = (self.processed_orgBcg - mean(self.processed_orgBcg)) / std(self.processed_orgBcg) except Exception: return Result().failure(info=Constants.APPROXIMATELY_STANDARDIZE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Standardize_Exception"]) return Result().success(info=Constants.APPROXIMATELY_STANDARDIZE_FINISHED) def Standardize_5(self): # 重采样 try: # 用[::]完成 temp_frequency = Config["TempFrequency"] self.processed_Tho = self.processed_Tho[::int(temp_frequency / Config["ApplyFrequency"])] self.processed_Abd = self.processed_Abd[::int(temp_frequency / Config["ApplyFrequency"])] self.processed_orgBcg = self.processed_orgBcg[::int(temp_frequency / Config["ApplyFrequency"])] except Exception: return Result().failure(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Resample_Exception"]) return Result().success(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FINISHED) def calculate_correlation1(self): # 计算互相关1/2 try: a = self.processed_Tho[Config["PSGConfig"]["PreCut"]:len(self.processed_Tho) - Config["PSGConfig"]["PostCut"]].copy() v = self.processed_orgBcg[Config["orgBcgConfig"]["PreCut"]:len(self.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"]].copy() a *= 100 v *= 100 a = a.astype(int64) v = v.astype(int64) a = pad(a, (len(v) - 1, len(v) - 1), mode='constant') tho_relate = zeros(len(a) - len(v) - 1, dtype=int64) len_calc = len(a) - len(v) - 1 # 将序列分成一百份来计算 for i in range(100): tho_relate[i * len_calc // 100:(i + 1) * len_calc // 100] = Data.get_Correlate(a, v, array( [i * len_calc // 100, (i + 1) * len_calc // 100])) tho_relate = tho_relate / 10000 tho_relate2 = - tho_relate result = {"tho_relate": tho_relate, "tho_relate2": tho_relate2} except Exception: return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Correlation1_Exception"]) return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FINISHED, data=result) def calculate_correlation2(self): # 计算互相关2/2 try: a = self.processed_Abd[Config["PSGConfig"]["PreCut"]:len(self.processed_Abd) - Config["PSGConfig"]["PostCut"]].copy() v = self.processed_orgBcg[Config["orgBcgConfig"]["PreCut"]:len(self.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"]].copy() a *= 100 v *= 100 a = a.astype(int64) v = v.astype(int64) a = pad(a, (len(v) - 1, len(v) - 1), mode='constant') abd_relate = zeros(len(a) - len(v) - 1, dtype=int64) len_calc = len(a) - len(v) - 1 # 将序列分成一百份来计算 for i in range(100): abd_relate[i * len_calc // 100:(i + 1) * len_calc // 100] = Data.get_Correlate(a, v, array( [i * len_calc // 100, (i + 1) * len_calc // 100])) abd_relate = abd_relate / 10000 abd_relate2 = - abd_relate result = {"abd_relate": abd_relate, "abd_relate2": abd_relate2} except Exception: return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Correlation2_Exception"]) return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FINISHED, data=result) def calculate_maxvalue_pos(self, tho_relate, tho_relate2, abd_relate, abd_relate2): # 计算最大值位置 try: tho_max = argmax(tho_relate) tho_max2 = argmax(tho_relate2) abd_max = argmax(abd_relate) abd_max2 = argmax(abd_relate2) pre = Config["PSGConfig"]["PreCut"] + Config["orgBcgConfig"]["PostCut"] bias = pre - len(self.processed_orgBcg) + 1 result = {"tho_max": tho_max, "tho_max2": tho_max2, "abd_max": abd_max, "abd_max2": abd_max2, "bias": bias} except Exception: return Result().failure(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Maxvalue_Pos_Exception"]) return Result().success(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FINISHED, data=result) def get_epoch(self): # 获取epoch try: epoch_min = max(0, Config["pos"] // 30 // Config["ApplyFrequency"] + 1) epoch_max = min(len(self.processed_Tho) // 30 // Config["ApplyFrequency"] - 1, (len(self.processed_orgBcg) + Config["pos"]) // 30 // Config["ApplyFrequency"] - 1) result = {"epoch_min": epoch_min, "epoch_max": epoch_max} except Exception: return Result().failure(info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Get_Epoch_Exception"]) return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result) @staticmethod @njit("int64[:](int64[:],int64[:], int32[:])", nogil=True, parallel=True) def get_Correlate(a, v, between): begin, end = between if end == 0: end = len(a) - len(v) - 1 result = empty(end - begin, dtype=int64) for i in prange(end - begin): result[i] = np_sum(a[begin + i: begin + i + len(v)] * v) return result