317 lines
15 KiB
Python
317 lines
15 KiB
Python
from ast import literal_eval
|
|
from gc import collect
|
|
from math import floor
|
|
from pathlib import Path
|
|
from traceback import format_exc
|
|
|
|
from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication
|
|
from numpy import array
|
|
from overrides import overrides
|
|
from pandas import read_csv, DataFrame
|
|
from yaml import dump, load, FullLoader
|
|
|
|
from func.utils.PublicFunc import PublicFunc
|
|
from func.utils.Constants import Constants, ConfigParams
|
|
from func.utils.Result import Result
|
|
|
|
from ui.MainWindow.MainWindow_cut_PSG import Ui_MainWindow_cut_PSG
|
|
|
|
|
|
Config = {
|
|
|
|
}
|
|
|
|
ButtonState = {
|
|
"Default": {
|
|
"pushButton_execute": True
|
|
},
|
|
"Current": {
|
|
"pushButton_execute": True
|
|
}
|
|
}
|
|
|
|
|
|
class MainWindow_cut_PSG(QMainWindow):
|
|
|
|
def __init__(self):
|
|
super(MainWindow_cut_PSG, self).__init__()
|
|
self.ui = Ui_MainWindow_cut_PSG()
|
|
self.ui.setupUi(self)
|
|
|
|
self.root_path = None
|
|
self.sampID = None
|
|
|
|
self.__read_config__()
|
|
|
|
self.data = None
|
|
|
|
# 初始化进度条
|
|
self.ui.progressbar.setStyleSheet(Constants.PROGRESSBAR_STYLE)
|
|
self.progressbar = self.ui.progressbar
|
|
|
|
self.msgBox = QMessageBox()
|
|
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
|
|
|
|
@overrides
|
|
def show(self, root_path, sampID):
|
|
super().show()
|
|
self.root_path = root_path
|
|
self.sampID = sampID
|
|
|
|
PublicFunc.__resetAllButton__(self, ButtonState)
|
|
|
|
Config.update({
|
|
"Path": {
|
|
"InputFolder": str(Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT / Path(str(self.sampID))),
|
|
"SaveFolder": str(Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_ALIGNED / Path(str(self.sampID))),
|
|
"InputAlignInfo": str(Path(self.root_path) / ConfigParams.PUBLIC_PATH_LABEL / Path(str(self.sampID)) / (ConfigParams.CUT_PSG_SAVE_ECG_ALIGNINFO_FILENAME + ConfigParams.ENDSWITH_TXT))
|
|
}
|
|
})
|
|
|
|
self.ui.plainTextEdit_channel.setPlainText(', '.join(Config["ChannelInput"].keys()))
|
|
self.ui.plainTextEdit_label.setPlainText(', '.join(Config["LabelInput"].keys()))
|
|
|
|
self.ui.pushButton_execute.clicked.connect(self.__slot_btn_execute__)
|
|
|
|
@overrides
|
|
def closeEvent(self, event):
|
|
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
|
if reply == QMessageBox.Yes:
|
|
PublicFunc.__disableAllButton__(self, ButtonState)
|
|
|
|
PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
|
|
QApplication.processEvents()
|
|
|
|
# 释放资源
|
|
del self.data
|
|
self.deleteLater()
|
|
collect()
|
|
event.accept()
|
|
else:
|
|
event.ignore()
|
|
|
|
def __reset__(self):
|
|
ButtonState["Current"].update(ButtonState["Default"].copy())
|
|
|
|
def __read_config__(self):
|
|
if not Path(ConfigParams.CUT_PSG_CONFIG_FILE_PATH).exists():
|
|
with open(ConfigParams.CUT_PSG_CONFIG_FILE_PATH, "w") as f:
|
|
dump(ConfigParams.CUT_PSG_CONFIG_NEW_CONTENT, f)
|
|
|
|
with open(ConfigParams.CUT_PSG_CONFIG_FILE_PATH, "r") as f:
|
|
file_config = load(f.read(), Loader=FullLoader)
|
|
Config.update(file_config)
|
|
|
|
# 数据回显
|
|
self.ui.spinBox_ECGFreq.setValue(Config["ECGFreq"])
|
|
|
|
def __slot_btn_execute__(self):
|
|
PublicFunc.__disableAllButton__(self, ButtonState)
|
|
|
|
self.data = Data()
|
|
Config["ECGFreq"] = self.ui.spinBox_ECGFreq.value()
|
|
|
|
# 检查文件是否存在并获取其数据采样率
|
|
PublicFunc.progressbar_update(self, 1, 5, Constants.CUT_PSG_GETTING_FILE_AND_FREQ, 0)
|
|
result = self.data.get_file_and_freq()
|
|
if not result.status:
|
|
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
|
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
return
|
|
else:
|
|
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
|
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
|
|
# 导入数据
|
|
PublicFunc.progressbar_update(self, 2, 5, Constants.INPUTTING_DATA, 10)
|
|
result = self.data.open_file()
|
|
if not result.status:
|
|
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
|
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
return
|
|
else:
|
|
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
|
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
|
|
# 切割数据
|
|
PublicFunc.progressbar_update(self, 3, 5, Constants.CUT_PSG_CUTTING_DATA, 40)
|
|
result = self.data.cut_data()
|
|
if not result.status:
|
|
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
|
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
return
|
|
else:
|
|
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
|
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
|
|
# 标签映射
|
|
PublicFunc.progressbar_update(self, 4, 5, Constants.CUT_PSG_ALIGNING_LABEL, 60)
|
|
result = self.data.align_label()
|
|
if not result.status:
|
|
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
|
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
return
|
|
else:
|
|
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
|
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
|
|
# 保存数据
|
|
PublicFunc.progressbar_update(self, 5, 5, Constants.SAVING_DATA, 70)
|
|
result = self.data.save()
|
|
|
|
if not result.status:
|
|
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
|
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
return
|
|
else:
|
|
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
|
for key, raw in self.data.raw.items():
|
|
info = "保存{}的长度为{},采样率为{}Hz".format(key, str(len(raw)), str(self.data.freq[key]))
|
|
PublicFunc.text_output(self.ui, info, Constants.TIPS_TYPE_INFO)
|
|
QApplication.processEvents()
|
|
|
|
PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO)
|
|
PublicFunc.finish_operation(self, ButtonState)
|
|
|
|
|
|
class Data:
|
|
|
|
def __init__(self):
|
|
self.alignInfo = None
|
|
|
|
self.raw = {key: array([]) for key in Config["ChannelInput"]}
|
|
self.freq = {key: 0 for key in Config["ChannelInput"]}
|
|
|
|
self.SALabel = None
|
|
self.startTime = None
|
|
|
|
def get_file_and_freq(self):
|
|
try:
|
|
for file_path in Path(Config["Path"]["InputFolder"]).glob('*'):
|
|
if file_path.is_file():
|
|
file_stem = Path(file_path).stem
|
|
for key, prefix in Config["ChannelInput"].items():
|
|
if file_stem.startswith(prefix):
|
|
freq_str = file_stem.rsplit('_', 1)[1]
|
|
try:
|
|
freq = int(freq_str)
|
|
self.freq[key] = freq
|
|
except ValueError:
|
|
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Filename_Format_not_Correct"])
|
|
for value in self.freq.values():
|
|
if value == 0:
|
|
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Filename_Format_not_Correct"])
|
|
if not any((Config["LabelInput"]["SA Label"] + Config["EndWith"]["SA Label"]) in str(file) for file in Path(Config["Path"]["InputFolder"]).glob('*')):
|
|
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE + Constants.CUT_PSG_FAILURE_REASON["File_not_Exist"])
|
|
if not any((Config["StartTime"] + Config["EndWith"]["StartTime"]) in str(file) for file in Path(Config["Path"]["InputFolder"]).glob('*')):
|
|
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE + Constants.CUT_PSG_FAILURE_REASON["File_not_Exist"])
|
|
if not Path(Config["Path"]["InputAlignInfo"]).exists():
|
|
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE + Constants.CUT_PSG_FAILURE_REASON["File_not_Exist"])
|
|
except Exception as e:
|
|
return Result().failure(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE + Constants.CUT_PSG_GET_FILE_AND_FREQ_FAILURE["Get_File_and_Freq_Excepetion"] + "\n" + format_exc())
|
|
|
|
return Result().success(info=Constants.CUT_PSG_GET_FILE_AND_FREQ_FINISHED)
|
|
|
|
def open_file(self):
|
|
try:
|
|
for key in Config["ChannelInput"].keys():
|
|
self.raw[key] = read_csv(Path(Config["Path"]["InputFolder"]) / Path((Config["ChannelInput"][key] + str(self.freq[key]) + Config["EndWith"][key])),
|
|
encoding=ConfigParams.UTF8_ENCODING,
|
|
header=None).to_numpy().reshape(-1)
|
|
self.SALabel = read_csv(Path(Config["Path"]["InputFolder"]) / Path((Config["LabelInput"]["SA Label"] + Config["EndWith"]["SA Label"])),
|
|
encoding=ConfigParams.GBK_ENCODING)
|
|
self.startTime = read_csv(Path(Config["Path"]["InputFolder"]) / Path((Config["StartTime"] + Config["EndWith"]["StartTime"])),
|
|
encoding=ConfigParams.UTF8_ENCODING,
|
|
header=None).to_numpy().reshape(-1)
|
|
self.alignInfo = read_csv(Path(Config["Path"]["InputAlignInfo"]),
|
|
encoding=ConfigParams.UTF8_ENCODING,
|
|
header=None).to_numpy().reshape(-1)
|
|
self.alignInfo = literal_eval(self.alignInfo[0])
|
|
except Exception as e:
|
|
return Result().failure(info=Constants.INPUT_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Read_Data_Exception"] + "\n" + format_exc())
|
|
|
|
return Result().success(info=Constants.INPUT_FINISHED)
|
|
|
|
def cut_data(self):
|
|
try:
|
|
for key, raw in self.raw.items():
|
|
# 转换切割点
|
|
ECG_freq = Config["ECGFreq"]
|
|
raw_freq = self.freq[key]
|
|
duration_second = ((self.alignInfo["cut_index"]["back_ECG"] - self.alignInfo["cut_index"]["front_ECG"]) // 1000) + 1
|
|
start_index_cut = floor(self.alignInfo["cut_index"]["front_ECG"] * (raw_freq / ECG_freq))
|
|
end_index_cut = start_index_cut + (duration_second * raw_freq)
|
|
|
|
try:
|
|
# 切割信号
|
|
self.raw[key] = self.raw[key][start_index_cut:end_index_cut]
|
|
except Exception:
|
|
return Result().failure(info=Constants.CUT_PSG_CUT_DATA_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Cut_Data_Length_not_Correct"])
|
|
except Exception as e:
|
|
return Result().failure(info=Constants.CUT_PSG_CUT_DATA_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Cut_Data_Exception"] + "\n" + format_exc())
|
|
|
|
return Result().success(info=Constants.CUT_PSG_CUT_DATA_FINISHED)
|
|
|
|
def align_label(self):
|
|
try:
|
|
# 读取SA标签
|
|
self.SALabel = self.SALabel.loc[:, ~self.SALabel.columns.str.contains("^Unnamed")]
|
|
self.SALabel = self.SALabel[self.SALabel["Event type"].isin(ConfigParams.CUT_PSG_SALABEL_EVENT)]
|
|
self.SALabel["Duration"] = self.SALabel["Duration"].astype(str)
|
|
self.SALabel["Duration"] = self.SALabel["Duration"].str.replace(r' \(.*?\)', '', regex=True)
|
|
except Exception:
|
|
return Result().failure(info=Constants.CUT_PSG_ALIGN_LABEL_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Align_Label_SALabel_Format_not_Correct"])
|
|
|
|
try:
|
|
# 获取记录开始时间
|
|
start_time = str(self.startTime[0]).split(" ")[1]
|
|
start_time = Data.get_time_to_seconds(start_time)
|
|
|
|
# 计算起始时间秒数和终止时间秒数
|
|
self.SALabel["Start"] = (self.SALabel["Time"].apply(self.get_time_to_seconds) - start_time).apply(
|
|
lambda x: x + 24 * 3600 if x < 0 else x).astype(int)
|
|
self.SALabel["End"] = self.SALabel["Start"] + self.SALabel["Duration"].astype(float).round(0).astype(int)
|
|
|
|
# 标签映射
|
|
ECG_length = self.alignInfo["cut_index"]["back_ECG"] - self.alignInfo["cut_index"]["front_ECG"]
|
|
self.SALabel["Start"] = self.SALabel["Start"] - round((self.alignInfo["cut_index"]["front_ECG"] / 1000))
|
|
self.SALabel["End"] = self.SALabel["End"] - round((self.alignInfo["cut_index"]["front_ECG"] / 1000))
|
|
self.SALabel = self.SALabel[self.SALabel["End"] >= 0]
|
|
self.SALabel.loc[self.SALabel["Start"] < 0, "Start"] = 0
|
|
self.SALabel = self.SALabel[self.SALabel["Start"] < ECG_length]
|
|
self.SALabel.loc[self.SALabel["End"] >= ECG_length, "End"] = ECG_length - 1
|
|
except Exception as e:
|
|
return Result().failure(info=Constants.CUT_PSG_ALIGN_LABEL_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Align_Label_Exception"] + "\n" + format_exc())
|
|
|
|
return Result().success(info=Constants.CUT_PSG_ALIGN_LABEL_FINISHED)
|
|
|
|
def save(self):
|
|
for raw in self.raw.values():
|
|
if len(raw) == 0:
|
|
return Result().failure(info=Constants.SAVING_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Save_Data_not_Exist"])
|
|
|
|
try:
|
|
for key, raw in self.raw.items():
|
|
DataFrame(raw.reshape(-1)).to_csv(Path(Config["Path"]["SaveFolder"]) / Path((Config["ChannelSave"][key] + str(self.freq[key]) + Config["EndWith"][key])),
|
|
index=False, header=False)
|
|
self.SALabel.to_csv(Path(Config["Path"]["SaveFolder"]) / Path((Config["LabelSave"]["SA Label"] + Config["EndWith"]["SA Label"])),
|
|
index=False,
|
|
encoding="gbk")
|
|
except Exception as e:
|
|
return Result().failure(info=Constants.SAVING_FAILURE + Constants.CUT_PSG_FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
|
|
|
|
return Result().success(info=Constants.SAVING_FINISHED)
|
|
|
|
@staticmethod
|
|
def get_time_to_seconds(time_str):
|
|
h, m, s = map(int, time_str.split(":"))
|
|
return h * 3600 + m * 60 + s |