Files
Signal_Label_Reborn/func/Module_preprocess.py

602 lines
27 KiB
Python

from gc import collect
from pathlib import Path
from traceback import format_exc
import matplotlib.pyplot as plt
from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication
from matplotlib import gridspec
from matplotlib.backends.backend_qt import NavigationToolbar2QT
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg
from numpy import column_stack, arange
from overrides import overrides
from pandas import read_csv, DataFrame
from scipy.signal import resample
from vispy import scene
from vispy.scene import visuals
from yaml import dump, load, FullLoader
from func.utils.ConfigParams import Filename, Params
from func.utils.PublicFunc import PublicFunc
from func.utils.Constants import Constants
from func.Filters.Preprocessing import Butterworth_for_BCG_PreProcess, Butterworth_for_ECG_PreProcess
from func.utils.Result import Result
from ui.MainWindow.MainWindow_preprocess import Ui_MainWindow_preprocess
from ui.setting.preprocess_input_setting import Ui_MainWindow_preprocess_input_setting
Config = {
}
ButtonState = {
"Default": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
},
"Current": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
}
}
class SettingWindow(QMainWindow):
def __init__(self, mode, root_path, sampID):
super(SettingWindow, self).__init__()
self.ui = Ui_MainWindow_preprocess_input_setting()
self.ui.setupUi(self)
self.mode = mode
self.root_path = root_path
self.sampID = sampID
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
self.config = None
self.__read_config__()
self.__examine_freq__()
self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__)
self.ui.spinBox_output_freq.valueChanged.connect(self.__update_ui__)
self.ui.pushButton_confirm.clicked.connect(self.__write_config__)
self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__)
self.ui.pushButton_cancel.clicked.connect(self.close)
def __read_config__(self):
if not Path(Params.PREPROCESS_CONFIG_FILE_PATH).exists():
with open(Params.PREPROCESS_CONFIG_FILE_PATH, "w") as f:
dump(Params.PREPROCESS_CONFIG_NEW_CONTENT, f)
with open(Params.PREPROCESS_CONFIG_FILE_PATH, "r") as f:
file_config = load(f.read(), Loader=FullLoader)
Config.update(file_config)
self.config = file_config
if self.mode == "BCG":
Config.update({
"Path": {
"Input": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT /
Path(str(self.sampID)))),
"Save": str((Path(self.root_path) / Filename.PATH_ORGBCG_TEXT /
Path(str(self.sampID))))
},
"Mode": self.mode
})
elif self.mode == "ECG":
Config.update({
"Path": {
"Input": str((Path(self.root_path) / Filename.PATH_PSG_TEXT /
Path(str(self.sampID)))),
"Save": str((Path(self.root_path) / Filename.PATH_PSG_TEXT /
Path(str(self.sampID))))
},
"Mode": self.mode
})
else:
raise ValueError("模式不存在")
# 数据回显
self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"])
self.ui.spinBox_output_freq.setValue(Config["OutputConfig"]["Freq"])
self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"])
self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"])
def __write_config__(self):
# 从界面写入配置
Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
Config["OutputConfig"]["Freq"] = self.ui.spinBox_output_freq.value()
Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText()
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
# 保存配置到文件
self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
self.config["OutputConfig"]["Freq"] = self.ui.spinBox_output_freq.value()
with open(Params.PREPROCESS_CONFIG_FILE_PATH, "w") as f:
dump(self.config, f)
self.close()
def __rollback_config__(self):
self.__read_config__()
def __update_ui__(self):
if self.mode == "BCG":
self.ui.plainTextEdit_file_path_input.setPlainText(
str((Path(self.root_path) /
Filename.PATH_ORGBCG_TEXT /
Path(str(self.sampID)) /
Path(Filename.ORGBCG_RAW +
str(self.ui.spinBox_input_freq.value()) +
Params.ENDSWITH_TXT))))
elif self.mode == "ECG":
self.ui.plainTextEdit_file_path_input.setPlainText(
str((Path(self.root_path) /
Filename.PATH_PSG_TEXT /
Path(str(self.sampID)) /
Path(Filename.ECG_RAW +
str(self.ui.spinBox_input_freq.value()) +
Params.ENDSWITH_TXT))))
else:
raise ValueError("模式不存在")
def __examine_freq__(self):
if Config["Mode"] == "BCG":
signal = Filename.ORGBCG_RAW
elif Config["Mode"] == "ECG":
signal = Filename.ECG_RAW
else:
raise ValueError("模式不存在")
if Path(Config["Path"]["Input"]).is_file():
Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent)
result = PublicFunc.examine_file(Config["Path"]["Input"], signal, Params.ENDSWITH_TXT)
if result.status:
Config["InputConfig"]["Freq"] = result.data["freq"]
else:
PublicFunc.msgbox_output(self, signal + Constants.FAILURE_REASON["Get_Freq_Not_Correct"], Constants.MSGBOX_TYPE_ERROR)
# 数据回显
self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"])
class MainWindow_preprocess(QMainWindow):
def __init__(self, plot_mode):
super(MainWindow_preprocess, self).__init__()
self.ui = Ui_MainWindow_preprocess()
self.ui.setupUi(self)
self.mode = None
self.root_path = None
self.sampID = None
self.data = None
self.setting = None
# 初始化进度条
self.progressbar = None
PublicFunc.add_progressbar(self)
self.plot_mode = plot_mode
if self.plot_mode == "matplotlib":
#初始化画框
self.fig = None
self.canvas = None
self.figToolbar = None
self.gs = None
self.ax0 = None
elif self.plot_mode == "vispy":
# 初始化画框
self.canvas = None
self.view = None
self.y_axis = None
self.x_axis = None
self.view = None
self.grid_lines = None
self.line_original = None
self.line_processed = None
self.ui.textBrowser_info.setStyleSheet("QTextBrowser { background-color: rgb(255, 255, 200); }")
PublicFunc.__styleAllButton__(self, ButtonState)
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
@overrides
def show(self, mode, root_path, sampID):
super().show()
self.mode = mode
self.root_path = root_path
self.sampID = sampID
self.setting = SettingWindow(mode, root_path, sampID)
if self.plot_mode == "matplotlib":
# 初始化画框
self.fig = plt.figure(figsize=(12, 9), dpi=100)
self.canvas = FigureCanvasQTAgg(self.fig)
self.figToolbar = NavigationToolbar2QT(self.canvas)
for action in self.figToolbar.actions():
if action.text() == "Subplots" or action.text() == "Customize":
self.figToolbar.removeAction(action)
self.ui.verticalLayout_canvas.addWidget(self.canvas)
self.ui.verticalLayout_canvas.addWidget(self.figToolbar)
self.gs = gridspec.GridSpec(1, 1, height_ratios=[1])
self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0)
self.ax0 = self.fig.add_subplot(self.gs[0])
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(Params.FORMATTER)
elif self.plot_mode == "vispy":
# 初始化画框
self.canvas = scene.SceneCanvas(keys='interactive', show=True, bgcolor='white')
self.ui.verticalLayout_canvas.addWidget(self.canvas.native)
self.main_grid = self.canvas.central_widget.add_grid(spacing=0)
self.y_axis = scene.AxisWidget(orientation='left',
axis_color='black',
tick_color='black',
text_color='black',
axis_font_size=4,
tick_font_size=4,
tick_label_margin=20)
self.y_axis.width_max = 80
self.x_axis = scene.AxisWidget(orientation='bottom',
axis_color='black',
tick_color='black',
text_color='black',
axis_font_size=4,
tick_font_size=4,
tick_label_margin=20)
self.x_axis.height_max = 40
self.view = self.main_grid.add_view(row=0, col=1, camera='panzoom')
self.main_grid.add_widget(self.y_axis, row=0, col=0)
self.main_grid.add_widget(self.x_axis, row=1, col=1)
self.x_axis.link_view(self.view)
self.y_axis.link_view(self.view)
self.grid_lines = visuals.GridLines(color=(0.3, 0.3, 0.3, 0.5), parent=self.view.scene)
self.grid_lines.set_gl_state(line_width=1)
self.grid_lines.order = 100
PublicFunc.__resetAllButton__(self, ButtonState)
self.ui.label_mode.setText(self.mode)
if self.mode == "BCG":
self.ui.spinBox_bandPassOrder.setValue(Config["Filter"]["BCGBandPassOrder"])
self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BCGBandPassLow"])
self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BCGBandPassHigh"])
elif self.mode == "ECG":
self.ui.spinBox_bandPassOrder.setValue(Config["Filter"]["ECGBandPassOrder"])
self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["ECGBandPassLow"])
self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["ECGBandPassHigh"])
else:
raise ValueError("模式不存在")
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__)
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
self.ui.spinBox_bandPassOrder.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__)
@overrides
def closeEvent(self, event):
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
PublicFunc.__disableAllButton__(self, ButtonState)
PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
QApplication.processEvents()
# 清空画框
if self.plot_mode == "matplotlib":
if self.ax0 is not None:
self.ax0.clear()
self.fig.clf()
plt.close(self.fig)
elif self.plot_mode == "vispy":
self.canvas.close()
# 释放资源
self.setting.close()
del self.data
self.deleteLater()
collect()
self.canvas = None
event.accept()
else:
event.ignore()
def __reset__(self):
ButtonState["Current"].update(ButtonState["Default"].copy())
def __plot__(self):
# 清空画框
if self.plot_mode == "matplotlib":
self.reset_axes()
sender = self.sender()
if sender == self.ui.pushButton_view:
self.ax0.plot(self.data.raw_data,
color=Constants.PLOT_COLOR_RED,
label=Constants.PREPROCESS_PLOT_LABEL_ORIGINAL_DATA)
self.ax0.plot(self.data.processed_data + Constants.PREPROCESS_OUTPUT_INPUT_AMP_OFFSET,
color=Constants.PLOT_COLOR_BLUE,
label=Constants.PREPROCESS_PLOT_LABEL_PROCESSED_DATA)
self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT)
self.canvas.draw()
return Result().success(info=Constants.DRAW_FINISHED)
else:
self.canvas.draw()
return Result().failure(info=Constants.DRAW_FAILURE)
elif self.plot_mode == "vispy":
sender = self.sender()
if self.line_original is not None:
self.line_original.parent = None
if self.line_processed is not None:
self.line_processed.parent = None
if sender == self.ui.pushButton_view:
t = arange(len(self.data.raw_data))
raw_data_2d = column_stack((t, self.data.raw_data))
self.line_original = visuals.Line(raw_data_2d,
color=Constants.PLOT_COLOR_RED,
parent=self.view.scene,
antialias=False,
width=2,
method='gl')
processed_y = self.data.processed_data + Constants.PREPROCESS_OUTPUT_INPUT_AMP_OFFSET
processed_data_2d = column_stack((t, processed_y))
self.line_processed = visuals.Line(processed_data_2d,
color=Constants.PLOT_COLOR_BLUE,
parent=self.view.scene,
antialias=False,
width=2,
method='gl')
self.line_original.order = 0
self.line_processed.order = -1
self.view.camera.set_range()
self.canvas.update()
return Result().success(info=Constants.DRAW_FINISHED)
else:
return Result().failure(info=Constants.DRAW_FAILURE)
def __update_config__(self):
if self.mode == "BCG":
Config["Filter"]["BCGBandPassOrder"] = self.ui.spinBox_bandPassOrder.value()
Config["Filter"]["BCGBandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value()
Config["Filter"]["BCGBandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value()
elif self.mode == "ECG":
Config["Filter"]["ECGBandPassOrder"] = self.ui.spinBox_bandPassOrder.value()
Config["Filter"]["ECGBandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value()
Config["Filter"]["ECGBandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value()
else:
raise ValueError("模式不存在")
def __slot_btn_input__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
if self.plot_mode == "matplotlib":
# 清空画框
self.reset_axes()
self.canvas.draw()
elif self.plot_mode == "vispy":
self.canvas.update()
self.data = Data()
# 导入数据
PublicFunc.progressbar_update(self, 1, 1, Constants.INPUTTING_DATA, 0)
result = self.data.open_file()
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
# 重采样
PublicFunc.progressbar_update(self, 2, 2, Constants.RESAMPLING_DATA, 0)
result = self.data.resample()
if not result.status:
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO)
self.setting.close()
ButtonState["Current"]["pushButton_input_setting"] = False
ButtonState["Current"]["pushButton_input"] = False
ButtonState["Current"]["pushButton_view"] = True
ButtonState["Current"]["pushButton_save"] = False
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_view__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 数据预处理
PublicFunc.progressbar_update(self, 1, 2, Constants.PREPROCESSING_DATA, 0)
result = self.data.preprocess()
if not result.status:
PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_INFO)
# 绘图
PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 50)
result = self.__plot__()
if not result.status:
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_save"] = True
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_save__(self):
reply = QMessageBox.question(self, Constants.QUESTION_TITLE,
Constants.QUESTION_CONTENT + Config["Path"]["Save"],
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes)
if reply == QMessageBox.Yes:
PublicFunc.__disableAllButton__(self, ButtonState)
# 保存
PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0)
total_rows = len(DataFrame(self.data.processed_data.reshape(-1)))
chunk_size = Params.PREPROCESS_SAVE_CHUNK_SIZE
try:
with open(Config["Path"]["Save"], 'w') as f:
for start in range(0, total_rows, chunk_size):
end = min(start + chunk_size, total_rows)
chunk = DataFrame(self.data.processed_data.reshape(-1)).iloc[start:end]
result = self.data.save(chunk)
progress = int((end / total_rows) * 100)
self.progressbar.setValue(progress)
QApplication.processEvents()
except FileNotFoundError as e:
result = Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"])
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.finish_operation(self, ButtonState)
def reset_axes(self):
if self.ax0 is not None:
self.ax0.clear()
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(Params.FORMATTER)
class Data:
def __init__(self):
self.raw_data = None
self.processed_data = None
def open_file(self):
if Config["Mode"] == "BCG":
signal = Filename.ORGBCG_RAW
save = Filename.BCG_FILTER
elif Config["Mode"] == "ECG":
signal = Filename.ECG_RAW
save = Filename.ECG_FILTER
else:
raise ValueError("模式不存在")
if Path(Config["Path"]["Input"]).is_file():
Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent)
result = PublicFunc.examine_file(Config["Path"]["Input"], signal, Params.ENDSWITH_TXT)
if result.status:
Config["Path"]["Input"] = result.data["path"]
Config["InputConfig"]["Freq"] = result.data["freq"]
Config["Path"]["Save"] = str(
Path(Config["Path"]["Save"]) / Path(save + str(Config["OutputConfig"]["Freq"]) + Params.ENDSWITH_TXT))
else:
return result
try:
self.raw_data = read_csv(Config["Path"]["Input"],
encoding=Params.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception as e:
return Result().failure(info=Constants.INPUT_FAILURE +
Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.INPUT_FINISHED)
def resample(self):
if self.raw_data is None:
return Result().failure(info=Constants.RESAMPLE_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"])
try:
if Config["InputConfig"]["Freq"] != Config["OutputConfig"]["Freq"]:
self.raw_data = resample(self.raw_data,
int(len(self.raw_data) *
(Config["OutputConfig"]["Freq"] / Config["InputConfig"]["Freq"])))
else:
return Result().success(info=Constants.RESAMPLE_NO_NEED)
except Exception as e:
return Result().failure(info=Constants.RESAMPLE_FAILURE +
Constants.FAILURE_REASON["Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.RESAMPLE_FINISHED)
def preprocess(self):
if self.raw_data is None:
return Result().failure(info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"])
try:
if Config["Mode"] == "BCG":
if Config["Filter"]["BCGBandPassOrder"] == 0:
self.processed_data = self.raw_data
else:
if ((Config["Filter"]["BCGBandPassLow"] >= Config["Filter"]["BCGBandPassHigh"]) or
(Config["Filter"]["BCGBandPassLow"] <= 0) or (Config["Filter"]["BCGBandPassHigh"] <= 0)):
return Result().failure(
info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Filter_Args_Not_Correct"])
self.processed_data = Butterworth_for_BCG_PreProcess(self.raw_data, type='bandpass',
low_cut=Config["Filter"]["BCGBandPassLow"],
high_cut=Config["Filter"]["BCGBandPassHigh"],
order=Config["Filter"]["BCGBandPassOrder"],
sample_rate=Config["OutputConfig"]["Freq"])
elif Config["Mode"] == "ECG":
if Config["Filter"]["ECGBandPassOrder"] == 0:
self.processed_data = self.raw_data
else:
if ((Config["Filter"]["ECGBandPassLow"] >= Config["Filter"]["ECGBandPassHigh"]) or
(Config["Filter"]["ECGBandPassLow"] <= 0) or (Config["Filter"]["ECGBandPassHigh"] <= 0)):
return Result().failure(
info=Constants.PREPROCESS_FAILURE + Constants.FAILURE_REASON["Filter_Args_Not_Correct"])
self.processed_data = Butterworth_for_ECG_PreProcess(self.raw_data, type='bandpass',
low_cut=Config["Filter"]["ECGBandPassLow"],
high_cut=Config["Filter"]["ECGBandPassHigh"],
order=Config["Filter"]["ECGBandPassOrder"],
sample_rate=Config["OutputConfig"]["Freq"])
else:
raise ValueError("模式不存在")
except Exception as e:
return Result().failure(info=Constants.PREPROCESS_FAILURE +
Constants.FAILURE_REASON["Preprocess_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.PREPROCESS_FINISHED)
def save(self, chunk):
if self.processed_data is None:
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"])
try:
chunk.to_csv(Config["Path"]["Save"], mode='a', index=False, header=False, float_format='%.4f')
except PermissionError as e:
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"])
except Exception as e:
return Result().failure(info=Constants.SAVE_FAILURE +
Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
return Result().success(Constants.SAVE_FINISHED)