修改粗对齐文件读取后时间显示为秒数

修改计算对齐按钮位置到对齐板块
添加数据文件相关错误提示
添加粗对齐文件读取自动识别文件名称中的采样率
添加粗对齐选择对其位置后逐epoch拟合视图接口
This commit is contained in:
marques
2025-05-19 09:42:17 +08:00
parent 0be88d9be7
commit adb90c258c
8 changed files with 557 additions and 242 deletions

View File

@ -20,7 +20,6 @@ from func.utils.Result import Result
from ui.MainWindow.MainWindow_approximately_align import Ui_MainWindow_approximately_align
from ui.setting.approximately_align_input_setting import Ui_MainWindow_approximately_align_input_setting
Config = {
}
@ -32,6 +31,7 @@ ButtonState = {
"pushButton_Standardize": False,
"pushButton_CutOff": False,
"pushButton_GetPos": False,
"pushButton_ChangeView": False,
"pushButton_JUMP": False,
"pushButton_EM1": False,
"pushButton_EM10": False,
@ -39,7 +39,12 @@ ButtonState = {
"pushButton_EP1": False,
"pushButton_EP10": False,
"pushButton_EP100": False,
"pushButton_save": False
"pushButton_save": False,
"radioButton_PTHO": False,
"radioButton_PABD": False,
"radioButton_NTHO": False,
"radioButton_NABD": False,
"radioButton_custom": False
},
"Current": {
"pushButton_input_setting": True,
@ -47,6 +52,7 @@ ButtonState = {
"pushButton_Standardize": False,
"pushButton_CutOff": False,
"pushButton_GetPos": False,
"pushButton_ChangeView": False,
"pushButton_JUMP": False,
"pushButton_EM1": False,
"pushButton_EM10": False,
@ -54,7 +60,12 @@ ButtonState = {
"pushButton_EP1": False,
"pushButton_EP10": False,
"pushButton_EP100": False,
"pushButton_save": False
"pushButton_save": False,
"radioButton_PTHO": False,
"radioButton_PABD": False,
"radioButton_NTHO": False,
"radioButton_NABD": False,
"radioButton_custom": False
}
}
@ -93,23 +104,19 @@ class SettingWindow(QMainWindow):
Config.update(file_config)
self.config = file_config
# 更新配置
Config.update({
"Path": {
"Input_orgBcg": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME +
str(Config["InputConfig"]["orgBcgFreq"]) +
ConfigParams.ENDSWITH_TXT))),
Path(str(self.sampID)))),
"Input_Tho": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_THO_FILENAME +
str(Config["InputConfig"]["ThoFreq"]) +
ConfigParams.ENDSWITH_TXT))),
Path(str(self.sampID)))),
"Input_Abd": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ABD_FILENAME +
str(Config["InputConfig"]["AbdFreq"]) +
ConfigParams.ENDSWITH_TXT))),
Path(str(self.sampID)))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_SAVE_FILENAME +
ConfigParams.ENDSWITH_CSV)))
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_SAVE_FILENAME +
ConfigParams.ENDSWITH_CSV)))
},
"orgBcgConfig": {},
"PSGConfig": {}
@ -207,7 +214,7 @@ class MainWindow_approximately_align(QMainWindow):
self.progressbar = None
PublicFunc.add_progressbar(self)
#初始化画框
# 初始化画框
self.fig = None
self.canvas = None
@ -231,7 +238,7 @@ class MainWindow_approximately_align(QMainWindow):
PublicFunc.__resetAllButton__(self, ButtonState)
self.ui.groupBox_align_position.setEnabled(False)
# self.ui.groupBox_align_position.setEnabled(False)
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
@ -295,7 +302,6 @@ class MainWindow_approximately_align(QMainWindow):
self.ui.radioButton_PTHO.setText("备选1")
self.ui.radioButton_NABD.setText("备选4")
self.ui.radioButton_NTHO.setText("备选2")
self.ui.groupBox_align_position.setEnabled(False)
self.ui.spinBox_SelectEpoch.setMinimum(0)
def __plot__(self, *args, **kwargs):
@ -357,14 +363,14 @@ class MainWindow_approximately_align(QMainWindow):
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
orgBcg_minutes = round(self.data.raw_orgBcg.shape[0] / Config["InputConfig"]["orgBcgFreq"] / 60)
PSG_minutes = round(self.data.raw_Tho.shape[0] / Config["InputConfig"]["ThoFreq"] / 60)
orgBcg_seconds = round(self.data.raw_orgBcg.shape[0] / Config["InputConfig"]["orgBcgFreq"])
PSG_seconds = round(self.data.raw_Tho.shape[0] / Config["InputConfig"]["ThoFreq"])
Config.update({
"orgBcg_minutes": orgBcg_minutes,
"PSG_minutes": PSG_minutes
"orgBcg_seconds": orgBcg_seconds,
"PSG_seconds": PSG_seconds
})
self.ui.label_orgBcg_length.setText(str(orgBcg_minutes))
self.ui.label_PSG_length.setText(str(PSG_minutes))
self.ui.label_orgBcg_length.setText(str(orgBcg_seconds))
self.ui.label_PSG_length.setText(str(PSG_seconds))
self.__reset__()
PublicFunc.finish_operation(self, ButtonState)
@ -474,14 +480,15 @@ class MainWindow_approximately_align(QMainWindow):
PublicFunc.__disableAllButton__(self, ButtonState)
Config["orgBcgConfig"].update({"PreA": self.ui.spinBox_orgBcgPreA.value(),
"PreCut": self.ui.spinBox_orgBcgPreCut.value(),
"PostCut": self.ui.spinBox_orgBcgPostCut.value()})
"PreCut": self.ui.spinBox_orgBcgPreCut.value(),
"PostCut": self.ui.spinBox_orgBcgPostCut.value()})
Config["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(),
"PreCut": self.ui.spinBox_PSGPreCut.value(),
"PostCut": self.ui.spinBox_PSGPostCut.value()})
"PreCut": self.ui.spinBox_PSGPreCut.value(),
"PostCut": self.ui.spinBox_PSGPostCut.value()})
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
result = self.__plot__()
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
@ -489,8 +496,8 @@ class MainWindow_approximately_align(QMainWindow):
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_GetPos"] = True
ButtonState["Current"]["pushButton_GetPos"] = True
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_GetPosition__(self):
@ -520,7 +527,8 @@ class MainWindow_approximately_align(QMainWindow):
# 绘图
PublicFunc.progressbar_update(self, 3, 4, Constants.DRAWING_DATA, 50)
result = self.__plot__(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"], result2.data["abd_relate2"])
result = self.__plot__(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"],
result2.data["abd_relate2"])
if not result.status:
PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
@ -531,7 +539,8 @@ class MainWindow_approximately_align(QMainWindow):
# 计算最大值位置
PublicFunc.progressbar_update(self, 4, 4, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 90)
result = self.data.calculate_maxvalue_pos(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"], result2.data["abd_relate2"])
result = self.data.calculate_maxvalue_pos(result1.data["tho_relate"], result1.data["tho_relate2"],
result2.data["abd_relate"], result2.data["abd_relate2"])
if not result.status:
PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
@ -544,7 +553,12 @@ class MainWindow_approximately_align(QMainWindow):
self.ui.radioButton_PTHO.setText(str(result.data["tho_max"] + result.data["bias"]))
self.ui.radioButton_NABD.setText(str(result.data["abd_max2"] + result.data["bias"]))
self.ui.radioButton_NTHO.setText(str(result.data["tho_max2"] + result.data["bias"]))
self.ui.groupBox_align_position.setEnabled(True)
ButtonState["Current"]["radioButton_PTHO"] = True
ButtonState["Current"]["radioButton_PABD"] = True
ButtonState["Current"]["radioButton_NTHO"] = True
ButtonState["Current"]["radioButton_NABD"] = True
ButtonState["Current"]["radioButton_custom"] = True
# self.ui.groupBox_align_position.setEnabled(True)
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_jump__(self):
@ -638,7 +652,8 @@ class MainWindow_approximately_align(QMainWindow):
self.ui.spinBox_SelectEpoch.setMinimum(result3.data["epoch_min"])
self.ui.spinBox_SelectEpoch.setMaximum(result3.data["epoch_max"])
self.ui.spinBox_SelectEpoch.setValue(result3.data["epoch_min"])
self.ui.spinBox_SelectEpoch.setToolTip("最小值:{}\n最大值:{}".format(result3.data["epoch_min"], result3.data["epoch_max"]))
self.ui.spinBox_SelectEpoch.setToolTip(
"最小值:{}\n最大值:{}".format(result3.data["epoch_min"], result3.data["epoch_max"]))
self.ui.spinBox_SelectEpoch.setEnabled(True)
ButtonState["Current"]["pushButton_JUMP"] = True
ButtonState["Current"]["pushButton_EM1"] = True
@ -648,23 +663,25 @@ class MainWindow_approximately_align(QMainWindow):
ButtonState["Current"]["pushButton_EP10"] = True
ButtonState["Current"]["pushButton_EP100"] = True
ButtonState["Current"]["pushButton_save"] = True
ButtonState["Current"]["pushButton_ChangeView"] = True
PublicFunc.finish_operation(self, ButtonState)
def DrawPicRawOverview(self):
try:
max_x = max(self.data.processed_Tho.shape[0], self.data.processed_Abd.shape[0], self.data.processed_orgBcg.shape[0])
max_x = max(self.data.processed_downsample_Tho.shape[0], self.data.processed_downsample_Abd.shape[0],
self.data.processed_downsample_orgBcg.shape[0])
ax1 = self.fig.add_subplot(311)
ax1.plot(self.data.processed_Tho, color='blue')
ax1.plot(self.data.processed_downsample_Tho, color='blue')
ax1.set_xlim(0, max_x)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(self.data.processed_orgBcg, color='blue')
ax2.plot(self.data.processed_downsample_orgBcg, color='blue')
ax2.set_xlim(0, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(self.data.processed_Abd, color='blue')
ax3.plot(self.data.processed_downsample_Abd, color='blue')
ax3.set_xlim(0, max_x)
ax3.set_title("ABD")
@ -676,19 +693,20 @@ class MainWindow_approximately_align(QMainWindow):
def DrawPicOverviewWithCutOff(self):
try:
max_x = max(self.data.processed_Tho.shape[0] + Config["PSGConfig"]["PreA"],
self.data.processed_orgBcg.shape[0] + Config["orgBcgConfig"]["PreA"])
max_x = max(self.data.processed_downsample_Tho.shape[0] + Config["PSGConfig"]["PreA"],
self.data.processed_downsample_orgBcg.shape[0] + Config["orgBcgConfig"]["PreA"])
min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"], 0)
ax1 = self.fig.add_subplot(311)
ax1.plot(
linspace(Config["PSGConfig"]["PreA"],
len(self.data.processed_Tho) + Config["PSGConfig"]["PreA"],
len(self.data.processed_Tho)), self.data.processed_Tho, color='blue')
len(self.data.processed_downsample_Tho) + Config["PSGConfig"]["PreA"],
len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue')
# 绘制x = PreCut的线 和 x = PostCut的虚线
ax1.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red',
linestyle='--')
ax1.axvline(
x=len(self.data.processed_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"]["PreA"],
x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][
"PreA"],
color='red', linestyle='--')
ax1.set_xlim(min_x, max_x)
@ -696,24 +714,29 @@ class MainWindow_approximately_align(QMainWindow):
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(
linspace(Config["orgBcgConfig"]["PreA"], len(self.data.processed_orgBcg) + Config["orgBcgConfig"]["PreA"],
len(self.data.processed_orgBcg)), self.data.processed_orgBcg, color='blue')
linspace(Config["orgBcgConfig"]["PreA"],
len(self.data.processed_downsample_orgBcg) + Config["orgBcgConfig"]["PreA"],
len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg,
color='blue')
ax2.axvline(x=Config["orgBcgConfig"]["PreCut"] + Config["orgBcgConfig"]["PreA"], color='red',
linestyle='--')
ax2.axvline(x=len(self.data.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"] + Config["orgBcgConfig"]["PreA"],
color='red', linestyle='--')
ax2.axvline(
x=len(self.data.processed_downsample_orgBcg) - Config["orgBcgConfig"]["PostCut"] +
Config["orgBcgConfig"]["PreA"],
color='red', linestyle='--')
ax2.set_xlim(min_x, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(
linspace(Config["PSGConfig"]["PreA"],
len(self.data.processed_Abd) + Config["PSGConfig"]["PreA"],
len(self.data.processed_Abd)), self.data.processed_Abd, color='blue')
len(self.data.processed_downsample_Abd) + Config["PSGConfig"]["PreA"],
len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue')
ax3.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red',
linestyle='--')
ax3.axvline(
x=len(self.data.processed_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"]["PreA"],
x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][
"PreA"],
color='red', linestyle='--')
ax3.set_xlim(min_x, max_x)
ax3.set_title("ABD")
@ -750,29 +773,30 @@ class MainWindow_approximately_align(QMainWindow):
def DrawPicTryAlign(self):
try:
max_x = max(self.data.processed_Tho.shape[0],
self.data.processed_orgBcg.shape[0] + Config["pos"])
max_x = max(self.data.processed_downsample_Tho.shape[0],
self.data.processed_downsample_orgBcg.shape[0] + Config["pos"])
min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"] + Config["pos"], 0)
ax1 = self.fig.add_subplot(311)
ax1.plot(
linspace(0, len(self.data.processed_Tho),
len(self.data.processed_Tho)), self.data.processed_Tho, color='blue')
linspace(0, len(self.data.processed_downsample_Tho),
len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue')
# 绘制x = PreCut的线 和 x = PostCut的虚线
ax1.set_xlim(min_x, max_x)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(linspace(Config["pos"],
len(self.data.processed_orgBcg) + Config["pos"],
len(self.data.processed_orgBcg)), self.data.processed_orgBcg, color='blue')
len(self.data.processed_downsample_orgBcg) + Config["pos"],
len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg,
color='blue')
ax2.set_xlim(min_x, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(
linspace(0, len(self.data.processed_Abd),
len(self.data.processed_Abd)), self.data.processed_Abd, color='blue')
linspace(0, len(self.data.processed_downsample_Abd),
len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue')
ax3.set_xlim(min_x, max_x)
ax3.set_title("ABD")
@ -790,9 +814,9 @@ class MainWindow_approximately_align(QMainWindow):
orgBcg_SP = PSG_SP - Config["pos"]
orgBcg_EP = PSG_EP - Config["pos"]
tho_seg = self.data.processed_Tho[PSG_SP:PSG_EP]
orgBcg_seg = self.data.processed_orgBcg[orgBcg_SP:orgBcg_EP] * Config["orgBcg_reverse"]
abd_seg = self.data.processed_Abd[PSG_SP:PSG_EP]
tho_seg = self.data.processed_downsample_Tho[PSG_SP:PSG_EP]
orgBcg_seg = self.data.processed_downsample_orgBcg[orgBcg_SP:orgBcg_EP] * Config["orgBcg_reverse"]
abd_seg = self.data.processed_downsample_Abd[PSG_SP:PSG_EP]
# 根据PSG来和绘制
ax1 = self.fig.add_subplot(321)
@ -842,6 +866,9 @@ class MainWindow_approximately_align(QMainWindow):
# 返回图片以便存到QPixImage
return Result().success(info=Constants.DRAWING_FINISHED)
def DrawAlignScatter(self):
pass
class Data:
@ -852,22 +879,90 @@ class Data:
self.processed_orgBcg = None
self.processed_Tho = None
self.processed_Abd = None
self.processed_downsample_orgBcg = None
self.processed_downsample_Tho = None
self.processed_downsample_Abd = None
self.relate_list = None
self.relate_point = None
def open_file(self):
if not Path(Config["Path"]["Input_orgBcg"]).exists():
return Result().failure(
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Path_Not_Exist"])
temp_orgBcg_path = list(
Path(Config["Path"]["Input_orgBcg"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME}*"))
if len(temp_orgBcg_path) == 0:
return Result().failure(
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_Not_Exist"])
elif len(temp_orgBcg_path) > 1:
return Result().failure(
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_More_Than_One"])
else:
orgBcg_path = temp_orgBcg_path[0]
Config["Path"]["Input_orgBcg"] = str(orgBcg_path)
orgBcg_freq = orgBcg_path.stem.split("_")[-1]
# 验证是否为整数或是否存在
if not orgBcg_freq.isdigit():
return Result().failure(
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Frequency_Not_In_Filename"])
Config["InputConfig"]["orgBcgFreq"] = int(orgBcg_freq)
if not Path(Config["Path"]["Input_Tho"]).exists():
return Result().failure(
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Path_Not_Exist"])
temp_Tho_path = list(
Path(Config["Path"]["Input_Tho"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_THO_FILENAME}*"))
if len(temp_Tho_path) == 0:
return Result().failure(
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_Not_Exist"])
elif len(temp_Tho_path) > 1:
return Result().failure(
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_More_Than_One"])
else:
tho_path = temp_Tho_path[0]
Config["Path"]["Input_Tho"] = str(tho_path)
tho_freq = tho_path.stem.split("_")[-1]
# 验证是否为整数或是否存在
if not tho_freq.isdigit():
return Result().failure(
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Frequency_Not_In_Filename"])
Config["InputConfig"]["ThoFreq"] = int(tho_freq)
if not Path(Config["Path"]["Input_Abd"]).exists():
return Result().failure(
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Path_Not_Exist"])
temp_Abd_path = list(
Path(Config["Path"]["Input_Abd"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_ABD_FILENAME}*"))
if len(temp_Abd_path) == 0:
return Result().failure(
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_Not_Exist"])
elif len(temp_Abd_path) > 1:
return Result().failure(
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_More_Than_One"])
else:
abd_path = temp_Abd_path[0]
Config["Path"]["Input_Abd"] = str(abd_path)
abd_freq = abd_path.stem.split("_")[-1]
# 验证是否为整数或是否存在
if not abd_freq.isdigit():
return Result().failure(
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Frequency_Not_In_Filename"])
Config["InputConfig"]["AbdFreq"] = int(abd_freq)
try:
self.raw_orgBcg = read_csv(Config["Path"]["Input_orgBcg"],
encoding=ConfigParams.UTF8_ENCODING,
@ -876,10 +971,11 @@ class Data:
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
self.raw_Abd = read_csv(Config["Path"]["Input_Abd"],
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception as e:
return Result().failure(info=Constants.INPUT_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Read_Data_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.INPUT_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Read_Data_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.INPUT_FINISHED)
@ -891,20 +987,31 @@ class Data:
df = DataFrame({"pos": [pos], "epoch": [epoch], "ApplyFrequency": [ApplyFrequency]})
df.to_csv(Path(Config["Path"]["Save"]), mode="w", header=True, index=False)
except Exception as e:
return Result().failure(info=Constants.SAVING_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.SAVING_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Save_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.SAVING_FINISHED)
def Standardize_0(self):
# 仅重采样
if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None:
return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Raw_Data_Not_Exist"])
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Raw_Data_Not_Exist"])
try:
self.processed_orgBcg = resample(self.raw_orgBcg, int(Config["orgBcg_minutes"] * 60 * Config["ApplyFrequency"]))
self.processed_Tho = resample(self.raw_Tho, int(Config["PSG_minutes"] * 60 * Config["ApplyFrequency"]))
self.processed_Abd = resample(self.raw_Abd, int(Config["PSG_minutes"] * 60 * Config["ApplyFrequency"]))
# 按照秒数进行截断
self.raw_orgBcg = self.raw_orgBcg[:int(Config["orgBcg_seconds"] * Config["InputConfig"]["orgBcgFreq"])]
self.raw_Tho = self.raw_Tho[:int(Config["PSG_seconds"] * Config["InputConfig"]["ThoFreq"])]
self.raw_Abd = self.raw_Abd[:int(Config["PSG_seconds"] * Config["InputConfig"]["AbdFreq"])]
self.processed_orgBcg = resample(self.raw_orgBcg,
int(Config["orgBcg_seconds"] * Config["ApplyFrequency"]))
self.processed_Tho = resample(self.raw_Tho, int(Config["PSG_seconds"] * Config["ApplyFrequency"]))
self.processed_Abd = resample(self.raw_Abd, int(Config["PSG_seconds"] * Config["ApplyFrequency"]))
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Only_Resample_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Only_Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FINISHED)
@ -915,24 +1022,32 @@ class Data:
high = highCut / (fs * 0.5)
sos = butter(order, [low, high], btype="bandpass", output='sos')
return sosfiltfilt(sos, data)
if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None:
return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Raw_Data_Not_Exist"])
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Raw_Data_Not_Exist"])
try:
# 滤波
self.processed_orgBcg = butter_bandpass_filter(self.raw_orgBcg, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["orgBcgFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Tho = butter_bandpass_filter(self.raw_Tho, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["ThoFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Abd = butter_bandpass_filter(self.raw_Abd, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["AbdFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_orgBcg = butter_bandpass_filter(
self.raw_orgBcg, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["orgBcgFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Tho = butter_bandpass_filter(
self.raw_Tho, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["ThoFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Abd = butter_bandpass_filter(
self.raw_Abd, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["AbdFreq"],
Config["Filter"]["BandPassOrder"])
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_RESP_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Resp_Get_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_RESP_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Resp_Get_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_RESP_GET_FINISHED)
@ -940,27 +1055,24 @@ class Data:
# 预重采样
try:
# TODO这里的采样率处理如果THO和ABD的采样率不同可能还是会导致之后的ApplyFrequency出问题最后导致得到的粗同步坐标不正确
# 修改Config
if Config["InputConfig"]["ThoFreq"] < Config["InputConfig"]["AbdFreq"]:
Config.update({"TempFrequency": Config["InputConfig"]["ThoFreq"]})
self.processed_Abd = self.processed_Abd[::int(Config["InputConfig"]["AbdFreq"] / Config["TempFrequency"])]
elif Config["InputConfig"]["ThoFreq"] > Config["InputConfig"]["AbdFreq"]:
Config.update({"TempFrequency": Config["InputConfig"]["AbdFreq"]})
self.processed_Tho = self.processed_Tho[::int(Config["InputConfig"]["ThoFreq"] / Config["TempFrequency"])]
else:
Config.update({"TempFrequency": Config["InputConfig"]["ThoFreq"]})
#
if Config["InputConfig"]["ThoFreq"] != Config["TempFrequency"]:
print(int(Config["InputConfig"]["ThoFreq"]), int(Config["TempFrequency"]))
self.processed_Tho = resample(self.processed_Tho,
int(Config["PSG_seconds"] * Config["TempFrequency"]))
# 如果orgBcg采样率大于PSG采样率那么orgBcg重采样到PSG采样率
if Config["InputConfig"]["orgBcgFreq"] > Config["TempFrequency"]:
# 用[::]完成
self.processed_orgBcg = self.processed_orgBcg[::int(Config["InputConfig"]["orgBcgFreq"] / Config["TempFrequency"])]
# 如果orgBcg采样率小于PSG采样率那么orgBcg重采样到PSG采样率
elif Config["InputConfig"]["orgBcgFreq"] < Config["TempFrequency"]:
# 用repeat完成
self.processed_orgBcg = repeat(self.processed_orgBcg, int(Config["TempFrequency"] / Config["InputConfig"]["orgBcgFreq"]), axis=0)
if Config["InputConfig"]["AbdFreq"] != Config["TempFrequency"]:
self.processed_Abd = resample(self.processed_Abd,
int(Config["PSG_seconds"] * Config["TempFrequency"]))
if Config["InputConfig"]["orgBcgFreq"] != Config["TempFrequency"]:
self.processed_orgBcg = resample(self.processed_orgBcg,
int(Config["orgBcg_seconds"] * Config["TempFrequency"]))
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Pre_Resample_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Pre_Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FINISHED)
@ -970,18 +1082,17 @@ class Data:
temp_frequency = Config["TempFrequency"]
if Config["PSGConfig"]["PSGDelBase"]:
# 减去四秒钟平均滤波
self.processed_Tho = self.processed_Tho - convolve(self.processed_Tho,
ones(int(4 * temp_frequency)) / int(4 * temp_frequency),
mode='same')
self.processed_Abd = self.processed_Abd - convolve(self.processed_Abd,
ones(int(4 * temp_frequency)) / int(4 * temp_frequency),
mode='same')
self.processed_Tho = self.processed_Tho - convolve(
self.processed_Tho, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
self.processed_Abd = self.processed_Abd - convolve(
self.processed_Abd, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
if Config["orgBcgConfig"]["orgBcgDelBase"]:
self.processed_orgBcg = self.processed_orgBcg - convolve(self.processed_orgBcg,
ones(int(4 * temp_frequency)) / int(4 * temp_frequency),
mode='same')
self.processed_orgBcg = self.processed_orgBcg - convolve(
self.processed_orgBcg, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_DELETE_BASE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Delete_Base_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_DELETE_BASE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Delete_Base_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_DELETE_BASE_FINISHED)
@ -993,9 +1104,12 @@ class Data:
self.processed_Tho = (self.processed_Tho - mean(self.processed_Tho)) / std(self.processed_Tho)
self.processed_Abd = (self.processed_Abd - mean(self.processed_Abd)) / std(self.processed_Abd)
if Config["orgBcgConfig"]["orgBcgZScore"]:
self.processed_orgBcg = (self.processed_orgBcg - mean(self.processed_orgBcg)) / std(self.processed_orgBcg)
self.processed_orgBcg = (self.processed_orgBcg - mean(self.processed_orgBcg)) / std(
self.processed_orgBcg)
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_STANDARDIZE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Standardize_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_STANDARDIZE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Standardize_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_STANDARDIZE_FINISHED)
@ -1004,11 +1118,13 @@ class Data:
try:
# 用[::]完成
temp_frequency = Config["TempFrequency"]
self.processed_Tho = self.processed_Tho[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_Abd = self.processed_Abd[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_orgBcg = self.processed_orgBcg[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_downsample_Tho = self.processed_Tho[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_downsample_Abd = self.processed_Abd[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_downsample_orgBcg = self.processed_orgBcg[::int(temp_frequency / Config["ApplyFrequency"])]
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Resample_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FINISHED)
@ -1017,8 +1133,12 @@ class Data:
try:
# 计算因子
MULTIPLE_FACTOER = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["Multiple_Factor"]
a = self.processed_Tho[Config["PSGConfig"]["PreCut"]:len(self.processed_Tho) - Config["PSGConfig"]["PostCut"]].copy()
v = self.processed_orgBcg[Config["orgBcgConfig"]["PreCut"]:len(self.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"]].copy()
a = self.processed_downsample_Tho[
Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Tho) - Config["PSGConfig"][
"PostCut"]].copy()
v = self.processed_downsample_orgBcg[
Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][
"PostCut"]].copy()
a *= MULTIPLE_FACTOER
v *= MULTIPLE_FACTOER
a = a.astype(int64)
@ -1029,15 +1149,21 @@ class Data:
result = {"tho_relate": tho_relate, "tho_relate2": tho_relate2}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Correlation1_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FAILURE +
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Calculate_Correlation1_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FINISHED, data=result)
def calculate_correlation2(self):
# 计算互相关2/2
try:
a = self.processed_Abd[Config["PSGConfig"]["PreCut"]:len(self.processed_Abd) - Config["PSGConfig"]["PostCut"]].copy()
v = self.processed_orgBcg[Config["orgBcgConfig"]["PreCut"]:len(self.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"]].copy()
a = self.processed_downsample_Abd[
Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Abd) - Config["PSGConfig"][
"PostCut"]].copy()
v = self.processed_downsample_orgBcg[
Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][
"PostCut"]].copy()
a *= 100
v *= 100
a = a.astype(int64)
@ -1048,7 +1174,9 @@ class Data:
result = {"abd_relate": abd_relate, "abd_relate2": abd_relate2}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Correlation2_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FAILURE +
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Calculate_Correlation2_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FINISHED, data=result)
@ -1061,11 +1189,13 @@ class Data:
abd_max2 = argmax(abd_relate2)
pre = Config["PSGConfig"]["PreCut"] + Config["orgBcgConfig"]["PostCut"]
bias = pre - len(self.processed_orgBcg) + 1
bias = pre - len(self.processed_downsample_orgBcg) + 1
result = {"tho_max": tho_max, "tho_max2": tho_max2, "abd_max": abd_max, "abd_max2": abd_max2, "bias": bias}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Maxvalue_Pos_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FAILURE +
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Calculate_Maxvalue_Pos_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FINISHED, data=result)
@ -1073,12 +1203,61 @@ class Data:
# 获取epoch
try:
epoch_min = max(0, Config["pos"] // 30 // Config["ApplyFrequency"] + 1)
epoch_max = min(len(self.processed_Tho) // 30 // Config["ApplyFrequency"] - 1,
(len(self.processed_orgBcg) + Config["pos"]) // 30 // Config["ApplyFrequency"] - 1)
epoch_max = min(len(self.processed_downsample_Tho) // 30 // Config["ApplyFrequency"] - 1,
(len(self.processed_downsample_orgBcg) + Config["pos"]) // 30 // Config[
"ApplyFrequency"] - 1)
result = {"epoch_min": epoch_min, "epoch_max": epoch_max}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Get_Epoch_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Get_Epoch_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)
def get_corr_by_epoch(self):
# 获取相关系数
try:
# 获取 epoch 区间
response = self.get_epoch()
if not response.status:
raise Exception(response.info)
epoch_min = response.data["epoch_min"]
epoch_max = response.data["epoch_max"]
"""
bias_list = []
samp_rate4 = 100
for epoch in tqdm(range(epoch_min, epoch_max)):
win1 = 30 * epoch
win2 = win1 + 30 * 3
SP = win1 * samp_rate4
EP = win2 * samp_rate4
tho_seg = tho_pn * tho_low[SP: EP]
abd_seg = abd_pn * abd_low[SP: EP]
resp_seg = resp_pn * resp_low[SP - tho_pos *
samp_rate4: EP - tho_pos * samp_rate4]
abd_relate_seg = correlate(abd_seg, resp_seg, mode="full")
abd_seg_pos = np.argmax(abd_relate_seg) - len(resp_seg)
bias_list.append(abd_seg_pos)
plt.figure()
plt.scatter(np.linspace(0, len(bias_list), len(bias_list)), bias_list, alpha=0.2)
"""
for epoch in range(epoch_min, epoch_max):
SP = epoch * 30
result = None
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Get_Corr_By_Epoch"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)