重构数据切割和标签映射模块,新增OrgBCG和ECG通道支持,优化文件路径和异常处理逻辑,调整UI组件名称和布局
This commit is contained in:
@ -1,358 +0,0 @@
|
||||
from ast import literal_eval
|
||||
from gc import collect
|
||||
from math import floor
|
||||
from pathlib import Path
|
||||
from traceback import format_exc
|
||||
|
||||
from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication
|
||||
from numpy import array
|
||||
from overrides import overrides
|
||||
from pandas import read_csv, DataFrame
|
||||
from yaml import dump, load, FullLoader
|
||||
|
||||
from func.utils.ConfigParams import Filename, Params
|
||||
from func.utils.PublicFunc import PublicFunc
|
||||
from func.utils.Constants import Constants
|
||||
from func.utils.Result import Result
|
||||
|
||||
from ui.MainWindow.MainWindow_cut_PSG import Ui_MainWindow_cut_PSG
|
||||
|
||||
|
||||
Config = {
|
||||
|
||||
}
|
||||
|
||||
ButtonState = {
|
||||
"Default": {
|
||||
"pushButton_execute": True
|
||||
},
|
||||
"Current": {
|
||||
"pushButton_execute": True
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class MainWindow_cut_PSG(QMainWindow):
|
||||
|
||||
def __init__(self):
|
||||
super(MainWindow_cut_PSG, self).__init__()
|
||||
self.ui = Ui_MainWindow_cut_PSG()
|
||||
self.ui.setupUi(self)
|
||||
|
||||
self.root_path = None
|
||||
self.sampID = None
|
||||
|
||||
self.__read_config__()
|
||||
|
||||
self.data = None
|
||||
|
||||
self.ui.textBrowser_info.setStyleSheet("QTextBrowser { background-color: rgb(255, 255, 200); }")
|
||||
PublicFunc.__styleAllButton__(self, ButtonState)
|
||||
|
||||
# 初始化进度条
|
||||
self.ui.progressbar.setStyleSheet(Constants.PROGRESSBAR_STYLE)
|
||||
self.progressbar = self.ui.progressbar
|
||||
|
||||
self.msgBox = QMessageBox()
|
||||
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
|
||||
|
||||
@overrides
|
||||
def show(self, root_path, sampID):
|
||||
super().show()
|
||||
self.root_path = root_path
|
||||
self.sampID = sampID
|
||||
|
||||
PublicFunc.__resetAllButton__(self, ButtonState)
|
||||
|
||||
Config.update({
|
||||
"Path": {
|
||||
"InputFolder": str(Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID))),
|
||||
"SaveFolder": str(Path(self.root_path) / Filename.PATH_PSG_ALIGNED / Path(str(self.sampID))),
|
||||
"InputAlignInfo": str(Path(self.root_path) / Filename.PATH_LABEL / Path(str(self.sampID)))
|
||||
}
|
||||
})
|
||||
|
||||
self.ui.plainTextEdit_channel.setPlainText(', '.join(Config["ChannelInput"].keys()))
|
||||
self.ui.plainTextEdit_label.setPlainText(', '.join(Config["LabelInput"].keys()))
|
||||
|
||||
self.ui.pushButton_execute.clicked.connect(self.__slot_btn_execute__)
|
||||
|
||||
@overrides
|
||||
def closeEvent(self, event):
|
||||
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||||
if reply == QMessageBox.Yes:
|
||||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||||
|
||||
PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
|
||||
QApplication.processEvents()
|
||||
|
||||
# 释放资源
|
||||
del self.data
|
||||
self.deleteLater()
|
||||
collect()
|
||||
event.accept()
|
||||
else:
|
||||
event.ignore()
|
||||
|
||||
def __reset__(self):
|
||||
ButtonState["Current"].update(ButtonState["Default"].copy())
|
||||
|
||||
def __read_config__(self):
|
||||
if not Path(Params.CUT_PSG_CONFIG_FILE_PATH).exists():
|
||||
with open(Params.CUT_PSG_CONFIG_FILE_PATH, "w") as f:
|
||||
dump(Params.CUT_PSG_CONFIG_NEW_CONTENT, f)
|
||||
|
||||
with open(Params.CUT_PSG_CONFIG_FILE_PATH, "r") as f:
|
||||
file_config = load(f.read(), Loader=FullLoader)
|
||||
Config.update(file_config)
|
||||
|
||||
# 数据回显
|
||||
self.ui.spinBox_ECGFreq.setValue(Config["ECGFreq"])
|
||||
|
||||
def __slot_btn_execute__(self):
|
||||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||||
|
||||
self.data = Data(self.root_path, self.sampID)
|
||||
Config["ECGFreq"] = self.ui.spinBox_ECGFreq.value()
|
||||
|
||||
# 检查文件是否存在并获取其数据采样率
|
||||
PublicFunc.progressbar_update(self, 1, 5, Constants.CUT_PSG_GETTING_FILE_AND_FREQ, 0)
|
||||
result = self.data.get_file_and_freq()
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
# 导入数据
|
||||
PublicFunc.progressbar_update(self, 2, 5, Constants.INPUTTING_DATA, 10)
|
||||
result = self.data.open_file()
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
# 切割数据
|
||||
PublicFunc.progressbar_update(self, 3, 5, Constants.CUT_PSG_CUTTING_DATA, 40)
|
||||
result = self.data.cut_data()
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
# 标签映射
|
||||
PublicFunc.progressbar_update(self, 4, 5, Constants.CUT_PSG_ALIGNING_LABEL, 60)
|
||||
result = self.data.align_label()
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
# 保存数据
|
||||
PublicFunc.progressbar_update(self, 5, 5, Constants.SAVING_DATA, 70)
|
||||
result = self.data.save()
|
||||
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
for key, raw in self.data.raw.items():
|
||||
info = "保存{}的长度为{},采样率为{}Hz".format(key, str(len(raw)), str(self.data.freq[key]))
|
||||
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
|
||||
QApplication.processEvents()
|
||||
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
|
||||
class Data:
|
||||
|
||||
def __init__(self, root_path, sampID):
|
||||
self.alignInfo = None
|
||||
|
||||
self.raw = {key: array([]) for key in Config["ChannelInput"]}
|
||||
self.freq = {key: 0 for key in Config["ChannelInput"]}
|
||||
|
||||
self.SALabel = None
|
||||
self.startTime = None
|
||||
|
||||
self.root_path = root_path
|
||||
self.sampID = sampID
|
||||
|
||||
def get_file_and_freq(self):
|
||||
try:
|
||||
for file_path in Path(Config["Path"]["InputFolder"]).glob('*'):
|
||||
if file_path.is_file():
|
||||
file_stem = Path(file_path).stem
|
||||
for key, prefix in Config["ChannelInput"].items():
|
||||
if file_stem.startswith(prefix):
|
||||
freq_str = file_stem.rsplit('_', 1)[1]
|
||||
try:
|
||||
freq = int(freq_str)
|
||||
self.freq[key] = freq
|
||||
except ValueError:
|
||||
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["Filename_Format_not_Correct"] + f"\n{Config['ChannelInput']}")
|
||||
for value in self.freq.values():
|
||||
if value == 0:
|
||||
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["Filename_Format_not_Correct"] + f"\n{Config['ChannelInput']}")
|
||||
if not any((Config["LabelInput"]["SA Label"] + Config["EndWith"]["SA Label"]) in str(file) for file in Path(Config["Path"]["InputFolder"]).glob('*')):
|
||||
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||||
if not any((Config["StartTime"] + Config["EndWith"]["StartTime"]) in str(file) for file in Path(Config["Path"]["InputFolder"]).glob('*')):
|
||||
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||||
if not Path(Config["Path"]["InputAlignInfo"]).exists():
|
||||
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["Get_File_and_Freq_Excepetion"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FINISHED)
|
||||
|
||||
def open_file(self):
|
||||
path = str(Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)))
|
||||
for value in Config["ChannelInput"].values():
|
||||
result = PublicFunc.examine_file(path, value, Params.ENDSWITH_TXT)
|
||||
if not result.status:
|
||||
return result
|
||||
|
||||
if Path(Config["Path"]["InputAlignInfo"]).is_file():
|
||||
Config["Path"]["InputAlignInfo"] = str(Path(Config["Path"]["InputAlignInfo"]).parent)
|
||||
|
||||
Config["Path"]["InputAlignInfo"] = str(
|
||||
Path(Config["Path"]["InputAlignInfo"]) / Path(
|
||||
Filename.PRECISELY_ALIGN_INFO + Params.ENDSWITH_TXT))
|
||||
|
||||
try:
|
||||
for key in Config["ChannelInput"].keys():
|
||||
self.raw[key] = read_csv(Path(Config["Path"]["InputFolder"]) / Path((Config["ChannelInput"][key] + str(self.freq[key]) + Config["EndWith"][key])),
|
||||
encoding=Params.UTF8_ENCODING,
|
||||
header=None).to_numpy().reshape(-1)
|
||||
self.SALabel = read_csv(Path(Config["Path"]["InputFolder"]) / Path((Config["LabelInput"]["SA Label"] + Config["EndWith"]["SA Label"])),
|
||||
encoding=Params.GBK_ENCODING)
|
||||
self.startTime = read_csv(Path(Config["Path"]["InputFolder"]) / Path((Config["StartTime"] + Config["EndWith"]["StartTime"])),
|
||||
encoding=Params.UTF8_ENCODING,
|
||||
header=None).to_numpy().reshape(-1)
|
||||
self.alignInfo = read_csv(Path(Config["Path"]["InputAlignInfo"]),
|
||||
encoding=Params.UTF8_ENCODING,
|
||||
header=None).to_numpy().reshape(-1)
|
||||
self.alignInfo = literal_eval(self.alignInfo[0])
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.INPUT_FAILURE +
|
||||
Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.INPUT_FINISHED)
|
||||
|
||||
def cut_data(self):
|
||||
try:
|
||||
for key, raw in self.raw.items():
|
||||
# 转换切割点
|
||||
ECG_freq = Config["ECGFreq"]
|
||||
raw_freq = self.freq[key]
|
||||
duration_second = ((self.alignInfo["cut_index"]["back_ECG"] - self.alignInfo["cut_index"]["front_ECG"]) // ECG_freq) + 1
|
||||
start_index_cut = floor(self.alignInfo["cut_index"]["front_ECG"] * (raw_freq / ECG_freq))
|
||||
end_index_cut = start_index_cut + (duration_second * raw_freq)
|
||||
|
||||
try:
|
||||
# 切割信号
|
||||
self.raw[key] = self.raw[key][start_index_cut:end_index_cut]
|
||||
except Exception:
|
||||
return Result().failure(info=Constants.CUT_PSG_CUT_DATA_FAILURE +
|
||||
Constants.FAILURE_REASON["Cut_Data_Length_not_Correct"])
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.CUT_PSG_CUT_DATA_FAILURE +
|
||||
Constants.FAILURE_REASON["Cut_Data_Exception"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.CUT_PSG_CUT_DATA_FINISHED)
|
||||
|
||||
def align_label(self):
|
||||
try:
|
||||
# 读取SA标签
|
||||
self.SALabel = self.SALabel.loc[:, ~self.SALabel.columns.str.contains("^Unnamed")]
|
||||
self.SALabel = self.SALabel[self.SALabel["Event type"].isin(Params.CUT_PSG_SALABEL_EVENT)]
|
||||
self.SALabel["Duration"] = self.SALabel["Duration"].astype(str)
|
||||
self.SALabel["Duration"] = self.SALabel["Duration"].str.replace(r' \(.*?\)', '', regex=True)
|
||||
except Exception:
|
||||
return Result().failure(info=Constants.CUT_PSG_ALIGN_LABEL_FAILURE +
|
||||
Constants.FAILURE_REASON["Align_Label_SALabel_Format_not_Correct"])
|
||||
|
||||
try:
|
||||
# 获取记录开始时间
|
||||
start_time = str(self.startTime[0]).split(" ")[1]
|
||||
start_time = Data.get_time_to_seconds(start_time)
|
||||
|
||||
# 计算起始时间秒数和终止时间秒数
|
||||
self.SALabel["Start"] = (self.SALabel["Time"].apply(self.get_time_to_seconds) - start_time).apply(
|
||||
lambda x: x + 24 * 3600 if x < 0 else x).astype(int)
|
||||
self.SALabel["End"] = self.SALabel["Start"] + self.SALabel["Duration"].astype(float).round(0).astype(int)
|
||||
|
||||
# 标签映射
|
||||
ECG_length = self.alignInfo["cut_index"]["back_ECG"] - self.alignInfo["cut_index"]["front_ECG"]
|
||||
self.SALabel["Start"] = self.SALabel["Start"] - round((self.alignInfo["cut_index"]["front_ECG"] / 1000))
|
||||
self.SALabel["End"] = self.SALabel["End"] - round((self.alignInfo["cut_index"]["front_ECG"] / 1000))
|
||||
self.SALabel = self.SALabel[self.SALabel["End"] >= 0]
|
||||
self.SALabel.loc[self.SALabel["Start"] < 0, "Start"] = 0
|
||||
self.SALabel = self.SALabel[self.SALabel["Start"] < ECG_length]
|
||||
self.SALabel.loc[self.SALabel["End"] >= ECG_length, "End"] = ECG_length - 1
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.CUT_PSG_ALIGN_LABEL_FAILURE +
|
||||
Constants.FAILURE_REASON["Align_Label_Exception"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.CUT_PSG_ALIGN_LABEL_FINISHED)
|
||||
|
||||
def save(self):
|
||||
for raw in self.raw.values():
|
||||
if len(raw) == 0:
|
||||
return Result().failure(info=Constants.SAVE_FAILURE +
|
||||
Constants.FAILURE_REASON["Data_not_Exist"])
|
||||
|
||||
try:
|
||||
for key, raw in self.raw.items():
|
||||
DataFrame(raw.reshape(-1)).to_csv(Path(Config["Path"]["SaveFolder"]) / Path((Config["ChannelSave"][key] + str(self.freq[key]) + Config["EndWith"][key])),
|
||||
index=False, header=False)
|
||||
# 重排index,从1开始,并给index命名
|
||||
self.SALabel.sort_values(by=["Start"], inplace=True)
|
||||
self.SALabel.reset_index(drop=True, inplace=True)
|
||||
self.SALabel.index = self.SALabel.index + 1
|
||||
self.SALabel.index.name = "Index"
|
||||
self.SALabel.to_csv(Path(Config["Path"]["SaveFolder"]) / Path((Config["LabelSave"]["SA Label"] + Config["EndWith"]["SA Label"])),
|
||||
encoding="gbk")
|
||||
except PermissionError as e:
|
||||
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"])
|
||||
except FileNotFoundError as e:
|
||||
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"])
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.SAVE_FAILURE +
|
||||
Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.SAVE_FINISHED)
|
||||
|
||||
@staticmethod
|
||||
def get_time_to_seconds(time_str):
|
||||
h, m, s = map(int, time_str.split(":"))
|
||||
return h * 3600 + m * 60 + s
|
||||
564
func/Module_cut_pair_file.py
Normal file
564
func/Module_cut_pair_file.py
Normal file
@ -0,0 +1,564 @@
|
||||
from ast import literal_eval
|
||||
from gc import collect
|
||||
from math import floor
|
||||
from pathlib import Path
|
||||
from traceback import format_exc
|
||||
|
||||
from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication
|
||||
from numpy import array
|
||||
from overrides import overrides
|
||||
from pandas import read_csv, DataFrame
|
||||
from yaml import dump, load, FullLoader
|
||||
|
||||
from func.utils.ConfigParams import Filename, Params
|
||||
from func.utils.PublicFunc import PublicFunc
|
||||
from func.utils.Constants import Constants
|
||||
from func.utils.Result import Result
|
||||
|
||||
from ui.MainWindow.MainWindow_cut_PAIR_FILE import Ui_MainWindow_cut_PAIR_FILE
|
||||
|
||||
|
||||
Config = {
|
||||
|
||||
}
|
||||
|
||||
ButtonState = {
|
||||
"Default": {
|
||||
"checkBox_roughCut": False,
|
||||
"pushButton_deleteRoughCut": False,
|
||||
"pushButton_execute": True
|
||||
},
|
||||
"Current": {
|
||||
"checkBox_roughCut": False,
|
||||
"pushButton_deleteRoughCut": False,
|
||||
"pushButton_execute": True
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class MainWindow_cut_PAIR_FILE(QMainWindow):
|
||||
|
||||
def __init__(self):
|
||||
super(MainWindow_cut_PAIR_FILE, self).__init__()
|
||||
self.ui = Ui_MainWindow_cut_PAIR_FILE()
|
||||
self.ui.setupUi(self)
|
||||
|
||||
self.root_path = None
|
||||
self.sampID = None
|
||||
|
||||
self.__read_config__()
|
||||
|
||||
self.data = None
|
||||
|
||||
self.ui.textBrowser_info.setStyleSheet("QTextBrowser { background-color: rgb(255, 255, 200); }")
|
||||
PublicFunc.__styleAllButton__(self, ButtonState)
|
||||
|
||||
# 初始化进度条
|
||||
self.ui.progressbar.setStyleSheet(Constants.PROGRESSBAR_STYLE)
|
||||
self.progressbar = self.ui.progressbar
|
||||
|
||||
self.msgBox = QMessageBox()
|
||||
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
|
||||
|
||||
@overrides
|
||||
def show(self, root_path, sampID):
|
||||
super().show()
|
||||
self.root_path = root_path
|
||||
self.sampID = sampID
|
||||
|
||||
PublicFunc.__resetAllButton__(self, ButtonState)
|
||||
|
||||
Config.update({
|
||||
"Path": {
|
||||
"InputPSGFolder": str(Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID))),
|
||||
"SavePSGFolder": str(Path(self.root_path) / Filename.PATH_PSG_ALIGNED / Path(str(self.sampID))),
|
||||
"InputAlignInfo": str(Path(self.root_path) / Filename.PATH_LABEL / Path(str(self.sampID))),
|
||||
"InputOrgBCGFolder": str(Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID))),
|
||||
"SaveOrgBCGFolder": str(Path(self.root_path) / Filename.PATH_ORGBCG_ALIGNED / Path(str(self.sampID)))
|
||||
}
|
||||
})
|
||||
|
||||
self.ui.plainTextEdit_channel.setPlainText(', '.join(Config["ChannelInput"].keys()))
|
||||
self.ui.plainTextEdit_label.setPlainText(', '.join(Config["LabelInput"].keys()))
|
||||
|
||||
self.ui.pushButton_execute.clicked.connect(self.__slot_btn_execute__)
|
||||
self.ui.checkBox_roughCut.stateChanged.connect(self.__change_approximate_align_mode__)
|
||||
|
||||
|
||||
@overrides
|
||||
def closeEvent(self, event):
|
||||
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||||
if reply == QMessageBox.Yes:
|
||||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||||
|
||||
PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
|
||||
QApplication.processEvents()
|
||||
|
||||
# 释放资源
|
||||
del self.data
|
||||
self.deleteLater()
|
||||
collect()
|
||||
event.accept()
|
||||
else:
|
||||
event.ignore()
|
||||
|
||||
def __reset__(self):
|
||||
ButtonState["Current"].update(ButtonState["Default"].copy())
|
||||
|
||||
def __read_config__(self):
|
||||
if not Path(Params.CUT_PAIR_FILE_CONFIG_FILE_PATH).exists():
|
||||
with open(Params.CUT_PAIR_FILE_CONFIG_FILE_PATH, "w") as f:
|
||||
dump(Params.CUT_PAIR_FILE_CONFIG_NEW_CONTENT, f)
|
||||
|
||||
with open(Params.CUT_PAIR_FILE_CONFIG_FILE_PATH, "r") as f:
|
||||
file_config = load(f.read(), Loader=FullLoader)
|
||||
Config.update(file_config)
|
||||
|
||||
# 数据回显
|
||||
self.ui.spinBox_ECGFreq.setValue(Config["ECGFreq"])
|
||||
|
||||
|
||||
def __change_approximate_align_mode__(self):
|
||||
# ChannelInput 添加OrgBCGCHannelInput
|
||||
if self.ui.checkBox_roughCut.isChecked():
|
||||
Config["ChannelInput"].update(Config["OrgBCGChannelInput"])
|
||||
Config["ChannelInput"].update(Config["ECGChannelInput"])
|
||||
|
||||
# 修改ChannelSave中的Sync为RoughCut
|
||||
for key in Config["ChannelSave"].keys():
|
||||
if "Sync" in Config["ChannelSave"][key]:
|
||||
Config["ChannelSave"][key] = Config["ChannelSave"][key].replace("Sync", "RoughCut")
|
||||
|
||||
ButtonState["Default"]["pushButton_deleteRoughCut"] = True
|
||||
self.ui.plainTextEdit_channel.setPlainText(', '.join(Config["ChannelInput"].keys()))
|
||||
|
||||
else:
|
||||
# ChannelInput 移除OrgBCGCHannelInput
|
||||
for key in Config["OrgBCGChannelInput"].keys():
|
||||
if key in Config["ChannelInput"]:
|
||||
Config["ChannelInput"].pop(key)
|
||||
for key in Config["ECGChannelInput"].keys():
|
||||
if key in Config["ChannelInput"]:
|
||||
Config["ChannelInput"].pop(key)
|
||||
# print(Config["ChannelInput"])
|
||||
# 修改ChannelSave中的RoughCut为Sync
|
||||
for key in Config["ChannelSave"].keys():
|
||||
if "RoughCut" in Config["ChannelSave"][key]:
|
||||
Config["ChannelSave"][key] = Config["ChannelSave"][key].replace("RoughCut", "Sync")
|
||||
ButtonState["Default"]["pushButton_deleteRoughCut"] = False
|
||||
self.ui.plainTextEdit_channel.setPlainText(', '.join(Config["ChannelInput"].keys()))
|
||||
|
||||
|
||||
def __get_approximately_align_info__(self):
|
||||
time_bias = self.data.get_approximately_align_info()
|
||||
if time_bias is not None:
|
||||
info = "已成功获取粗对齐信息,时间偏移量为{:.3f}秒".format(time_bias)
|
||||
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
|
||||
QApplication.processEvents()
|
||||
return True
|
||||
else:
|
||||
info = "获取粗对齐信息失败,系统将尝试计算粗对齐信息"
|
||||
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_WARNING)
|
||||
QApplication.processEvents()
|
||||
|
||||
align_info = self.data.calc_approximately_align_info()
|
||||
if align_info is not None:
|
||||
info = "已成功计算粗对齐信息,切割点为:ECG前端去除{}样本点,ECG后端去除{}样本点,BCG前端去除{}样本点,BCG后端去除{}样本点".format(
|
||||
align_info["cut_index"]["front_ECG"],
|
||||
align_info["cut_index"]["back_ECG"],
|
||||
align_info["cut_index"]["front_BCG"],
|
||||
align_info["cut_index"]["back_BCG"]
|
||||
)
|
||||
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
|
||||
QApplication.processEvents()
|
||||
return True
|
||||
else:
|
||||
info = "计算粗对齐信息失败,无法进行后续操作"
|
||||
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_ERROR)
|
||||
QApplication.processEvents()
|
||||
return False
|
||||
|
||||
|
||||
def __slot_btn_execute__(self):
|
||||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||||
|
||||
self.data = Data(self.root_path, self.sampID)
|
||||
Config["ECGFreq"] = self.ui.spinBox_ECGFreq.value()
|
||||
|
||||
# 检查文件是否存在并获取其数据采样率
|
||||
PublicFunc.progressbar_update(self, 1, 5, Constants.CUT_PAIR_FILE_GETTING_FILE_AND_FREQ, 0)
|
||||
result = self.data.get_file_and_freq()
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
if self.ui.checkBox_roughCut.isChecked():
|
||||
Config["BCGFreq"] = self.data.freq["OrgBCG"]
|
||||
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
# 导入数据
|
||||
PublicFunc.progressbar_update(self, 2, 5, Constants.INPUTTING_DATA, 10)
|
||||
result = self.data.open_file()
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
if self.ui.checkBox_roughCut.isChecked():
|
||||
# 获取或计算粗对齐信息
|
||||
PublicFunc.progressbar_update(self, 3, 5, Constants.CUT_PAIR_FILE_GETTING_APPROXIMATE_ALIGN_INFO, 20)
|
||||
result_approximate = self.__get_approximately_align_info__()
|
||||
if not result_approximate:
|
||||
PublicFunc.msgbox_output(self, Constants.CUT_PAIR_FILE_GETTING_APPROXIMATE_ALIGN_INFO_FAILURE, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(3/5)" + Constants.CUT_PAIR_FILE_GETTING_APPROXIMATE_ALIGN_INFO_FINISHED, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
# 切割数据
|
||||
PublicFunc.progressbar_update(self, 3, 5, Constants.CUT_PAIR_FILE_CUTTING_DATA, 40)
|
||||
result = self.data.cut_data()
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
# 标签映射
|
||||
PublicFunc.progressbar_update(self, 4, 5, Constants.CUT_PAIR_FILE_ALIGNING_LABEL, 60)
|
||||
result = self.data.align_label()
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
# 保存数据
|
||||
PublicFunc.progressbar_update(self, 5, 5, Constants.SAVING_DATA, 70)
|
||||
|
||||
result = self.data.save()
|
||||
|
||||
if not result.status:
|
||||
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
return
|
||||
else:
|
||||
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||||
for key, raw in self.data.raw.items():
|
||||
info = "保存{}的长度为{},采样率为{}Hz".format(key, str(len(raw)), str(self.data.freq[key]))
|
||||
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
|
||||
QApplication.processEvents()
|
||||
|
||||
PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO)
|
||||
PublicFunc.finish_operation(self, ButtonState)
|
||||
|
||||
|
||||
class Data:
|
||||
|
||||
def __init__(self, root_path, sampID):
|
||||
self.TimeBiasSecond = None
|
||||
self.alignInfo = None
|
||||
|
||||
self.raw = {key: array([]) for key in Config["ChannelInput"]}
|
||||
self.freq = {key: 0 for key in Config["ChannelInput"]}
|
||||
|
||||
self.SALabel = None
|
||||
self.startTime = None
|
||||
|
||||
self.root_path = root_path
|
||||
self.sampID = sampID
|
||||
|
||||
def get_file_and_freq(self):
|
||||
try:
|
||||
for file_path in Path(Config["Path"]["InputPSGFolder"]).glob('*'):
|
||||
if file_path.is_file():
|
||||
file_stem = Path(file_path).stem
|
||||
for key, prefix in Config["ChannelInput"].items():
|
||||
if not prefix.startswith("PSG:"):
|
||||
continue
|
||||
prefix = prefix[len("PSG:"):]
|
||||
if file_stem.startswith(prefix):
|
||||
freq_str = file_stem.rsplit('_', 1)[1]
|
||||
try:
|
||||
freq = int(freq_str)
|
||||
self.freq[key] = freq
|
||||
except ValueError:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["Filename_Format_not_Correct"] + f"\n{Config['ChannelInput']}")
|
||||
|
||||
for file_path in Path(Config["Path"]["InputOrgBCGFolder"]).glob('*'):
|
||||
if file_path.is_file():
|
||||
file_stem = Path(file_path).stem
|
||||
for key, prefix in Config["ChannelInput"].items():
|
||||
if not prefix.startswith("OrgBCG:"):
|
||||
continue
|
||||
prefix = prefix[len("OrgBCG:"):]
|
||||
if file_stem.startswith(prefix):
|
||||
freq_str = file_stem.rsplit('_', 1)[1]
|
||||
try:
|
||||
freq = int(freq_str)
|
||||
self.freq[key] = freq
|
||||
except ValueError:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["Filename_Format_not_Correct"] + f"\n{Config['ChannelInput']}")
|
||||
|
||||
for value in self.freq.values():
|
||||
if value == 0:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["Filename_Format_not_Correct"] + f"\n{Config['ChannelInput']}")
|
||||
if not any((Config["LabelInput"]["SA Label"] + Config["EndWith"]["SA Label"]) in str(file) for file in Path(Config["Path"]["InputPSGFolder"]).glob('*')):
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||||
if not any((Config["StartTime"] + Config["EndWith"]["StartTime"]) in str(file) for file in Path(Config["Path"]["InputPSGFolder"]).glob('*')):
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||||
if not Path(Config["Path"]["InputAlignInfo"]).exists():
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_GET_FILE_AND_FREQ_FAILURE +
|
||||
Constants.FAILURE_REASON["Get_File_and_Freq_Excepetion"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.CUT_PAIR_FILE_GET_FILE_AND_FREQ_FINISHED)
|
||||
|
||||
def open_file(self):
|
||||
psg_path = str(Path(self.root_path) / Filename.PATH_PSG_TEXT / Path(str(self.sampID)))
|
||||
bcg_path = str(Path(self.root_path) / Filename.PATH_ORGBCG_TEXT / Path(str(self.sampID)))
|
||||
for value in Config["ChannelInput"].values():
|
||||
if value.startswith("PSG:"):
|
||||
value = value[len("PSG:"):]
|
||||
result = PublicFunc.examine_file(psg_path, value, Params.ENDSWITH_TXT)
|
||||
if not result.status:
|
||||
return result
|
||||
elif value.startswith("OrgBCG:"):
|
||||
value = value[len("OrgBCG:"):]
|
||||
result = PublicFunc.examine_file(bcg_path, value, Params.ENDSWITH_TXT)
|
||||
if not result.status:
|
||||
return result
|
||||
|
||||
if Path(Config["Path"]["InputAlignInfo"]).is_file():
|
||||
Config["Path"]["InputAlignInfo"] = str(Path(Config["Path"]["InputAlignInfo"]).parent)
|
||||
|
||||
Config["Path"]["Input_Approximately_Align"] = str(
|
||||
Path(Config["Path"]["InputAlignInfo"]) / Path(
|
||||
Filename.APPROXIMATELY_ALIGN_INFO + Params.ENDSWITH_CSV))
|
||||
|
||||
Config["Path"]["InputAlignInfo"] = str(
|
||||
Path(Config["Path"]["InputAlignInfo"]) / Path(
|
||||
Filename.PRECISELY_ALIGN_INFO + Params.ENDSWITH_TXT))
|
||||
|
||||
try:
|
||||
for key in Config["ChannelInput"].keys():
|
||||
if not Config["ChannelInput"][key].startswith("OrgBCG"):
|
||||
self.raw[key] = read_csv(Path(Config["Path"]["InputPSGFolder"]) / Path((Config["ChannelInput"][key][len("PSG:"):] + str(self.freq[key]) + Config["EndWith"][key])),
|
||||
encoding=Params.UTF8_ENCODING,
|
||||
header=None).to_numpy().reshape(-1)
|
||||
|
||||
elif Config["ChannelInput"][key].startswith("OrgBCG"):
|
||||
self.raw[key] = read_csv(Path(Config["Path"]["InputOrgBCGFolder"]) / Path((Config["ChannelInput"][key][len("OrgBCG:"):] + str(self.freq[key]) + Config["EndWith"][key])),
|
||||
encoding=Params.UTF8_ENCODING,
|
||||
header=None).to_numpy().reshape(-1)
|
||||
|
||||
self.SALabel = read_csv(Path(Config["Path"]["InputPSGFolder"]) / Path((Config["LabelInput"]["SA Label"] + Config["EndWith"]["SA Label"])),
|
||||
encoding=Params.GBK_ENCODING)
|
||||
self.startTime = read_csv(Path(Config["Path"]["InputPSGFolder"]) / Path((Config["StartTime"] + Config["EndWith"]["StartTime"])),
|
||||
encoding=Params.UTF8_ENCODING,
|
||||
header=None).to_numpy().reshape(-1)
|
||||
self.alignInfo = read_csv(Path(Config["Path"]["InputAlignInfo"]),
|
||||
encoding=Params.UTF8_ENCODING,
|
||||
header=None).to_numpy().reshape(-1)
|
||||
self.alignInfo = literal_eval(self.alignInfo[0])
|
||||
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.INPUT_FAILURE +
|
||||
Constants.FAILURE_REASON["Open_Data_Exception"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.INPUT_FINISHED)
|
||||
|
||||
|
||||
def get_approximately_align_info(self):
|
||||
try:
|
||||
df = read_csv(Config["Path"]["Input_Approximately_Align"])
|
||||
pos = df["pos"].values[-1]
|
||||
ApplyFrequency = df["ApplyFrequency"].values[-1]
|
||||
self.TimeBiasSecond = pos / ApplyFrequency
|
||||
return self.TimeBiasSecond
|
||||
|
||||
except Exception as e:
|
||||
self.TimeBiasSecond = 0
|
||||
return None
|
||||
|
||||
def calc_approximately_align_info(self):
|
||||
try:
|
||||
# 获取BCG长度
|
||||
BCG_freq = Config["BCGFreq"]
|
||||
BCG_second = len(self.raw["BCG"]) // BCG_freq
|
||||
|
||||
|
||||
# 计算ECG长度
|
||||
ECG_freq = Config["ECGFreq"]
|
||||
ECG_second = len(self.raw["ECG"]) // ECG_freq
|
||||
|
||||
|
||||
pos = self.TimeBiasSecond
|
||||
|
||||
# 如果pos<0,表示BCG信号比ECG信号提前,需要在开头去除掉一部分BCG信号
|
||||
if pos < 0:
|
||||
front_BCG = floor(-pos)
|
||||
front_ECG = 0
|
||||
else:
|
||||
front_BCG = 0
|
||||
front_ECG = floor(pos)
|
||||
|
||||
# 计算结束位置
|
||||
if (BCG_second - front_BCG) > (ECG_second - front_ECG):
|
||||
back_ECG = ECG_second + front_ECG
|
||||
back_BCG = back_ECG - floor(pos)
|
||||
else:
|
||||
back_BCG = BCG_second + front_BCG
|
||||
back_ECG = back_BCG + floor(pos)
|
||||
|
||||
self.alignInfo = {
|
||||
"cut_index": {
|
||||
"front_ECG": front_ECG * ECG_freq,
|
||||
"back_ECG": back_ECG * ECG_freq,
|
||||
"front_BCG": front_BCG * BCG_freq,
|
||||
"back_BCG": back_BCG * BCG_freq
|
||||
}
|
||||
}
|
||||
return self.alignInfo
|
||||
|
||||
|
||||
except Exception as e:
|
||||
return None
|
||||
|
||||
|
||||
def cut_data(self):
|
||||
try:
|
||||
for key, raw in self.raw.items():
|
||||
if Config["ChannelInput"][key].startswith("PSG:"):
|
||||
# 转换切割点
|
||||
ECG_freq = Config["ECGFreq"]
|
||||
raw_freq = self.freq[key]
|
||||
duration_second = ((self.alignInfo["cut_index"]["back_ECG"] - self.alignInfo["cut_index"]["front_ECG"]) // ECG_freq) + 1
|
||||
start_index_cut = floor(self.alignInfo["cut_index"]["front_ECG"] * (raw_freq / ECG_freq))
|
||||
end_index_cut = start_index_cut + (duration_second * raw_freq)
|
||||
|
||||
try:
|
||||
# 切割信号
|
||||
self.raw[key] = self.raw[key][start_index_cut:end_index_cut]
|
||||
except Exception:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_CUT_DATA_FAILURE +
|
||||
Constants.FAILURE_REASON["Cut_Data_Length_not_Correct"])
|
||||
elif Config["ChannelInput"][key].startswith("OrgBCG:"):
|
||||
# 转换切割点
|
||||
BCG_freq = Config["BCGFreq"]
|
||||
raw_freq = self.freq[key]
|
||||
duration_second = ((self.alignInfo["cut_index"]["back_BCG"] - self.alignInfo["cut_index"]["front_BCG"]) // BCG_freq) + 1
|
||||
start_index_cut = floor(self.alignInfo["cut_index"]["front_BCG"] * (raw_freq / BCG_freq))
|
||||
end_index_cut = start_index_cut + (duration_second * raw_freq)
|
||||
|
||||
try:
|
||||
# 切割信号
|
||||
self.raw[key] = self.raw[key][start_index_cut:end_index_cut]
|
||||
except Exception:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_CUT_DATA_FAILURE +
|
||||
Constants.FAILURE_REASON["Cut_Data_Length_not_Correct"])
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_CUT_DATA_FAILURE +
|
||||
Constants.FAILURE_REASON["Cut_Data_Exception"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.CUT_PAIR_FILE_CUT_DATA_FINISHED)
|
||||
|
||||
def align_label(self):
|
||||
try:
|
||||
# 读取SA标签
|
||||
self.SALabel = self.SALabel.loc[:, ~self.SALabel.columns.str.contains("^Unnamed")]
|
||||
self.SALabel = self.SALabel[self.SALabel["Event type"].isin(Params.CUT_PAIR_FILE_SALABEL_EVENT)]
|
||||
self.SALabel["Duration"] = self.SALabel["Duration"].astype(str)
|
||||
self.SALabel["Duration"] = self.SALabel["Duration"].str.replace(r' \(.*?\)', '', regex=True)
|
||||
except Exception:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_ALIGN_LABEL_FAILURE +
|
||||
Constants.FAILURE_REASON["Align_Label_SALabel_Format_not_Correct"])
|
||||
|
||||
try:
|
||||
# 获取记录开始时间
|
||||
start_time = str(self.startTime[0]).split(" ")[1]
|
||||
start_time = Data.get_time_to_seconds(start_time)
|
||||
|
||||
# 计算起始时间秒数和终止时间秒数
|
||||
self.SALabel["Start"] = (self.SALabel["Time"].apply(self.get_time_to_seconds) - start_time).apply(
|
||||
lambda x: x + 24 * 3600 if x < 0 else x).astype(int)
|
||||
self.SALabel["End"] = self.SALabel["Start"] + self.SALabel["Duration"].astype(float).round(0).astype(int)
|
||||
|
||||
# 标签映射
|
||||
ECG_length = self.alignInfo["cut_index"]["back_ECG"] - self.alignInfo["cut_index"]["front_ECG"]
|
||||
self.SALabel["Start"] = self.SALabel["Start"] - round((self.alignInfo["cut_index"]["front_ECG"] / 1000))
|
||||
self.SALabel["End"] = self.SALabel["End"] - round((self.alignInfo["cut_index"]["front_ECG"] / 1000))
|
||||
self.SALabel = self.SALabel[self.SALabel["End"] >= 0]
|
||||
self.SALabel.loc[self.SALabel["Start"] < 0, "Start"] = 0
|
||||
self.SALabel = self.SALabel[self.SALabel["Start"] < ECG_length]
|
||||
self.SALabel.loc[self.SALabel["End"] >= ECG_length, "End"] = ECG_length - 1
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.CUT_PAIR_FILE_ALIGN_LABEL_FAILURE +
|
||||
Constants.FAILURE_REASON["Align_Label_Exception"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.CUT_PAIR_FILE_ALIGN_LABEL_FINISHED)
|
||||
|
||||
def save(self):
|
||||
for raw in self.raw.values():
|
||||
if len(raw) == 0:
|
||||
return Result().failure(info=Constants.SAVE_FAILURE +
|
||||
Constants.FAILURE_REASON["Data_not_Exist"])
|
||||
|
||||
try:
|
||||
for key, raw in self.raw.items():
|
||||
if Config["ChannelInput"][key].startswith("PSG:"):
|
||||
# print(f"Saving PSG channel: {key} to {Config['Path']['SavePSGFolder']} / {Config['ChannelSave'][key] + str(self.freq[key]) + Config['EndWith'][key]}")
|
||||
DataFrame(raw.reshape(-1)).to_csv(Path(Config["Path"]["SavePSGFolder"]) / Path((Config["ChannelSave"][key][len("PSG:"):] + str(self.freq[key]) + Config["EndWith"][key])),
|
||||
index=False, header=False)
|
||||
elif Config["ChannelInput"][key].startswith("OrgBCG:"):
|
||||
# print(f"Saving OrgBCG channel: {key} to {Config['Path']['SaveOrgBCGFolder']} / {Config['ChannelSave'][key] + str(self.freq[key]) + Config['EndWith'][key]}")
|
||||
DataFrame(raw.reshape(-1)).to_csv(Path(Config["Path"]["SaveOrgBCGFolder"]) / Path((Config["ChannelSave"][key][len("OrgBCG:"):] + str(self.freq[key]) + Config["EndWith"][key])),
|
||||
index=False, header=False)
|
||||
# 重排index,从1开始,并给index命名
|
||||
self.SALabel.sort_values(by=["Start"], inplace=True)
|
||||
self.SALabel.reset_index(drop=True, inplace=True)
|
||||
self.SALabel.index = self.SALabel.index + 1
|
||||
self.SALabel.index.name = "Index"
|
||||
self.SALabel.to_csv(Path(Config["Path"]["SavePSGFolder"]) / Path((Config["LabelSave"]["SA Label"] + Config["EndWith"]["SA Label"])),
|
||||
encoding="gbk")
|
||||
except PermissionError as e:
|
||||
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_Permission_Denied"])
|
||||
except FileNotFoundError as e:
|
||||
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON["Save_File_Not_Found"])
|
||||
except Exception as e:
|
||||
return Result().failure(info=Constants.SAVE_FAILURE +
|
||||
Constants.FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
|
||||
|
||||
return Result().success(info=Constants.SAVE_FINISHED)
|
||||
|
||||
@staticmethod
|
||||
def get_time_to_seconds(time_str):
|
||||
h, m, s = map(int, time_str.split(":"))
|
||||
return h * 3600 + m * 60 + s
|
||||
@ -16,7 +16,7 @@ from func.Module_detect_Jpeak import MainWindow_detect_Jpeak
|
||||
from func.Module_detect_Rpeak import MainWindow_detect_Rpeak
|
||||
from func.Module_label_check import MainWindow_label_check
|
||||
from func.Module_precisely_align import MainWindow_precisely_align
|
||||
from func.Module_cut_PSG import MainWindow_cut_PSG
|
||||
from func.Module_cut_pair_file import MainWindow_cut_PAIR_FILE
|
||||
from func.Module_artifact_label import MainWindow_artifact_label
|
||||
from func.Module_bcg_quality_label import MainWindow_bcg_quality_label
|
||||
from func.Module_resp_quality_label import MainWindow_resp_quality_label
|
||||
@ -266,7 +266,7 @@ class MainWindow(QMainWindow, Ui_Signal_Label):
|
||||
self.check_save_path_and_mkdir(root_path, sampID)
|
||||
|
||||
def __slot_btn_cut_PSG__(self):
|
||||
self.cut_PSG = MainWindow_cut_PSG()
|
||||
self.cut_PSG = MainWindow_cut_PAIR_FILE()
|
||||
root_path = self.ui.plainTextEdit_root_path.toPlainText()
|
||||
sampID = self.ui.comboBox_sampID.currentText()
|
||||
if not self.check_root_path():
|
||||
|
||||
@ -220,30 +220,39 @@ class Params:
|
||||
PRECISELY_ALIGN_LABEL_TRANSPARENCY: float = 0.2
|
||||
|
||||
# 冗余数据切割和标签映射
|
||||
CUT_PSG_CONFIG_FILE_PATH: str = "./config/Config_cut_PSG.yaml"
|
||||
CUT_PSG_CONFIG_NEW_CONTENT: dict = {
|
||||
CUT_PAIR_FILE_CONFIG_FILE_PATH: str = "./config/Config_cut_PAIR_FILE.yaml"
|
||||
CUT_PAIR_FILE_CONFIG_NEW_CONTENT: dict = {
|
||||
"ECGFreq": 1000,
|
||||
"BCGFreq": 1000,
|
||||
"ChannelInput": {
|
||||
"Effort Tho": Filename.THO_RAW,
|
||||
"Effort Abd": Filename.ABD_RAW,
|
||||
"Flow T": Filename.FLOWT_RAW,
|
||||
"Flow P": Filename.FLOWP_RAW,
|
||||
"Snore": Filename.SNORE_RAW,
|
||||
"SpO2": Filename.SPO2_RAW,
|
||||
"5_class": Filename.FIVE_CLASS_RAW
|
||||
"Effort Tho": "PSG:" + Filename.THO_RAW,
|
||||
"Effort Abd": "PSG:" + Filename.ABD_RAW,
|
||||
"Flow T": "PSG:" + Filename.FLOWT_RAW,
|
||||
"Flow P": "PSG:" + Filename.FLOWP_RAW,
|
||||
"Snore": "PSG:" + Filename.SNORE_RAW,
|
||||
"SpO2": "PSG:" + Filename.SPO2_RAW,
|
||||
"5_class": "PSG:" + Filename.FIVE_CLASS_RAW
|
||||
},
|
||||
"OrgBCGChannelInput":{
|
||||
"OrgBCG": "OrgBCG:OrgBCG_Raw_"
|
||||
},
|
||||
"ECGChannelInput": {
|
||||
"ECG": "PSG:" + Filename.ECG_RAW
|
||||
},
|
||||
"LabelInput": {
|
||||
"SA Label": Filename.SA_LABEL_RAW
|
||||
},
|
||||
"StartTime": Filename.STARTTIME_RAW,
|
||||
"ChannelSave": {
|
||||
"Effort Tho": Filename.THO_SYNC,
|
||||
"Effort Abd": Filename.ABD_SYNC,
|
||||
"Flow T": Filename.FLOWT_SYNC,
|
||||
"Flow P": Filename.FLOWP_SYNC,
|
||||
"Snore": Filename.SNORE_SYNC,
|
||||
"SpO2": Filename.SPO2_SYNC,
|
||||
"5_class": Filename.FIVE_CLASS_SYNC
|
||||
"Effort Tho": "PSG:" + Filename.THO_SYNC,
|
||||
"Effort Abd": "PSG:" + Filename.ABD_SYNC,
|
||||
"Flow T": "PSG:" + Filename.FLOWT_SYNC,
|
||||
"Flow P": "PSG:" + Filename.FLOWP_SYNC,
|
||||
"Snore": "PSG:" + Filename.SNORE_SYNC,
|
||||
"SpO2": "PSG:" + Filename.SPO2_SYNC,
|
||||
"5_class": "PSG:" + Filename.FIVE_CLASS_SYNC,
|
||||
"OrgBCG": "OrgBCG:" + Filename.ORGBCG_SYNC,
|
||||
"ECG": "PSG:" + Filename.ECG_SYNC
|
||||
},
|
||||
"LabelSave": {
|
||||
"SA Label": Filename.SA_LABEL_SYNC
|
||||
@ -258,10 +267,12 @@ class Params:
|
||||
"SpO2": ENDSWITH_TXT,
|
||||
"5_class": ENDSWITH_TXT,
|
||||
"SA Label": ENDSWITH_CSV,
|
||||
"StartTime": ENDSWITH_TXT
|
||||
"StartTime": ENDSWITH_TXT,
|
||||
"OrgBCG": ENDSWITH_TXT,
|
||||
"ECG": ENDSWITH_TXT
|
||||
},
|
||||
}
|
||||
CUT_PSG_SALABEL_EVENT: list = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea"]
|
||||
CUT_PAIR_FILE_SALABEL_EVENT: list = ["Hypopnea", "Central apnea", "Obstructive apnea", "Mixed apnea"]
|
||||
|
||||
# 体动标注
|
||||
ARTIFACT_LABEL_CONFIG_FILE_PATH: str = "./config/Config_artifact_label.yaml"
|
||||
|
||||
@ -5,6 +5,7 @@ class Constants:
|
||||
|
||||
# 公共
|
||||
TIPS_TYPE_INFO: str = "Info"
|
||||
TIPS_TYPE_WARNING: str = "Warning"
|
||||
TIPS_TYPE_ERROR: str = "Error"
|
||||
MSGBOX_TYPE_INFO: str = "Info"
|
||||
MSGBOX_TYPE_WARNING: str = "Warning"
|
||||
@ -352,17 +353,22 @@ class Constants:
|
||||
PRECISELY_ALIGN_ACTION_GET_RANGE_NAME: str = f"设置范围({Params.PRECISELY_ALIGN_ACTION_GET_RANGE_SHORTCUT_KEY})"
|
||||
|
||||
# 冗余数据切割和标签映射
|
||||
CUT_PSG_GETTING_FILE_AND_FREQ: str = "正在获取文件及其采样率"
|
||||
CUT_PSG_GET_FILE_AND_FREQ_FINISHED: str = "获取文件及其采样率完成"
|
||||
CUT_PSG_GET_FILE_AND_FREQ_FAILURE: str = "获取文件及其采样率失败"
|
||||
CUT_PAIR_FILE_GETTING_FILE_AND_FREQ: str = "正在获取文件及其采样率"
|
||||
CUT_PAIR_FILE_GET_FILE_AND_FREQ_FINISHED: str = "获取文件及其采样率完成"
|
||||
CUT_PAIR_FILE_GET_FILE_AND_FREQ_FAILURE: str = "获取文件及其采样率失败"
|
||||
|
||||
CUT_PSG_CUTTING_DATA: str = "正在切割数据"
|
||||
CUT_PSG_CUT_DATA_FINISHED: str = "切割数据完成"
|
||||
CUT_PSG_CUT_DATA_FAILURE: str = "切割数据失败"
|
||||
CUT_PAIR_FILE_CUTTING_DATA: str = "正在切割数据"
|
||||
CUT_PAIR_FILE_CUT_DATA_FINISHED: str = "切割数据完成"
|
||||
CUT_PAIR_FILE_CUT_DATA_FAILURE: str = "切割数据失败"
|
||||
|
||||
CUT_PAIR_FILE_ALIGNING_LABEL: str = "正在映射标签"
|
||||
CUT_PAIR_FILE_ALIGN_LABEL_FINISHED: str = "映射标签完成"
|
||||
CUT_PAIR_FILE_ALIGN_LABEL_FAILURE: str = "映射标签失败"
|
||||
|
||||
CUT_PAIR_FILE_GETTING_APPROXIMATE_ALIGN_INFO: str = "正在获取粗对齐信息"
|
||||
CUT_PAIR_FILE_GETTING_APPROXIMATE_ALIGN_INFO_FAILURE: str = "获取粗对齐信息失败"
|
||||
CUT_PAIR_FILE_GETTING_APPROXIMATE_ALIGN_INFO_FINISHED: str = "获取粗对齐信息完成"
|
||||
|
||||
CUT_PSG_ALIGNING_LABEL: str = "正在映射标签"
|
||||
CUT_PSG_ALIGN_LABEL_FINISHED: str = "映射标签完成"
|
||||
CUT_PSG_ALIGN_LABEL_FAILURE: str = "映射标签失败"
|
||||
|
||||
# 体动标注
|
||||
ARTIFACT_LABEL_PLOT_LABEL_ORGBCG_SYNC: str = "OrgBCG_Sync"
|
||||
|
||||
Reference in New Issue
Block a user