from gc import collect from pathlib import Path import matplotlib.pyplot as plt from PySide6.QtWidgets import QMessageBox, QMainWindow, QWidget, QPushButton, QProgressBar, QApplication from matplotlib import gridspec from matplotlib.backends.backend_qt import NavigationToolbar2QT from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg from overrides import overrides from pandas import read_csv, DataFrame from yaml import dump, load, FullLoader from func.utils.PublicFunc import PublicFunc from func.utils.Constants import Constants, ConfigParams from func.utils.detect_Jpeak import preprocess, Jpeak_Detection from ui.MainWindow.MainWindow_detect_Jpeak import Ui_MainWindow_detect_Jpeak from ui.setting.detect_Jpeak_input_setting import Ui_MainWindow_detect_Jpeak_input_setting Config = { } ButtonState = { "Default": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_view": False, "pushButton_save": False }, "Current": { "pushButton_input_setting": True, "pushButton_input": True, "pushButton_view": False, "pushButton_save": False } } class SettingWindow(QMainWindow): def __init__(self, root_path, sampID): super(SettingWindow, self).__init__() self.ui = Ui_MainWindow_detect_Jpeak_input_setting() self.ui.setupUi(self) self.root_path = root_path self.sampID = sampID self.config = None self.__read_config__() self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__) self.ui.pushButton_confirm.clicked.connect(self.__write_config__) self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__) self.ui.pushButton_cancel.clicked.connect(self.close) def __read_config__(self): if not Path(ConfigParams.DETECT_JPEAK_CONFIG_FILE_PATH).exists(): with open(ConfigParams.DETECT_JPEAK_CONFIG_FILE_PATH, "w") as f: dump(ConfigParams.DETECT_JPEAK_CONFIG_NEW_CONTENT, f) with open(ConfigParams.DETECT_JPEAK_CONFIG_FILE_PATH, "r") as f: file_config = load(f.read(), Loader=FullLoader) Config.update(file_config) self.config = file_config Config.update({ "Path": { "Input": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.DETECT_JPEAK_INPUT_BCG_FILENAME + str(Config["InputConfig"]["Freq"]) + ConfigParams.ENDSWITH_TXT))), "Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.DETECT_JPEAK_SAVE_FILENAME + ConfigParams.ENDSWITH_TXT))) } }) # 数据回显 self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"]) self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"]) self.ui.plainTextEdit_deepmodel_path.setPlainText(Config["ModelFolderPath"]) self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"]) def __write_config__(self): # 从界面写入配置 Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value() Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText() Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText() Config["ModelFolderPath"] = self.ui.plainTextEdit_deepmodel_path.toPlainText() # 保存配置到文件 self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value() self.config["ModelFolderPath"] = self.ui.plainTextEdit_deepmodel_path.toPlainText() with open(ConfigParams.DETECT_JPEAK_CONFIG_FILE_PATH, "w") as f: dump(self.config, f) self.close() def __rollback_config__(self): self.__read_config__() def __update_ui__(self): self.ui.plainTextEdit_file_path_input.setPlainText( str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT / Path(str(self.sampID)) / Path(ConfigParams.DETECT_JPEAK_INPUT_BCG_FILENAME + str(self.ui.spinBox_input_freq.value()) + ConfigParams.ENDSWITH_TXT)))) class MainWindow_detect_Jpeak(QMainWindow): def __init__(self): super(MainWindow_detect_Jpeak, self).__init__() self.ui = Ui_MainWindow_detect_Jpeak() self.ui.setupUi(self) self.root_path = None self.sampID = None self.data = None self.model = None self.setting = None # 初始化进度条 self.progressbar = None self.add_progressbar() #初始化画框 self.fig = None self.canvas = None self.figToolbar = None self.gs = None self.ax0 = None self.line_data = None self.point_peak = None self.line_interval = None self.msgBox = QMessageBox() self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE) @overrides def show(self, root_path, sampID): super().show() self.root_path = root_path self.sampID = sampID self.setting = SettingWindow(root_path, sampID) # 初始化画框 self.fig = plt.figure(figsize=(12, 9), dpi=100) self.canvas = FigureCanvasQTAgg(self.fig) self.figToolbar = NavigationToolbar2QT(self.canvas) for action in self.figToolbar.actions(): if action.text() == "Subplots" or action.text() == "Customize": self.figToolbar.removeAction(action) self.ui.verticalLayout_canvas.addWidget(self.canvas) self.ui.verticalLayout_canvas.addWidget(self.figToolbar) self.gs = gridspec.GridSpec(1, 1, height_ratios=[1]) self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0) self.ax0 = self.fig.add_subplot(self.gs[0]) self.ax0.grid(True) self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER) self.__resetAllButton__() self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BandPassLow"]) self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BandPassHigh"]) self.ui.spinBox_peaksValue.setValue(Config["PeaksValue"]) self.ui.doubleSpinBox_ampValue.setValue(Config["AmpValue"]) self.ui.spinBox_intervalLow.setValue(Config["IntervalLow"]) self.ui.spinBox_intervalHigh.setValue(Config["IntervalHigh"]) self.ui.checkBox_useCPU.setChecked(Config["UseCPU"]) self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__) self.ui.pushButton_input_setting.clicked.connect(self.setting.show) self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__) self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__) self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__) self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__) self.ui.spinBox_peaksValue.editingFinished.connect(self.__update_config__) self.ui.doubleSpinBox_ampValue.editingFinished.connect(self.__update_config__) self.ui.spinBox_intervalLow.editingFinished.connect(self.__update_config__) self.ui.spinBox_intervalHigh.editingFinished.connect(self.__update_config__) self.ui.checkBox_useCPU.stateChanged.connect(self.__update_config__) self.ui.comboBox_model.currentTextChanged.connect(self.__update_config__) @overrides def closeEvent(self, event): self.__disableAllButton__() self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.SHUTTING_DOWN)) QApplication.processEvents() # 清空画框 if self.line_data and self.point_peak: del self.line_data del self.point_peak del self.line_interval self.canvas.draw() # 释放资源 del self.data del self.model self.fig.clf() plt.close(self.fig) self.deleteLater() collect() self.canvas = None event.accept() @staticmethod def __reset__(): ButtonState["Current"].update(ButtonState["Default"].copy()) ButtonState["Current"]["pushButton_view"] = True def __plot__(self): # 清空画框 if self.line_data and self.point_peak and self.line_interval: try: self.line_data.remove() self.point_peak.remove() self.line_interval.remove() except ValueError: pass sender = self.sender() if sender == self.ui.pushButton_view: self.line_data, = self.ax0.plot(self.data.processed_data, color=Constants.PLOT_COLOR_BLUE, label=Constants.DETECT_JPEAK_PLOT_LABEL_BCG) self.point_peak, = self.ax0.plot(self.data.peak, self.data.processed_data[self.data.peak], 'r.', label=Constants.DETECT_JPEAK_PLOT_LABEL_J_PEAKS) self.line_interval, = self.ax0.plot(self.data.interval, color=Constants.PLOT_COLOR_ORANGE, label=Constants.DETECT_JPEAK_PLOT_LABEL_INTERVAL) self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT) status = True info = Constants.DRAWING_FINISHED else: status = False info = Constants.DRAWING_FAILURE self.canvas.draw() return status, info def __disableAllButton__(self): # 禁用所有按钮 all_widgets = self.centralWidget().findChildren(QWidget) # 迭代所有部件,查找按钮并禁用它们 for widget in all_widgets: if isinstance(widget, QPushButton): if widget.objectName() in ButtonState["Current"].keys(): widget.setEnabled(False) def __enableAllButton__(self): # 启用按钮 all_widgets = self.centralWidget().findChildren(QWidget) # 迭代所有部件,查找按钮并启用它们 for widget in all_widgets: if isinstance(widget, QPushButton): if widget.objectName() in ButtonState["Current"].keys(): widget.setEnabled(ButtonState["Current"][widget.objectName()]) def __resetAllButton__(self): # 启用按钮 all_widgets = self.centralWidget().findChildren(QWidget) # 迭代所有部件,查找按钮并启用它们 for widget in all_widgets: if isinstance(widget, QPushButton): if widget.objectName() in ButtonState["Default"].keys(): widget.setEnabled(ButtonState["Default"][widget.objectName()]) def __update_config__(self): Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value() Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value() Config["PeaksValue"] = self.ui.spinBox_peaksValue.value() Config["AmpValue"] = self.ui.doubleSpinBox_ampValue.value() Config["IntervalLow"] = self.ui.spinBox_intervalLow.value() Config["IntervalHigh"] = self.ui.spinBox_intervalHigh.value() Config["UseCPU"] = self.ui.checkBox_useCPU.isChecked() Config["DetectMethod"] = self.ui.comboBox_model.currentText() def finish_operation(self): self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.OPERATION_FINISHED)) self.progressbar.setValue(100) QApplication.processEvents() self.__enableAllButton__() def add_progressbar(self): self.progressbar = QProgressBar() self.progressbar.setRange(0, 100) self.progressbar.setValue(0) self.progressbar.setStyleSheet(Constants.PROGRESSBAR_STYLE) self.ui.statusbar.addPermanentWidget(self.progressbar) def statusbar_show_msg(self, msg): self.ui.statusbar.showMessage(msg) def statusbar_clear_msg(self): self.ui.statusbar.clearMessage() def __slot_btn_input__(self): self.__disableAllButton__() # 清空画框 if self.line_data and self.point_peak and self.line_interval: try: self.line_data.remove() self.point_peak.remove() self.line_interval.remove() except ValueError: pass self.canvas.draw() # 清空模型列表 self.ui.comboBox_model.clear() self.statusbar_show_msg(PublicFunc.format_status_msg("(1/2)" + Constants.DETECT_JPEAK_LOADING_MODEL)) self.progressbar.setValue(0) QApplication.processEvents() # 寻找模型 self.model = Model() status, info = self.model.seek_model() if not status: PublicFunc.text_output(self.ui, "(1/2)" + info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR) self.finish_operation() return else: PublicFunc.text_output(self.ui, "(1/2)" + info, Constants.TIPS_TYPE_INFO) self.update_ui_comboBox_model(self.model.model_list) self.statusbar_show_msg(PublicFunc.format_status_msg("(2/2)" + Constants.INPUTTING_DATA)) self.progressbar.setValue(10) QApplication.processEvents() # 导入数据 self.data = Data() status, info = self.data.open_file() if not status: PublicFunc.text_output(self.ui, "(2/2)" + info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR) self.finish_operation() return else: PublicFunc.text_output(self.ui, "(2/2)" + info, Constants.TIPS_TYPE_INFO) MainWindow_detect_Jpeak.__reset__() self.finish_operation() def __slot_btn_view__(self): self.__disableAllButton__() self.statusbar_show_msg(PublicFunc.format_status_msg("(1/3)" + Constants.DETECT_JPEAK_PROCESSING_DATA)) self.progressbar.setValue(0) QApplication.processEvents() # 数据预处理 status, info = self.data.preprocess() if not status: PublicFunc.text_output(self.ui, "(1/3)" + info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR) self.finish_operation() return else: PublicFunc.text_output(self.ui, "(1/3)" + info, Constants.TIPS_TYPE_INFO) self.statusbar_show_msg(PublicFunc.format_status_msg("(2/3)" + Constants.DETECT_JPEAK_PREDICTING_PEAK)) self.progressbar.setValue(10) QApplication.processEvents() # 预测峰值 self.model.selected_model = Config["DetectMethod"] status, info = self.data.predict_Jpeak(self.model) if not status: PublicFunc.text_output(self.ui, "(2/3)" + info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR) self.finish_operation() return else: PublicFunc.text_output(self.ui, "(2/3)" + info, Constants.TIPS_TYPE_INFO) PublicFunc.text_output(self.ui, Constants.DETECT_JPEAK_DATA_LENGTH_POINTS + str(len(self.data.raw_data)), Constants.TIPS_TYPE_INFO) PublicFunc.text_output(self.ui, Constants.DETECT_JPEAK_DURATION_MIN + str((len(self.data.raw_data) / Config["InputConfig"]["Freq"] / 60)), Constants.TIPS_TYPE_INFO) PublicFunc.text_output(self.ui, Constants.DETECT_JPEAK_PEAK_AMOUNT + str(len(self.data.peak)), Constants.TIPS_TYPE_INFO) self.statusbar_show_msg(PublicFunc.format_status_msg("(3/3)" + Constants.DRAWING_DATA)) self.progressbar.setValue(70) QApplication.processEvents() # 绘图 status, info = self.__plot__() if not status: PublicFunc.text_output(self.ui, "(3/3)" + info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR) self.finish_operation() return else: PublicFunc.text_output(self.ui, "(3/3)" + info, Constants.TIPS_TYPE_INFO) ButtonState["Current"]["pushButton_save"] = True self.finish_operation() def __slot_btn_save__(self): reply = QMessageBox.question(self, Constants.QUESTION_TITLE, Constants.QUESTION_CONTENT + Config["Path"]["Save"], QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes) if reply == QMessageBox.Yes: self.__disableAllButton__() self.statusbar_show_msg(PublicFunc.format_status_msg("(1/1)" + Constants.SAVING_DATA)) self.progressbar.setValue(0) QApplication.processEvents() # 保存 # status, info = self.data.save() total_rows = len(DataFrame(self.data.peak.reshape(-1))) chunk_size = ConfigParams.DETECT_JPEAK_SAVE_CHUNK_SIZE with open(Config["Path"]["Save"], 'w') as f: for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = DataFrame(self.data.peak.reshape(-1)).iloc[start:end] status, info = self.data.save(chunk) progress = int((end / total_rows) * 100) self.progressbar.setValue(progress) QApplication.processEvents() if not status: PublicFunc.text_output(self.ui, "(1/1)" + info, Constants.TIPS_TYPE_ERROR) PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR) self.finish_operation() return else: PublicFunc.text_output(self.ui, "(1/1)" + info, Constants.TIPS_TYPE_INFO) PublicFunc.msgbox_output(self, info, Constants.TIPS_TYPE_INFO) self.finish_operation() def update_ui_comboBox_model(self, model_list): self.ui.comboBox_model.clear() self.ui.comboBox_model.addItems(model_list) class Data: def __init__(self): self.file_path_input = Config["Path"]["Input"] self.file_path_save = Config["Path"]["Save"] self.raw_data = None self.processed_data = None self.peak = None self.interval = None def open_file(self): if not Path(Config["Path"]["Input"]).exists(): return False, Constants.INPUT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Data_Path_Not_Exist"] try: self.raw_data = read_csv(self.file_path_input, encoding=ConfigParams.UTF8_ENCODING, header=None).to_numpy().reshape(-1) except Exception: return False, Constants.INPUT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Read_Data_Exception"] return True, Constants.INPUT_FINISHED def preprocess(self): if self.raw_data is None: return False, Constants.DETECT_JPEAK_PROCESS_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Raw_Data_Not_Exist"] try: self.processed_data = preprocess(self.raw_data, Config["InputConfig"]["Freq"], Config["Filter"]["BandPassLow"], Config["Filter"]["BandPassHigh"], Config["AmpValue"]) except Exception: return False, Constants.DETECT_JPEAK_PROCESS_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Filter_Exception"] return True, Constants.DETECT_JPEAK_PROCESS_FINISHED def predict_Jpeak(self, model): if not (Path(model.model_folder_path) / Path(model.selected_model)).exists(): return False, Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Model_File_Not_Exist"] if self.processed_data is None: return False, Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Processed_Data_Not_Exist"] try: self.peak, self.interval = Jpeak_Detection(model.selected_model, Path(model.model_folder_path) / Path(model.selected_model), self.processed_data, Config["InputConfig"]["Freq"], Config["IntervalHigh"], Config["IntervalLow"], Config["PeaksValue"], Config["UseCPU"]) except Exception: return False, Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Predict_Exception"] return True, Constants.DETECT_JPEAK_PREDICT_FINISHED def save(self, chunk): if self.peak is None: return False, Constants.SAVING_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Peak_Not_Exist"] try: # DataFrame(self.processed_data.reshape(-1)).to_csv(self.file_path_save, # index=False, # header=False, # float_format='%.4f') chunk.to_csv(self.file_path_save, mode='a', index=False, header=False) except Exception: return False, Constants.SAVING_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Save_Exception"] return True, Constants.SAVING_FINISHED class Model: def __init__(self): self.model_folder_path = Config["ModelFolderPath"] self.model_list = None self.selected_model_path = None self.selected_model = None def seek_model(self): if not Path(Config["ModelFolderPath"]).exists(): return False, Constants.DETECT_JPEAK_LOAD_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Model_Path_Not_Exist"] try: self.model_list = [file.name for file in Path(Config["ModelFolderPath"]).iterdir() if file.is_file()] if len(self.model_list) == 0: return False, Constants.DETECT_JPEAK_FAILURE_REASON["Model_File_Not_Exist"] except Exception: return False, Constants.DETECT_JPEAK_LOAD_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Read_Model_Exception"] return True, Constants.DETECT_JPEAK_LOAD_FINISHED