from gc import collect from pathlib import Path from traceback import format_exc import matplotlib.pyplot as plt from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication from matplotlib import gridspec from matplotlib.backends.backend_qt import NavigationToolbar2QT from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg from numpy import column_stack, arange from overrides import overrides from pandas import read_csv, DataFrame from scipy.signal import resample from vispy import scene from vispy.scene import visuals from yaml import dump, load, FullLoader from func.utils.ConfigParams import Filename, Params from func.utils.PublicFunc import PublicFunc from func.utils.Constants import Constants from func.Filters.Preprocessing import Butterworth_for_BCG_PreProcess, Butterworth_for_ECG_PreProcess from func.utils.Result import Result from func.utils.FloatingImagePanel import add_floating_image_panel from ui.MainWindow.MainWindow_preprocess import Ui_MainWindow_preprocess from ui.setting.preprocess_input_setting import Ui_MainWindow_preprocess_input_setting Config = { } ButtonState = { "Default": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_view": False, "pushButton_save": False }, "Current": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_view": False, "pushButton_save": False } } class SettingWindow(QMainWindow): def __init__(self, mode, root_path, sampID): super(SettingWindow, self).__init__() self.ui = Ui_MainWindow_preprocess_input_setting() self.ui.setupUi(self) self.mode = mode self.root_path = root_path self.sampID = sampID self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) self.config = None self.__read_config__() self.__examine_freq__() self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__) self.ui.spinBox_output_freq.valueChanged.connect(self.__update_ui__) self.ui.pushButton_confirm.clicked.connect(self.__write_config__) self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__) self.ui.pushButton_cancel.clicked.connect(self.close) def __read_config__(self): if not Path(Params.PREPROCESS_CONFIG_FILE_PATH).exists(): with open(Params.PREPROCESS_CONFIG_FILE_PATH, "w") as f: dump(Params.PREPROCESS_CONFIG_NEW_CONTENT, f) with open(Params.PREPROCESS_CONFIG_FILE_PATH, "r") as f: file_config = load(f.read(), Loader=FullLoader) Config.update(file_config) self.config = file_config if self.mode == "BCG": Config.update({ "Path": { "Input": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)))), "Save": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)))) }, "Mode": self.mode }) elif self.mode == "ECG": Config.update({ "Path": { "Input": str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)))), "Save": str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)))) }, "Mode": self.mode }) else: raise ValueError("模式不存在") # 数据回显 self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"]) self.ui.spinBox_output_freq.setValue(Config["OutputConfig"]["Freq"]) self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"]) self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"]) def __write_config__(self): # 从界面写入配置 Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value() Config["OutputConfig"]["Freq"] = self.ui.spinBox_output_freq.value() Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText() Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText() # 保存配置到文件 self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value() self.config["OutputConfig"]["Freq"] = self.ui.spinBox_output_freq.value() with open(Params.PREPROCESS_CONFIG_FILE_PATH, "w") as f: dump(self.config, f) self.close() def __rollback_config__(self): self.__read_config__() def __update_ui__(self): if self.mode == "BCG": self.ui.plainTextEdit_file_path_input.setPlainText( str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(Filename.ORGBCG_RAW + str(self.ui.spinBox_input_freq.value()) + Params.ENDSWITH_TXT)))) elif self.mode == "ECG": self.ui.plainTextEdit_file_path_input.setPlainText( str((Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)) / Path(Filename.ECG_RAW + str(self.ui.spinBox_input_freq.value()) + Params.ENDSWITH_TXT)))) else: raise ValueError("模式不存在") def __examine_freq__(self): if Config["Mode"] == "BCG": signal = Filename.ORGBCG_RAW elif Config["Mode"] == "ECG": signal = Filename.ECG_RAW else: raise ValueError("模式不存在") if Path(Config["Path"]["Input"]).is_file(): Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent) result = PublicFunc.examine_file(Config["Path"]["Input"], signal, Params.ENDSWITH_TXT) if result.status: Config["InputConfig"]["Freq"] = result.data["freq"] else: PublicFunc.msgbox_output(self, signal + Constants.FAILURE_REASON["Get_Freq_Not_Correct"], Constants.MSGBOX_TYPE_ERROR) # 数据回显 self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"]) class MainWindow_preprocess(QMainWindow): def __init__(self, plot_mode): super(MainWindow_preprocess, self).__init__() self.ui = Ui_MainWindow_preprocess() self.ui.setupUi(self) self.mode = None self.root_path = None self.sampID = None self.data = None self.setting = None # 初始化进度条 self.progressbar = None PublicFunc.add_progressbar(self) self.image_overlay = add_floating_image_panel( self.centralWidget(), title="参考图例", img_path=r"image\legend_preprocess.png" ) self.plot_mode = plot_mode if self.plot_mode == "matplotlib": #初始化画框 self.fig = None self.canvas = None self.figToolbar = None self.gs = None self.ax0 = None elif self.plot_mode == "vispy": # 初始化画框 self.canvas = None self.view = None self.y_axis = None self.x_axis = None self.view = None self.grid_lines = None self.line_original = None self.line_processed = None self.ui.textBrowser_info.setStyleSheet("QTextBrowser { background-color: rgb(255, 255, 200); }") PublicFunc.__styleAllButton__(self, ButtonState) self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) @overrides def show(self, mode, root_path, sampID): super().show() self.mode = mode self.root_path = root_path self.sampID = sampID self.setting = SettingWindow(mode, root_path, sampID) if self.plot_mode == "matplotlib": # 初始化画框 self.fig = plt.figure(figsize=(12, 9), dpi=100) self.canvas = FigureCanvasQTAgg(self.fig) self.figToolbar = NavigationToolbar2QT(self.canvas) for action in self.figToolbar.actions(): if action.text() == "Subplots" or action.text() == "Customize": self.figToolbar.removeAction(action) self.ui.verticalLayout_canvas.addWidget(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.figToolbar) self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0) self.ax0 = self.fig.add_subplot(self.gs[0]) self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(Params.FORMATTER) elif self.plot_mode == "vispy": # 初始化画框 self.canvas = scene.SceneCanvas(keys='interactive', show=True, bgcolor='white') self.ui.verticalLayout_canvas.addWidget(self.canvas.native) self.main_grid = self.canvas.central_widget.add_grid(spacing=0) self.y_axis = scene.AxisWidget(orientation='left', axis_color='black', tick_color='black', text_color='black', axis_font_size=4, tick_font_size=4, tick_label_margin=20) self.y_axis.width_max = 80 self.x_axis = scene.AxisWidget(orientation='bottom', axis_color='black', tick_color='black', text_color='black', axis_font_size=4, tick_font_size=4, tick_label_margin=20) self.x_axis.height_max = 40 self.view = self.main_grid.add_view(row=0, col=1, camera='panzoom') self.main_grid.add_widget(self.y_axis, row=0, col=0) self.main_grid.add_widget(self.x_axis, row=1, col=1) self.x_axis.link_view(self.view) self.y_axis.link_view(self.view) self.grid_lines = visuals.GridLines(color=(0.3, 0.3, 0.3, 0.5), parent=self.view.scene) self.grid_lines.set_gl_state(line_width=1) self.grid_lines.order = 100 PublicFunc.__resetAllButton__(self, ButtonState) self.ui.label_mode.setText(self.mode) if self.mode == "BCG": self.ui.spinBox_bandPassOrder.setValue(Config["Filter"]["BCGBandPassOrder"]) self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BCGBandPassLow"]) self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BCGBandPassHigh"]) elif self.mode == "ECG": self.ui.spinBox_bandPassOrder.setValue(Config["Filter"]["ECGBandPassOrder"]) self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["ECGBandPassLow"]) self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["ECGBandPassHigh"]) else: raise ValueError("模式不存在") self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__) self.ui.pushButton_input_setting.clicked.connect(self.setting.show) self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__) self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__) self.ui.spinBox_bandPassOrder.editingFinished.connect(self.__update_config__) self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__) self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__) @overrides def closeEvent(self, event): reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN)) QApplication.processEvents() # 清空画框 if self.plot_mode == "matplotlib": if self.ax0 is not None: self.ax0.clear() self.fig.clf() plt.close(self.fig) elif self.plot_mode == "vispy": self.canvas.close() # 释放资源 self.setting.close() del self.data self.deleteLater() collect() self.canvas = None event.accept() else: event.ignore() def __reset__(self): ButtonState["Current"].update(ButtonState["Default"].copy()) def __plot__(self): # 清空画框 if self.plot_mode == "matplotlib": self.reset_axes() sender = self.sender() if sender == self.ui.pushButton_view: self.ax0.plot(self.data.raw_data, color=Constants.PLOT_COLOR_RED, label=Constants.PREPROCESS_PLOT_LABEL_ORIGINAL_DATA) self.ax0.plot(self.data.processed_data + Constants.PREPROCESS_OUTPUT_INPUT_AMP_OFFSET, color=Constants.PLOT_COLOR_BLUE, label=Constants.PREPROCESS_PLOT_LABEL_PROCESSED_DATA) self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT) self.canvas.draw() return Result().success(info=Constants.DRAW_FINISHED) else: self.canvas.draw() return Result().failure(info=Constants.DRAW_FAILURE) elif self.plot_mode == "vispy": sender = self.sender() if self.line_original is not None: self.line_original.parent = None if self.line_processed is not None: self.line_processed.parent = None if sender == self.ui.pushButton_view: t = arange(len(self.data.raw_data)) raw_data_2d = column_stack((t, self.data.raw_data)) self.line_original = visuals.Line(raw_data_2d, color=Constants.PLOT_COLOR_RED, parent=self.view.scene, antialias=False, width=2, method='gl') processed_y = self.data.processed_data + Constants.PREPROCESS_OUTPUT_INPUT_AMP_OFFSET processed_data_2d = column_stack((t, processed_y)) self.line_processed = visuals.Line(processed_data_2d, color=Constants.PLOT_COLOR_BLUE, parent=self.view.scene, antialias=False, width=2, method='gl') self.line_original.order = 0 self.line_processed.order = -1 self.view.camera.set_range() self.canvas.update() return Result().success(info=Constants.DRAW_FINISHED) else: return Result().failure(info=Constants.DRAW_FAILURE) def __update_config__(self): if self.mode == "BCG": Config["Filter"]["BCGBandPassOrder"] = self.ui.spinBox_bandPassOrder.value() Config["Filter"]["BCGBandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value() Config["Filter"]["BCGBandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value() elif self.mode == "ECG": Config["Filter"]["ECGBandPassOrder"] = self.ui.spinBox_bandPassOrder.value() Config["Filter"]["ECGBandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value() Config["Filter"]["ECGBandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value() else: raise ValueError("模式不存在") def __slot_btn_input__(self): PublicFunc.__disableAllButton__(self, ButtonState) if self.plot_mode == "matplotlib": # 清空画框 self.reset_axes() self.canvas.draw() elif self.plot_mode == "vispy": self.canvas.update() self.data = Data() # 导入数据 PublicFunc.progressbar_update(self, 1, 1, Constants.INPUTTING_DATA, 0) result = self.data.open_file() if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) # 重采样 PublicFunc.progressbar_update(self, 2, 2, Constants.RESAMPLING_DATA, 0) result = self.data.resample() if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) self.setting.close() ButtonState["Current"]["pushButton_input_setting"] = False ButtonState["Current"]["pushButton_input"] = False ButtonState["Current"]["pushButton_view"] = True ButtonState["Current"]["pushButton_save"] = False PublicFunc.finish_operation(self, ButtonState) def __slot_btn_view__(self): PublicFunc.__disableAllButton__(self, ButtonState) # 数据预处理 PublicFunc.progressbar_update(self, 1, 2, Constants.PREPROCESSING_DATA, 0) result = self.data.preprocess() if not result.status: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO) # 绘图 PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 50) result = self.__plot__() if not result.status: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO) ButtonState["Current"]["pushButton_save"] = True PublicFunc.finish_operation(self, ButtonState) def __slot_btn_save__(self): reply = QMessageBox.question(self, Constants.QUESTION_TITLE, Constants.QUESTION_CONTENT + Config["Path"]["Save"], QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes) if reply == QMessageBox.Yes: PublicFunc.__disableAllButton__(self, ButtonState) # 保存 PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0) total_rows = len(DataFrame(self.data.processed_data.reshape(-1))) chunk_size = Params.PREPROCESS_SAVE_CHUNK_SIZE try: with open(Config["Path"]["Save"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.processed_data.reshape(-1)).iloc[start:end] result = self.data.save(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() except FileNotFoundError as e: result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"]) if not result.status: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR) PublicFunc.finish_operation(self, ButtonState) return else: PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO) PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO) PublicFunc.finish_operation(self, ButtonState) def reset_axes(self): if self.ax0 is not None: self.ax0.clear() self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(Params.FORMATTER) def update_overlay_position(self): """手动定位函数""" margin = 20 x = self.width() - self.image_overlay.width() - margin y = margin self.image_overlay.move(x, y) def resizeEvent(self, event): """确保窗口变大小时,悬浮窗依然在右上角""" super().resizeEvent(event) self.update_overlay_position() class Data: def __init__(self): self.raw_data = None self.processed_data = None def open_file(self): if Config["Mode"] == "BCG": signal = Filename.ORGBCG_RAW save = Filename.BCG_FILTER elif Config["Mode"] == "ECG": signal = Filename.ECG_RAW save = Filename.ECG_FILTER else: raise ValueError("模式不存在") if Path(Config["Path"]["Input"]).is_file(): Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent) result = PublicFunc.examine_file(Config["Path"]["Input"], signal, Params.ENDSWITH_TXT) if result.status: Config["Path"]["Input"] = result.data["path"] Config["InputConfig"]["Freq"] = result.data["freq"] Config["Path"]["Save"] = str( Path(Config["Path"]["Save"]) / Path(save + str(Config["OutputConfig"]["Freq"]) + Params.ENDSWITH_TXT)) else: return result try: self.raw_data = read_csv(Config["Path"]["Input"], encoding=Params.UTF8_ENCODING, header=None).to_numpy().reshape(-1) except Exception as e: return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.INPUT_FINISHED) def resample(self): if self.raw_data is None: return Result().failure(info=Constants.RESAMPLE_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: if Config["InputConfig"]["Freq"] != Config["OutputConfig"]["Freq"]: self.raw_data = resample(self.raw_data, int(len(self.raw_data) * (Config["OutputConfig"]["Freq"] / Config["InputConfig"]["Freq"]))) else: return Result().success(info=Constants.RESAMPLE_NO_NEED) except Exception as e: return Result().failure(info=Constants.RESAMPLE_FAILURE + Constants.FAILURE_REASON["Resample_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.RESAMPLE_FINISHED) def preprocess(self): if self.raw_data is None: return Result().failure(info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: if Config["Mode"] == "BCG": if Config["Filter"]["BCGBandPassOrder"] == 0: self.processed_data = self.raw_data else: if ((Config["Filter"]["BCGBandPassLow"] >= Config["Filter"]["BCGBandPassHigh"]) or (Config["Filter"]["BCGBandPassLow"] <= 0) or (Config["Filter"]["BCGBandPassHigh"] <= 0)): return Result().failure( info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Filter_Args_Not_Correct"]) self.processed_data = Butterworth_for_BCG_PreProcess(self.raw_data, type='bandpass', low_cut=Config["Filter"]["BCGBandPassLow"], high_cut=Config["Filter"]["BCGBandPassHigh"], order=Config["Filter"]["BCGBandPassOrder"], sample_rate=Config["OutputConfig"]["Freq"]) elif Config["Mode"] == "ECG": if Config["Filter"]["ECGBandPassOrder"] == 0: self.processed_data = self.raw_data else: if ((Config["Filter"]["ECGBandPassLow"] >= Config["Filter"]["ECGBandPassHigh"]) or (Config["Filter"]["ECGBandPassLow"] <= 0) or (Config["Filter"]["ECGBandPassHigh"] <= 0)): return Result().failure( info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Filter_Args_Not_Correct"]) self.processed_data = Butterworth_for_ECG_PreProcess(self.raw_data, type='bandpass', low_cut=Config["Filter"]["ECGBandPassLow"], high_cut=Config["Filter"]["ECGBandPassHigh"], order=Config["Filter"]["ECGBandPassOrder"], sample_rate=Config["OutputConfig"]["Freq"]) else: raise ValueError("模式不存在") except Exception as e: return Result().failure(info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Preprocess_Exception"] + "\n" + format_exc()) return Result().success(info=Constants.PREPROCESS_FINISHED) def save(self, chunk): if self.processed_data is None: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"]) try: chunk.to_csv(Config["Path"]["Save"], mode='a', index=False, header=False, float_format='%.4f') except PermissionError as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"]) except Exception as e: return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc()) return Result().success(Constants.SAVE_FINISHED)