Initial Commit.

This commit is contained in:
Yorusora
2025-04-28 11:33:05 +08:00
parent f0eb8083b1
commit f928fa4d9c
28 changed files with 7126 additions and 0 deletions

View File

@ -0,0 +1,198 @@
# encoding:utf-8
import warnings
from os import listdir
from numpy import array, arange, zeros, percentile, append, full
from numpy import max as np_max
from numpy import max as np_min
from pandas import read_csv, DataFrame, concat
from scipy.signal import butter, filtfilt
from torch.utils.data import Dataset
warnings.filterwarnings("ignore")
class BCGDataset(Dataset):
def __init__(self, train=True):
if train:
self.data = array(read_csv("./in_data/train.txt").iloc[:,arange(1000)])
self.label = array(read_csv("./in_data/train.txt").iloc[:,arange(1000,2000)])
else:
self.data = array(read_csv("./in_data/test.txt").iloc[:, arange(1000)])
self.label = array(read_csv("./in_data/test.txt").iloc[:, arange(1000, 2000)])
def __getitem__(self, index):
return self.data[index], self.label[index]
def __len__(self):
return len(self.label)
class BCG_Operation():
def __init__(self, sample_rate=1000):
self.sample_rate = sample_rate
def down_sample(self,data=None, down_radio=10):
if data is None:
raise ValueError("data is None, please given an real value!")
length_before = len(data)
length_after = length_before//down_radio
data = data[:length_after*down_radio]
data = data.reshape(-1,down_radio)
data = data[:,0]
self.sample_rate = self.sample_rate/down_radio
return data
def Splitwin(self, data=None, len_win=None, coverage=1.0,calculate_to_end=False):
"""
分窗
:param len_win: length of window
:return: signal windows
"""
if ( len_win is None) or (data is None):
raise ValueError("length of window or data is None, please given an real value!")
else:
length = len_win * self.sample_rate # number point of a window
# step of split windows
step = length*coverage
start = 0
Splitdata = []
while (len(data)-start>=length):
Splitdata.append( data[int(start):int(start+length)] )
start += step
if calculate_to_end and (len(data)-start>2000):
remain = len(data)-start
start = start - step
step = int(remain/2000)
start = start + step*2000
Splitdata.append(data[int(start):int(start+length)])
return array(Splitdata), step
elif calculate_to_end :
return array(Splitdata), 0
else:
return array(Splitdata)
def Butterworth(self,data, type, low_cut = 0.0, high_cut = 0.0, order = 10):
"""
:param type: Type of Butter. filter, lowpass, bandpass, ...
:param lowcut: Low cutoff frequency
:param highcut: High cutoff frequency
:param order: Order of filter
:return: Signal after filtering
"""
if type == "lowpass": # 低通滤波处理
b, a = butter(order, low_cut / (self.sample_rate * 0.5), btype='lowpass')
return filtfilt(b, a, array(data))
elif type == "bandpass": # 带通滤波处理
low = low_cut / (self.sample_rate * 0.5)
high = high_cut / (self.sample_rate * 0.5)
b, a = butter(order, [low, high], btype='bandpass')
return filtfilt(b, a, array(data))
elif type == "highpass": # 高通滤波处理
b, a = butter(order, high_cut / (self.sample_rate * 0.5), btype='highpass')
return filtfilt(b, a, array(data))
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")
def AmpMovement(self, data, win_size, threshold=20, get_judge_line=False):
"""
基于幅值方法检测体动:
1.将输入信号按win_size切分
2.将每个win_size信号片段分窗每个窗2s步长为2s
3.计算一分钟所有信号窗的最大峰谷值差,获取中位数和均值
4.所有2s时间窗内大于中位数/均值的2.2倍视为体动
5.体动间间隔过短的信号,同样标记为体动
:param data: Input signal
:param win_size: Size of the win(Must be a multiple of 2)
:return: State of signal
"""
Dataframe, cover_num = self.Splitwin(data, len_win=win_size, coverage=1.0, calculate_to_end=True)
state_all = array([])
Amp_list = array([])
for win in range(Dataframe.shape[0]):
state = array([])
# two seconds window
data_win = self.Splitwin(Dataframe[win], len_win=2, coverage=1.0)
Amp = zeros(data_win.shape[0])
for i in range(data_win.shape[0]):
Amp[i] = np_max(data_win[i]) - np_min(data_win[i]) # max - min
# 取..位数
Median_Amp = percentile(Amp, 20) # 20%
if get_judge_line:
Amp_list = append(Amp_list, full(win_size * self.sample_rate, 2.3 * Median_Amp))
for i in range(len(Amp)):
if (Amp[i] > 2.1 * Median_Amp):
state = append(state, "Movement")
elif Amp[i] < threshold:
state = append(state, "Nobody")
else:
state = append(state, "Sleep")
if win == Dataframe.shape[0] - 1 and cover_num > 0:
state = state[-int(cover_num):]
state_all = append(state_all, state)
if get_judge_line:
return state_all, Amp_list
else:
return state_all
def preprocess1(self):
# ----------------------------------------------------------
data_dir = "../in_data/"
dir_list = listdir(data_dir)
data_list = [data_dir + dir + "/orgData.txt" for dir in dir_list]
label_list = [data_dir + dir + "/label.txt" for dir in dir_list]
print(data_list)
print(label_list)
for i in range(len(data_list)):
orgBCG = array(read_csv(data_list[i], header=None)).reshape(-1)
orgLabel = array(read_csv(label_list[i])).reshape(-1)
# ---------------------Movement Detection-------------------------
operation = BCG_Operation()
BCG = operation.Butterworth(data=orgBCG, type="bandpass", low_cut=2.5, high_cut=10, order=2)
state_win60 = operation.AmpMovement(orgBCG, win_size=60)
visual_state = array([])
for num in range(state_win60.shape[0]):
print("state_num/all_state: ", num, '/', state_win60.shape[0])
if state_win60[num] == "Movement":
visual_state = append(visual_state, full(2000, 1))
else:
visual_state = append(visual_state, full(2000, 0))
# ------------------------------------------------------------------
downBCG = operation.down_sample(data=orgBCG, down_radio=10)
downLabel = operation.down_sample(data=orgLabel, down_radio=10)
downState = operation.down_sample(data=visual_state, down_radio=10)
length_before = len(downState)
length_after = length_before // 1000
downBCG = downBCG[:length_after * 1000]
downLabel = downLabel[:length_after * 1000]
downState = downState[:length_after * 1000]
downBCG = downBCG.reshape(-1, 1000)
downLabel = downLabel.reshape(-1, 1000)
downState = downState.reshape(-1, 1000)
downState = np_max(downState, axis=1)
df_BCG = DataFrame(downBCG)
df_label = DataFrame(downLabel)
df_state = DataFrame(downState, columns=["state"])
df_BCG.to_csv()
df_all = concat([df_BCG, df_label, df_state], axis=1)
df_all.to_csv(data_dir + "/data" + str(i + 1) + ".txt", index=False)
def read_all_data(data_dir):
df_all = read_csv(data_dir)
df_clean = df_all[ df_all["state"]==0.0 ]
df_artifact = df_all[ df_all["state"]==1.0 ]
data_clean = df_clean.iloc[:,arange(1000)]
label_clean = df_clean.iloc[:,arange(1000,2000)]
data_artifact = df_artifact.iloc[:,arange(1000)]
label_artifact = df_artifact.iloc[:,arange(1000,2000)]
return array(data_clean),array(label_clean),array(data_artifact),array(label_artifact)

View File

@ -0,0 +1 @@
from .Dataset_operation import BCGDataset,BCG_Operation,read_all_data

1744
func/Deep_Model/Unet.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1 @@
from .Unet import Unet,ResUNet,DUNet,LSTM_UNet,R2U_Net,AttU_Net,R2AttU_Net,Unet_lstm,deep_Unet,Fivelayer_Unet,Fourlayer_LUnet,Sixlayer_Unet,Threelayer_Unet,Sixlayer_Lstm_Unet,Fivelayer_Lstm_Unet,Fourlayer_Lstm_Unet

View File

@ -0,0 +1,56 @@
# encoding:utf-8
"""
@ date: 2020-09-16
@ author: jingxian
@ illustration: Pre-processing
"""
from matplotlib import pyplot as plt
from numpy import array
from pandas import read_csv
from scipy.signal import butter, filtfilt, sosfiltfilt
def Butterworth_for_ECG_PreProcess(data, sample_rate, type, low_cut=0.0, high_cut=0.0, order=10):
"""
:param type: Type of Butter. filter, lowpass, bandpass, ...
:param lowcut: Low cutoff frequency
:param highcut: High cutoff frequency
:param order: Order of filter
:return: Signal after filtering
"""
if type == "lowpass": # 低通滤波处理
b, a = butter(order, low_cut / (sample_rate * 0.5), btype='lowpass', output='ba')
return filtfilt(b, a, data)
elif type == "bandpass": # 带通滤波处理
low = low_cut / (sample_rate * 0.5)
high = high_cut / (sample_rate * 0.5)
b, a = butter(order, [low, high], btype='bandpass', output='ba')
return filtfilt(b, a, data)
elif type == "highpass": # 高通滤波处理
b, a = butter(order, high_cut / (sample_rate * 0.5), btype='highpass', output='ba')
return filtfilt(b, a, data)
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")
def Butterworth_for_BCG_PreProcess(data, sample_rate, type, low_cut=0.0, high_cut=0.0, order=10):
"""
:param type: Type of Butter. filter, lowpass, bandpass, ...
:param lowcut: Low cutoff frequency
:param highcut: High cutoff frequency
:param order: Order of filter
:return: Signal after filtering
"""
if type == "lowpass": # 低通滤波处理
sos = butter(order, low_cut / (sample_rate * 0.5), btype='lowpass', output='sos')
return sosfiltfilt(sos, array(data))
elif type == "bandpass": # 带通滤波处理
low = low_cut / (sample_rate * 0.5)
high = high_cut / (sample_rate * 0.5)
sos = butter(order, [low, high], btype='bandpass', output='sos')
return sosfiltfilt(sos, array(data))
elif type == "highpass": # 高通滤波处理
sos = butter(order, high_cut / (sample_rate * 0.5), btype='highpass', output='sos')
return sosfiltfilt(sos, array(data))
else: # 警告,滤波器类型必须有
raise ValueError("Please choose a type of fliter")

584
func/Module_detect_Jpeak.py Normal file
View File

