from gc import collect from pathlib import Path from traceback import format_exc import matplotlib.pyplot as plt from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication from matplotlib import gridspec from matplotlib.backends.backend_qt import NavigationToolbar2QT from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg from overrides import overrides from pandas import read_csv, DataFrame from scipy.signal import resample from yaml import dump, load, FullLoader from func.utils.ConfigParams import Filename, Params from func.utils.PublicFunc import PublicFunc from func.utils.Constants import Constants from func.Filters.Preprocessing import Butterworth_for_BCG_PreProcess, Butterworth_for_ECG_PreProcess from func.utils.Result import Result from ui.MainWindow.MainWindow_preprocess import Ui_MainWindow_preprocess from ui.setting.preprocess_input_setting import Ui_MainWindow_preprocess_input_setting Config = { } ButtonState = { "Default": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_view": False, "pushButton_save": False }, "Current": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_view": False, "pushButton_save": False } } class SettingWindow(QMainWindow): def __init__(self, mode, root_path, sampID): super(SettingWindow, self).__init__() self.ui = Ui_MainWindow_preprocess_input_setting() self.ui.setupUi(self) self.mode = mode self.root_path = root_path self.sampID = sampID self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) self.config = None self.__read_config__() self.__examine_freq__() self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__) self.ui.spinBox_output_freq.valueChanged.connect(self.__update_ui__) self.ui.pushButton_confirm.clicked.connect(self.__write_config__) self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__) self.ui.pushButton_cancel.clicked.connect(self.close) def __read_config__(self): if not Path(Params.PREPROCESS_CONFIG_FILE_PATH).exists(): with open(Params.PREPROCESS_CONFIG_FILE_PATH, "w") as f: dump(Params.PREPROCESS_CONFIG_NEW_CONTENT, f) with open(Params.PREPROCESS_CONFIG_FILE_PATH, "r") as f: file_config = load(f.read(), Loader=FullLoader) Config.update(file_config) self.config = file_config if self.mode == "BCG": Config.update({ "Path": { "Input": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Save": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)))) }, "Mode": self.mode }) elif self.mode == "ECG": Config.update({ "Path": { "Input": str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)))), "Save": str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)))) }, "Mode": self.mode }) else: raise ValueError("模式不存在") # 数据回显 self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"]) self.ui.spinBox_output_freq.setValue(Config["OutputConfig"]["Freq"]) self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"]) self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"]) def __write_config__(self): # 从界面写入配置 Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value() Config["OutputConfig"]["Freq"] = self.ui.spinBox_output_freq.value() Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText() Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText() # 保存配置到文件 self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value() self.config["OutputConfig"]["Freq"] = self.ui.spinBox_output_freq.value() with open(Params.PREPROCESS_CONFIG_FILE_PATH, "w") as f: dump(self.config, f) self.close() def __rollback_config__(self): self.__read_config__() def __update_ui__(self): if self.mode == "BCG": self.ui.plainTextEdit_file_path_input.setPlainText( str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(Filename.ORGBCG_RAW + str(self.ui.spinBox_input_freq.value()) + Params.ENDSWITH_TXT)))) elif self.mode == "ECG": self.ui.plainTextEdit_file_path_input.setPlainText( str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)) / Path(Filename.ECG_RAW + str(self.ui.spinBox_input_freq.value()) + Params.ENDSWITH_TXT)))) else: raise ValueError("模式不存在") def __examine_freq__(self): if Config["Mode"] == "BCG": signal = Filename.ORGBCG_RAW elif Config["Mode"] == "ECG": signal = Filename.ECG_RAW else: raise ValueError("模式不存在") if Path(Config["Path"]["Input"]).is_file(): Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent) result = PublicFunc.examine_file(Config["Path"]["Input"], signal, Params.ENDSWITH_TXT) if result.status: Config["InputConfig"]["Freq"] = result.data["freq"] else: PublicFunc.msgbox_output(self, signal + Constants.FAILURE_REASON["Get_Freq_Not_Correct"], Constants.MSGBOX_TYPE_ERROR) # 数据回显 self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"]) class MainWindow_preprocess(QMainWindow): def __init__(self): super(MainWindow_preprocess, self).__init__() self.ui = Ui_MainWindow_preprocess() self.ui.setupUi(self) self.mode = None self.root_path = None self.sampID = None self.data = None self.setting = None # 初始化进度条 self.progressbar = None PublicFunc.add_progressbar(self) #初始化画框 self.fig = None self.canvas = None self.figToolbar = None self.gs = None self.ax0 = None self.ui.textBrowser_info.setStyleSheet("QTextBrowser { background-color: rgb(255, 255, 200); }") PublicFunc.__styleAllButton__(self, ButtonState) self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) @overrides def show(self, mode, root_path, sampID): super().show() self.mode = mode self.root_path = root_path self.sampID = sampID self.setting = SettingWindow(mode, root_path, sampID) # 初始化画框 self.fig = plt.figure(figsize=(12, 9), dpi=100) self.canvas = FigureCanvasQTAgg(self.fig) self.figToolbar = NavigationToolbar2QT(self.canvas) for action in self.figToolbar.actions(): if action.text() == "Subplots" or action.text() == "Customize": self.figToolbar.removeAction(action) self.ui.verticalLayout_canvas.addWidget(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.figToolbar) self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0) self.ax0 = self.fig.add_subplot(self.gs[0]) self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(Params.FORMATTER) PublicFunc.__resetAllButton__(self, ButtonState) self.ui.label_mode.setText(self.mode) if self.mode == "BCG": self.ui.spinBox_bandPassOrder.setValue(Config["Filter"]["BCGBandPassOrder"]) self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BCGBandPassLow"]) self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BCGBandPassHigh"]) elif self.mode == "ECG": self.ui.spinBox_bandPassOrder.setValue(Config["Filter"]["ECGBandPassOrder"]) self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["ECGBandPassLow"]) self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["ECGBandPassHigh"]) else: raise ValueError("模式不存在") self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__) self.ui.pushButton_input_setting.clicked.connect(self.setting.show) self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__) self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__) self.ui.spinBox_bandPassOrder.editingFinished.connect(self.__update_config__) self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__) self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__) @overrides def closeEvent(self, event): reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN)) QApplication.processEvents() # 清空画框 if self.ax0 is not None: self.ax0.clear() # 释放资源 del self.data self.fig.clf() plt.close(self.fig) self.deleteLater() collect() self.canvas = None event.accept() else: event.ignore() def __reset__(self): ButtonState["Current"].update(ButtonState["Default"].copy()) def __plot__(self): # 清空画框 self.reset_axes() sender = self.sender() if sender == self.ui.pushButton_view: self.ax0.plot(self.data.raw_data, color=Constants.PLOT_COLOR_RED, label=Constants.PREPROCESS_PLOT_LABEL_ORIGINAL_DATA) self.ax0.plot(self.data.processed_data + Constants.PREPROCESS_OUTPUT_INPUT_AMP_OFFSET, color=Constants.PLOT_COLOR_BLUE, label=Constants.PREPROCESS_PLOT_LABEL_PROCESSED_DATA) self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAW_FINISHED) else: self.canvas.draw() return Result().success(info=Constants.DRAW_FAILURE) def __update_config__(self): if self.mode == "BCG": Config["Filter"]["BCGBandPassOrder"] = self.ui.spinBox_bandPassOrder.value() Config["Filter"]["BCGBandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value() Config["Filter"]["BCGBandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value() elif self.mode == "ECG": Config["Filter"]["ECGBandPassOrder"] = self.ui.spinBox_bandPassOrder.value() Config["Filter"]["ECGBandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value() Config["Filter"]["ECGBandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value() else: raise ValueError("模式不存在") def __slot_btn_input__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 清空画框 self.reset_axes() self.canvas.draw() self.data = Data() # 导入数据 PublicFunc.progressbar_update(self, 1, 1, Constants.INPUTTING_DATA, 0) result = self.data.open_file() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) # 重采样 PublicFunc.progressbar_update(self, 2, 2, Constants.RESAMPLING_DATA, 0) result = self.data.resample() if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"]["pushButton_input_setting"] = False ButtonState["Current"]["pushButton_input"] = False ButtonState["Current"]["pushButton_view"] = True ButtonState["Current"]["pushButton_save"] = False PublicFunc.finish_operation(self, ButtonState) def __slot_btn_view__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 数据预处理 PublicFunc.progressbar_update(self, 1, 2, Constants.PREPROCESSING_DATA, 0) result = self.data.preprocess() if not result.status: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 50) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"]["pushButton_save"] = True PublicFunc.finish_operation(self, ButtonState) def __slot_btn_save__(self): reply = QMessageBox.question(self, Constants.QUESTION_TITLE, Constants.QUESTION_CONTENT + Config["Path"]["Save"], QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) # 保存 PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0) total_rows = len(DataFrame(self.data.processed_data.reshape(-1))) chunk_size = Params.PREPROCESS_SAVE_CHUNK_SIZE try: with open(Config["Path"]["Save"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.processed_data.reshape(-1)).iloc[start:end] result = self.data.save(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() except FileNotFoundError as e: result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"]) if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def reset_axes(self): if self.ax0 is not None: self.ax0.clear() self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(Params.FORMATTER) class Data: def __init__(self): self.raw_data = None self.processed_data = None def open_file(self): if Config["Mode"] == "BCG": signal = Filename.ORGBCG_RAW save = Filename.BCG_FILTER elif Config["Mode"] == "ECG": signal = Filename.ECG_RAW save = Filename.ECG_FILTER else: raise ValueError("模式不存在") if Path(Config["Path"]["Input"]).is_file(): Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent) result = PublicFunc.examine_file(Config["Path"]["Input"], signal, Params.ENDSWITH_TXT) if result.status: Config["Path"]["Input"] = result.data["path"] Config["InputConfig"]["Freq"] = result.data["freq"] Config["Path"]["Save"] = str( Path(Config["Path"]["Save"]) / Path(save + str(Config["OutputConfig"]["Freq"]) + Params.ENDSWITH_TXT)) else: return result try: self.raw_data = read_csv(Config["Path"]["Input"], encoding=Params.UTF8_ENCODING, header=None).to_numpy().reshape(-1) except Exception as e: return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.INPUT_FINISHED) def resample(self): if self.raw_data is None: return Result().failure(info=Constants.RESAMPLE_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: if Config["InputConfig"]["Freq"] != Config["OutputConfig"]["Freq"]: self.raw_data = resample(self.raw_data, int(len(self.raw_data) * (Config["OutputConfig"]["Freq"] / Config["InputConfig"]["Freq"]))) else: return Result().success(info=Constants.RESAMPLE_NO_NEED) except Exception as e: return Result().failure(info=Constants.RESAMPLE_FAILURE + Constants.FAILURE_REASON["Resample_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.RESAMPLE_FINISHED) def preprocess(self): if self.raw_data is None: return Result().failure(info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: if Config["Mode"] == "BCG": if Config["Filter"]["BCGBandPassOrder"] == 0: self.processed_data = self.raw_data else: if ((Config["Filter"]["BCGBandPassLow"] >= Config["Filter"]["BCGBandPassHigh"]) or (Config["Filter"]["BCGBandPassLow"] <= 0) or (Config["Filter"]["BCGBandPassHigh"] <= 0)): return Result().failure( info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Filter_Args_Not_Correct"]) self.processed_data = Butterworth_for_BCG_PreProcess(self.raw_data, type='bandpass', low_cut=Config["Filter"]["BCGBandPassLow"], high_cut=Config["Filter"]["BCGBandPassHigh"], order=Config["Filter"]["BCGBandPassOrder"], sample_rate=Config["OutputConfig"]["Freq"]) elif Config["Mode"] == "ECG": if Config["Filter"]["ECGBandPassOrder"] == 0: self.processed_data = self.raw_data else: if ((Config["Filter"]["ECGBandPassLow"] >= Config["Filter"]["ECGBandPassHigh"]) or (Config["Filter"]["ECGBandPassLow"] <= 0) or (Config["Filter"]["ECGBandPassHigh"] <= 0)): return Result().failure( info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Filter_Args_Not_Correct"]) self.processed_data = Butterworth_for_ECG_PreProcess(self.raw_data, type='bandpass', low_cut=Config["Filter"]["ECGBandPassLow"], high_cut=Config["Filter"]["ECGBandPassHigh"], order=Config["Filter"]["ECGBandPassOrder"], sample_rate=Config["OutputConfig"]["Freq"]) else: raise ValueError("模式不存在") except Exception as e: return Result().failure(info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Preprocess_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PREPROCESS_FINISHED) def save(self, chunk): if self.processed_data is None: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save"], mode='a', index=False, header=False, float_format='%.4f') except PermissionError as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"]) except Exception as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(Constants.SAVE_FINISHED)