from gc import collect from pathlib import Path from traceback import format_exc import matplotlib.pyplot as plt from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication from matplotlib.backends.backend_qt import NavigationToolbar2QT from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure from numpy import convolve, ones, mean, std, int64, argmax, linspace, diff from overrides import overrides from pandas import read_csv, DataFrame from scipy.signal import find_peaks, resample, butter, sosfiltfilt, correlate from yaml import dump, load, FullLoader from func.utils.PublicFunc import PublicFunc from func.utils.Constants import Constants, ConfigParams from func.utils.Result import Result from ui.MainWindow.MainWindow_approximately_align import Ui_MainWindow_approximately_align from ui.setting.approximately_align_input_setting import Ui_MainWindow_approximately_align_input_setting Config = { } ButtonState = { "Default": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_Standardize": False, "pushButton_CutOff": False, "pushButton_GetPos": False, "pushButton_ChangeView": False, "pushButton_JUMP": False, "pushButton_EM1": False, "pushButton_EM10": False, "pushButton_EM100": False, "pushButton_EP1": False, "pushButton_EP10": False, "pushButton_EP100": False, "pushButton_save": False, "radioButton_PTHO": False, "radioButton_PABD": False, "radioButton_NTHO": False, "radioButton_NABD": False, "radioButton_custom": False }, "Current": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_Standardize": False, "pushButton_CutOff": False, "pushButton_GetPos": False, "pushButton_ChangeView": False, "pushButton_JUMP": False, "pushButton_EM1": False, "pushButton_EM10": False, "pushButton_EM100": False, "pushButton_EP1": False, "pushButton_EP10": False, "pushButton_EP100": False, "pushButton_save": False, "radioButton_PTHO": False, "radioButton_PABD": False, "radioButton_NTHO": False, "radioButton_NABD": False, "radioButton_custom": False } } class SettingWindow(QMainWindow): def __init__(self, root_path, sampID): super(SettingWindow, self).__init__() self.ui = Ui_MainWindow_approximately_align_input_setting() self.ui.setupUi(self) self.root_path = root_path self.sampID = sampID self.config = None self.__read_config__() self.ui.spinBox_input_orgBcg_freq.valueChanged.connect(self.__update_ui__) self.ui.spinBox_input_Tho_freq.valueChanged.connect(self.__update_ui__) self.ui.spinBox_input_Abd_freq.valueChanged.connect(self.__update_ui__) self.ui.spinBox_bandpassOrder.valueChanged.connect(self.__update_ui__) self.ui.doubleSpinBox_bandpassLow.valueChanged.connect(self.__update_ui__) self.ui.doubleSpinBox_bandpassHigh.valueChanged.connect(self.__update_ui__) self.ui.spinBox_display_freq.valueChanged.connect(self.__update_ui__) self.ui.pushButton_confirm.clicked.connect(self.__write_config__) self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__) self.ui.pushButton_cancel.clicked.connect(self.close) def __read_config__(self): if not Path(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH).exists(): with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "w") as f: dump(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT, f) with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "r") as f: file_config = load(f.read(), Loader=FullLoader) Config.update(file_config) self.config = file_config # 更新配置 Config.update({ "Path": { "Input_orgBcg": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Input_Tho": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)))), "Input_Abd": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)))), "Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INFO + ConfigParams.ENDSWITH_CSV))) }, "orgBcgConfig": {}, "PSGConfig": {} }) # 数据回显 self.ui.spinBox_input_orgBcg_freq.setValue(Config["InputConfig"]["orgBcgFreq"]) self.ui.spinBox_input_Tho_freq.setValue(Config["InputConfig"]["ThoFreq"]) self.ui.spinBox_input_Abd_freq.setValue(Config["InputConfig"]["AbdFreq"]) self.ui.spinBox_bandpassOrder.setValue(Config["Filter"]["BandPassOrder"]) self.ui.doubleSpinBox_bandpassLow.setValue(Config["Filter"]["BandPassLow"]) self.ui.doubleSpinBox_bandpassHigh.setValue(Config["Filter"]["BandPassHigh"]) self.ui.spinBox_display_freq.setValue(Config["ApplyFrequency"]) self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText(Config["Path"]["Input_orgBcg"]) self.ui.plainTextEdit_file_path_input_Tho.setPlainText(Config["Path"]["Input_Tho"]) self.ui.plainTextEdit_file_path_input_Abd.setPlainText(Config["Path"]["Input_Abd"]) self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"]) def __write_config__(self): # 从界面写入配置 Config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value() Config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value() Config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value() Config["ApplyFrequency"] = self.ui.spinBox_display_freq.value() Config["Filter"]["BandPassOrder"] = self.ui.spinBox_bandpassOrder.value() Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandpassLow.value() Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandpassHigh.value() Config["Path"]["Input_orgBcg"] = self.ui.plainTextEdit_file_path_input_orgBcg.toPlainText() Config["Path"]["Input_Tho"] = self.ui.plainTextEdit_file_path_input_Tho.toPlainText() Config["Path"]["Input_Abd"] = self.ui.plainTextEdit_file_path_input_Abd.toPlainText() Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText() # 保存配置到文件 self.config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value() self.config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value() self.config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value() self.config["ApplyFrequency"] = self.ui.spinBox_display_freq.value() self.config["Filter"]["BandPassOrder"] = self.ui.spinBox_bandpassOrder.value() self.config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandpassLow.value() self.config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandpassHigh.value() with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "w") as f: dump(self.config, f) self.close() def __rollback_config__(self): self.__read_config__() def __update_ui__(self): self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.ORGBCG_RAW + str(self.ui.spinBox_input_orgBcg_freq.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_input_Tho.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.THO_RAW + str(self.ui.spinBox_input_Tho_freq.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_input_Abd.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.ABD_RAW + str(self.ui.spinBox_input_Abd_freq.value()) + ConfigParams.ENDSWITH_TXT)))) self.ui.plainTextEdit_file_path_save.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INFO + ConfigParams.ENDSWITH_CSV)))) class MainWindow_approximately_align(QMainWindow): def __init__(self): super(MainWindow_approximately_align, self).__init__() self.ui = Ui_MainWindow_approximately_align() self.ui.setupUi(self) self.root_path = None self.sampID = None self.data = None self.setting = None # 初始化进度条 self.progressbar = None PublicFunc.add_progressbar(self) # 初始化画框 self.fig = None self.canvas = None self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) @overrides def show(self, root_path, sampID): super().show() self.root_path = root_path self.sampID = sampID self.setting = SettingWindow(root_path, sampID) self.fig = Figure(figsize=(12, 9), dpi=100) self.fig.subplots_adjust(left=0.05, right=0.98, top=0.95, bottom=0.05) self.canvas = FigureCanvas(self.fig) self.figToolbar = NavigationToolbar2QT(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.figToolbar) PublicFunc.__resetAllButton__(self, ButtonState) # self.ui.groupBox_align_position.setEnabled(False) self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__) self.ui.pushButton_input_setting.clicked.connect(self.setting.show) self.ui.pushButton_Standardize.clicked.connect(self.__slot_btn_Standardize__) self.ui.pushButton_CutOff.clicked.connect(self.__slot_btn_CutOff__) self.ui.pushButton_GetPos.clicked.connect(self.__slot_btn_GetPosition__) self.ui.pushButton_ChangeView.clicked.connect(self.__slot_btn_changeview__) self.ui.pushButton_JUMP.clicked.connect(self.__slot_btn_jump__) self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__) self.ui.pushButton_EM1.clicked.connect(self.__EpochChange__) self.ui.pushButton_EM10.clicked.connect(self.__EpochChange__) self.ui.pushButton_EM100.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP1.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP10.clicked.connect(self.__EpochChange__) self.ui.pushButton_EP100.clicked.connect(self.__EpochChange__) self.ui.radioButton_NTHO.clicked.connect(self.__enableAlign__) self.ui.radioButton_PTHO.clicked.connect(self.__enableAlign__) self.ui.radioButton_PABD.clicked.connect(self.__enableAlign__) self.ui.radioButton_NABD.clicked.connect(self.__enableAlign__) self.ui.radioButton_custom.clicked.connect(self.__enableAlign__) @overrides def closeEvent(self, event): reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN)) QApplication.processEvents() # 释放资源 del self.data self.fig.clf() plt.close(self.fig) self.deleteLater() collect() self.canvas = None event.accept() else: event.ignore() def __reset__(self): ButtonState["Current"].update(ButtonState["Default"].copy()) ButtonState["Current"]["pushButton_Standardize"] = True self.fig.clf() self.fig.canvas.draw() self.ui.spinBox_PSGPreA.setValue(0) self.ui.spinBox_PSGPreCut.setValue(0) self.ui.spinBox_PSGPostCut.setValue(0) self.ui.spinBox_orgBcgPreA.setValue(0) self.ui.spinBox_orgBcgPreCut.setValue(0) self.ui.spinBox_orgBcgPostCut.setValue(0) self.ui.radioButton_NABD.setChecked(False) self.ui.radioButton_NTHO.setChecked(False) self.ui.radioButton_PABD.setChecked(False) self.ui.radioButton_PTHO.setChecked(False) self.ui.radioButton_custom.setChecked(False) self.ui.spinBox_SelectEpoch.setValue(0) self.ui.spinBox_custom.setValue(0) self.ui.spinBox_SelectEpoch.setToolTip("") self.ui.radioButton_PABD.setText("备选3") self.ui.radioButton_PTHO.setText("备选1") self.ui.radioButton_NABD.setText("备选4") self.ui.radioButton_NTHO.setText("备选2") self.ui.spinBox_SelectEpoch.setMinimum(0) def __plot__(self, *args, **kwargs): sender = self.sender() self.fig.clf() result = None try: if sender == self.ui.pushButton_Standardize: result = self.DrawPicRawOverview() elif sender == self.ui.pushButton_CutOff: result = self.DrawPicOverviewWithCutOff() elif sender == self.ui.pushButton_GetPos: result = self.DrawPicCorrelate(*args, **kwargs) elif sender == self.ui.radioButton_NTHO: result = self.DrawPicTryAlign() elif sender == self.ui.radioButton_NABD: result = self.DrawPicTryAlign() elif sender == self.ui.radioButton_PTHO: result = self.DrawPicTryAlign() elif sender == self.ui.radioButton_PABD: result = self.DrawPicTryAlign() elif sender == self.ui.radioButton_custom: result = self.DrawPicTryAlign() elif sender == self.ui.pushButton_ChangeView: result = self.DrawAlignScatter() elif sender == self.ui.pushButton_JUMP: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EM1: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EM10: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EM100: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EP1: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EP10: result = self.DrawPicByEpoch(*args, **kwargs) elif sender == self.ui.pushButton_EP100: result = self.DrawPicByEpoch(*args, **kwargs) except Exception as e: return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc()) if result is None: return Result().failure(info=Constants.DRAWING_FAILURE) else: return result def __slot_btn_input__(self): PublicFunc.__disableAllButton__(self, ButtonState) self.data = Data() # 导入数据 PublicFunc.progressbar_update(self, 1, 3, Constants.INPUTTING_DATA, 0) result = self.data.open_file() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) orgBcg_seconds = round(self.data.raw_orgBcg.shape[0] / Config["InputConfig"]["orgBcgFreq"]) PSG_seconds = round(self.data.raw_Tho.shape[0] / Config["InputConfig"]["ThoFreq"]) Config.update({ "orgBcg_seconds": orgBcg_seconds, "PSG_seconds": PSG_seconds }) self.ui.label_orgBcg_length.setText(str(orgBcg_seconds)) self.ui.label_PSG_length.setText(str(PSG_seconds)) self.__reset__() PublicFunc.finish_operation(self, ButtonState) def __slot_btn_save__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 保存对齐信息 PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0) epoch = self.ui.spinBox_SelectEpoch.value() result = self.data.save(epoch) if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_Standardize__(self): PublicFunc.__disableAllButton__(self, ButtonState) Config["RawSignal"] = self.ui.checkBox_RawSignal.isChecked() Config["orgBcgConfig"].update({ "orgBcgDelBase": self.ui.checkBox_orgBcgDelBase.isChecked(), "orgBcgZScore": self.ui.checkBox_orgBcgZScore.isChecked() }) Config["PSGConfig"].update({ "PSGDelBase": self.ui.checkBox_PSGDelBase.isChecked(), "PSGZScore": self.ui.checkBox_PSGZScore.isChecked() }) if Config["RawSignal"]: # 仅重采样 PublicFunc.progressbar_update(self, 1, 1, Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLING, 0) result = self.data.Standardize_0() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) else: # 呼吸提取 PublicFunc.progressbar_update(self, 1, 5, Constants.APPROXIMATELY_RESP_GETTING, 0) result = self.data.Standardize_1() if not result.status: PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_INFO) # 预重采样 PublicFunc.progressbar_update(self, 2, 5, Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLING, 20) result = self.data.Standardize_2() if not result.status: PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_INFO) # 去基线 PublicFunc.progressbar_update(self, 3, 5, Constants.APPROXIMATELY_DELETING_BASE, 40) result = self.data.Standardize_3() if not result.status: PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_INFO) # 标准化 PublicFunc.progressbar_update(self, 4, 5, Constants.APPROXIMATELY_STANDARDIZING, 60) result = self.data.Standardize_4() if not result.status: PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_INFO) # 重采样 PublicFunc.progressbar_update(self, 5, 5, Constants.APPROXIMATELY_ALIGN_RESAMPLING, 80) result = self.data.Standardize_5() if not result.status: PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_INFO) self.__plot__() ButtonState["Current"]["pushButton_CutOff"] = True PublicFunc.finish_operation(self, ButtonState) def __slot_btn_CutOff__(self): PublicFunc.__disableAllButton__(self, ButtonState) Config["orgBcgConfig"].update({"PreA": self.ui.spinBox_orgBcgPreA.value(), "PreCut": self.ui.spinBox_orgBcgPreCut.value(), "PostCut": self.ui.spinBox_orgBcgPostCut.value()}) Config["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(), "PreCut": self.ui.spinBox_PSGPreCut.value(), "PostCut": self.ui.spinBox_PSGPostCut.value()}) PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"]["pushButton_GetPos"] = True PublicFunc.finish_operation(self, ButtonState) def __slot_btn_GetPosition__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 计算互相关1/2 PublicFunc.progressbar_update(self, 1, 4, Constants.APPROXIMATELY_CORRELATION_CALCULATING1, 0) result1 = self.data.calculate_correlation1() if not result1.status: PublicFunc.text_output(self.ui, "(1/4)" + result1.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result1.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/4)" + result1.info, Constants.TIPS_TYPE_INFO) # 计算互相关2/2 PublicFunc.progressbar_update(self, 2, 4, Constants.APPROXIMATELY_CORRELATION_CALCULATING2, 25) result2 = self.data.calculate_correlation2() if not result2.status: PublicFunc.text_output(self.ui, "(2/4)" + result2.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result2.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/4)" + result2.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 3, 4, Constants.DRAWING_DATA, 50) result = self.__plot__(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"], result2.data["abd_relate2"]) if not result.status: PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_INFO) # 计算最大值位置 PublicFunc.progressbar_update(self, 4, 4, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 90) result = self.data.calculate_maxvalue_pos(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"], result2.data["abd_relate2"]) if not result.status: PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_INFO) self.ui.radioButton_PABD.setText(str(result.data["abd_max"] + result.data["bias"])) self.ui.radioButton_PTHO.setText(str(result.data["tho_max"] + result.data["bias"])) self.ui.radioButton_NABD.setText(str(result.data["abd_max2"] + result.data["bias"])) self.ui.radioButton_NTHO.setText(str(result.data["tho_max2"] + result.data["bias"])) ButtonState["Current"]["radioButton_PTHO"] = True ButtonState["Current"]["radioButton_PABD"] = True ButtonState["Current"]["radioButton_NTHO"] = True ButtonState["Current"]["radioButton_NABD"] = True ButtonState["Current"]["radioButton_custom"] = True # self.ui.groupBox_align_position.setEnabled(True) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_changeview__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 绘图 PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def __slot_btn_jump__(self): PublicFunc.__disableAllButton__(self, ButtonState) epoch = self.ui.spinBox_SelectEpoch.value() # 绘图 PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0) result = self.__plot__(epoch=epoch) if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def __EpochChange__(self): # 获取当前值 value = self.ui.spinBox_SelectEpoch.value() # 判断按键 if self.sender() == self.ui.pushButton_EM1: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 1) elif self.sender() == self.ui.pushButton_EM10: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 10) elif self.sender() == self.ui.pushButton_EM100: epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 100) elif self.sender() == self.ui.pushButton_EP1: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 1) elif self.sender() == self.ui.pushButton_EP10: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 10) elif self.sender() == self.ui.pushButton_EP100: epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 100) else: return self.ui.spinBox_SelectEpoch.setValue(epoch) QApplication.processEvents() self.__slot_btn_jump__() def __enableAlign__(self): PublicFunc.__disableAllButton__(self, ButtonState) if self.ui.radioButton_NTHO.isChecked(): relate = int(self.ui.radioButton_NTHO.text()) reverse = -1 elif self.ui.radioButton_PTHO.isChecked(): relate = int(self.ui.radioButton_PTHO.text()) reverse = 1 elif self.ui.radioButton_PABD.isChecked(): relate = int(self.ui.radioButton_PABD.text()) reverse = 1 elif self.ui.radioButton_NABD.isChecked(): relate = int(self.ui.radioButton_NABD.text()) reverse = -1 elif self.ui.radioButton_custom.isChecked(): relate = int(self.ui.spinBox_custom.value()) reverse = 1 else: return # 最大相关系数值相对于PSG的位置 Config.update({"pos": relate}) # 设置epoch上下限 Config.update({"orgBcg_reverse": reverse}) PublicFunc.text_output(self.ui, "相对位置:{}".format(Config["pos"]), Constants.TIPS_TYPE_INFO) # 获取epoch PublicFunc.progressbar_update(self, 1, 2, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 0) result3 = self.data.get_epoch() if not result3.status: PublicFunc.text_output(self.ui, "(1/2)" + result3.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result3.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result3.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 30) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) self.ui.spinBox_SelectEpoch.setMinimum(result3.data["epoch_min"]) self.ui.spinBox_SelectEpoch.setMaximum(result3.data["epoch_max"]) self.ui.spinBox_SelectEpoch.setValue(result3.data["epoch_min"]) self.ui.spinBox_SelectEpoch.setToolTip( "最小值:{}\n最大值:{}".format(result3.data["epoch_min"], result3.data["epoch_max"])) self.ui.spinBox_SelectEpoch.setEnabled(True) ButtonState["Current"]["pushButton_JUMP"] = True ButtonState["Current"]["pushButton_EM1"] = True ButtonState["Current"]["pushButton_EM10"] = True ButtonState["Current"]["pushButton_EM100"] = True ButtonState["Current"]["pushButton_EP1"] = True ButtonState["Current"]["pushButton_EP10"] = True ButtonState["Current"]["pushButton_EP100"] = True ButtonState["Current"]["pushButton_save"] = True ButtonState["Current"]["pushButton_ChangeView"] = True PublicFunc.finish_operation(self, ButtonState) def DrawPicRawOverview(self): try: max_x = max(self.data.processed_downsample_Tho.shape[0], self.data.processed_downsample_Abd.shape[0], self.data.processed_downsample_orgBcg.shape[0]) ax1 = self.fig.add_subplot(311) ax1.plot(self.data.processed_downsample_Tho, color='blue') ax1.set_xlim(0, max_x) ax1.set_title("THO") ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1) ax2.plot(self.data.processed_downsample_orgBcg, color='blue') ax2.set_xlim(0, max_x) ax2.set_title("orgBcg") ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1) ax3.plot(self.data.processed_downsample_Abd, color='blue') ax3.set_xlim(0, max_x) ax3.set_title("ABD") self.fig.canvas.draw() except Exception as e: return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc()) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawPicOverviewWithCutOff(self): try: max_x = max(self.data.processed_downsample_Tho.shape[0] + Config["PSGConfig"]["PreA"], self.data.processed_downsample_orgBcg.shape[0] + Config["orgBcgConfig"]["PreA"]) min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"], 0) ax1 = self.fig.add_subplot(311) ax1.plot( linspace(Config["PSGConfig"]["PreA"], len(self.data.processed_downsample_Tho) + Config["PSGConfig"]["PreA"], len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue') # 绘制x = PreCut的线 和 x = PostCut的虚线 ax1.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax1.axvline( x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][ "PreA"], color='red', linestyle='--') ax1.set_xlim(min_x, max_x) ax1.set_title("THO") ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1) ax2.plot( linspace(Config["orgBcgConfig"]["PreA"], len(self.data.processed_downsample_orgBcg) + Config["orgBcgConfig"]["PreA"], len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg, color='blue') ax2.axvline(x=Config["orgBcgConfig"]["PreCut"] + Config["orgBcgConfig"]["PreA"], color='red', linestyle='--') ax2.axvline( x=len(self.data.processed_downsample_orgBcg) - Config["orgBcgConfig"]["PostCut"] + Config["orgBcgConfig"]["PreA"], color='red', linestyle='--') ax2.set_xlim(min_x, max_x) ax2.set_title("orgBcg") ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1) ax3.plot( linspace(Config["PSGConfig"]["PreA"], len(self.data.processed_downsample_Abd) + Config["PSGConfig"]["PreA"], len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue') ax3.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red', linestyle='--') ax3.axvline( x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][ "PreA"], color='red', linestyle='--') ax3.set_xlim(min_x, max_x) ax3.set_title("ABD") self.fig.canvas.draw() except Exception as e: return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc()) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawPicCorrelate(self, tho_pxx, tho_nxx, abd_pxx, abd_nxx): try: ax1 = self.fig.add_subplot(221) ax1.plot(tho_pxx, color='blue') ax1.set_title("The Correlation of THO and orgBcg") ax2 = self.fig.add_subplot(222, sharex=ax1, sharey=ax1) ax2.plot(tho_nxx, color='blue') ax2.set_title("The Correlation of THO and Reverse orgBcg") ax3 = self.fig.add_subplot(223, sharex=ax1, sharey=ax1) ax3.plot(abd_pxx, color='blue') ax3.set_title("The Correlation of ABD and orgBcg") ax4 = self.fig.add_subplot(224, sharex=ax1, sharey=ax1) ax4.plot(abd_nxx, color='blue') ax4.set_title("The Correlation of ABD and Reverse orgBcg") self.fig.canvas.draw() except Exception as e: return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc()) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawPicTryAlign(self): try: max_x = max(self.data.processed_downsample_Tho.shape[0], self.data.processed_downsample_orgBcg.shape[0] + Config["pos"]) min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"] + Config["pos"], 0) ax1 = self.fig.add_subplot(311) ax1.plot( linspace(0, len(self.data.processed_downsample_Tho), len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue') # 绘制x = PreCut的线 和 x = PostCut的虚线 ax1.set_xlim(min_x, max_x) ax1.set_title("THO") ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1) ax2.plot(linspace(Config["pos"], len(self.data.processed_downsample_orgBcg) + Config["pos"], len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg, color='blue') ax2.set_xlim(min_x, max_x) ax2.set_title("orgBcg") ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1) ax3.plot( linspace(0, len(self.data.processed_downsample_Abd), len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue') ax3.set_xlim(min_x, max_x) ax3.set_title("ABD") self.fig.canvas.draw() except Exception as e: return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc()) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawPicByEpoch(self, epoch): try: PSG_SP = epoch * 30 * Config["ApplyFrequency"] PSG_EP = (epoch + 6) * 30 * Config["ApplyFrequency"] orgBcg_SP = PSG_SP - Config["pos"] orgBcg_EP = PSG_EP - Config["pos"] tho_seg = self.data.processed_downsample_Tho[PSG_SP:PSG_EP] orgBcg_seg = self.data.processed_downsample_orgBcg[orgBcg_SP:orgBcg_EP] * Config["orgBcg_reverse"] abd_seg = self.data.processed_downsample_Abd[PSG_SP:PSG_EP] # 根据PSG来和绘制 ax1 = self.fig.add_subplot(321) ax1.plot(linspace(PSG_SP, PSG_EP, len(tho_seg)), tho_seg) tho_peaks, _ = find_peaks(tho_seg, prominence=0, distance=3 * Config["ApplyFrequency"]) ax1.plot(linspace(PSG_SP, PSG_EP, len(tho_seg))[tho_peaks], tho_seg[tho_peaks], "x") ax3 = self.fig.add_subplot(323) ax3.plot(linspace(orgBcg_SP, orgBcg_EP, len(orgBcg_seg)), orgBcg_seg) orgBcg_peaks, _ = find_peaks(orgBcg_seg, prominence=0, distance=3 * Config["ApplyFrequency"]) ax3.plot(linspace(orgBcg_SP, orgBcg_EP, len(orgBcg_seg))[orgBcg_peaks], orgBcg_seg[orgBcg_peaks], "x") ax2 = self.fig.add_subplot(325) ax2.plot(linspace(PSG_SP, PSG_EP, len(abd_seg)), abd_seg) abd_peaks, _ = find_peaks(abd_seg, prominence=0, distance=3 * Config["ApplyFrequency"]) ax2.plot(linspace(PSG_SP, PSG_EP, len(abd_seg))[abd_peaks], abd_seg[abd_peaks], "x") # 绘制间期 ax4 = self.fig.add_subplot(322) ax4.plot(linspace(PSG_SP, PSG_EP, len(diff(tho_peaks).repeat(Config["ApplyFrequency"]))), diff(tho_peaks).repeat(Config["ApplyFrequency"]), alpha=0.5, label="tho") ax4.plot(linspace(PSG_SP, PSG_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))), diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp") ax4.set_title("tho_interval") ax4.legend() ax4.set_ylim((10, 50)) ax5 = self.fig.add_subplot(324) ax5.plot(linspace(orgBcg_SP, orgBcg_EP, len(diff(tho_peaks).repeat(Config["ApplyFrequency"]))), diff(tho_peaks).repeat(Config["ApplyFrequency"])) ax5.plot(linspace(orgBcg_SP, orgBcg_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))), diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp") ax5.set_title("resp_interval") ax5.set_ylim((10, 50)) ax6 = self.fig.add_subplot(326) ax6.plot(linspace(PSG_SP, PSG_EP, len(diff(abd_peaks).repeat(Config["ApplyFrequency"]))), diff(abd_peaks).repeat(Config["ApplyFrequency"])) ax6.plot(linspace(PSG_SP, PSG_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))), diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp") ax6.set_title("abd_interval") ax6.set_ylim((10, 50)) self.fig.canvas.draw() except Exception as e: return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc()) # 返回图片以便存到QPixImage return Result().success(info=Constants.DRAWING_FINISHED) def DrawAlignScatter(self): try: response = self.data.get_corr_by_epoch() epoch_min = response.data["epoch_min"] epoch_max = response.data["epoch_max"] tho_bias_list = response.data["tho_bias_list"] abd_bias_list = response.data["abd_bias_list"] ax1 = self.fig.add_subplot(211) ax1.scatter(linspace(epoch_min, epoch_max, len(tho_bias_list)), tho_bias_list, alpha=0.2) ax1.set_title("THO") ax2 = self.fig.add_subplot(212) ax2.scatter(linspace(epoch_min, epoch_max, len(abd_bias_list)), abd_bias_list, alpha=0.2) ax2.set_title("ABD") self.fig.canvas.draw() return Result().success(info=Constants.DRAWING_FINISHED) except Exception as e: return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc()) class Data: def __init__(self): self.raw_orgBcg = None self.raw_Tho = None self.raw_Abd = None self.processed_orgBcg = None self.processed_Tho = None self.processed_Abd = None self.processed_downsample_orgBcg = None self.processed_downsample_Tho = None self.processed_downsample_Abd = None self.relate_list = None self.relate_point = None def open_file(self): if Path(Config["Path"]["Input_orgBcg"]).is_file(): Config["Path"]["Input_orgBcg"] = str(Path(Config["Path"]["Input_orgBcg"]).parent) if Path(Config["Path"]["Input_Tho"]).is_file(): Config["Path"]["Input_Tho"] = str(Path(Config["Path"]["Input_Tho"]).parent) if Path(Config["Path"]["Input_Abd"]).is_file(): Config["Path"]["Input_Abd"] = str(Path(Config["Path"]["Input_Abd"]).parent) result = PublicFunc.examine_file(Config["Path"]["Input_orgBcg"], ConfigParams.ORGBCG_RAW) if result.status: Config["Path"]["Input_orgBcg"] = result.data["path"] Config["InputConfig"]["orgBcgFreq"] = result.data["freq"] else: return result result = PublicFunc.examine_file(Config["Path"]["Input_Tho"], ConfigParams.THO_RAW) if result.status: Config["Path"]["Input_Tho"] = result.data["path"] Config["InputConfig"]["ThoFreq"] = result.data["freq"] else: return result result = PublicFunc.examine_file(Config["Path"]["Input_Abd"], ConfigParams.ABD_RAW) if result.status: Config["Path"]["Input_Abd"] = result.data["path"] Config["InputConfig"]["AbdFreq"] = result.data["freq"] else: return result try: self.raw_orgBcg = read_csv(Config["Path"]["Input_orgBcg"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.raw_Tho = read_csv(Config["Path"]["Input_Tho"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) self.raw_Abd = read_csv(Config["Path"]["Input_Abd"], encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) except Exception as e: return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON[ "Open_Data_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.INPUT_FINISHED) def save(self, epoch): try: pos = Config["pos"] ApplyFrequency = Config["ApplyFrequency"] # 保存到csv中 df = DataFrame({"pos": [pos], "epoch": [epoch], "ApplyFrequency": [ApplyFrequency]}) df.to_csv(Path(Config["Path"]["Save"]), mode="w", header=True, index=False) except Exception as e: return Result().failure(info=Constants.SAVING_FAILURE + Constants.FAILURE_REASON[ "Save_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.SAVING_FINISHED) def Standardize_0(self): # 仅重采样 if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None: return Result().failure( info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[ "Data_Not_Exist"]) try: # 按照秒数进行截断 self.raw_orgBcg = self.raw_orgBcg[:int(Config["orgBcg_seconds"] * Config["InputConfig"]["orgBcgFreq"])] self.raw_Tho = self.raw_Tho[:int(Config["PSG_seconds"] * Config["InputConfig"]["ThoFreq"])] self.raw_Abd = self.raw_Abd[:int(Config["PSG_seconds"] * Config["InputConfig"]["AbdFreq"])] self.processed_orgBcg = resample(self.raw_orgBcg, int(Config["orgBcg_seconds"] * Config["ApplyFrequency"])) self.processed_Tho = resample(self.raw_Tho, int(Config["PSG_seconds"] * Config["ApplyFrequency"])) self.processed_Abd = resample(self.raw_Abd, int(Config["PSG_seconds"] * Config["ApplyFrequency"])) except Exception as e: return Result().failure( info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[ "Only_Resample_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FINISHED) def Standardize_1(self): # 呼吸提取 def butter_bandpass_filter(data, lowCut, highCut, fs, order): low = lowCut / (fs * 0.5) high = highCut / (fs * 0.5) sos = butter(order, [low, high], btype="bandpass", output='sos') return sosfiltfilt(sos, data) if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None: return Result().failure( info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[ "Data_Not_Exist"]) try: # 滤波 self.processed_orgBcg = butter_bandpass_filter( self.raw_orgBcg, Config["Filter"]["BandPassLow"], Config["Filter"]["BandPassHigh"], Config["InputConfig"]["orgBcgFreq"], Config["Filter"]["BandPassOrder"]) self.processed_Tho = butter_bandpass_filter( self.raw_Tho, Config["Filter"]["BandPassLow"], Config["Filter"]["BandPassHigh"], Config["InputConfig"]["ThoFreq"], Config["Filter"]["BandPassOrder"]) self.processed_Abd = butter_bandpass_filter( self.raw_Abd, Config["Filter"]["BandPassLow"], Config["Filter"]["BandPassHigh"], Config["InputConfig"]["AbdFreq"], Config["Filter"]["BandPassOrder"]) except Exception as e: return Result().failure( info=Constants.APPROXIMATELY_RESP_GET_FAILURE + Constants.FAILURE_REASON[ "Resp_Get_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_RESP_GET_FINISHED) def Standardize_2(self): # 预重采样 try: # TODO:这里的采样率处理,如果THO和ABD的采样率不同,可能还是会导致之后的ApplyFrequency出问题,最后导致得到的粗同步坐标不正确 # if Config["InputConfig"]["ThoFreq"] != Config["TempFrequency"]: print(int(Config["InputConfig"]["ThoFreq"]), int(Config["TempFrequency"])) self.processed_Tho = resample(self.processed_Tho, int(Config["PSG_seconds"] * Config["TempFrequency"])) if Config["InputConfig"]["AbdFreq"] != Config["TempFrequency"]: self.processed_Abd = resample(self.processed_Abd, int(Config["PSG_seconds"] * Config["TempFrequency"])) if Config["InputConfig"]["orgBcgFreq"] != Config["TempFrequency"]: self.processed_orgBcg = resample(self.processed_orgBcg, int(Config["orgBcg_seconds"] * Config["TempFrequency"])) except Exception as e: return Result().failure( info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[ "Pre_Resample_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FINISHED) def Standardize_3(self): # 去基线 try: temp_frequency = Config["TempFrequency"] if Config["PSGConfig"]["PSGDelBase"]: # 减去四秒钟平均滤波 self.processed_Tho = self.processed_Tho - convolve( self.processed_Tho, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same') self.processed_Abd = self.processed_Abd - convolve( self.processed_Abd, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same') if Config["orgBcgConfig"]["orgBcgDelBase"]: self.processed_orgBcg = self.processed_orgBcg - convolve( self.processed_orgBcg, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same') except Exception as e: return Result().failure( info=Constants.APPROXIMATELY_DELETE_BASE_FAILURE + Constants.FAILURE_REASON[ "Delete_Base_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_DELETE_BASE_FINISHED) def Standardize_4(self): # 标准化 try: # 判断是否标准化 if Config["PSGConfig"]["PSGZScore"]: self.processed_Tho = (self.processed_Tho - mean(self.processed_Tho)) / std(self.processed_Tho) self.processed_Abd = (self.processed_Abd - mean(self.processed_Abd)) / std(self.processed_Abd) if Config["orgBcgConfig"]["orgBcgZScore"]: self.processed_orgBcg = (self.processed_orgBcg - mean(self.processed_orgBcg)) / std( self.processed_orgBcg) except Exception as e: return Result().failure( info=Constants.APPROXIMATELY_STANDARDIZE_FAILURE + Constants.FAILURE_REASON[ "Standardize_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_STANDARDIZE_FINISHED) def Standardize_5(self): # 重采样 try: # 用[::]完成 temp_frequency = Config["TempFrequency"] self.processed_downsample_Tho = self.processed_Tho[::int(temp_frequency / Config["ApplyFrequency"])] self.processed_downsample_Abd = self.processed_Abd[::int(temp_frequency / Config["ApplyFrequency"])] self.processed_downsample_orgBcg = self.processed_orgBcg[::int(temp_frequency / Config["ApplyFrequency"])] except Exception as e: return Result().failure( info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[ "Resample_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FINISHED) def calculate_correlation1(self): # 计算互相关1/2 try: # 计算因子 MULTIPLE_FACTOER = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["Multiple_Factor"] a = self.processed_downsample_Tho[ Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Tho) - Config["PSGConfig"][ "PostCut"]].copy() v = self.processed_downsample_orgBcg[ Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][ "PostCut"]].copy() a *= MULTIPLE_FACTOER v *= MULTIPLE_FACTOER a = a.astype(int64) v = v.astype(int64) tho_relate = correlate(a, v, mode='full') tho_relate = tho_relate / (MULTIPLE_FACTOER ** 2) tho_relate2 = - tho_relate result = {"tho_relate": tho_relate, "tho_relate2": tho_relate2} except Exception as e: return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FAILURE + Constants.FAILURE_REASON[ "Calculate_Correlation1_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FINISHED, data=result) def calculate_correlation2(self): # 计算互相关2/2 try: a = self.processed_downsample_Abd[ Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Abd) - Config["PSGConfig"][ "PostCut"]].copy() v = self.processed_downsample_orgBcg[ Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][ "PostCut"]].copy() a *= 100 v *= 100 a = a.astype(int64) v = v.astype(int64) abd_relate = correlate(a, v, mode='full') abd_relate = abd_relate / 10000 abd_relate2 = - abd_relate result = {"abd_relate": abd_relate, "abd_relate2": abd_relate2} except Exception as e: return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FAILURE + Constants.FAILURE_REASON[ "Calculate_Correlation2_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FINISHED, data=result) def calculate_maxvalue_pos(self, tho_relate, tho_relate2, abd_relate, abd_relate2): # 计算最大值位置 try: tho_max = argmax(tho_relate) tho_max2 = argmax(tho_relate2) abd_max = argmax(abd_relate) abd_max2 = argmax(abd_relate2) pre = Config["PSGConfig"]["PreCut"] + Config["orgBcgConfig"]["PostCut"] bias = pre - len(self.processed_downsample_orgBcg) + 1 result = {"tho_max": tho_max, "tho_max2": tho_max2, "abd_max": abd_max, "abd_max2": abd_max2, "bias": bias} except Exception as e: return Result().failure(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FAILURE + Constants.FAILURE_REASON[ "Calculate_Maxvalue_Pos_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FINISHED, data=result) def get_epoch(self): # 获取epoch try: epoch_min = max(0, Config["pos"] // 30 // Config["ApplyFrequency"] + 1) epoch_max = min(len(self.processed_downsample_Tho) // 30 // Config["ApplyFrequency"] - 1, (len(self.processed_downsample_orgBcg) + Config["pos"]) // 30 // Config[ "ApplyFrequency"] - 1) result = {"epoch_min": epoch_min, "epoch_max": epoch_max} except Exception as e: return Result().failure( info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.FAILURE_REASON[ "Get_Epoch_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result) def get_corr_by_epoch(self): # 获取相关系数 try: # 获取 epoch 区间 response = self.get_epoch() if not response.status: raise Exception(response.info) epoch_min = response.data["epoch_min"] epoch_max = response.data["epoch_max"] temp_freq = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["TempFrequency"] window_epoch = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["CorrByEpoch"]["window_epoch"] tho_bias_list = [] abd_bias_list = [] # pos采样率转换 pos = Config["pos"] * temp_freq // Config["ApplyFrequency"] for epoch in range(epoch_min, epoch_max): SP = epoch * 30 * temp_freq EP = (epoch + window_epoch) * 30 * temp_freq tho_seg = self.processed_Tho[SP:EP] abd_seg = self.processed_Abd[SP:EP] orgBcg_seg = self.processed_orgBcg[SP - pos:EP - pos] * Config["orgBcg_reverse"] tho_relate_seg = correlate(tho_seg, orgBcg_seg, mode='full') abd_relate_seg = correlate(abd_seg, orgBcg_seg, mode='full') tho_seg_pos = argmax(tho_relate_seg) - len(orgBcg_seg) abd_seg_pos = argmax(abd_relate_seg) - len(orgBcg_seg) tho_bias_list.append(tho_seg_pos // temp_freq) abd_bias_list.append(abd_seg_pos // temp_freq) result = { "tho_bias_list": tho_bias_list, "abd_bias_list": abd_bias_list, "epoch_min": epoch_min, "epoch_max": epoch_max } except Exception as e: return Result().failure( info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.FAILURE_REASON[ "Get_Corr_By_Epoch_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)