@ -0,0 +1,584 @@
from gc import collect
from pathlib import Path
import matplotlib.pyplot as plt
from PySide6.QtWidgets import QMessageBox, QMainWindow, QWidget, QPushButton, QProgressBar, QApplication
from matplotlib import gridspec
from matplotlib.backends.backend_qt import NavigationToolbar2QT
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg
from overrides import overrides
from pandas import read_csv, DataFrame
from yaml import dump, load, FullLoader
from func.utils.PublicFunc import PublicFunc
from func.utils.Constants import Constants, ConfigParams
from func.utils.detect_Jpeak import preprocess, Jpeak_Detection
from ui.MainWindow.MainWindow_detect_Jpeak import Ui_MainWindow_detect_Jpeak
from ui.setting.detect_Jpeak_input_setting import Ui_MainWindow_detect_Jpeak_input_setting
Config = {
}
ButtonState = {
"Default": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
},
"Current": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
}
}
class SettingWindow(QMainWindow):
def __init__(self, root_path, sampID):
super(SettingWindow, self).__init__()
self.ui = Ui_MainWindow_detect_Jpeak_input_setting()
self.ui.setupUi(self)
self.root_path = root_path
self.sampID = sampID
self.config = None
self.__read_config__()
self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__)
self.ui.pushButton_confirm.clicked.connect(self.__write_config__)
self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__)
self.ui.pushButton_cancel.clicked.connect(self.close)
def __read_config__(self):
if not Path(ConfigParams.DETECT_JPEAK_CONFIG_FILE_PATH).exists():
with open(ConfigParams.DETECT_JPEAK_CONFIG_FILE_PATH, "w") as f:
dump(ConfigParams.DETECT_JPEAK_CONFIG_NEW_CONTENT, f)
with open(ConfigParams.DETECT_JPEAK_CONFIG_FILE_PATH, "r") as f:
file_config = load(f.read(), Loader=FullLoader)
Config.update(file_config)
self.config = file_config
Config.update({
"Path": {
"Input": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.DETECT_JPEAK_INPUT_BCG_FILENAME +
str(Config["InputConfig"]["Freq"]) +
ConfigParams.ENDSWITH_TXT))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.DETECT_JPEAK_SAVE_FILENAME +
ConfigParams.ENDSWITH_TXT)))
}
})
# 数据回显
self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"])
self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"])
self.ui.plainTextEdit_deepmodel_path.setPlainText(Config["ModelFolderPath"])
self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"])
def __write_config__(self):
# 从界面写入配置
Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText()
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
Config["ModelFolderPath"] = self.ui.plainTextEdit_deepmodel_path.toPlainText()
# 保存配置到文件
self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
self.config["ModelFolderPath"] = self.ui.plainTextEdit_deepmodel_path.toPlainText()
with open(ConfigParams.DETECT_JPEAK_CONFIG_FILE_PATH, "w") as f:
dump(self.config, f)
self.close()
def __rollback_config__(self):
self.__read_config__()
def __update_ui__(self):
self.ui.plainTextEdit_file_path_input.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.DETECT_JPEAK_INPUT_BCG_FILENAME +
str(self.ui.spinBox_input_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
class MainWindow_detect_Jpeak(QMainWindow):
def __init__(self):
super(MainWindow_detect_Jpeak, self).__init__()
self.ui = Ui_MainWindow_detect_Jpeak()
self.ui.setupUi(self)
self.root_path = None
self.sampID = None
self.data = None
self.model = None
self.setting = None
# 初始化进度条
self.progressbar = None
self.add_progressbar()
#初始化画框
self.fig = None
self.canvas = None
self.figToolbar = None
self.gs = None
self.ax0 = None
self.line_data = None
self.point_peak = None
self.line_interval = None
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
@overrides
def show(self, root_path, sampID):
super().show()
self.root_path = root_path
self.sampID = sampID
self.setting = SettingWindow(root_path, sampID)
# 初始化画框
self.fig = plt.figure(figsize=(12, 9), dpi=100)
self.canvas = FigureCanvasQTAgg(self.fig)
self.figToolbar = NavigationToolbar2QT(self.canvas)
for action in self.figToolbar.actions():
if action.text() == "Subplots" or action.text() == "Customize":
self.figToolbar.removeAction(action)
self.ui.verticalLayout_canvas.addWidget(self.canvas)
self.ui.verticalLayout_canvas.addWidget(self.figToolbar)
self.gs = gridspec.GridSpec(1, 1, height_ratios=[1])
self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0)
self.ax0 = self.fig.add_subplot(self.gs[0])
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER)
self.__resetAllButton__()
self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BandPassLow"])
self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BandPassHigh"])
self.ui.spinBox_peaksValue.setValue(Config["PeaksValue"])
self.ui.doubleSpinBox_ampValue.setValue(Config["AmpValue"])
self.ui.spinBox_intervalLow.setValue(Config["IntervalLow"])
self.ui.spinBox_intervalHigh.setValue(Config["IntervalHigh"])
self.ui.checkBox_useCPU.setChecked(Config["UseCPU"])
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__)
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__)
self.ui.spinBox_peaksValue.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_ampValue.editingFinished.connect(self.__update_config__)
self.ui.spinBox_intervalLow.editingFinished.connect(self.__update_config__)
self.ui.spinBox_intervalHigh.editingFinished.connect(self.__update_config__)
self.ui.checkBox_useCPU.stateChanged.connect(self.__update_config__)
self.ui.comboBox_model.currentTextChanged.connect(self.__update_config__)
@overrides
def closeEvent(self, event):
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
QApplication.processEvents()
# 清空画框
if self.line_data and self.point_peak:
del self.line_data
del self.point_peak
del self.line_interval
self.canvas.draw()
# 释放资源
del self.data
del self.model
self.fig.clf()
plt.close(self.fig)
self.deleteLater()
collect()
self.canvas = None
event.accept()
@staticmethod
def __reset__():
ButtonState["Current"].update(ButtonState["Default"].copy())
ButtonState["Current"]["pushButton_view"] = True
def __plot__(self):
# 清空画框
if self.line_data and self.point_peak and self.line_interval:
try:
self.line_data.remove()
self.point_peak.remove()
self.line_interval.remove()
except ValueError:
pass
sender = self.sender()
if sender == self.ui.pushButton_view:
self.line_data, = self.ax0.plot(self.data.processed_data,
color=Constants.PLOT_COLOR_BLUE,
label=Constants.DETECT_J_PEAK_PLOT_LABEL_BCG)
self.point_peak, = self.ax0.plot(self.data.peak, self.data.processed_data[self.data.peak],
'r.',
label=Constants.DETECT_J_PEAK_PLOT_LABEL_J_PEAKS)
self.line_interval, = self.ax0.plot(self.data.interval,
color=Constants.PLOT_COLOR_ORANGE,
label=Constants.DETECT_J_PEAK_PLOT_LABEL_INTERVAL)
self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT)
status = True
info = Constants.DRAWING_FINISHED
else:
status = False
info = Constants.DRAWING_FAILURE
self.canvas.draw()
return status, info
def __disableAllButton__(self):
# 禁用所有按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并禁用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Current"].keys():
widget.setEnabled(False)
def __enableAllButton__(self):
# 启用按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并启用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Current"].keys():
widget.setEnabled(ButtonState["Current"][widget.objectName()])
def __resetAllButton__(self):
# 启用按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并启用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Default"].keys():
widget.setEnabled(ButtonState["Default"][widget.objectName()])
def __slot_btn_input__(self):
self.__disableAllButton__()
# 清空画框
if self.line_data and self.point_peak and self.line_interval:
try:
self.line_data.remove()
self.point_peak.remove()
self.line_interval.remove()
except ValueError:
pass
self.canvas.draw()
# 清空模型列表
self.ui.comboBox_model.clear()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.LOADING_MODEL))
self.progressbar.setValue(0)
QApplication.processEvents()
# 寻找模型
self.model = Model()
status, info = self.model.seek_model()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
self.update_ui_comboBox_model(self.model.model_list)
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.INPUTTING_DATA))
self.progressbar.setValue(10)
QApplication.processEvents()
# 导入数据
self.data = Data()
status, info = self.data.open_file()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
MainWindow_detect_Jpeak.__reset__()
self.finish_operation()
def __slot_btn_view__(self):
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.DETECT_JPEAK_PROCESSING_DATA))
self.progressbar.setValue(0)
QApplication.processEvents()
# 数据预处理
status, info = self.data.preprocess()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.DETECT_JPEAK_PREDICTING_PEAK))
self.progressbar.setValue(10)
QApplication.processEvents()
# 预测峰值
self.model.selected_model = Config["DetectMethod"]
status, info = self.data.predict_Jpeak(self.model)
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_J_PEAK_DATA_LENGTH_POINTS + str(len(self.data.data)),
Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_J_PEAK_DURATION_MIN +
str((len(self.data.data) / Config["InputConfig"]["Freq"] / 60)),
Constants.TIPS_TYPE_INFO)
PublicFunc.text_output(self.ui, Constants.DETECT_J_PEAK_JPEAK_AMOUNT + str(len(self.data.peak)),
Constants.TIPS_TYPE_INFO)
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.DRAWING_DATA))
self.progressbar.setValue(70)
QApplication.processEvents()
# 绘图
status, info = self.__plot__()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_save"] = True
self.finish_operation()
def __slot_btn_save__(self):
reply = QMessageBox.question(self, Constants.QUESTION_TITLE,
Constants.QUESTION_CONTENT + Config["Path"]["Save"],
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes)
if reply == QMessageBox.Yes:
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.SAVING_DATA))
self.progressbar.setValue(0)
QApplication.processEvents()
# 保存
# status, info = self.data.save()
total_rows = len(DataFrame(self.data.peak.reshape(-1)))
chunk_size = ConfigParams.PREPROCESS_SAVE_CHUNK_SIZE
with open(Config["Path"]["Save"], 'w') as f:
for start in range(0, total_rows, chunk_size):
end = min(start + chunk_size, total_rows)
chunk = DataFrame(self.data.peak.reshape(-1)).iloc[start:end]
status, info = self.data.save(chunk)
progress = int((end / total_rows) * 100)
self.progressbar.setValue(progress)
QApplication.processEvents()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
PublicFunc.msgbox_output(self, info, Constants.TIPS_TYPE_INFO)
self.finish_operation()
def __update_config__(self):
Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value()
Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value()
Config["PeaksValue"] = self.ui.spinBox_peaksValue.value()
Config["AmpValue"] = self.ui.doubleSpinBox_ampValue.value()
Config["IntervalLow"] = self.ui.spinBox_intervalLow.value()
Config["IntervalHigh"] = self.ui.spinBox_intervalHigh.value()
Config["UseCPU"] = self.ui.checkBox_useCPU.isChecked()
Config["DetectMethod"] = self.ui.comboBox_model.currentText()
def update_ui_comboBox_model(self, model_list):
self.ui.comboBox_model.clear()
self.ui.comboBox_model.addItems(model_list)
def finish_operation(self):
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.OPERATION_FINISHED))
self.progressbar.setValue(100)
QApplication.processEvents()
self.__enableAllButton__()
def add_progressbar(self):
self.progressbar = QProgressBar()
self.progressbar.setRange(0, 100)
self.progressbar.setValue(0)
self.progressbar.setStyleSheet(Constants.PROGRESSBAR_STYLE)
self.ui.statusBar.addPermanentWidget(self.progressbar)
def statusbar_show_msg(self, msg):
self.ui.statusBar.showMessage(msg)
def statusbar_clear_msg(self):
self.ui.statusBar.clearMessage()
class Data:
def __init__(self):
self.file_path_input = Config["Path"]["Input"]
self.file_path_save = Config["Path"]["Save"]
self.data = None
self.processed_data = None
self.peak = None
self.interval = None
def open_file(self):
if not Path(Config["Path"]["Input"]).exists():
return False, Constants.INPUT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Data_Path_Not_Exist"]
try:
self.data = read_csv(self.file_path_input,
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception:
return False, Constants.INPUT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Read_File_Exception"]
return True, Constants.INPUT_FINISHED
def preprocess(self):
if self.data is None:
return False, Constants.DETECT_JPEAK_PROCESS_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Raw_Data_Not_Exist"]
try:
self.processed_data = preprocess(self.data,
Config["InputConfig"]["Freq"],
Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["AmpValue"])
except Exception:
return False, Constants.DETECT_JPEAK_PROCESS_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Filter_Exception"]
return True, Constants.DETECT_JPEAK_PROCESS_FINISHED
def predict_Jpeak(self, model):
if not (Path(model.model_folder_path) / Path(model.selected_model)).exists():
return False, Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Model_File_Not_Exist"]
if self.processed_data is None:
return False, Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Processed_Data_Not_Exist"]
try:
self.peak, self.interval = Jpeak_Detection(model.selected_model,
Path(model.model_folder_path) / Path(model.selected_model),
self.processed_data,
Config["InputConfig"]["Freq"],
Config["IntervalHigh"],
Config["IntervalLow"],
Config["PeaksValue"],
Config["UseCPU"])
except Exception:
return False, Constants.DETECT_JPEAK_PREDICT_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Predict_Exception"]
return True, Constants.DETECT_JPEAK_PREDICT_FINISHED
def save(self, chunk):
if self.peak is None:
return False, Constants.SAVING_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Peak_Not_Exist"]
try:
# DataFrame(self.processed_data.reshape(-1)).to_csv(self.file_path_save,
# index=False,
# header=False,
# float_format='%.4f')
chunk.to_csv(self.file_path_save, mode='a', index=False, header=False)
except Exception:
return False, Constants.SAVING_FAILURE + Constants.PREPROCESS_FAILURE_REASON["Save_Exception"]
return True, Constants.SAVING_FINISHED
class Model:
def __init__(self):
self.model_folder_path = Config["ModelFolderPath"]
self.model_list = None
self.selected_model_path = None
self.selected_model = None
def seek_model(self):
if not Path(Config["ModelFolderPath"]).exists():
return False, Constants.LOAD_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Model_Path_Not_Exist"]
try:
self.model_list = [file.name for file in Path(Config["ModelFolderPath"]).iterdir() if file.is_file()]
if len(self.model_list) == 0:
return False, Constants.DETECT_JPEAK_FAILURE_REASON["Model_File_Not_Exist"]
except Exception:
return False, Constants.LOAD_FAILURE + Constants.DETECT_JPEAK_FAILURE_REASON["Read_Model_Exception"]
return True, Constants.LOAD_FINISHED

108
func/Module_mainwindow.py Normal file
View File

@ -0,0 +1,108 @@
from pathlib import Path
from PySide6.QtWidgets import QMainWindow, QMessageBox, QFileDialog
from matplotlib import use
from yaml import dump, load, FullLoader
from func.utils.PublicFunc import PublicFunc
from ui.MainWindow.MainWindow_menu import Ui_Signal_Label
from func.Module_preprocess import MainWindow_preprocess
from func.Module_detect_Jpeak import MainWindow_detect_Jpeak
from func.utils.Constants import Constants, ConfigParams
use("QtAgg")
Config = {
}
class MainWindow(QMainWindow, Ui_Signal_Label):
def __init__(self):
super(MainWindow, self).__init__()
self.ui = Ui_Signal_Label()
self.ui.setupUi(self)
self.__read_config__()
self.ui.plainTextEdit_root_path.setPlainText(Config["Path"]["Root"])
self.seek_sampID(Path(Config["Path"]["Root"]) / Path(ConfigParams.PUBLIC_PATH_ORGBCG_TEXT))
self.preprocess = None
# 消息弹窗初始化
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
# 绑定槽函数
self.ui.pushButton_open.clicked.connect(self.__slot_btn_open__)
self.ui.pushButton_preprocess_BCG.clicked.connect(self.__slot_btn_preprocess__)
self.ui.pushButton_preprocess_ECG.clicked.connect(self.__slot_btn_preprocess__)
self.ui.pushButton_detect_Jpeak.clicked.connect(self.__slot_btn_detect_Jpeak__)
@staticmethod
def __read_config__():
if not Path(ConfigParams.PUBLIC_CONFIG_FILE_PATH).exists():
with open(ConfigParams.PUBLIC_CONFIG_FILE_PATH, "w") as f:
dump(ConfigParams.PUBLIC_CONFIG_NEW_CONTENT, f)
with open(ConfigParams.PUBLIC_CONFIG_FILE_PATH, "r") as f:
file_config = load(f.read(), Loader=FullLoader)
Config.update(file_config)
@staticmethod
def __write_config__():
with open(Path(ConfigParams.PUBLIC_CONFIG_FILE_PATH), "w") as f:
dump(Config, f)
def __slot_btn_open__(self):
file_dialog = QFileDialog()
file_dialog.setFileMode(QFileDialog.Directory)
file_dialog.setOption(QFileDialog.ShowDirsOnly, True)
if file_dialog.exec_() == QFileDialog.Accepted:
self.seek_sampID(Path(file_dialog.selectedFiles()[0]) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT)
self.ui.plainTextEdit_root_path.setPlainText(file_dialog.selectedFiles()[0])
# 修改配置
Config["Path"]["Root"] = str(file_dialog.selectedFiles()[0])
self.__write_config__()
else:
PublicFunc.msgbox_output(self, Constants.OPERATION_CANCELED, Constants.MSGBOX_TYPE_INFO)
def __slot_btn_preprocess__(self):
self.preprocess = MainWindow_preprocess()
sender = self.sender()
root_path = self.ui.plainTextEdit_root_path.toPlainText()
sampID = int(self.ui.comboBox_sampID.currentText())
if sender == self.ui.pushButton_preprocess_BCG:
mode = "BCG"
self.preprocess.show(mode, root_path, sampID)
elif sender == self.ui.pushButton_preprocess_ECG:
mode = "ECG"
self.preprocess.show(mode, root_path, sampID)
def __slot_btn_detect_Jpeak__(self):
self.detect_Jpeak = MainWindow_detect_Jpeak()
root_path = self.ui.plainTextEdit_root_path.toPlainText()
sampID = int(self.ui.comboBox_sampID.currentText())
self.detect_Jpeak.show(root_path, sampID)
def seek_sampID(self, path):
if not Path(path).exists():
PublicFunc.msgbox_output(self, Constants.MAINWINDOW_ROOT_PATH_NOT_EXIST, Constants.MSGBOX_TYPE_ERROR)
return
sub_folders = [item.name for item in Path(path).iterdir() if item.is_dir()]
self.ui.comboBox_sampID.addItems(sub_folders)

525
func/Module_preprocess.py Normal file
View File

@ -0,0 +1,525 @@
from gc import collect
from pathlib import Path
import matplotlib.pyplot as plt
from PySide6.QtWidgets import QMessageBox, QMainWindow, QWidget, QPushButton, QProgressBar, QApplication
from matplotlib import gridspec
from matplotlib.backends.backend_qt import NavigationToolbar2QT
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg
from overrides import overrides
from pandas import read_csv, DataFrame
from yaml import dump, load, FullLoader
from func.utils.PublicFunc import PublicFunc
from func.utils.Constants import Constants, ConfigParams
from func.Filters.Preprocessing import Butterworth_for_BCG_PreProcess, Butterworth_for_ECG_PreProcess
from ui.MainWindow.MainWindow_preprocess import Ui_MainWindow_preprocess
from ui.setting.preprocess_input_setting import Ui_MainWindow_preprocess_input_setting
Config = {
}
ButtonState = {
"Default": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
},
"Current": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_view": False,
"pushButton_save": False
}
}
class SettingWindow(QMainWindow):
def __init__(self, mode, root_path, sampID):
super(SettingWindow, self).__init__()
self.ui = Ui_MainWindow_preprocess_input_setting()
self.ui.setupUi(self)
self.mode = mode
self.root_path = root_path
self.sampID = sampID
self.config = None
self.__read_config__()
self.ui.spinBox_input_freq.valueChanged.connect(self.__update_ui__)
self.ui.pushButton_confirm.clicked.connect(self.__write_config__)
self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__)
self.ui.pushButton_cancel.clicked.connect(self.close)
def __read_config__(self):
if not Path(ConfigParams.PREPROCESS_CONFIG_FILE_PATH).exists():
with open(ConfigParams.PREPROCESS_CONFIG_FILE_PATH, "w") as f:
dump(ConfigParams.PREPROCESS_CONFIG_NEW_CONTENT, f)
with open(ConfigParams.PREPROCESS_CONFIG_FILE_PATH, "r") as f:
file_config = load(f.read(), Loader=FullLoader)
Config.update(file_config)
self.config = file_config
if self.mode == "BCG":
Config.update({
"Path": {
"Input": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.PREPROCESS_INPUT_BCG_FILENAME +
str(Config["InputConfig"]["Freq"]) +
ConfigParams.ENDSWITH_TXT))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.PREPROCESS_SAVE_BCG_FILENAME +
str(Config["InputConfig"]["Freq"]) +
ConfigParams.ENDSWITH_TXT)))
},
"Mode": self.mode
})
else:
Config.update({
"Path": {
"Input": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.PREPROCESS_INPUT_ECG_FILENAME +
str(Config["InputConfig"]["Freq"]) +
ConfigParams.ENDSWITH_TXT))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.PREPROCESS_SAVE_ECG_FILENAME +
str(Config["InputConfig"]["Freq"]) +
ConfigParams.ENDSWITH_TXT)))
},
"Mode": self.mode
})
# 数据回显
self.ui.spinBox_input_freq.setValue(Config["InputConfig"]["Freq"])
self.ui.plainTextEdit_file_path_input.setPlainText(Config["Path"]["Input"])
self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"])
def __write_config__(self):
# 从界面写入配置
Config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
Config["Path"]["Input"] = self.ui.plainTextEdit_file_path_input.toPlainText()
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
# 保存配置到文件
self.config["InputConfig"]["Freq"] = self.ui.spinBox_input_freq.value()
with open(ConfigParams.PREPROCESS_CONFIG_FILE_PATH, "w") as f:
dump(self.config, f)
self.close()
def __rollback_config__(self):
self.__read_config__()
def __update_ui__(self):
if self.mode == "BCG":
self.ui.plainTextEdit_file_path_input.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.PREPROCESS_INPUT_BCG_FILENAME +
str(self.ui.spinBox_input_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
self.ui.plainTextEdit_file_path_save.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.PREPROCESS_SAVE_BCG_FILENAME +
str(self.ui.spinBox_input_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
else:
self.ui.plainTextEdit_file_path_input.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.PREPROCESS_INPUT_ECG_FILENAME +
str(self.ui.spinBox_input_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
self.ui.plainTextEdit_file_path_save.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.PREPROCESS_SAVE_ECG_FILENAME +
str(self.ui.spinBox_input_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
class MainWindow_preprocess(QMainWindow):
def __init__(self):
super(MainWindow_preprocess, self).__init__()
self.ui = Ui_MainWindow_preprocess()
self.ui.setupUi(self)
self.mode = None
self.root_path = None
self.sampID = None
self.data = None
self.setting = None
# 初始化进度条
self.progressbar = None
self.add_progressbar()
#初始化画框
self.fig = None
self.canvas = None
self.figToolbar = None
self.gs = None
self.ax0 = None
self.line_raw_data = None
self.line_processed_data = None
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
@overrides
def show(self, mode, root_path, sampID):
super().show()
self.mode = mode
self.root_path = root_path
self.sampID = sampID
self.setting = SettingWindow(mode, root_path, sampID)
# 初始化画框
self.fig = plt.figure(figsize=(12, 9), dpi=100)
self.canvas = FigureCanvasQTAgg(self.fig)
self.figToolbar = NavigationToolbar2QT(self.canvas)
for action in self.figToolbar.actions():
if action.text() == "Subplots" or action.text() == "Customize":
self.figToolbar.removeAction(action)
self.ui.verticalLayout_canvas.addWidget(self.canvas)
self.ui.verticalLayout_canvas.addWidget(self.figToolbar)
self.gs = gridspec.GridSpec(1, 1, height_ratios=[1])
self.fig.subplots_adjust(top=0.98, bottom=0.05, right=0.98, left=0.1, hspace=0, wspace=0)
self.ax0 = self.fig.add_subplot(self.gs[0])
self.ax0.grid(True)
self.ax0.xaxis.set_major_formatter(ConfigParams.FORMATTER)
self.__resetAllButton__()
self.ui.label_mode.setText(self.mode)
if self.mode == "BCG":
self.ui.spinBox_bandPassOrder.setValue(Config["Filter"]["BCGBandPassOrder"])
self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["BCGBandPassLow"])
self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["BCGBandPassHigh"])
else:
self.ui.spinBox_bandPassOrder.setValue(Config["Filter"]["ECGBandPassOrder"])
self.ui.doubleSpinBox_bandPassLow.setValue(Config["Filter"]["ECGBandPassLow"])
self.ui.doubleSpinBox_bandPassHigh.setValue(Config["Filter"]["ECGBandPassHigh"])
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
self.ui.pushButton_view.clicked.connect(self.__slot_btn_view__)
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
self.ui.spinBox_bandPassOrder.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_bandPassLow.editingFinished.connect(self.__update_config__)
self.ui.doubleSpinBox_bandPassHigh.editingFinished.connect(self.__update_config__)
@overrides
def closeEvent(self, event):
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
QApplication.processEvents()
# 清空画框
if self.line_raw_data and self.line_processed_data:
del self.line_raw_data
del self.line_processed_data
self.canvas.draw()
# 释放资源
del self.data
self.fig.clf()
plt.close(self.fig)
self.deleteLater()
collect()
self.canvas = None
event.accept()
@staticmethod
def __reset__():
ButtonState["Current"].update(ButtonState["Default"].copy())
ButtonState["Current"]["pushButton_view"] = True
def __plot__(self):
# 清空画框
if self.line_raw_data and self.line_processed_data:
try:
self.line_raw_data.remove()
self.line_processed_data.remove()
except ValueError:
pass
sender = self.sender()
if sender == self.ui.pushButton_view:
self.line_raw_data, = self.ax0.plot(self.data.raw_data,
color=Constants.PLOT_COLOR_RED,
label=Constants.PREPROCESS_PLOT_LABEL_ORIGINAL_DATA)
self.line_processed_data, = self.ax0.plot(self.data.processed_data + Constants.PREPROCESS_OUTPUT_INPUT_AMP_OFFSET,
color=Constants.PLOT_COLOR_BLUE,
label=Constants.PREPROCESS_PLOT_LABEL_PROCESSED_DATA)
self.ax0.legend(loc=Constants.PLOT_UPPER_RIGHT)
status = True
info = Constants.DRAWING_FINISHED
else:
status = False
info = Constants.DRAWING_FAILURE
self.canvas.draw()
return status, info
def __disableAllButton__(self):
# 禁用所有按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并禁用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Current"].keys():
widget.setEnabled(False)
def __enableAllButton__(self):
# 启用按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并启用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Current"].keys():
widget.setEnabled(ButtonState["Current"][widget.objectName()])
def __resetAllButton__(self):
# 启用按钮
all_widgets = self.centralWidget().findChildren(QWidget)
# 迭代所有部件,查找按钮并启用它们
for widget in all_widgets:
if isinstance(widget, QPushButton):
if widget.objectName() in ButtonState["Default"].keys():
widget.setEnabled(ButtonState["Default"][widget.objectName()])
def __slot_btn_input__(self):
self.__disableAllButton__()
# 清空画框
if self.line_raw_data and self.line_processed_data:
try:
self.line_raw_data.remove()
self.line_processed_data.remove()
except ValueError:
pass
self.canvas.draw()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.INPUTTING_DATA))
self.progressbar.setValue(0)
QApplication.processEvents()
# 导入数据
self.data = Data()
status, info = self.data.open_file()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
MainWindow_preprocess.__reset__()
self.finish_operation()
def __slot_btn_view__(self):
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.PREPROCESS_PROCESSING_DATA))
self.progressbar.setValue(0)
QApplication.processEvents()
# 数据预处理
status, info = self.data.preprocess()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.DRAWING_DATA))
self.progressbar.setValue(50)
QApplication.processEvents()
# 绘图
status, info = self.__plot__()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_save"] = True
self.finish_operation()
def __slot_btn_save__(self):
reply = QMessageBox.question(self, Constants.QUESTION_TITLE,
Constants.QUESTION_CONTENT + Config["Path"]["Save"],
QMessageBox.Yes | QMessageBox.No,
QMessageBox.Yes)
if reply == QMessageBox.Yes:
self.__disableAllButton__()
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.SAVING_DATA))
self.progressbar.setValue(0)
QApplication.processEvents()
# 保存
# status, info = self.data.save()
total_rows = len(DataFrame(self.data.processed_data.reshape(-1)))
chunk_size = ConfigParams.PREPROCESS_SAVE_CHUNK_SIZE
with open(Config["Path"]["Save"], 'w') as f:
for start in range(0, total_rows, chunk_size):
end = min(start + chunk_size, total_rows)
chunk = DataFrame(self.data.processed_data.reshape(-1)).iloc[start:end]
status, info = self.data.save(chunk)
progress = int((end / total_rows) * 100)
self.progressbar.setValue(progress)
QApplication.processEvents()
if not status:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, info, Constants.MSGBOX_TYPE_ERROR)
self.finish_operation()
return
else:
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
PublicFunc.msgbox_output(self, info, Constants.TIPS_TYPE_INFO)
self.finish_operation()
def __update_config__(self):
if self.mode == "BCG":
Config["Filter"]["BCGBandPassOrder"] = self.ui.spinBox_bandPassOrder.value()
Config["Filter"]["BCGBandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value()
Config["Filter"]["BCGBandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value()
else:
Config["Filter"]["ECGBandPassOrder"] = self.ui.spinBox_bandPassOrder.value()
Config["Filter"]["ECGBandPassLow"] = self.ui.doubleSpinBox_bandPassLow.value()
Config["Filter"]["ECGBandPassHigh"] = self.ui.doubleSpinBox_bandPassHigh.value()
def finish_operation(self):
self.statusbar_show_msg(PublicFunc.format_status_msg(Constants.OPERATION_FINISHED))
self.progressbar.setValue(100)
QApplication.processEvents()
self.__enableAllButton__()
def add_progressbar(self):
self.progressbar = QProgressBar()
self.progressbar.setRange(0, 100)
self.progressbar.setValue(0)
self.progressbar.setStyleSheet(Constants.PROGRESSBAR_STYLE)
self.ui.statusBar.addPermanentWidget(self.progressbar)
def statusbar_show_msg(self, msg):
self.ui.statusBar.showMessage(msg)
def statusbar_clear_msg(self):
self.ui.statusBar.clearMessage()
class Data:
def __init__(self):
self.file_path_input = Config["Path"]["Input"]
self.file_path_save = Config["Path"]["Save"]
self.raw_data = None
self.processed_data = None
def open_file(self):
if not Path(Config["Path"]["Input"]).exists():
return False, Constants.INPUT_FAILURE + Constants.PREPROCESS_FAILURE_REASON["Data_Path_Not_Exist"]
try:
self.raw_data = read_csv(self.file_path_input,
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception:
return False, Constants.INPUT_FAILURE + Constants.PREPROCESS_FAILURE_REASON["Read_Data_Exception"]
return True, Constants.INPUT_FINISHED
def preprocess(self):
if self.raw_data is None:
return False, Constants.PREPROCESS_PROCESS_FAILURE + Constants.PREPROCESS_FAILURE_REASON["Raw_Data_Not_Exist"]
try:
if Config["Mode"] == "BCG":
self.processed_data = Butterworth_for_BCG_PreProcess(self.raw_data, type='bandpass',
low_cut=Config["Filter"]["BCGBandPassLow"],
high_cut=Config["Filter"]["BCGBandPassHigh"],
order=Config["Filter"]["BCGBandPassOrder"],
sample_rate=Config["InputConfig"]["Freq"])
else:
self.processed_data = Butterworth_for_ECG_PreProcess(self.raw_data, type='bandpass',
low_cut=Config["Filter"]["ECGBandPassLow"],
high_cut=Config["Filter"]["ECGBandPassHigh"],
order=Config["Filter"]["ECGBandPassOrder"],
sample_rate=Config["InputConfig"]["Freq"])
except Exception:
return False, Constants.PREPROCESS_PROCESS_FAILURE + Constants.PREPROCESS_FAILURE_REASON["Filter_Exception"]
return True, Constants.PREPROCESS_PROCESS_FINISHED
def save(self, chunk):
if self.processed_data is None:
return False, Constants.SAVING_FAILURE + Constants.PREPROCESS_FAILURE_REASON["Processed_Data_Not_Exist"]
try:
# DataFrame(self.processed_data.reshape(-1)).to_csv(self.file_path_save,
# index=False,
# header=False,
# float_format='%.4f')
chunk.to_csv(self.file_path_save, mode='a', index=False, header=False, float_format='%.4f')
except Exception:
return False, Constants.SAVING_FAILURE + Constants.PREPROCESS_FAILURE_REASON["Save_Exception"]
return True, Constants.SAVING_FINISHED

270
func/utils/ConfigParams.py Normal file
View File

@ -0,0 +1,270 @@
from PySide6.QtGui import QIntValidator, QDoubleValidator
from matplotlib.ticker import FuncFormatter
class ConfigParams:
# 公共
PUBLIC_CONFIG_FILE_PATH: str = "./config/Config_public.yaml"
PUBLIC_PATH_ORGBCG_TEXT: str = "OrgBCG_Text"
PUBLIC_PATH_PSG_TEXT: str = "PSG_Text"
PUBLIC_PATH_ORGBCG_ALIGNED: str = "OrgBCG_Aligned"
PUBLIC_PATH_PSG_ALIGNED: str = "PSG_Aligned"
PUBLIC_PATH_LABEL: str = "Label"
PUBLIC_CONFIG_NEW_CONTENT = {
"Path": {
"Root": ""
}
}
UTF8_ENCODING: str = "utf-8"
GBK_ENCODING: str = "gbk"
ENDSWITH_TXT: str = ".txt"
ENDSWITH_CSV: str = ".csv"
ENDSWITH_EDF: str = ".edf"
FORMATTER = FuncFormatter(lambda x, p: f"{x:.0f}")
ACTION_PAN_SHORTCUT_KEY: str = "X"
ACTION_ZOOM_SHORTCUT_KEY: str = "C"
# 数据粗同步
# 预处理
PREPROCESS_CONFIG_FILE_PATH: str = "./config/Config_preprocess.yaml"
PREPROCESS_CONFIG_NEW_CONTENT = {
"InputConfig": {
"Freq": 1000
},
"Filter": {
"BCGBandPassOrder": 4,
"BCGBandPassLow": 2,
"BCGBandPassHigh": 10,
"ECGBandPassOrder": 3,
"ECGBandPassLow": 1,
"ECGBandPassHigh": 25
}
}
PREPROCESS_INPUT_BCG_FILENAME: str = "orgBcg_Raw_"
PREPROCESS_INPUT_ECG_FILENAME: str = "ECG I_"
PREPROCESS_SAVE_BCG_FILENAME: str = "DSbcg_sig_"
PREPROCESS_SAVE_ECG_FILENAME: str = "ECG_filter_"
PREPROCESS_SAVE_CHUNK_SIZE: int = 1000000
# BCG的J峰算法定位
DETECT_JPEAK_CONFIG_FILE_PATH: str = "./config/Config_detect_Jpeak.yaml"
DETECT_JPEAK_CONFIG_NEW_CONTENT = {
"InputConfig": {
"Freq": 1000
},
"Filter": {
"BandPassLow": 2,
"BandPassHigh": 10
},
"ModelFolderPath": "./func/detect_Jpeak_model",
"PeaksValue": 100,
"AmpValue": 5,
"IntervalLow": 50,
"IntervalHigh": 140,
"UseCPU": False,
"DetectMethod": ""
}
DETECT_JPEAK_INPUT_BCG_FILENAME: str = "DSbcg_sig_"
DETECT_JPEAK_SAVE_FILENAME: str = "JPeak_revise"
DETECT_JPEAK_SAVE_CHUNK_SIZE: int = 100
# TODO弃用
# 通用
# 目前用到这个编码的地方:
# <BCG的质量评估打标>里的保存和读取csv文件的地方注意的是读取原始数据时依然使用UTF-8
VALIDATOR_INTEGER = QIntValidator(-2**31, 2**31 - 1)
VALIDATOR_DOUBLE = QDoubleValidator(-1e100, 1e100, 10)
FONT: str = "Microsoft YaHei UI"
# 菜单界面
MATPLOTLIB_PLOT_PRECISION_PARAM: int = 10000
# 数据粗同步
APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME: str = "orgBcg_Raw_"
APPROXIMATELY_ALIGN_INPUT_PSG_FILENAME: str = "A"
APPROXIMATELY_ALIGN_SAVE_FILENAME: str = "Approximately_Align_Info"
APPROXIMATELY_ALIGN_INPUT_ORGBCG_DEFAULT_FS: int = 1000
APPROXIMATELY_ALIGN_INPUT_PSG_DEFAULT_FS: int = 100
APPROXIMATELY_ALIGN_THO_CUSTOM_CHANNEL_DEFAULT: int = 3
APPROXIMATELY_ALIGN_ABD_CUSTOM_CHANNE_DEFAULT: int = 4
APPROXIMATELY_ALIGN_BUTTERORDER_DEFAULT: int = 4
APPROXIMATELY_ALIGN_BUTTERLOWPASSFREQ_CHANNE_DEFAULT: float = 0.01
APPROXIMATELY_ALIGN_BUTTERHIGHPASSFREQ_DEFAULT: float = 0.70
APPROXIMATELY_ALIGN_APPLYFREQ_DEFAULT: float = 5
# 预处理
# PREPROCESS_INPUT_BCG_FILENAME: str = "orgBcg_Raw_"
# PREPROCESS_INPUT_ECG_FILENAME: str = "ECG I_"
# PREPROCESS_SAVE_BCG_FILENAME: str = "DSbcg_sig_"
# PREPROCESS_SAVE_ECG_FILENAME: str = "ECG_filter_"
# PREPROCESS_INPUT_BCG_DEFAULT_FS: int = 1000
# PREPROCESS_INPUT_BCG_SAVE_DEFAULT_FS: int = 1000
# PREPROCESS_INPUT_ECG_DEFAULT_FS: int = 1000
# PREPROCESS_INPUT_ECG_SAVE_DEFAULT_FS: int = 1000
#
# PREPROCESS_BANDPASS_LOW_DEFAULT: int = 2
# PREPROCESS_BANDPASS_HIGH_DEFAULT: int = 10
# PREPROCESS_FILTER_ORDER_DEFAULT: int = 4
#
# PREPROCESS_FILTER_BCG: str = "bandpass"
# PREPROCESS_FILTER_ECG: str = "bandpass"
# ECG的R峰算法定位
DETECT_R_PEAK_INPUT_ECG_FILENAME: str = "ECG_filter_"
DETECT_R_PEAK_SAVE_RPEAK_FILENAME: str = "final_Rpeak"
DETECT_R_PEAK_INPUT_ECG_DEFAULT_FS: int = 1000
DETECT_R_PEAK_PEAKS_VALUE_DEFAULT: int = 200
DETECT_R_PEAK_BANDPASS_LOW_DEFAULT: int = 2
DETECT_R_PEAK_BANDPASS_HIGH_DEFAULT: int = 15
DETECT_R_PEAK_DETECT_METHOD_PT: str = "pt"
DETECT_R_PEAK_DETECT_METHOD_TA: str = "ta"
DETECT_R_PEAK_DETECT_METHOD_WT: str = "Wt"
DETECT_R_PEAK_DETECT_METHOD_HAMILTON: str = "Hamilton"
DETECT_R_PEAK_DETECT_METHOD_ENGZEE: str = "Engzee"
# BCG的J峰算法定位
DETECT_J_PEAK_INPUT_BCG_FILENAME: str = "DSbcg_sig_"
DETECT_J_PEAK_SAVE_JPEAK_FILENAME: str = "JPeak_revise"
DETECT_J_PEAK_INPUT_BCG_DEFAULT_FS: int = 1000
DETECT_J_PEAK_BANDPASS_LOW_DEFAULT: int = 2
DETECT_J_PEAK_BANDPASS_HIGH_DEFAULT: int = 10
DETECT_J_PEAK_PEAKS_VALUE_DEFAULT: int = 100
DETECT_J_PEAK_AMP_VALUE_DEFAULT: int = 5
DETECT_J_PEAK_INTERVAL_LOW_DEFAULT: int = 50
DETECT_J_PEAK_INTERVAL_HIGH_DEFAULT: int = 140
DETECT_J_PEAK_UNET_MODEL1_PKL_PATH: str = "./func/result/Fivelayer_Unet/1.pkl"
DETECT_J_PEAK_UNET_MODEL2_PKL_PATH: str = "./func/result/Fivelayer_Unet/2.pkl"
DETECT_J_PEAK_LSTMUNET_MODEL1_PKL_PATH: str = "./func/result/Fivelayer_Lstm_Unet/1.pkl"
DETECT_J_PEAK_LSTMUNET_MODEL2_PKL_PATH: str = "./func/result/Fivelayer_Lstm_Unet/2.pkl"
DETECT_J_PEAK_UNET_MODEL1_NAME: str = "Fivelayer_Unet_1"
DETECT_J_PEAK_UNET_MODEL2_NAME: str = "Fivelayer_Unet_2"
DETECT_J_PEAK_LSTMUNET_MODEL1_NAME: str = "Fivelayer_Lstm_Unet_1"
DETECT_J_PEAK_LSTMUNET_MODEL2_NAME: str = "Fivelayer_Lstm_Unet_2"
# 人工纠正
LABEL_CHECK_INPUT_BCG_FILENAME: str = "DSbcg_sig_"
LABEL_CHECK_INPUT_JPEAK_FILENAME: str = "JPeak_revise"
LABEL_CHECK_SAVE_JPEAK_FILENAME: str = "JPeak_revise_corrected"
LABEL_CHECK_INPUT_ECG_FILENAME: str = "ECG_filter_"
LABEL_CHECK_INPUT_RPEAK_FILENAME: str = "final_Rpeak"
LABEL_CHECK_SAVE_RPEAK_FILENAME: str = "final_Rpeak_corrected"
LABEL_CHECK_INPUT_DEFAULT_FS: int = 1000
LABEL_CHECK_DATA1_FILTER_ORDER_DEFAULT: int = 2
LABEL_CHECK_DATA1_BANDPASS_LOW_DEFAULT: int = 2
LABEL_CHECK_DATA1_BANDPASS_HIGH_DEFAULT: int = 10
LABEL_CHECK_DATA2_FILTER_ORDER_DEFAULT: int = 2
LABEL_CHECK_DATA2_BANDPASS_LOW_DEFAULT: int = 2
LABEL_CHECK_DATA2_BANDPASS_HIGH_DEFAULT: int = 15
LABEL_CHECK_FINDPEAKS_MIN_INTERVAL_DEFAULT: int = 1000
LABEL_CHECK_FINDPEAKS_MIN_HEIGHT_DEFAULT: int = 0.5
LABEL_CHECK_MOVELENGTH_DEFAULT: int = 15000
LABEL_CHECK_MAXRANGE_DEFAULT: int = 60000
LABEL_CHECK_MOVESPEED_DEFAULT: int = 1000
LABEL_CHECK_FILTER: str = "bandpass"
LABEL_CHECK_LABEL_TRANSPARENCY: float = 0.2
LABEL_CHECK_ACTION_LABEL_MULTIPLE_SHORTCUT_KEY: str = "Z"
# 体动打标
ARTIFACT_LABEL_INPUT_BCG_FILENAME: str = "BCG_sync_"
ARTIFACT_LABEL_INPUT_XINXIAO_FILENAME: str = "orgBcg_sync_"
ARTIFACT_LABEL_SAVE_TXT_ARTIFACT_FILENAME: str = "Artifact_a"
ARTIFACT_LABEL_SAVE_TXT_ARTIFACT_AMOUNT_FILENAME: str = "Artifact_b"
ARTIFACT_LABEL_SAVE_CSV_ARTIFACT_FILENAME: str = "Artifact_c"
ARTIFACT_LABEL_INPUT_XINXIAO_DEFAULT_FS: int = 1000
ARTIFACT_LABEL_INPUT_BCG_DEFAULT_FS: int = 1000
ARTIFACT_LABEL_MOVELENGTH_DEFAULT: int = 15000
ARTIFACT_LABEL_MAXRANGE_DEFAULT: int = 60000
ARTIFACT_LABEL_MOVESPEED_DEFAULT: int = 1000
ARTIFACT_LABEL_LABEL_TRANSPARENCY: float = 0.3
ARTIFACT_LABEL_ACTION_LABEL_ARTIFACT_SHORTCUT_KEY: str = "Z"
# 质量打标
BCG_QUALITY_LABEL_INPUT_BCG_FILENAME: str = "BCG_sync_"
BCG_QUALITY_LABEL_INPUT_ARTIFACT_FILENAME: str = "Artifact_a"
BCG_QUALITY_LABEL_SAVE_FILENAME: str = "SQ_label_"
BCG_QUALITY_LABEL_INPUT_DEFAULT_FS: int = 1000
BCG_QUALITY_LABEL_SAVE_MODE_10S: str = "10s"
BCG_QUALITY_LABEL_SAVE_MODE_30S: str = "30s"
BCG_QUALITY_LABEL_MODE_10S_LENGTH = 10 * BCG_QUALITY_LABEL_INPUT_DEFAULT_FS
BCG_QUALITY_LABEL_MODE_30S_LENGTH = 30 * BCG_QUALITY_LABEL_INPUT_DEFAULT_FS
# 呼吸可用性及间期标注
RESP_QUALITY_LABEL_INPUT_XINXIAO_FILENAME: str = "orgBcg_sync_"
RESP_QUALITY_LABEL_INPUT_THO_FILENAME: str = "Effort_Tho_sync_"
RESP_QUALITY_LABEL_INPUT_ARTIFACT_FILENAME: str = "Artifact_a"
RESP_QUALITY_LABEL_SAVE_RESP_QUALITY_LABNEL_FILENAME: str = "Resp_quality_label"
RESP_QUALITY_LABEL_SAVE_THO_PEAK_FILENAME: str = "Tho_peak"
RESP_QUALITY_LABEL_INPUT_XINXIAO_DEFAULT_FS: int = 1000
RESP_QUALITY_LABEL_INPUT_THO_DEFAULT_FS: int = 200
RESP_QUALITY_LABEL_PARTS_TIME_SEC: int = 30
RESP_QUALITY_LABEL_PREPROCESS_FC: int = 1
RESP_QUALITY_LABEL_THRESHOLD1_DEFAULT: float = 0.65
RESP_QUALITY_LABEL_THRESHOLD2_DEFAULT: float = 0.8
RESP_QUALITY_LABEL_FINDPEAKS_MIN_INTERVAL_DEFAULT: int = 300
RESP_QUALITY_LABEL_FINDPEAKS_MIN_HEIGHT_DEFAULT: float = 0.1
RESP_QUALITY_LABEL_CUSTOM_LOW_DEFAULT: float = 0.1
RESP_QUALITY_LABEL_CUSTOM_HIGH_DEFAULT: float = 1
RESP_QUALITY_LABEL_LABEL_TRANSPARENCY: float = 0.2
RESP_QUALITY_LABEL_ACTION_LABEL_MULTIPLE_SHORTCUT_KEY: str = "Z"
# 睡眠呼吸暂停事件打标
# 禁止实例化
def __new__(cls):
raise TypeError("Constants class cannot be instantiated")
# 禁止修改常量
@classmethod
def __setattr__(cls, key, value):
raise AttributeError("Cannot modify constants")

411
func/utils/Constants.py Normal file
View File

@ -0,0 +1,411 @@
from func.utils.ConfigParams import ConfigParams
class Constants:
# 公共
TIPS_TYPE_INFO: str = "Info"
TIPS_TYPE_ERROR: str = "Error"
MSGBOX_TYPE_INFO: str = "Info"
MSGBOX_TYPE_WARNING: str = "Warning"
MSGBOX_TYPE_ERROR: str = "Error"
MSGBOX_TYPE_QUESTION: str = "Question"
INPUTTING_DATA: str = "正在导入数据"
INPUT_FINISHED: str = "导入完成"
INPUT_FAILURE: str = "导入失败"
LOADING_MODEL: str = "正在读取模型"
LOAD_FINISHED: str = "读取完成"
LOAD_FAILURE: str = "读取失败"
DRAWING_DATA: str = "正在绘制图形"
DRAWING_FINISHED: str = "绘制完成"
DRAWING_FAILURE: str = "绘制失败"
SAVING_DATA: str = "正在保存数据"
SAVING_FINISHED: str = "保存完成"
SAVING_FAILURE: str = "保存失败"
OPERATION_FINISHED: str = "操作完成"
OPERATION_FAILURE: str = "操作失败"
UNKNOWN_ERROR: str = "未知错误"
SHUTTING_DOWN: str = "正在关闭窗口"
QUESTION_TITLE: str = "警告:确认操作"
QUESTION_CONTENT: str = "你确定要保存结果到"
PLOT_UPPER_RIGHT: str = "upper right"
STRING_IS_EMPTY: str = ""
STRING_IS_NAN: str = "nan"
PLOT_COLOR_RED: str = "r"
PLOT_COLOR_GREEN: str = "g"
PLOT_COLOR_BLUE: str = "b"
PLOT_COLOR_ORANGE: str = "orange"
PLOT_COLOR_WHITE: str = "white"
PLOT_COLOR_BLACK: str = "black"
PLOT_COLOR_PINK: str = "#ff00ff"
PLOT_COLOR_PURPLE: str = "m"
PLOT_COLOR_GRAY: str = "gray"
PLOT_COLOR_DEEP_YELLOW: str = "#ffa500"
PLOT_COLOR_YELLOW: str = "#ffff00"
PLOT_COLOR_AQUA: str = "#00ffff"
PLOT_COLOR_PURPLE_PINK: str = "#ee82ee"
PLOT_COLOR_DEEP_GREY: str = "#808080"
PROGRESSBAR_STYLE: str = """
QProgressBar {
border: 1px solid #020066;
border-radius: 6px;
font-size: 16px;
color: black;
text-align: center;
height: 20px;
background: #E5E4E4;
}
QProgressBar::chunk {
background: qlineargradient(x1:0, y1:0, x2:1, y2:0, stop:0 #9AFF99, stop:1 #9A9AFE);
border-radius: 6px;
}
"""
# 预处理
PREPROCESS_PROCESSING_DATA: str = "正在处理数据"
PREPROCESS_PROCESS_FINISHED: str = "处理完成"
PREPROCESS_PROCESS_FAILURE: str = "处理失败"
PREPROCESS_FAILURE_REASON = {
"Data_Path_Not_Exist": "(路径不存在)",
"Read_Data_Exception": "(读取数据异常)",
"Raw_Data_Not_Exist": "(原始数据不存在)",
"Filter_Exception": "(滤波器异常)",
"Processed_Data_Not_Exist": "(处理后数据不存在)",
"Save_Exception": "(保存异常)"
}
PREPROCESS_PLOT_LABEL_ORIGINAL_DATA: str = "Original Data"
PREPROCESS_PLOT_LABEL_PROCESSED_DATA: str = "Processed Data"
PREPROCESS_OUTPUT_INPUT_AMP_OFFSET: int = 1850
# BCG的J峰算法定位
DETECT_JPEAK_PROCESSING_DATA: str = "正在处理数据"
DETECT_JPEAK_PROCESS_FINISHED: str = "处理完成"
DETECT_JPEAK_PROCESS_FAILURE: str = "处理失败"
DETECT_JPEAK_PREDICTING_PEAK: str = "正在预测峰值"
DETECT_JPEAK_PREDICT_FINISHED: str = "预测完成"
DETECT_JPEAK_PREDICT_FAILURE: str = "预测失败"
DETECT_JPEAK_FAILURE_REASON = {
"Data_Path_Not_Exist": "(数据路径不存在)",
"Read_Data_Exception": "(读取数据异常)",
"Model_Path_Not_Exist": "(模型路径不存在)",
"Model_File_Not_Exist": "(模型文件不存在)",
"Read_Model_Exception": "(读取模型异常)",
"Predict_Exception": "(模型预测异常)",
"Raw_Data_Not_Exist": "(原始数据不存在)",
"Filter_Exception": "(滤波器异常)",
"Processed_Data_Not_Exist": "(处理后数据不存在)",
"Peak_Not_Exist": "(预测的峰值不存在)",
"Save_Exception": "(保存异常)"
}
DETECT_J_PEAK_DATA_LENGTH_POINTS: str = "数据长度(点数):"
DETECT_J_PEAK_DURATION_MIN: str = "数据时长(分钟):"
DETECT_J_PEAK_JPEAK_AMOUNT: str = "J峰个数"
DETECT_J_PEAK_PLOT_LABEL_BCG: str = "BCG_Processed"
DETECT_J_PEAK_PLOT_LABEL_J_PEAKS: str = "J_Peaks"
DETECT_J_PEAK_PLOT_LABEL_INTERVAL: str = "Interval"
# TODO弃用
# 通用
FOLDER_DIR_NOT_EXIST_THEN_CREATE: str = "检测到保存路径所指向的文件夹不存在,已创建相应文件夹"
# 菜单界面
MAINWINDOW_ROOT_PATH_NOT_EXIST: str = "根目录路径输入错误"
MAINWINDOW_MSGBOX_TITLE: str = "消息"
MAINWINDOW_DIALOG_TITLE: str = "确认数据的采样率"
MAINWINDOW_BACK_TO_MENU: str = "返回主菜单"
MAINWINDOW_QUESTION_BACK_TO_MENU: str = "确定要返回主菜单吗"
# 数据粗同步
APPROXIMATELY_ALIGN_FILES_NOT_FOUND: str = f"无法找到{ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.APPROXIMATELY_ALIGN_INPUT_PSG_FILENAME}{ConfigParams.ENDSWITH_EDF},无法执行<数据粗同步>"
APPROXIMATELY_ALIGN_FILES_FOUND: str = f"找到{ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.APPROXIMATELY_ALIGN_INPUT_PSG_FILENAME}{ConfigParams.ENDSWITH_EDF}"
APPROXIMATELY_ALIGN_RUNNING: str = "开始执行任务<数据粗同步>"
APPROXIMATELY_RECORD_NOT_FOUND: str = "没有保存记录"
# ECG的R峰算法定位
DETECT_R_PEAK_FILES_NOT_FOUND: str = f"无法找到{ConfigParams.DETECT_R_PEAK_INPUT_ECG_FILENAME}{ConfigParams.ENDSWITH_TXT},无法执行<R峰提取>"
DETECT_R_PEAK_FILES_FOUND: str = f"找到{ConfigParams.DETECT_R_PEAK_INPUT_ECG_FILENAME}{ConfigParams.ENDSWITH_TXT}"
DETECT_R_PEAK_RUNNING: str = "开始执行任务<ECG的R峰算法定位>"
DETECT_R_PEAK_PLOT_LABEL_RRIV: str = "RRIV"
DETECT_R_PEAK_PLOT_LABEL_ECG: str = "ECG"
DETECT_R_PEAK_PLOT_LABEL_R_PEAKS: str = "R_peaks"
DETECT_R_PEAK_PLOT_LABEL_INTERVAL: str = "Interval"
DETECT_R_PEAK_DATA_LENGTH_POINTS: str = "数据长度(点数):"
DETECT_R_PEAK_DURATION_MIN: str = "数据时长(分钟):"
DETECT_R_PEAK_RPEAK_AMOUNT: str = "R峰个数"
# 人工纠正
LABEL_CHECK_FILES_BCG_NOT_FOUND: str = f"无法找到{ConfigParams.LABEL_CHECK_INPUT_BCG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.LABEL_CHECK_INPUT_JPEAK_FILENAME}{ConfigParams.ENDSWITH_TXT},无法执行<BCG的J峰人工纠正>"
LABEL_CHECK_FILES_BCG_FOUND: str = f"找到{ConfigParams.LABEL_CHECK_INPUT_BCG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.LABEL_CHECK_INPUT_JPEAK_FILENAME}{ConfigParams.ENDSWITH_TXT}"
LABEL_CHECK_FILES_ECG_NOT_FOUND: str = f"无法找到{ConfigParams.LABEL_CHECK_INPUT_ECG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.LABEL_CHECK_INPUT_RPEAK_FILENAME}{ConfigParams.ENDSWITH_TXT},无法执行<ECG的R峰人工纠正>"
LABEL_CHECK_FILES_ECG_FOUND: str = f"找到{ConfigParams.LABEL_CHECK_INPUT_ECG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.LABEL_CHECK_INPUT_RPEAK_FILENAME}{ConfigParams.ENDSWITH_TXT}"
LABEL_CHECK_HISTORICAL_SAVE_FOUND: str = f"找到历史存档文件{ConfigParams.LABEL_CHECK_SAVE_JPEAK_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.LABEL_CHECK_SAVE_RPEAK_FILENAME}{ConfigParams.ENDSWITH_TXT},已成功读取"
LABEL_CHECK_RUNNING: str = "开始执行任务<人工纠正>"
LABEL_CHECK_BCG_MODE: str = "BCG_MODE"
LABEL_CHECK_ECG_MODE: str = "ECG_MODE"
LABEL_CHECK_PLOT_LABEL_DATA1: str = "Data 1"
LABEL_CHECK_PLOT_LABEL_DATA2: str = "Data 2"
LABEL_CHECK_PLOT_LABEL_LABEL1: str = "Label 1"
LABEL_CHECK_PLOT_LABEL_LABEL2: str = "Label 2"
LABEL_CHECK_PLOT_LABEL_VLINE: str = "vline"
LABEL_CHECK_PLOT_LABEL_HLINE: str = "hline"
LABEL_CHECK_AUTOPLAY_LEFT: str = "LEFT"
LABEL_CHECK_AUTOPLAY_PAUSE: str = "PAUSE"
LABEL_CHECK_AUTOPLAY_RIGHT: str = "RIGHT"
LABEL_CHECK_AUTOPLAY_LEFT_INFO: str = "开始自动播放-向左"
LABEL_CHECK_AUTOPLAY_PAUSE_INFO: str = "暂停自动播放"
LABEL_CHECK_AUTOPLAY_RIGHT_INFO: str = "开始自动播放-向右"
LABEL_CHECK_AUTOPLAY_PRESET1_INFO: str = "切换到自动播放-预设1"
LABEL_CHECK_AUTOPLAY_PRESET2_INFO: str = "切换到自动播放-预设2"
LABEL_CHECK_AUTOPLAY_PRESET3_INFO: str = "切换到自动播放-预设3"
LABEL_CHECK_AUTOPLAY_PRESET_CUSTOM_INFO: str = "切换到自动播放-自定义"
LABEL_CHECK_AUTOPLAY_PRESET_CUSTOM_WARNING: str = "自定义的输入参数未做任何检查,请斟酌输入参数,否则可能会导致程序异常"
LABEL_CHECK_JUMP_X_INDEX: str = "跳转到x坐标: "
LABEL_CHECK_RECOVER_SCALE: str = "尺度恢复"
LABEL_CHECK_BUTTON_PRESS_EVENT: str = "button_press_event"
LABEL_CHECK_BUTTON_RELEASE_EVENT: str = "button_release_event"
LABEL_CHECK_MOTION_NOTIFY_EVENT: str = "motion_notify_event"
LABEL_CHECK_ADD_POINTS_SUCCESSFULLY: str = "成功新增点,横坐标:"
LABEL_CHECK_REMOVE_POINTS_SUCCESSFULLY: str = "成功删除点,横坐标:"
LABEL_CHECK_NO_POINT_IN_THE_INTERVAL: str = "所选区间内无新增或删除点"
LABEL_CHECK_CUSTOM_NAVIGATIONTOOLBAR_WIDGET_NAME: str = "MainWindow"
LABEL_CHECK_ACTION_LABEL_MULTIPLE_NAME: str = f"批量更改标签({ConfigParams.LABEL_CHECK_ACTION_LABEL_MULTIPLE_SHORTCUT_KEY})"
# 体动打标
ARTIFACT_LABEL_FILES_NOT_FOUND: str = f"无法找到{ConfigParams.ARTIFACT_LABEL_INPUT_BCG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.ARTIFACT_LABEL_INPUT_XINXIAO_FILENAME}{ConfigParams.ENDSWITH_TXT},无法执行<体动标注>"
ARTIFACT_LABEL_FILES_FOUND: str = f"找到{ConfigParams.ARTIFACT_LABEL_INPUT_BCG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.ARTIFACT_LABEL_INPUT_XINXIAO_FILENAME}{ConfigParams.ENDSWITH_TXT}"
ARTIFACT_LABEL_HISTORICAL_SAVE_FOUND: str = "找到历史存档文件,已成功读取"
ARTIFACT_LABEL_RUNNING: str = "开始执行任务<体动标注>"
ARTIFACT_LABEL_INPUT_FAILURE_LENGTH: str = "导入失败,两个输入信号的长度不相等"
ARTIFACT_LABEL_INPUT_ARTIFACT_FAILURE_FORMAT: str = "导入体动失败,请检查体动标签格式"
ARTIFACT_LABEL_INPUT_ARTIFACT_FAILURE_LENGTH: str = "导入体动失败请检查体动长度是否为4的倍数"
ARTIFACT_LABEL_DELETE_ARTIFACT_SUCCESSFULLY: str = "体动被删除"
ARTIFACT_LABEL_DELETE_ARTIFACT_FAILURE: str = "需要被删除的体动不存在"
ARTIFACT_LABEL_JUMP_ARTIFACT: str = "跳转到体动"
ARTIFACT_LABEL_MISS_ARGS: str = "打标参数未填写"
ARTIFACT_LABEL_OVERLAPPING: str = "当前所打标的片段存在重合,重合片段序号:"
ARTIFACT_LABEL_COLUMN_ORGBCG_SYNC: str = "orgBcg_sync"
ARTIFACT_LABEL_COLUMN_BCG_SYNC: str = "BCG_sync"
ARTIFACT_LABEL_AUTOPLAY_LEFT: str = "LEFT"
ARTIFACT_LABEL_AUTOPLAY_PAUSE: str = "PAUSE"
ARTIFACT_LABEL_AUTOPLAY_RIGHT: str = "RIGHT"
ARTIFACT_LABEL_AUTOPLAY_LEFT_INFO: str = "开始自动播放-向左"
ARTIFACT_LABEL_AUTOPLAY_PAUSE_INFO: str = "暂停自动播放"
ARTIFACT_LABEL_AUTOPLAY_RIGHT_INFO: str = "开始自动播放-向右"
ARTIFACT_LABEL_RECOVER_SCALE: str = "尺度恢复"
ARTIFACT_LABEL_BUTTON_PRESS_EVENT: str = "button_press_event"
ARTIFACT_LABEL_BUTTON_RELEASE_EVENT: str = "button_release_event"
ARTIFACT_LABEL_MOTION_NOTIFY_EVENT: str = "motion_notify_event"
ARTIFACT_LABEL_AUTOPLAY_PRESET1_INFO: str = "切换到自动播放-预设1"
ARTIFACT_LABEL_AUTOPLAY_PRESET2_INFO: str = "切换到自动播放-预设2"
ARTIFACT_LABEL_AUTOPLAY_PRESET3_INFO: str = "切换到自动播放-预设3"
ARTIFACT_LABEL_AUTOPLAY_PRESET_CUSTOM_INFO: str = "切换到自动播放-自定义"
ARTIFACT_LABEL_CUSTOM_NAVIGATIONTOOLBAR_WIDGET_NAME: str = "MainWindow"
ARTIFACT_LABEL_ACTION_LABEL_ARTIFACT_NAME: str = f"标注体动({ConfigParams.ARTIFACT_LABEL_ACTION_LABEL_ARTIFACT_SHORTCUT_KEY})"
ARTIFACT_LABEL_LABELBTN_STYLE_1: str = """
QPushButton {
background-color: #ffa500; /* 设置背景颜色 */
padding: 10px; /* 设置内边距 */
border: 2px solid darkblue; /* 设置边框 */
border-radius: 10px; /* 设置圆角 */
}
QPushButton:hover {
background-color: #00ff00; /* 鼠标悬停时的背景颜色 */
}"""
ARTIFACT_LABEL_LABELBTN_STYLE_2: str = """
QPushButton {
background-color: #ffff00; /* 设置背景颜色 */
padding: 10px; /* 设置内边距 */
border: 2px solid darkblue; /* 设置边框 */
border-radius: 10px; /* 设置圆角 */
}
QPushButton:hover {
background-color: #00ff00; /* 鼠标悬停时的背景颜色 */
}"""
ARTIFACT_LABEL_LABELBTN_STYLE_3: str = """
QPushButton {
background-color: #00ffff; /* 设置背景颜色 */
padding: 10px; /* 设置内边距 */
border: 2px solid darkblue; /* 设置边框 */
border-radius: 10px; /* 设置圆角 */
}
QPushButton:hover {
background-color: #00ff00; /* 鼠标悬停时的背景颜色 */
}"""
ARTIFACT_LABEL_LABELBTN_STYLE_4: str = """
QPushButton {
background-color: #ee82ee; /* 设置背景颜色 */
padding: 10px; /* 设置内边距 */
border: 2px solid darkblue; /* 设置边框 */
border-radius: 10px; /* 设置圆角 */
}
QPushButton:hover {
background-color: #00ff00; /* 鼠标悬停时的背景颜色 */
}"""
ARTIFACT_LABEL_LABELBTN_STYLE_5: str = """
QPushButton {
background-color: #808080; /* 设置背景颜色 */
padding: 10px; /* 设置内边距 */
border: 2px solid darkblue; /* 设置边框 */
border-radius: 10px; /* 设置圆角 */
}
QPushButton:hover {
background-color: #00ff00; /* 鼠标悬停时的背景颜色 */
}"""
# 质量打标
BCG_QUALITY_LABEL_FILES_NOT_FOUND: str = f"无法找到{ConfigParams.BCG_QUALITY_LABEL_INPUT_BCG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.BCG_QUALITY_LABEL_INPUT_ARTIFACT_FILENAME}{ConfigParams.ENDSWITH_TXT},无法执行<BCG的质量标注>"
BCG_QUALITY_LABEL_FILES_FOUND: str = f"找到{ConfigParams.BCG_QUALITY_LABEL_INPUT_BCG_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.BCG_QUALITY_LABEL_INPUT_ARTIFACT_FILENAME}{ConfigParams.ENDSWITH_TXT}"
BCG_QUALITY_LABEL_HISTORICAL_SAVE_FOUND: str = f"找到历史存档文件{ConfigParams.BCG_QUALITY_LABEL_SAVE_FILENAME}{ConfigParams.BCG_QUALITY_LABEL_SAVE_MODE_10S}{ConfigParams.ENDSWITH_CSV}{ConfigParams.BCG_QUALITY_LABEL_SAVE_FILENAME}{ConfigParams.BCG_QUALITY_LABEL_SAVE_MODE_30S}{ConfigParams.ENDSWITH_CSV},已成功读取"
BCG_QUALITY_LABEL_MODE_UNSELECTED: str = "显示模式未选择"
BCG_QUALITY_LABEL_INPUT_SIGNAL_FAILURE: str = "导入信号失败,请检查信号长度"
BCG_QUALITY_LABEL_INPUT_ARTIFACT_FAILURE_FORMAT: str = "导入体动失败,请检查体动标签格式"
BCG_QUALITY_LABEL_INPUT_ARTIFACT_FAILURE_LENGTH: str = "导入体动失败请检查体动长度是否为4的倍数"
BCG_QUALITY_LABEL_RUNNING: str = "开始执行任务<BCG的质量评估标注>"
BCG_QUALITY_LABEL_10S_MODE: str = f"{ConfigParams.BCG_QUALITY_LABEL_SAVE_MODE_10S}_MODE"
BCG_QUALITY_LABEL_30S_MODE: str = f"{ConfigParams.BCG_QUALITY_LABEL_SAVE_MODE_30S}_MODE"
BCG_QUALITY_LABEL_COLUMN_LABEL: str = "label"
BCG_QUALITY_LABEL_COLUMN_REMARK: str = "remark"
BCG_QUALITY_LABEL_VIEWING_THE_FIRST_PART: str = "你正在查看第1段信号"
BCG_QUALITY_LABEL_VIEWING_THE_LAST_PART: str = "你正在查看最后1段信号"
BCG_QUALITY_LABEL_VIEWING_THE_FIRST_PART_UNLABELED: str = "前面的片段都被打标将跳转至第1段信号"
BCG_QUALITY_LABEL_VIEWING_THE_LAST_PART_UNLABELED: str = "后面的片段都被打标将跳转至最后1段信号"
BCG_QUALITY_LABEL_LABELED_FINISHED: str = "该份数据打标已全部完成"
BCG_QUALITY_LABEL_VIEWING_PART: str = "正在查看信号段"
BCG_QUALITY_LABEL_JUMP_PART: str = "跳转到片段"
BCG_QUALITY_LABEL_CLICKED_CHECKBOX_HIGHLIGHT_LONGEST_CONTINUOUS: str = "点击了<高亮最长连续>"
BCG_QUALITY_LABEL_CLICKED_CHECKBOX_DISPLAY_AFTERFILTER: str = "点击了<去除工频噪声>"
BCG_QUALITY_LABEL_CLICKED_CHECKBOX_EXAMINE_TOBOLABELED: str = "点击了<仅查未标片段>"
BCG_QUALITY_LABEL_LABEL_ALL_TO_TYPE_C_QUESTION_CONTENT: str = "你确定要将所有片段标记为类型C"
BCG_QUALITY_LABEL_LABEL_ALL_TO_TYPE_C: str = "已将所有片段标记为类型C"
BCG_QUALITY_LABEL_LABEL_ARTIFACT_TO_TYPE_C_QUESTION_CONTENT: str = "你确定要将所有带有体动的片段标记为类型C"
BCG_QUALITY_LABEL_LABEL_ARTIFACT_TO_TYPE_C: str = "已将所有带有体动的片段标记为类型C"
BCG_QUALITY_LABEL_PLOT_LABEL_SIGNAL: str = "BCG"
BCG_QUALITY_LABEL_PLOT_LABEL_ARTIFACT: str = "Artifact"
BCG_QUALITY_LABEL_PLOT_LABEL_LONGEST_CONTINUOUS: str = "Longest_Continuous"
BCG_QUALITY_LABEL_10S_A: str = "a"
BCG_QUALITY_LABEL_10S_B: str = "b"
BCG_QUALITY_LABEL_10S_C: str = "c"
BCG_QUALITY_LABEL_10S_A_LIST: str = "label_a"
BCG_QUALITY_LABEL_10S_B_LIST: str = "label_b"
BCG_QUALITY_LABEL_10S_C_LIST: str = "label_c"
BCG_QUALITY_LABEL_30S_A1: str = "a"
BCG_QUALITY_LABEL_30S_A2: str = "b"
BCG_QUALITY_LABEL_30S_B1: str = "c"
BCG_QUALITY_LABEL_30S_B2: str = "d"
BCG_QUALITY_LABEL_30S_C: str = "e"
BCG_QUALITY_LABEL_30S_A1_LIST: str = "label_a1"
BCG_QUALITY_LABEL_30S_A2_LIST: str = "label_a2"
BCG_QUALITY_LABEL_30S_B1_LIST: str = "label_b1"
BCG_QUALITY_LABEL_30S_B2_LIST: str = "label_b2"
BCG_QUALITY_LABEL_30S_C_LIST: str = "label_c"
BCG_QUALITY_LABEL_tobeLabeled: str = "f"
BCG_QUALITY_LABEL_tobeLabeled_LIST: str = "label_tobeLabeled"
BCG_QUALITY_LABEL_LABELBTN_STYLE: str = """
QPushButton {
background-color: orange; /* 设置背景颜色 */
padding: 10px; /* 设置内边距 */
border: 2px solid darkblue; /* 设置边框 */
border-radius: 10px; /* 设置圆角 */
}
QPushButton:hover {
background-color: yellow; /* 鼠标悬停时的背景颜色 */
}"""
# 呼吸可用性及间期标注
RESP_QUALITY_LABEL_FILES_NOT_FOUND: str = f"无法找到{ConfigParams.RESP_QUALITY_LABEL_INPUT_XINXIAO_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.RESP_QUALITY_LABEL_INPUT_THO_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.RESP_QUALITY_LABEL_INPUT_ARTIFACT_FILENAME}{ConfigParams.ENDSWITH_TXT},无法执行<呼吸可用性及间期标注>"
RESP_QUALITY_LABEL_FILES_FOUND: str = f"找到{ConfigParams.RESP_QUALITY_LABEL_INPUT_XINXIAO_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.RESP_QUALITY_LABEL_INPUT_THO_FILENAME}{ConfigParams.ENDSWITH_TXT}{ConfigParams.RESP_QUALITY_LABEL_INPUT_ARTIFACT_FILENAME}{ConfigParams.ENDSWITH_TXT}"
RESP_QUALITY_LABEL_HISTORICAL_SAVE1_FOUND: str = f"找到历史存档文件{ConfigParams.RESP_QUALITY_LABEL_SAVE_RESP_QUALITY_LABNEL_FILENAME}{ConfigParams.ENDSWITH_TXT},已成功读取"
RESP_QUALITY_LABEL_HISTORICAL_SAVE2_FOUND: str = f"找到历史存档文件{ConfigParams.RESP_QUALITY_LABEL_SAVE_THO_PEAK_FILENAME}{ConfigParams.ENDSWITH_TXT},已成功读取"
RESP_QUALITY_LABEL_INPUT_SIGNAL_FAILURE: str = "导入信号失败,请检查信号长度"
RESP_QUALITY_LABEL_INPUT_SUCCESSFULLY: str = "导入数据成功"
RESP_QUALITY_LABEL_PREPROCESS_SUCCESSFULLY: str = "导入数据成功"
RESP_QUALITY_LABEL_INPUT_ARTIFACT_FAILURE_FORMAT: str = "导入体动失败,请检查体动标签格式"
RESP_QUALITY_LABEL_INPUT_ARTIFACT_FAILURE_LENGTH: str = "导入体动失败请检查体动长度是否为4的倍数"
RESP_QUALITY_LABEL_RUNNING: str = "开始执行任务<呼吸可用性及间期标注>"
RESP_QUALITY_LABEL_PLOT_LABEL_ORGBCG: str = "BDR_sync by filter orgBcg_sync"
RESP_QUALITY_LABEL_PLOT_LABEL_THO: str = "THO_sync after preprocess"
RESP_QUALITY_LABEL_PLOT_LABEL_THO_PEAKS: str = "THO_peak"
RESP_QUALITY_LABEL_PLOT_LABEL_ARTIFACT: str = "Artifact"
RESP_QUALITY_LABEL_VIEWING_THE_FIRST_PART: str = "你正在查看第1段信号"
RESP_QUALITY_LABEL_VIEWING_THE_LAST_PART: str = "你正在查看最后1段信号"
RESP_QUALITY_LABEL_ACTION_LABEL_MULTIPLE_NAME: str = f"批量更改标签({ConfigParams.RESP_QUALITY_LABEL_ACTION_LABEL_MULTIPLE_SHORTCUT_KEY})"
RESP_QUALITY_LABEL_CUSTOM_NAVIGATIONTOOLBAR_WIDGET_NAME: str = "MainWindow"
RESP_QUALITY_LABEL_BUTTON_PRESS_EVENT: str = "button_press_event"
RESP_QUALITY_LABEL_BUTTON_RELEASE_EVENT: str = "button_release_event"
RESP_QUALITY_LABEL_MOTION_NOTIFY_EVENT: str = "motion_notify_event"
RESP_QUALITY_LABEL_ADD_POINTS_SUCCESSFULLY: str = "成功新增点,横坐标:"
RESP_QUALITY_LABEL_REMOVE_POINTS_SUCCESSFULLY: str = "成功删除点,横坐标:"
RESP_QUALITY_LABEL_NO_POINT_IN_THE_INTERVAL: str = "所选区间内无新增或删除点"
RESP_QUALITY_LABEL_SAVE_PEAKS_SUCCESSFULLY: str = "保存峰值成功"
RESP_QUALITY_LABEL_DATA_NOT_FOUND: str = "数据未导入"
RESP_QUALITY_LABEL_LABEL_SUCCESSFULLY: str = "片段标注并保存成功"
RESP_QUALITY_LABEL_RESET_SUCCESSFULLY: str = "片段重置并保存成功"
RESP_QUALITY_LABEL_PLOT_LABEL_VLINE: str = "vline"
RESP_QUALITY_LABEL_PLOT_LABEL_HLINE: str = "hline"
RESP_QUALITY_LABEL_A_QUALITY: int = 1
RESP_QUALITY_LABEL_B_QUALITY: int = 0
RESP_QUALITY_LABEL_C_QUALITY: int = -1
RESP_QUALITY_LABEL_LABELED: str = "已标注"
RESP_QUALITY_LABEL_TOBELABELED: str = "未标注"
RESP_QUALITY_LABEL_SPECTRUM_BDR_TITLE: str = "Spectrum of BDR_sync by filter orgBcg_sync"
RESP_QUALITY_LABEL_SPECTRUM_THO_TITLE: str = "Spectrum of THO_sync after preprocess"
RESP_QUALITY_LABEL_SPECTRUM_ORGBCG_LABEL: str = "orgBcg"
RESP_QUALITY_LABEL_SPECTRUM_BDR_LABEL: str = "BDR"
RESP_QUALITY_LABEL_SPECTRUM_THO_LABEL: str = "THO"
RESP_QUALITY_LABEL_CUSTOM_FILTER_ARGS_ERROR: str = "orgBcg带通滤波频率设置范围应为数字范围是0~1"
RESP_QUALITY_LABEL_AUTOLABEL_ARGS_ERROR: str = "人工标注阈值设置范围应为数字范围是0~1"
RESP_QUALITY_LABEL_CHECK_ARGS_QUESTION_CONTENT: str = "你确定要执行此操作吗,请确保参数输入正确"
RESP_QUALITY_LABEL_KEY_VALUE = {
1: "Good",
0: "Bad",
-1: "None"
}
# 睡眠呼吸暂停事件打标
SA_LABEL_CHANNEL_NAME_FLOWT: str = "Flow T"
SA_LABEL_CHANNEL_NAME_FLOWP: str = "Flow P"
SA_LABEL_CHANNEL_NAME_EFFORTTHO: str = "Effort Tho"
SA_LABEL_CHANNEL_NAME_EFFORTABD: str = "Effort Abd"
SA_LABEL_CHANNEL_NAME_SPO2: str = "SpO2"
# 禁止实例化
def __new__(cls):
raise TypeError("Constants class cannot be instantiated")
# 禁止修改常量
@classmethod
def __setattr__(cls, key, value):
raise AttributeError("Cannot modify constants")

View File

@ -0,0 +1,61 @@
from logging import error
class TipsTypeValueNotExistError(Exception):
"""日志类型参数不存在异常"""
def __init__(self):
super().__init__()
error("日志类型参数不存在异常")
class MsgBoxTypeValueNotExistError(Exception):
"""MsgBox弹窗类型参数不存在异常"""
def __init__(self):
super().__init__()
error("MsgBox弹窗类型参数不存在异常")
class PreprocessModeNotExistError(Exception):
"""预处理类型不存在异常"""
def __init__(self):
super().__init__()
error("预处理类型不存在异常")
class RPeakDetectMethodNotExistError(Exception):
"""R峰算法定位检测方法不存在异常"""
def __init__(self):
super().__init__()
error("R峰提取检测方法不存在异常")
class JPeakDetectMethodsError(Exception):
"""J峰算法定位检测方法选择异常"""
def __init__(self):
super().__init__()
error("J峰算法定位检测方法选择异常")
class LabelCheckModeNotExistError(Exception):
"""人工纠正类型不存在异常"""
def __init__(self):
super().__init__()
error("人工纠正类型不存在异常")
class ArtifactLabelUnknownError(Exception):
"""体动标注未知异常"""
def __init__(self):
super().__init__()
error("体动打标未知异常")
class BCGQualityLabelTableWidgetNotExistError(Exception):
"""BCG质量标注表格元素不存在异常"""
def __init__(self):
super().__init__()
error("BCG质量标注表格元素不存在异常")
class RespQualityLabelOutOfIndexError(Exception):
"""呼吸可用性及间期标注数组越界异常"""
def __init__(self):
super().__init__()
error("呼吸可用性及间期标注数组越界异常")
class RespQualityLabelTableWidgetNotExistError(Exception):
"""呼吸可用性及间期标注表格元素不存在异常"""
def __init__(self):
super().__init__()
error("呼吸可用性及间期标注表格元素不存在异常")

96
func/utils/PublicFunc.py Normal file
View File

@ -0,0 +1,96 @@
from datetime import datetime
from logging import error, info
from PySide6.QtWidgets import QMessageBox
from func.utils.Constants import Constants
from func.utils.CustomException import TipsTypeValueNotExistError, MsgBoxTypeValueNotExistError
class PublicFunc:
@staticmethod
def get_current_localtime() -> str:
"""
获取当前本地时间
Parameters:
Returns:
str格式化为"%H:%M:%S"的当前本地时间
Raises:
"""
return str(datetime.now().strftime("%H:%M:%S"))
@staticmethod
def format_status_msg(msg) -> str:
"""
格式化状态栏信息
Parameters:
msg - str需要被格式化的字符串
Returns:
str格式化为"%H:%M:%S"的当前本地时间 + 信息
Raises:
"""
return str(datetime.now().strftime("%H:%M:%S")) + " " + msg
@staticmethod
def text_output(main_window: object, content: str, tips_type: str) -> None:
"""
更新textBrowser中的内容同时输出日志
Parameters:
main_window - objectQt的含有textBrowser属性的对象这里一般指的是mainWindow里面有唯一的textBrowser属性
content - str需要输出的内容
tips_type - 日志输出的类型
Returns:
Raises:
TipsTypeValueNotExistError
"""
if tips_type is Constants.TIPS_TYPE_INFO:
info(f"{tips_type}: {content}")
main_window.textBrowser_info.append(
f"<font color='black'>{PublicFunc.get_current_localtime()} {tips_type}: {content}</font>")
elif tips_type is Constants.TIPS_TYPE_ERROR:
error(f"{tips_type}: {content}")
main_window.textBrowser_info.append(
f"<font color='red'>{PublicFunc.get_current_localtime()} {tips_type}: {content}</font>")
else:
raise TipsTypeValueNotExistError()
main_window.textBrowser_info.verticalScrollBar().setValue(
main_window.textBrowser_info.verticalScrollBar().maximum())
@staticmethod
def msgbox_output(main_window: object, content: str, msg_box_type: str) -> None:
"""
更新messageBox中的内容并弹出
Parameters:
main_window - objectQt的含有messageBox属性的对象这里一般指的是mainWindow里面有唯一的messageBox属性
content - str需要输出的内容
msg_box_type - strmessageBox弹窗的类型
Returns:
Raises:
MsgBoxTypeValueNotExistError
"""
main_window.msgBox.setText(f"{msg_box_type}: {content}")
if msg_box_type is Constants.MSGBOX_TYPE_INFO:
main_window.msgBox.setIcon(QMessageBox.Information)
elif msg_box_type is Constants.MSGBOX_TYPE_WARNING:
main_window.msgBox.setIcon(QMessageBox.Warning)
elif msg_box_type is Constants.MSGBOX_TYPE_ERROR:
main_window.msgBox.setIcon(QMessageBox.Critical)
elif msg_box_type is Constants.MSGBOX_TYPE_QUESTION:
main_window.msgBox.setIcon(QMessageBox.Question)
else:
raise MsgBoxTypeValueNotExistError()
main_window.msgBox.exec()

295
func/utils/detect_Jpeak.py Normal file
View File

@ -0,0 +1,295 @@
from numpy import diff, argwhere, argmax, where, delete, insert, mean, array, full, nan
from numpy import min as np_min
from numpy import max as np_max
from torch import FloatTensor, no_grad, load
from torch import device as torch_device
from torch.cuda import is_available, empty_cache
from torch.nn.functional import sigmoid
from func.BCGDataset import BCG_Operation
from func.Deep_Model import Unet,Fivelayer_Lstm_Unet,Fivelayer_Unet,Sixlayer_Unet
def evaluate(test_data, model,fs,useCPU):
orgBCG = test_data
operation = BCG_Operation()
# 降采样
orgBCG = operation.down_sample(orgBCG, down_radio=int(fs//100)).copy() #一开始没加.copy()会报错,后来加了就没事了,结果没影响
# plt.figure()
# plt.plot(orgBCG)
# plt.show()
orgBCG = orgBCG.reshape(-1, 1000)
# test dataset
orgData = FloatTensor(orgBCG).unsqueeze(1)
# predict
if useCPU == True:
gpu = False
device = torch_device("cpu")
else:
gpu = is_available()
device = torch_device("cuda" if is_available() else "cpu")
# if gpu:
# orgData = orgData.cuda()
# model.cuda()
orgData = orgData.to(device)
model = model.to(device)
with no_grad():
y_hat = model(orgData)
y_prob = sigmoid(y_hat)
beat = (y_prob>0.5).float().view(-1).cpu().data.numpy()
beat_diff = diff(beat)
up_index = argwhere(beat_diff==1)
down_index = argwhere(beat_diff==-1)
return beat,up_index,down_index,y_prob
def find_TPeak(data,peaks,th=50):
"""
找出真实的J峰或R峰
:param data: BCG或ECG数据
:param peaks: 初步峰值从label中导出的location_R
:param th: 范围阈值
:return: 真实峰值
"""
return_peak = []
for peak in peaks:
if peak>len(data):continue
min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th))
return_peak.append(argmax(data[min_win:max_win])+min_win)
return return_peak
def new_calculate_beat(y,predict,th=0.5,up=10,th1=100,th2=45): #通过预测计算回原来J峰的坐标 输入y_prob,predict=ture,up*10,降采样多少就乘多少
"""
加上不应期算法,消除误判的峰
:param y: 预测输出值或者标签值label
:param predict: ture or false
:param up: 降采样为多少就多少
:return: 预测的J峰位置
"""
if predict:
beat = where(y>th,1,0)
else:
beat = y
beat_diff = diff(beat) #一阶差分
up_index = argwhere(beat_diff == 1).reshape(-1)
down_index = argwhere(beat_diff == -1).reshape(-1)
# print(up_index,down_index)
# print(y)
# print(y[up_index[4]+1:down_index[4]+1])
if len(up_index)==0:
return [0]
if up_index[0] > down_index[0]:
down_index = delete(down_index, 0)
if up_index[-1] > down_index[-1]:
up_index = delete(up_index, -1)
"""
加上若大于130点都没有一个心跳时降低阈值重新判决一次一般降到0.3就可以了;; 但是对于体动片段降低阈值可能又会造成误判,而且出现体动的话会被丢弃,间隔时间也长
"""
# print("初始:",up_index.shape,down_index.shape)
i = 0
lenth1 = len(up_index)
while i < len(up_index)-1:
if abs(up_index[i+1]-up_index[i]) > th1:
re_prob = y[down_index[i]+15:up_index[i+1]-15] #原本按正常应该是两个都+1的但是由于Unet输出低于0.6时把阈值调小后会在附近一两个点也变为1会影响判断
# print(re_prob.shape)
beat1 = where(re_prob > 0.1, 1, 0)
# print(beat1)
if sum(beat1) != 0 and beat1[0] != 1 and beat1[-1] != 1:
insert_up_index,insert_down_index = add_beat(re_prob,th=0.1)
# print(insert_up_index,insert_down_index,i)
if len(insert_up_index) > 1:
l = i+1
for u,d in zip(insert_up_index,insert_down_index):
up_index = insert(up_index,l,u+down_index[i]+1+15) #np.insert(arr, obj, values, axis) arr原始数组可一可多obj插入元素位置values是插入内容axis是按行按列插入。
down_index = insert(down_index,l,d+down_index[i]+1+15)
l = l+1
# print('l=', l)
elif len(insert_up_index) == 1:
# print(i)
up_index = insert(up_index,i+1,down_index[i]+insert_up_index+1+15)
down_index = insert(down_index,i+1,down_index[i]+insert_down_index+1+15)
i = i + len(insert_up_index) + 1
else:
i = i+1
continue
else:
i = i+1
# print("最终:",up_index.shape,down_index.shape)
"""
添加不应期
"""
new_up_index = up_index
new_down_index = down_index
flag = 0
i = 0
lenth = len(up_index)
while i < lenth:
if abs(up_index[i+1]-up_index[i]) < th2:
prob_forward = y[up_index[i]+1:down_index[i]+1]
prob_backward = y[up_index[i+1]+1:down_index[i+1]+1]
forward_score = 0
back_score = 0
forward_count = down_index[i] - up_index[i]
back_count = down_index[i+1] - up_index[i+1]
forward_max = np_max(prob_forward)
back_max = np_max(prob_backward)
forward_min = np_min(prob_forward)
back_min = np_min(prob_backward)
forward_average = mean(prob_forward)
back_average = mean(prob_backward)
if forward_count > back_count:
forward_score = forward_score + 1
else:back_score = back_score + 1
if forward_max > back_max:
forward_score = forward_score + 1
else:back_score = back_score + 1
if forward_min < back_min:
forward_score = forward_score + 1
else:back_score = back_score + 1
if forward_average > back_average:
forward_score = forward_score + 1
else:back_score = back_score + 1
if forward_score >=3:
up_index = delete(up_index, i+1)
down_index = delete(down_index, i+1)
flag = 1
elif back_score >=3:
up_index = delete(up_index, i)
down_index = delete(down_index, i)
flag = 1
elif forward_score == back_score:
if forward_average > back_average:
up_index = delete(up_index, i + 1)
down_index = delete(down_index, i + 1)
flag = 1
else:
up_index = delete(up_index, i)
down_index = delete(down_index, i)
flag = 1
if flag == 1:
i = i
flag = 0
else: i = i+1
else:i = i + 1
if i > len(up_index)-2:
break
# elif abs(up_index[i+1]-up_index[i]) > 120:
# print("全部处理之后",up_index.shape,down_index.shape)
predict_J = (up_index.reshape(-1) + down_index.reshape(-1)) // 2*up
# predict_J = predict_J.astype(int)
return predict_J
def add_beat(y,th=0.2): #通过预测计算回原来J峰的坐标 输入y_prob,predict=ture,up*10,降采样多少就乘多少
"""
:param y: 预测输出值或者标签值label
:param predict: ture or false
:param up: 降采样为多少就多少
:return: 预测的J峰位置
"""
beat1 = where(y>th,1,0)
beat_diff1 = diff(beat1) #一阶差分
add_up_index = argwhere(beat_diff1 == 1).reshape(-1)
add_down_index = argwhere(beat_diff1 == -1).reshape(-1)
# print(beat1)
# print(add_up_index,add_down_index)
if len(add_up_index) > 0:
if add_up_index[0] > add_down_index[0]:
add_down_index = delete(add_down_index, 0)
if add_up_index[-1] > add_down_index[-1]:
add_up_index = delete(add_up_index, -1)
return add_up_index, add_down_index
else:
return 0
def calculate_beat(y,predict,th=0.5,up=10): #通过预测计算回原来J峰的坐标 输入y_prob,predict=ture,up*10,降采样多少就乘多少
"""
:param y: 预测输出值或者标签值label
:param predict: ture or false
:param up: 降采样为多少就多少
:return: 预测的J峰位置
"""
if predict:
beat = where(y>th,1,0)
else:
beat = y
beat_diff = diff(beat) #一阶差分
up_index = argwhere(beat_diff == 1).reshape(-1)
down_index = argwhere(beat_diff == -1).reshape(-1)
if len(up_index)==0:
return [0]
if up_index[0] > down_index[0]:
down_index = delete(down_index, 0)
if up_index[-1] > down_index[-1]:
up_index = delete(up_index, -1)
predict_J = (up_index.reshape(-1) + down_index.reshape(-1)) // 2*up
# predict_J = predict_J.astype(int)
return predict_J
def preprocess(raw_bcg, fs, low_cut, high_cut, amp_value):
bcg_data = raw_bcg[:len(raw_bcg) // (fs * 10) * fs * 10]
preprocessing = BCG_Operation(sample_rate=fs)
bcg = preprocessing.Butterworth(bcg_data, "bandpass", low_cut=low_cut, high_cut=high_cut, order=3) * amp_value
return bcg
def Jpeak_Detection(model_name, model_path, bcg_data, fs, interval_high, interval_low, peaks_value, useCPU):
model_name = get_model_name(str(model_name))
if model_name == "Fivelayer_Unet":
model = Fivelayer_Unet()
elif model_name == "Fivelayer_Lstm_Unet":
model = Fivelayer_Lstm_Unet()
elif model_name == "Sixlayer_Unet":
model = Sixlayer_Unet()
elif model_name == "U_net":
model = Unet()
else:
raise Exception
model.load_state_dict(load(model_path, map_location=torch_device('cpu')))
model.eval()
# J峰预测
beat, up_index, down_index, y_prob = evaluate(bcg_data, model=model, fs=fs, useCPU=useCPU)
y_prob = y_prob.cpu().reshape(-1).data.numpy()
predict_J = new_calculate_beat(y_prob, 1, th=0.6, up=fs // 100, th1=interval_high, th2=interval_low)
predict_J = find_TPeak(bcg_data, predict_J, th=int(peaks_value * fs / 1000))
predict_J = array(predict_J)
Interval = full(len(bcg_data), nan)
for i in range(len(predict_J) - 1):
Interval[predict_J[i]: predict_J[i + 1]] = predict_J[i + 1] - predict_J[i]
empty_cache()
return predict_J, Interval
def get_model_name(input_string):
# 找到最后一个 "_" 的位置
last_underscore_index = input_string.rfind('_')
# 如果没有找到 "_"
if last_underscore_index == -1:
return input_string # 返回整个字符串
# 返回最后一个 "_" 之前的部分
return input_string[:last_underscore_index]

View File

@ -0,0 +1,71 @@
from ecgdetectors import Detectors
from numpy import quantile, delete, array, argmax, full, nan
from func.BCGDataset import BCG_Operation
def refinement( data, peak):
if len(data) == 0 or len(peak) <=2 : return None
firstPeak = peak[0]
lastPeak = peak[-1]
meanPeak = quantile( data[peak[1:-1]], 0.2 )
if data[firstPeak] < meanPeak * 0.6 :
peak = delete(peak, 0)
if data[lastPeak] < meanPeak * 0.6 :
peak = delete(peak, -1)
return array(peak)
def find_TPeak(data,peaks,th=50):
"""
找出真实的J峰或R峰
:param data: BCG或ECG数据
:param peaks: 初步峰值从label中导出的location_R
:param th: 范围阈值
:return: 真实峰值
"""
return_peak = []
for peak in peaks:
if peak>len(data):continue
min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th))
return_peak.append(argmax(data[min_win:max_win])+min_win)
return array(return_peak)
def Rpeak_Detection(raw_ecg,fs,low_cut,high_cut,th1,detector_method):
detectors = Detectors(sampling_frequency=fs)
method_dic = {'pt': detectors.pan_tompkins_detector,
'ta': detectors.two_average_detector,
"Engzee": detectors.engzee_detector,
"Wt": detectors.swt_detector,
"Christov": detectors.christov_detector,
"Hamilton": detectors.hamilton_detector
}
detectormethods = method_dic[detector_method]
# raw_ecg = raw_ecg[200*sample_rate:]
preprocessing = BCG_Operation(sample_rate=fs) # 对ECG做了降采样处理
raw_ecg = preprocessing.Butterworth(raw_ecg, "bandpass", low_cut=low_cut, high_cut=high_cut, order=3) * 4
#######################限制幅值处理############################################
# for i in range(len(raw_ecg)):
# if raw_ecg[i] > 300 or raw_ecg[i] < -300:
# raw_ecg[i] = 0
##############################################################################
R_peak = array(detectormethods(raw_ecg)) - 100
# R_peak = np.array(detectors.pan_tompkins_detector(raw_ecg))-100
R_peak = find_TPeak(raw_ecg, R_peak, th=int(th1 * fs / 1000))
R_peak = refinement(raw_ecg, R_peak)
RR_Interval = full(len(R_peak) - 1, nan)
for i in range(len(R_peak) - 1):
RR_Interval[i] = R_peak[i + 1] - R_peak[i]
RRIV = full(len(RR_Interval) - 1, nan)
for i in range(len(RR_Interval) - 1):
RRIV[i] = RR_Interval[i + 1] - RR_Interval[i]
Interval = full(len(raw_ecg), nan)
for i in range(len(R_peak) - 1):
Interval[R_peak[i]: R_peak[i + 1]] = R_peak[i + 1] - R_peak[i]
return R_peak, Interval, RRIV

View File

@ -0,0 +1,41 @@
from matplotlib.mlab import detrend
from numpy import argmax
from func.utils.resp_quality_label_filter import fft1, bpf, spectral_cosine_similarity
def get_slice(biosignal, seg_idx, duration, fs):
"""数据切片"""
start = (seg_idx - 1) * duration * fs
end = min(seg_idx * duration * fs, len(biosignal))
return biosignal[start:end]
def pre_process(biosignal,fs,fc):
"""数据预处理"""
biosignal = detrend(biosignal)
biosignal = bpf(biosignal, fs, 0.1,fc)
return biosignal
def calculate_spectral_cohere(BCG, THO, fs_bcg,fs_tho):
"""计算频谱相似度"""
fft_THO, freq_tho = fft1(THO, fs_tho)
peak_freq = freq_tho[argmax(fft_THO)]
f_low = min(peak_freq-0.2,0.1)
f_high = max(peak_freq+0.2,0.1)
# 计算频谱相似度
Similarity = spectral_cosine_similarity(BCG, THO, fs_bcg, fs_tho,[f_low,f_high])
return Similarity
def evaluate_quality(BCG,THO,fs_bcg,fs_tho,artifact_flag,thresholds):
"""评估信号质量"""
# 计算信号余弦相似度
cohere_value = calculate_spectral_cohere(BCG, THO, fs_bcg, fs_tho)
# 质量判断逻辑
if cohere_value > thresholds[1]:
return 1 if artifact_flag == 0 else -1
elif cohere_value < thresholds[0]:
BCG = pre_process(BCG, fs_bcg, 0.7)
THO = pre_process(THO, fs_tho, 0.7)
cohere_value2 = calculate_spectral_cohere(BCG, THO, fs_bcg, fs_tho)
return 0 if cohere_value2 < thresholds[0] else -1
return -1

View File

@ -0,0 +1,259 @@
from numpy import abs as np_abs, ceil, floor, ndarray, argmax, dot, linalg, where, arange
from numpy.fft import fft, fftfreq
from scipy.signal import detrend, butter, filtfilt
def fft1(x, fs):
"""
x: 输入信号
fs: 采样频率
返回:
magnitude[mask]0-1hz信号频谱
freq[mask]:0-1hz信号对应频率数组
"""
N = len(x)
# 计算FFT
fft_result = fft(x)
# 计算幅度谱取前N//2点避免镜像
magnitude = np_abs(fft_result)[:N // 2] * 2 / N # 归一化
# 生成频率轴
freqs = fftfreq(N, 1 / fs)[:N // 2]
# 提取0-1Hz范围内的数据
mask = freqs <= 1.0 # 筛选条件
return magnitude[mask] / max(magnitude[mask]), freqs[mask]
def lpf(input_signal, Fs, Fc):
"""
巴特沃斯低通滤波器 (零相位滤波)
参数:
input_signal : 输入信号
Fs : float - 采样频率 (Hz)
Fc : float - 截止频率 (Hz)
返回:
output : ndarray - 滤波后信号
"""
# 滤波器参数
order = 3 # 3阶滤波器
nyq = 0.5 * Fs # 奈奎斯特频率
# 检查截止频率是否有效
if Fc <= 0 or Fc >= nyq:
raise ValueError("截止频率必须满足 0 < Fc < {} Hz".format(nyq))
# 去趋势处理
data = detrend(input_signal, type='linear')
# 生成巴特沃斯滤波器系数
normal_cutoff = Fc / nyq
b, a = butter(order, normal_cutoff, btype='low', analog=False)
# 零相位滤波
output = filtfilt(b, a, data)
return output
def bpf(input_signal, fs, f_low, f_high):
"""
巴特沃斯带通滤波器 (零相位滤波)
参数:
signal : 输入信号
fs : float - 采样频率 (Hz)
f_low : float - 下截止频率 (Hz)
f_high:float - 上截止频率Hz)
返回:
output : ndarray - 滤波后信号
"""
# 滤波器参数
order = 2 # 3阶滤波器
nyq = 0.5 * fs # 奈奎斯特频率
# 检查截止频率是否有效
if f_low <= 0 or f_high >= nyq:
raise ValueError("截止频率必须满足 0 < Fc < {} Hz".format(nyq))
if f_low >= f_high:
raise ValueError("f_low must be less than f_high")
# 去趋势处理
data = detrend(input_signal, type='linear')
# 生成巴特沃斯滤波器系数
fc1 = f_low / nyq
fc2 = f_high / nyq
b, a = butter(order, [fc1, fc2], btype='bandpass', analog=False)
# 零相位滤波
output = filtfilt(b, a, data)
return output
def detect_peak_bottoms(frequencies, spectrum, peak_idx, left_bound, right_bound):
"""
检测信号呼吸峰谷
参数:
frequencies: 频率数组
spectrum: 频谱数组
peak_idx: 峰值索引
left_bound: 左边界
right_bound: 右边界
返回:
f_low峰谷索引
f_high峰谷索引
"""
normalized = spectrum / max(spectrum) # 归一化频谱
n = len(frequencies)
peak_idx = int(peak_idx * n)
left_bound = int(left_bound * n)
right_bound = int(right_bound * n)
# 左侧峰底检测
left_min = int(ceil(left_bound))
for i in range(peak_idx, left_bound - 1, -1):
if i == 0:
break # 避免索引超出范围
if normalized[i] < 0.2 * normalized[peak_idx]:
left_min = i
break
if normalized[i] < normalized[i - 1]:
left_min = i
# 右侧峰底检测
right_min = int(floor(right_bound))
for i in range(peak_idx, right_bound + 1):
if i == n - 1:
break # 避免索引超出范围
if normalized[i] < 0.2 * normalized[peak_idx]:
right_min = i
break
if normalized[i] > normalized[i + 1] and i != right_bound:
right_min = i + 1
# 结果
f_low = max(frequencies[left_min], 0.1)
f_high = min(frequencies[right_min], 1)
return f_low, f_high
def spectral_cosine_similarity(signal1, signal2, fs1, fs2, freq_range):
"""
计算两个信号在指定频段内的频谱余弦相似度
参数:
signal1: 信号1
signal2: 信号2
fs1: 信号1的采样率 (Hz)
fs2: 信号2的采样率 (Hz)
freq_range: 目标频段 [f_low, f_high] (单位Hz)
返回:
similarity: 指定频段内的频谱余弦相似度 (0~1)
"""
# 参数检查
if len(freq_range) != 2 or freq_range[0] >= freq_range[1]:
raise ValueError('freq_range应为升序的二元向量 [f_low, f_high]')
f_low, f_high = freq_range
# 计算FFT并获取频率轴
# 信号1
nfft1 = len(signal1)
fft_1 = fft(signal1)
freq1 = arange(nfft1) * (fs1 / nfft1) # 完整频率轴
mag1 = np_abs(fft_1)
# 信号2
nfft2 = len(signal2)
fft_2 = fft(signal2)
freq2 = arange(nfft2) * (fs2 / nfft2)
mag2 = np_abs(fft_2)
# 提取目标频段的幅度谱
idx1 = where((freq1 >= f_low) & (freq1 <= f_high))[0]
idx2 = where((freq2 >= f_low) & (freq2 <= f_high))[0]
# 对齐长度(取最小公共频段)
min_len = min(len(idx1), len(idx2))
mag1_band = mag1[idx1[:min_len]]
mag2_band = mag2[idx2[:min_len]]
# 计算余弦相似度
dot_product = dot(mag1_band, mag2_band)
norm1 = linalg.norm(mag1_band)
norm2 = linalg.norm(mag2_band)
if norm1 == 0 or norm2 == 0:
similarity = 0.0 # 避免除以零
else:
similarity = dot_product / (norm1 * norm2)
return similarity
def get_bandpass_bcgsignal(
bcg_signal: ndarray,
tho_signal: ndarray,
bcg_sample_rate: int,
tho_sample_rate: int,
use_custom_band: bool = False,
low_cutoff: float = 0.1,
high_cutoff: float = 1.0
):
"""
绘制带通滤波后的BCG信号和原始THO信号对比图
Args:
bcg_signal: 原始BCG信号数组
tho_signal: 原始THO信号数组
bcg_sample_rate: BCG信号采样率(Hz)
tho_sample_rate: THO信号采样率(Hz)
use_custom_band: 是否使用自定义频带 (默认自动检测)
low_cutoff: 自定义低截止频率(Hz)
high_cutoff: 自定义高截止频率(Hz)
Returns:
filtered_bcg :带通滤波后的BCG信号BDR信号
"""
# 参数校验
if len(bcg_signal) == 0 or len(tho_signal) == 0:
raise ValueError("输入信号不能为空数组")
if low_cutoff >= high_cutoff:
raise ValueError("截止频率范围不合法")
# 频谱分析
tho_spectrum, tho_freq = fft1(tho_signal, tho_sample_rate)
bcg_spectrum, bcg_freq = fft1(bcg_signal, bcg_sample_rate)
# 确定通带范围
if use_custom_band:
band_low = low_cutoff
band_high = high_cutoff
else:
peak_freq = tho_freq[argmax(tho_spectrum)]
band_low, band_high = detect_peak_bottoms(
frequencies=bcg_freq,
spectrum=bcg_spectrum,
peak_idx=peak_freq,
left_bound=max(peak_freq - 0.2, 0.1),
right_bound=min(peak_freq + 0.2, 1.0)
)
# 带通滤波
filtered_bcg = bpf(
bcg_signal,
bcg_sample_rate,
band_low,
band_high
)
return filtered_bcg, band_low, band_high, bcg_spectrum, bcg_freq, tho_spectrum, tho_freq