优化<数据预处理>的代码

优化<BCG的J峰算法定位>的代码
This commit is contained in:
Yorusora
2025-05-20 17:05:00 +08:00
parent fada84e115
commit ec16546f47
5 changed files with 106 additions and 96 deletions

View File

@ -71,12 +71,9 @@ class SettingWindow(QMainWindow):
Config.update({
"Path": {
"Input": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.DETECT_JPEAK_INPUT_BCG_FILENAME +
str(Config["InputConfig"]["Freq"]) +
ConfigParams.ENDSWITH_TXT))),
Path(str(self.sampID)))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.DETECT_JPEAK_SAVE_FILENAME +
ConfigParams.ENDSWITH_TXT)))
Path(str(self.sampID))))
}
})
@ -110,7 +107,7 @@ class SettingWindow(QMainWindow):
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.DETECT_JPEAK_INPUT_BCG_FILENAME +
Path(ConfigParams.BCG_FILTER +
str(self.ui.spinBox_input_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
@ -295,7 +292,7 @@ class MainWindow_detect_Jpeak(QMainWindow):
PublicFunc.__disableAllButton__(self, ButtonState)
# 数据预处理
PublicFunc.progressbar_update(self, 1, 3, Constants.DETECT_JPEAK_PROCESSING_DATA, 0)
PublicFunc.progressbar_update(self, 1, 3, Constants.PREPROCESSING_DATA, 0)
result = self.data.preprocess()
if not result.status:
PublicFunc.text_output(self.ui, "(1/3)" + result.info, Constants.TIPS_TYPE_ERROR)
@ -390,21 +387,35 @@ class Data:
self.interval = None
def open_file(self):
if Path(Config["Path"]["Input"]).is_file():
Config["Path"]["Input"] = str(Path(Config["Path"]["Input"]).parent)
if not Path(Config["Path"]["Input"]).exists():
return Result().failure(info=Constants.INPUT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Data_Path_Not_Exist"])
return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON["Path_Not_Exist"])
result = PublicFunc.examine_file(Config["Path"]["Input"], ConfigParams.BCG_FILTER)
if result.status:
Config["Path"]["Input"] = result.data["path"]
Config["InputConfig"]["Freq"] = result.data["freq"]
Config["Path"]["Save"] = str(
Path(Config["Path"]["Save"]) / Path(ConfigParams.JPEAK_REVISE + str(Config["InputConfig"]["Freq"]) + ConfigParams.ENDSWITH_TXT))
else:
return result
try:
self.raw_data = read_csv(Config["Path"]["Input"],
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception as e:
return Result().failure(info=Constants.INPUT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Read_Data_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.INPUT_FAILURE +
Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.INPUT_FINISHED)
def preprocess(self):
if self.raw_data is None:
return Result().failure(info=Constants.DETECT_JPEAK_PROCESS_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Raw_Data_Not_Exist"])
return Result().failure(info=Constants.PREPROCESS_FAILURE +
Constants.FAILURE_REASON["Data_Not_Exist"])
try:
self.processed_data = preprocess(self.raw_data,
@ -413,16 +424,19 @@ class Data:
Config["Filter"]["BandPassHigh"],
Config["AmpValue"])
except Exception as e:
return Result().failure(info=Constants.DETECT_JPEAK_PROCESS_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Filter_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.PREPROCESS_FAILURE +
Constants.FAILURE_REASON["Preprocess_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.DETECT_JPEAK_PROCESS_FINISHED)
return Result().success(info=Constants.PREPROCESS_FINISHED)
def predict_Jpeak(self, model):
if not (Path(model.model_folder_path) / Path(model.selected_model)).exists():
return Result().failure(info=Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Model_File_Not_Exist"])
return Result().failure(info=Constants.DETECT_JPEAK_PREDICT_FAILURE +
Constants.FAILURE_REASON["Model_File_Not_Exist"])
if self.processed_data is None:
return Result().failure(info=Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Processed_Data_Not_Exist"])
return Result().failure(info=Constants.DETECT_JPEAK_PREDICT_FAILURE +
Constants.FAILURE_REASON["Data_Not_Exist"])
try:
self.peak, self.interval = Jpeak_Detection(model.selected_model,
@ -434,18 +448,20 @@ class Data:
Config["PeaksValue"],
Config["UseCPU"])
except Exception as e:
return Result().failure(info=Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Predict_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.DETECT_JPEAK_PREDICT_FAILURE +
Constants.FAILURE_REASON["Predict_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.DETECT_JPEAK_PREDICT_FINISHED)
def save(self, chunk):
if self.peak is None:
return Result().failure(info=Constants.SAVING_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Peak_Not_Exist"])
return Result().failure(info=Constants.SAVING_FAILURE + Constants.FAILURE_REASON["Data_Not_Exist"])
try:
chunk.to_csv(Config["Path"]["Save"], mode='a', index=False, header=False)
except Exception as e:
return Result().failure(info=Constants.SAVING_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.SAVING_FAILURE +
Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.SAVING_FINISHED)
@ -460,13 +476,13 @@ class Model:
def seek_model(self):
if not Path(Config["ModelFolderPath"]).exists():
return Result().failure(info=Constants.DETECT_JPEAK_LOAD_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Model_Path_Not_Exist"])
return Result().failure(info=Constants.DETECT_JPEAK_LOAD_FAILURE + Constants.FAILURE_REASON["Model_Path_Not_Exist"])
try:
self.model_list = [file.name for file in Path(Config["ModelFolderPath"]).iterdir() if file.is_file()]
if len(self.model_list) == 0:
return Result().failure(info=Constants.DETECT_JPEAK_FAILURE_REASON["Model_File_Not_Exist"])
return Result().failure(info=Constants.FAILURE_REASON["Model_File_Not_Exist"])
except Exception as e:
return Result().failure(info=Constants.DETECT_JPEAK_LOAD_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Read_Model_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.DETECT_JPEAK_LOAD_FAILURE + Constants.FAILURE_REASON["Read_Model_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.DETECT_JPEAK_LOAD_FINISHED)

View File

@ -316,7 +316,7 @@ class MainWindow_preprocess(QMainWindow):
PublicFunc.__disableAllButton__(self, ButtonState)
# 数据预处理
PublicFunc.progressbar_update(self, 1, 2, Constants.PREPROCESS_PROCESSING_DATA, 0)
PublicFunc.progressbar_update(self, 1, 2, Constants.PREPROCESSING_DATA, 0)
result = self.data.preprocess()
if not result.status:
PublicFunc.text_output(self.ui, "(1/2)" + result.info, Constants.TIPS_TYPE_ERROR)
@ -388,8 +388,10 @@ class Data:
def open_file(self):
if Config["Mode"] == "BCG":
signal = ConfigParams.ORGBCG_RAW
save = ConfigParams.BCG_FILTER
elif Config["Mode"] == "ECG":
signal = ConfigParams.ECG_RAW
save = ConfigParams.ECG_FILTER
else:
raise ValueError("模式不存在")
if Path(Config["Path"]["Input"]).is_file():
@ -398,31 +400,14 @@ class Data:
if not Path(Config["Path"]["Input"]).exists():
return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON["Path_Not_Exist"])
temp_path = list(
Path(Config["Path"]["Input"]).glob(f"{signal}*"))
if len(temp_path) == 0:
return Result().failure(
info=Constants.INPUT_FAILURE + "\n" +
signal + "" +
Config["Path"]["Input"] +
Constants.FAILURE_REASON["File_Not_Exist"])
elif len(temp_path) > 1:
return Result().failure(
info=Constants.INPUT_FAILURE + "\n" +
signal + "" +
Config["Path"]["Input"] +
Constants.FAILURE_REASON["Data_File_More_Than_One"])
result = PublicFunc.examine_file(Config["Path"]["Input"], signal)
if result.status:
Config["Path"]["Input"] = result.data["path"]
Config["InputConfig"]["Freq"] = result.data["freq"]
Config["Path"]["Save"] = str(
Path(Config["Path"]["Save"]) / Path(save + str(Config["OutputConfig"]["Freq"]) + ConfigParams.ENDSWITH_TXT))
else:
path = temp_path[0]
Config["Path"]["Input"] = str(path)
freq = path.stem.split("_")[-1]
# 验证是否为整数或是否存在
if not freq.isdigit():
return Result().failure(
info=Constants.INPUT_FAILURE + "\n" +
signal + "" +
Constants.FAILURE_REASON["Data_Frequency_Not_In_Filename"])
Config["InputConfig"]["Freq"] = int(freq)
return result
try:
self.raw_data = read_csv(Config["Path"]["Input"],

View File

@ -36,6 +36,7 @@ class ConfigParams:
BCG_FILTER: str = "BCG_Filter_"
ECG_FILTER: str = "ECG_Filter_"
JPEAK_REVISE: str = "JPeak_revise_"

View File

@ -89,12 +89,36 @@ class Constants:
"File_More_Than_One": "(数据文件超过一个)",
"Frequency_Not_In_Filename": "(数据频率不在文件名中)",
"Data_Not_Exist": "(数据不存在)",
"Model_File_Not_Exist": "(模型文件不存在)",
"Open_Data_Exception": "(打开数据异常)",
"Preprocess_Exception": "(预处理异常)",
"Save_Exception": "(保存异常)"
"Save_Exception": "(保存异常)",
"Predict_Exception": "(预测异常)",
"Read_Model_Exception": "(读取模型异常)"
}
# 预处理
PREPROCESS_PLOT_LABEL_ORIGINAL_DATA: str = "Original_Data"
PREPROCESS_PLOT_LABEL_PROCESSED_DATA: str = "Processed_Data"
PREPROCESS_OUTPUT_INPUT_AMP_OFFSET: int = 1850
# BCG的J峰算法定位
DETECT_JPEAK_LOADING_MODEL: str = "正在读取模型"
DETECT_JPEAK_LOAD_FINISHED: str = "读取完成"
DETECT_JPEAK_LOAD_FAILURE: str = "读取失败"
DETECT_JPEAK_PREDICTING_PEAK: str = "正在预测峰值"
DETECT_JPEAK_PREDICT_FINISHED: str = "预测完成"
DETECT_JPEAK_PREDICT_FAILURE: str = "预测失败"
DETECT_JPEAK_DATA_LENGTH_POINTS: str = "数据长度(点数):"
DETECT_JPEAK_DURATION_MIN: str = "数据时长(分钟):"
DETECT_JPEAK_PEAK_AMOUNT: str = "J峰个数"
DETECT_JPEAK_PLOT_LABEL_BCG: str = "BCG_Processed"
DETECT_JPEAK_PLOT_LABEL_J_PEAKS: str = "J_Peaks"
DETECT_JPEAK_PLOT_LABEL_INTERVAL: str = "Interval"
# 数据粗同步
APPROXIMATELY_ONLY_ALIGN_RESAMPLING: str = "正在仅重采样"
APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FINISHED: str = "仅重采样完成"
@ -157,55 +181,6 @@ class Constants:
"Save_Exception": "(保存异常)"
}
# 预处理
PREPROCESS_PROCESSING_DATA: str = "正在处理数据"
PREPROCESS_PROCESS_FINISHED: str = "处理完成"
PREPROCESS_PROCESS_FAILURE: str = "处理失败"
PREPROCESS_FAILURE_REASON = {
"Data_Path_Not_Exist": "(路径不存在)",
"Read_Data_Exception": "(读取数据异常)",
"Raw_Data_Not_Exist": "(原始数据不存在)",
"Filter_Exception": "(滤波器异常)",
"Processed_Data_Not_Exist": "(处理后数据不存在)",
"Save_Exception": "(保存异常)"
}
PREPROCESS_PLOT_LABEL_ORIGINAL_DATA: str = "Original_Data"
PREPROCESS_PLOT_LABEL_PROCESSED_DATA: str = "Processed_Data"
PREPROCESS_OUTPUT_INPUT_AMP_OFFSET: int = 1850
# BCG的J峰算法定位
DETECT_JPEAK_LOADING_MODEL: str = "正在读取模型"
DETECT_JPEAK_LOAD_FINISHED: str = "读取完成"
DETECT_JPEAK_LOAD_FAILURE: str = "读取失败"
DETECT_JPEAK_PREDICTING_PEAK: str = "正在预测峰值"
DETECT_JPEAK_PREDICT_FINISHED: str = "预测完成"
DETECT_JPEAK_PREDICT_FAILURE: str = "预测失败"
DETECT_JPEAK_FAILURE_REASON = {
"Data_Path_Not_Exist": "(数据路径不存在)",
"Read_Data_Exception": "(读取数据异常)",
"Model_Path_Not_Exist": "(模型路径不存在)",
"Model_File_Not_Exist": "(模型文件不存在)",
"Read_Model_Exception": "(读取模型异常)",
"Predict_Exception": "(峰值预测异常)",
"Raw_Data_Not_Exist": "(原始数据不存在)",
"Filter_Exception": "(滤波器异常)",
"Processed_Data_Not_Exist": "(处理后数据不存在)",
"Peak_Not_Exist": "(预测的峰值不存在)",
"Save_Exception": "(保存异常)"
}
DETECT_JPEAK_DATA_LENGTH_POINTS: str = "数据长度(点数):"
DETECT_JPEAK_DURATION_MIN: str = "数据时长(分钟):"
DETECT_JPEAK_PEAK_AMOUNT: str = "J峰个数"
DETECT_JPEAK_PLOT_LABEL_BCG: str = "BCG_Processed"
DETECT_JPEAK_PLOT_LABEL_J_PEAKS: str = "J_Peaks"
DETECT_JPEAK_PLOT_LABEL_INTERVAL: str = "Interval"
# ECG的R峰算法定位
DETECT_RPEAK_LOADING_METHOD: str = "正在读取方法"

View File

@ -1,10 +1,12 @@
from datetime import datetime
from logging import error, info
from pathlib import Path
from PySide6.QtWidgets import QMessageBox, QWidget, QPushButton, QProgressBar, QApplication, QRadioButton
from func.utils.Constants import Constants
from func.utils.CustomException import TipsTypeValueNotExistError, MsgBoxTypeValueNotExistError
from func.utils.Result import Result
class PublicFunc:
@ -173,3 +175,34 @@ class PublicFunc:
PublicFunc.statusbar_show_msg(mainWindow, PublicFunc.format_status_msg(f"({str(current)}/{str(total)}){msg}"))
mainWindow.progressbar.setValue(progressbarState)
QApplication.processEvents()
@staticmethod
def examine_file(path_str, filename):
temp_path = list(
Path(path_str).glob(f"{filename}*"))
if len(temp_path) == 0:
return Result().failure(
info=Constants.INPUT_FAILURE + "\n" +
filename + "" +
path_str +
Constants.FAILURE_REASON["File_Not_Exist"])
elif len(temp_path) > 1:
return Result().failure(
info=Constants.INPUT_FAILURE + "\n" +
filename + "" +
path_str +
Constants.FAILURE_REASON["File_More_Than_One"])
else:
path = temp_path[0]
freq = path.stem.split("_")[-1]
# 验证是否为整数或是否存在
if not freq.isdigit():
return Result().failure(
info=Constants.INPUT_FAILURE + "\n" +
filename + "" +
Constants.FAILURE_REASON["Data_Frequency_Not_In_Filename"])
data = {
"path": str(path),
"freq": int(freq)
}
return Result().success(data=data)