Merge remote-tracking branch 'origin/master'

# Conflicts:
#	func/Module_approximately_align.py
This commit is contained in:
Yorusora
2025-05-20 15:26:43 +08:00
8 changed files with 604 additions and 246 deletions

View File

@ -20,7 +20,6 @@ from func.utils.Result import Result
from ui.MainWindow.MainWindow_approximately_align import Ui_MainWindow_approximately_align
from ui.setting.approximately_align_input_setting import Ui_MainWindow_approximately_align_input_setting
Config = {
}
@ -32,6 +31,7 @@ ButtonState = {
"pushButton_Standardize": False,
"pushButton_CutOff": False,
"pushButton_GetPos": False,
"pushButton_ChangeView": False,
"pushButton_JUMP": False,
"pushButton_EM1": False,
"pushButton_EM10": False,
@ -39,7 +39,12 @@ ButtonState = {
"pushButton_EP1": False,
"pushButton_EP10": False,
"pushButton_EP100": False,
"pushButton_save": False
"pushButton_save": False,
"radioButton_PTHO": False,
"radioButton_PABD": False,
"radioButton_NTHO": False,
"radioButton_NABD": False,
"radioButton_custom": False
},
"Current": {
"pushButton_input_setting": True,
@ -47,6 +52,7 @@ ButtonState = {
"pushButton_Standardize": False,
"pushButton_CutOff": False,
"pushButton_GetPos": False,
"pushButton_ChangeView": False,
"pushButton_JUMP": False,
"pushButton_EM1": False,
"pushButton_EM10": False,
@ -54,7 +60,12 @@ ButtonState = {
"pushButton_EP1": False,
"pushButton_EP10": False,
"pushButton_EP100": False,
"pushButton_save": False
"pushButton_save": False,
"radioButton_PTHO": False,
"radioButton_PABD": False,
"radioButton_NTHO": False,
"radioButton_NABD": False,
"radioButton_custom": False
}
}
@ -93,30 +104,26 @@ class SettingWindow(QMainWindow):
Config.update(file_config)
self.config = file_config
# 更新配置
Config.update({
"Path": {
"Input_orgBcg": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME +
str(Config["InputConfig"]["OrgBCGFreq"]) +
ConfigParams.ENDSWITH_TXT))),
Path(str(self.sampID)))),
"Input_Tho": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_THO_FILENAME +
str(Config["InputConfig"]["ThoFreq"]) +
ConfigParams.ENDSWITH_TXT))),
Path(str(self.sampID)))),
"Input_Abd": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ABD_FILENAME +
str(Config["InputConfig"]["AbdFreq"]) +
ConfigParams.ENDSWITH_TXT))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_LABEL /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_SAVE_FILENAME +
ConfigParams.ENDSWITH_CSV)))
Path(str(self.sampID)))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_SAVE_FILENAME +
ConfigParams.ENDSWITH_CSV)))
},
"orgBcgConfig": {},
"PSGConfig": {}
})
# 数据回显
self.ui.spinBox_input_orgBcg_freq.setValue(Config["InputConfig"]["OrgBCGFreq"])
self.ui.spinBox_input_orgBcg_freq.setValue(Config["InputConfig"]["orgBcgFreq"])
self.ui.spinBox_input_Tho_freq.setValue(Config["InputConfig"]["ThoFreq"])
self.ui.spinBox_input_Abd_freq.setValue(Config["InputConfig"]["AbdFreq"])
self.ui.spinBox_bandpassOrder.setValue(Config["Filter"]["BandPassOrder"])
@ -130,7 +137,7 @@ class SettingWindow(QMainWindow):
def __write_config__(self):
# 从界面写入配置
Config["InputConfig"]["OrgBCGFreq"] = self.ui.spinBox_input_orgBcg_freq.value()
Config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value()
Config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value()
Config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value()
Config["ApplyFrequency"] = self.ui.spinBox_display_freq.value()
@ -143,7 +150,7 @@ class SettingWindow(QMainWindow):
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
# 保存配置到文件
self.config["InputConfig"]["OrgBCGFreq"] = self.ui.spinBox_input_orgBcg_freq.value()
self.config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value()
self.config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value()
self.config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value()
self.config["ApplyFrequency"] = self.ui.spinBox_display_freq.value()
@ -207,7 +214,7 @@ class MainWindow_approximately_align(QMainWindow):
self.progressbar = None
PublicFunc.add_progressbar(self)
#初始化画框
# 初始化画框
self.fig = None
self.canvas = None
@ -231,13 +238,14 @@ class MainWindow_approximately_align(QMainWindow):
PublicFunc.__resetAllButton__(self, ButtonState)
self.ui.groupBox_align_position.setEnabled(False)
# self.ui.groupBox_align_position.setEnabled(False)
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
self.ui.pushButton_Standardize.clicked.connect(self.__slot_btn_Standardize__)
self.ui.pushButton_CutOff.clicked.connect(self.__slot_btn_CutOff__)
self.ui.pushButton_GetPos.clicked.connect(self.__slot_btn_GetPosition__)
self.ui.pushButton_ChangeView.clicked.connect(self.__slot_btn_changeview__)
self.ui.pushButton_JUMP.clicked.connect(self.__slot_btn_jump__)
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
self.ui.pushButton_EM1.clicked.connect(self.__EpochChange__)
@ -252,6 +260,7 @@ class MainWindow_approximately_align(QMainWindow):
self.ui.radioButton_NABD.clicked.connect(self.__enableAlign__)
self.ui.radioButton_custom.clicked.connect(self.__enableAlign__)
@overrides
def closeEvent(self, event):
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
@ -295,7 +304,6 @@ class MainWindow_approximately_align(QMainWindow):
self.ui.radioButton_PTHO.setText("备选1")
self.ui.radioButton_NABD.setText("备选4")
self.ui.radioButton_NTHO.setText("备选2")
self.ui.groupBox_align_position.setEnabled(False)
self.ui.spinBox_SelectEpoch.setMinimum(0)
def __plot__(self, *args, **kwargs):
@ -319,6 +327,8 @@ class MainWindow_approximately_align(QMainWindow):
result = self.DrawPicTryAlign()
elif sender == self.ui.radioButton_custom:
result = self.DrawPicTryAlign()
elif sender == self.ui.pushButton_ChangeView:
result = self.DrawAlignScatter()
elif sender == self.ui.pushButton_JUMP:
result = self.DrawPicByEpoch(*args, **kwargs)
elif sender == self.ui.pushButton_EM1:
@ -357,14 +367,14 @@ class MainWindow_approximately_align(QMainWindow):
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
orgBcg_minutes = round(self.data.raw_orgBcg.shape[0] / Config["InputConfig"]["OrgBCGFreq"] / 60)
PSG_minutes = round(self.data.raw_Tho.shape[0] / Config["InputConfig"]["ThoFreq"] / 60)
orgBcg_seconds = round(self.data.raw_orgBcg.shape[0] / Config["InputConfig"]["orgBcgFreq"])
PSG_seconds = round(self.data.raw_Tho.shape[0] / Config["InputConfig"]["ThoFreq"])
Config.update({
"orgBcg_minutes": orgBcg_minutes,
"PSG_minutes": PSG_minutes
"orgBcg_seconds": orgBcg_seconds,
"PSG_seconds": PSG_seconds
})
self.ui.label_orgBcg_length.setText(str(orgBcg_minutes))
self.ui.label_PSG_length.setText(str(PSG_minutes))
self.ui.label_orgBcg_length.setText(str(orgBcg_seconds))
self.ui.label_PSG_length.setText(str(PSG_seconds))
self.__reset__()
PublicFunc.finish_operation(self, ButtonState)
@ -474,14 +484,15 @@ class MainWindow_approximately_align(QMainWindow):
PublicFunc.__disableAllButton__(self, ButtonState)
Config["orgBcgConfig"].update({"PreA": self.ui.spinBox_orgBcgPreA.value(),
"PreCut": self.ui.spinBox_orgBcgPreCut.value(),
"PostCut": self.ui.spinBox_orgBcgPostCut.value()})
"PreCut": self.ui.spinBox_orgBcgPreCut.value(),
"PostCut": self.ui.spinBox_orgBcgPostCut.value()})
Config["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(),
"PreCut": self.ui.spinBox_PSGPreCut.value(),
"PostCut": self.ui.spinBox_PSGPostCut.value()})
"PreCut": self.ui.spinBox_PSGPreCut.value(),
"PostCut": self.ui.spinBox_PSGPostCut.value()})
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
result = self.__plot__()
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
@ -489,8 +500,8 @@ class MainWindow_approximately_align(QMainWindow):
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_GetPos"] = True
ButtonState["Current"]["pushButton_GetPos"] = True
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_GetPosition__(self):
@ -520,7 +531,8 @@ class MainWindow_approximately_align(QMainWindow):
# 绘图
PublicFunc.progressbar_update(self, 3, 4, Constants.DRAWING_DATA, 50)
result = self.__plot__(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"], result2.data["abd_relate2"])
result = self.__plot__(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"],
result2.data["abd_relate2"])
if not result.status:
PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
@ -531,7 +543,8 @@ class MainWindow_approximately_align(QMainWindow):
# 计算最大值位置
PublicFunc.progressbar_update(self, 4, 4, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 90)
result = self.data.calculate_maxvalue_pos(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"], result2.data["abd_relate2"])
result = self.data.calculate_maxvalue_pos(result1.data["tho_relate"], result1.data["tho_relate2"],
result2.data["abd_relate"], result2.data["abd_relate2"])
if not result.status:
PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
@ -544,7 +557,28 @@ class MainWindow_approximately_align(QMainWindow):
self.ui.radioButton_PTHO.setText(str(result.data["tho_max"] + result.data["bias"]))
self.ui.radioButton_NABD.setText(str(result.data["abd_max2"] + result.data["bias"]))
self.ui.radioButton_NTHO.setText(str(result.data["tho_max2"] + result.data["bias"]))
self.ui.groupBox_align_position.setEnabled(True)
ButtonState["Current"]["radioButton_PTHO"] = True
ButtonState["Current"]["radioButton_PABD"] = True
ButtonState["Current"]["radioButton_NTHO"] = True
ButtonState["Current"]["radioButton_NABD"] = True
ButtonState["Current"]["radioButton_custom"] = True
# self.ui.groupBox_align_position.setEnabled(True)
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_changeview__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 绘图
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
result = self.__plot__()
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_jump__(self):
@ -638,7 +672,8 @@ class MainWindow_approximately_align(QMainWindow):
self.ui.spinBox_SelectEpoch.setMinimum(result3.data["epoch_min"])
self.ui.spinBox_SelectEpoch.setMaximum(result3.data["epoch_max"])
self.ui.spinBox_SelectEpoch.setValue(result3.data["epoch_min"])
self.ui.spinBox_SelectEpoch.setToolTip("最小值:{}\n最大值:{}".format(result3.data["epoch_min"], result3.data["epoch_max"]))
self.ui.spinBox_SelectEpoch.setToolTip(
"最小值:{}\n最大值:{}".format(result3.data["epoch_min"], result3.data["epoch_max"]))
self.ui.spinBox_SelectEpoch.setEnabled(True)
ButtonState["Current"]["pushButton_JUMP"] = True
ButtonState["Current"]["pushButton_EM1"] = True
@ -648,23 +683,25 @@ class MainWindow_approximately_align(QMainWindow):
ButtonState["Current"]["pushButton_EP10"] = True
ButtonState["Current"]["pushButton_EP100"] = True
ButtonState["Current"]["pushButton_save"] = True
ButtonState["Current"]["pushButton_ChangeView"] = True
PublicFunc.finish_operation(self, ButtonState)
def DrawPicRawOverview(self):
try:
max_x = max(self.data.processed_Tho.shape[0], self.data.processed_Abd.shape[0], self.data.processed_orgBcg.shape[0])
max_x = max(self.data.processed_downsample_Tho.shape[0], self.data.processed_downsample_Abd.shape[0],
self.data.processed_downsample_orgBcg.shape[0])
ax1 = self.fig.add_subplot(311)
ax1.plot(self.data.processed_Tho, color='blue')
ax1.plot(self.data.processed_downsample_Tho, color='blue')
ax1.set_xlim(0, max_x)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(self.data.processed_orgBcg, color='blue')
ax2.plot(self.data.processed_downsample_orgBcg, color='blue')
ax2.set_xlim(0, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(self.data.processed_Abd, color='blue')
ax3.plot(self.data.processed_downsample_Abd, color='blue')
ax3.set_xlim(0, max_x)
ax3.set_title("ABD")
@ -676,19 +713,20 @@ class MainWindow_approximately_align(QMainWindow):
def DrawPicOverviewWithCutOff(self):
try:
max_x = max(self.data.processed_Tho.shape[0] + Config["PSGConfig"]["PreA"],
self.data.processed_orgBcg.shape[0] + Config["orgBcgConfig"]["PreA"])
max_x = max(self.data.processed_downsample_Tho.shape[0] + Config["PSGConfig"]["PreA"],
self.data.processed_downsample_orgBcg.shape[0] + Config["orgBcgConfig"]["PreA"])
min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"], 0)
ax1 = self.fig.add_subplot(311)
ax1.plot(
linspace(Config["PSGConfig"]["PreA"],
len(self.data.processed_Tho) + Config["PSGConfig"]["PreA"],
len(self.data.processed_Tho)), self.data.processed_Tho, color='blue')
len(self.data.processed_downsample_Tho) + Config["PSGConfig"]["PreA"],
len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue')
# 绘制x = PreCut的线 和 x = PostCut的虚线
ax1.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red',
linestyle='--')
ax1.axvline(
x=len(self.data.processed_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"]["PreA"],
x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][
"PreA"],
color='red', linestyle='--')
ax1.set_xlim(min_x, max_x)
@ -696,24 +734,29 @@ class MainWindow_approximately_align(QMainWindow):
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(
linspace(Config["orgBcgConfig"]["PreA"], len(self.data.processed_orgBcg) + Config["orgBcgConfig"]["PreA"],
len(self.data.processed_orgBcg)), self.data.processed_orgBcg, color='blue')
linspace(Config["orgBcgConfig"]["PreA"],
len(self.data.processed_downsample_orgBcg) + Config["orgBcgConfig"]["PreA"],
len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg,
color='blue')
ax2.axvline(x=Config["orgBcgConfig"]["PreCut"] + Config["orgBcgConfig"]["PreA"], color='red',
linestyle='--')
ax2.axvline(x=len(self.data.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"] + Config["orgBcgConfig"]["PreA"],
color='red', linestyle='--')
ax2.axvline(
x=len(self.data.processed_downsample_orgBcg) - Config["orgBcgConfig"]["PostCut"] +
Config["orgBcgConfig"]["PreA"],
color='red', linestyle='--')
ax2.set_xlim(min_x, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(
linspace(Config["PSGConfig"]["PreA"],
len(self.data.processed_Abd) + Config["PSGConfig"]["PreA"],
len(self.data.processed_Abd)), self.data.processed_Abd, color='blue')
len(self.data.processed_downsample_Abd) + Config["PSGConfig"]["PreA"],
len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue')
ax3.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red',
linestyle='--')
ax3.axvline(
x=len(self.data.processed_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"]["PreA"],
x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][
"PreA"],
color='red', linestyle='--')
ax3.set_xlim(min_x, max_x)
ax3.set_title("ABD")
@ -750,29 +793,30 @@ class MainWindow_approximately_align(QMainWindow):
def DrawPicTryAlign(self):
try:
max_x = max(self.data.processed_Tho.shape[0],
self.data.processed_orgBcg.shape[0] + Config["pos"])
max_x = max(self.data.processed_downsample_Tho.shape[0],
self.data.processed_downsample_orgBcg.shape[0] + Config["pos"])
min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"] + Config["pos"], 0)
ax1 = self.fig.add_subplot(311)
ax1.plot(
linspace(0, len(self.data.processed_Tho),
len(self.data.processed_Tho)), self.data.processed_Tho, color='blue')
linspace(0, len(self.data.processed_downsample_Tho),
len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue')
# 绘制x = PreCut的线 和 x = PostCut的虚线
ax1.set_xlim(min_x, max_x)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(linspace(Config["pos"],
len(self.data.processed_orgBcg) + Config["pos"],
len(self.data.processed_orgBcg)), self.data.processed_orgBcg, color='blue')
len(self.data.processed_downsample_orgBcg) + Config["pos"],
len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg,
color='blue')
ax2.set_xlim(min_x, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(
linspace(0, len(self.data.processed_Abd),
len(self.data.processed_Abd)), self.data.processed_Abd, color='blue')
linspace(0, len(self.data.processed_downsample_Abd),
len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue')
ax3.set_xlim(min_x, max_x)
ax3.set_title("ABD")
@ -790,9 +834,9 @@ class MainWindow_approximately_align(QMainWindow):
orgBcg_SP = PSG_SP - Config["pos"]
orgBcg_EP = PSG_EP - Config["pos"]
tho_seg = self.data.processed_Tho[PSG_SP:PSG_EP]
orgBcg_seg = self.data.processed_orgBcg[orgBcg_SP:orgBcg_EP] * Config["orgBcg_reverse"]
abd_seg = self.data.processed_Abd[PSG_SP:PSG_EP]
tho_seg = self.data.processed_downsample_Tho[PSG_SP:PSG_EP]
orgBcg_seg = self.data.processed_downsample_orgBcg[orgBcg_SP:orgBcg_EP] * Config["orgBcg_reverse"]
abd_seg = self.data.processed_downsample_Abd[PSG_SP:PSG_EP]
# 根据PSG来和绘制
ax1 = self.fig.add_subplot(321)
@ -842,6 +886,29 @@ class MainWindow_approximately_align(QMainWindow):
# 返回图片以便存到QPixImage
return Result().success(info=Constants.DRAWING_FINISHED)
def DrawAlignScatter(self):
try:
response = self.data.get_corr_by_epoch()
epoch_min = response.data["epoch_min"]
epoch_max = response.data["epoch_max"]
tho_bias_list = response.data["tho_bias_list"]
abd_bias_list = response.data["abd_bias_list"]
ax1 = self.fig.add_subplot(211)
ax1.scatter(linspace(epoch_min, epoch_max, len(tho_bias_list)), tho_bias_list, alpha=0.2)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(212)
ax2.scatter(linspace(epoch_min, epoch_max, len(abd_bias_list)), abd_bias_list, alpha=0.2)
ax2.set_title("ABD")
self.fig.canvas.draw()
return Result().success(info=Constants.DRAWING_FINISHED)
except Exception as e:
return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc())
class Data:
@ -852,22 +919,90 @@ class Data:
self.processed_orgBcg = None
self.processed_Tho = None
self.processed_Abd = None
self.processed_downsample_orgBcg = None
self.processed_downsample_Tho = None
self.processed_downsample_Abd = None
self.relate_list = None
self.relate_point = None
def open_file(self):
if not Path(Config["Path"]["Input_orgBcg"]).exists():
return Result().failure(
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Path_Not_Exist"])
temp_orgBcg_path = list(
Path(Config["Path"]["Input_orgBcg"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME}*"))
if len(temp_orgBcg_path) == 0:
return Result().failure(
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_Not_Exist"])
elif len(temp_orgBcg_path) > 1:
return Result().failure(
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_More_Than_One"])
else:
orgBcg_path = temp_orgBcg_path[0]
Config["Path"]["Input_orgBcg"] = str(orgBcg_path)
orgBcg_freq = orgBcg_path.stem.split("_")[-1]
# 验证是否为整数或是否存在
if not orgBcg_freq.isdigit():
return Result().failure(
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Frequency_Not_In_Filename"])
Config["InputConfig"]["orgBcgFreq"] = int(orgBcg_freq)
if not Path(Config["Path"]["Input_Tho"]).exists():
return Result().failure(
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Path_Not_Exist"])
temp_Tho_path = list(
Path(Config["Path"]["Input_Tho"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_THO_FILENAME}*"))
if len(temp_Tho_path) == 0:
return Result().failure(
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_Not_Exist"])
elif len(temp_Tho_path) > 1:
return Result().failure(
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_More_Than_One"])
else:
tho_path = temp_Tho_path[0]
Config["Path"]["Input_Tho"] = str(tho_path)
tho_freq = tho_path.stem.split("_")[-1]
# 验证是否为整数或是否存在
if not tho_freq.isdigit():
return Result().failure(
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Frequency_Not_In_Filename"])
Config["InputConfig"]["ThoFreq"] = int(tho_freq)
if not Path(Config["Path"]["Input_Abd"]).exists():
return Result().failure(
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Path_Not_Exist"])
temp_Abd_path = list(
Path(Config["Path"]["Input_Abd"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_ABD_FILENAME}*"))
if len(temp_Abd_path) == 0:
return Result().failure(
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_Not_Exist"])
elif len(temp_Abd_path) > 1:
return Result().failure(
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_File_More_Than_One"])
else:
abd_path = temp_Abd_path[0]
Config["Path"]["Input_Abd"] = str(abd_path)
abd_freq = abd_path.stem.split("_")[-1]
# 验证是否为整数或是否存在
if not abd_freq.isdigit():
return Result().failure(
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Data_Frequency_Not_In_Filename"])
Config["InputConfig"]["AbdFreq"] = int(abd_freq)
try:
self.raw_orgBcg = read_csv(Config["Path"]["Input_orgBcg"],
encoding=ConfigParams.UTF8_ENCODING,
@ -876,10 +1011,11 @@ class Data:
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
self.raw_Abd = read_csv(Config["Path"]["Input_Abd"],
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception as e:
return Result().failure(info=Constants.INPUT_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Read_Data_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.INPUT_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Read_Data_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.INPUT_FINISHED)
@ -891,20 +1027,31 @@ class Data:
df = DataFrame({"pos": [pos], "epoch": [epoch], "ApplyFrequency": [ApplyFrequency]})
df.to_csv(Path(Config["Path"]["Save"]), mode="w", header=True, index=False)
except Exception as e:
return Result().failure(info=Constants.SAVING_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Save_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.SAVING_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Save_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.SAVING_FINISHED)
def Standardize_0(self):
# 仅重采样
if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None:
return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Raw_Data_Not_Exist"])
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Raw_Data_Not_Exist"])
try:
self.processed_orgBcg = resample(self.raw_orgBcg, int(Config["orgBcg_minutes"] * 60 * Config["ApplyFrequency"]))
self.processed_Tho = resample(self.raw_Tho, int(Config["PSG_minutes"] * 60 * Config["ApplyFrequency"]))
self.processed_Abd = resample(self.raw_Abd, int(Config["PSG_minutes"] * 60 * Config["ApplyFrequency"]))
# 按照秒数进行截断
self.raw_orgBcg = self.raw_orgBcg[:int(Config["orgBcg_seconds"] * Config["InputConfig"]["orgBcgFreq"])]
self.raw_Tho = self.raw_Tho[:int(Config["PSG_seconds"] * Config["InputConfig"]["ThoFreq"])]
self.raw_Abd = self.raw_Abd[:int(Config["PSG_seconds"] * Config["InputConfig"]["AbdFreq"])]
self.processed_orgBcg = resample(self.raw_orgBcg,
int(Config["orgBcg_seconds"] * Config["ApplyFrequency"]))
self.processed_Tho = resample(self.raw_Tho, int(Config["PSG_seconds"] * Config["ApplyFrequency"]))
self.processed_Abd = resample(self.raw_Abd, int(Config["PSG_seconds"] * Config["ApplyFrequency"]))
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Only_Resample_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Only_Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FINISHED)
@ -915,24 +1062,32 @@ class Data:
high = highCut / (fs * 0.5)
sos = butter(order, [low, high], btype="bandpass", output='sos')
return sosfiltfilt(sos, data)
if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None:
return Result().failure(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Raw_Data_Not_Exist"])
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Raw_Data_Not_Exist"])
try:
# 滤波
self.processed_orgBcg = butter_bandpass_filter(self.raw_orgBcg, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["OrgBCGFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Tho = butter_bandpass_filter(self.raw_Tho, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["ThoFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Abd = butter_bandpass_filter(self.raw_Abd, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["AbdFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_orgBcg = butter_bandpass_filter(
self.raw_orgBcg, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["orgBcgFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Tho = butter_bandpass_filter(
self.raw_Tho, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["ThoFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Abd = butter_bandpass_filter(
self.raw_Abd, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["AbdFreq"],
Config["Filter"]["BandPassOrder"])
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_RESP_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Resp_Get_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_RESP_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Resp_Get_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_RESP_GET_FINISHED)
@ -940,27 +1095,24 @@ class Data:
# 预重采样
try:
# TODO这里的采样率处理如果THO和ABD的采样率不同可能还是会导致之后的ApplyFrequency出问题最后导致得到的粗同步坐标不正确
# 修改Config
if Config["InputConfig"]["ThoFreq"] < Config["InputConfig"]["AbdFreq"]:
Config.update({"TempFrequency": Config["InputConfig"]["ThoFreq"]})
self.processed_Abd = self.processed_Abd[::int(Config["InputConfig"]["AbdFreq"] / Config["TempFrequency"])]
elif Config["InputConfig"]["ThoFreq"] > Config["InputConfig"]["AbdFreq"]:
Config.update({"TempFrequency": Config["InputConfig"]["AbdFreq"]})
self.processed_Tho = self.processed_Tho[::int(Config["InputConfig"]["ThoFreq"] / Config["TempFrequency"])]
else:
Config.update({"TempFrequency": Config["InputConfig"]["ThoFreq"]})
#
if Config["InputConfig"]["ThoFreq"] != Config["TempFrequency"]:
print(int(Config["InputConfig"]["ThoFreq"]), int(Config["TempFrequency"]))
self.processed_Tho = resample(self.processed_Tho,
int(Config["PSG_seconds"] * Config["TempFrequency"]))
# 如果orgBcg采样率大于PSG采样率那么orgBcg重采样到PSG采样率
if Config["InputConfig"]["OrgBCGFreq"] > Config["TempFrequency"]:
# 用[::]完成
self.processed_orgBcg = self.processed_orgBcg[::int(Config["InputConfig"]["OrgBCGFreq"] / Config["TempFrequency"])]
# 如果orgBcg采样率小于PSG采样率那么orgBcg重采样到PSG采样率
elif Config["InputConfig"]["OrgBCGFreq"] < Config["TempFrequency"]:
# 用repeat完成
self.processed_orgBcg = repeat(self.processed_orgBcg, int(Config["TempFrequency"] / Config["InputConfig"]["OrgBCGFreq"]), axis=0)
if Config["InputConfig"]["AbdFreq"] != Config["TempFrequency"]:
self.processed_Abd = resample(self.processed_Abd,
int(Config["PSG_seconds"] * Config["TempFrequency"]))
if Config["InputConfig"]["orgBcgFreq"] != Config["TempFrequency"]:
self.processed_orgBcg = resample(self.processed_orgBcg,
int(Config["orgBcg_seconds"] * Config["TempFrequency"]))
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Pre_Resample_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Pre_Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FINISHED)
@ -970,18 +1122,17 @@ class Data:
temp_frequency = Config["TempFrequency"]
if Config["PSGConfig"]["PSGDelBase"]:
# 减去四秒钟平均滤波
self.processed_Tho = self.processed_Tho - convolve(self.processed_Tho,
ones(int(4 * temp_frequency)) / int(4 * temp_frequency),
mode='same')
self.processed_Abd = self.processed_Abd - convolve(self.processed_Abd,
ones(int(4 * temp_frequency)) / int(4 * temp_frequency),
mode='same')
self.processed_Tho = self.processed_Tho - convolve(
self.processed_Tho, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
self.processed_Abd = self.processed_Abd - convolve(
self.processed_Abd, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
if Config["orgBcgConfig"]["orgBcgDelBase"]:
self.processed_orgBcg = self.processed_orgBcg - convolve(self.processed_orgBcg,
ones(int(4 * temp_frequency)) / int(4 * temp_frequency),
mode='same')
self.processed_orgBcg = self.processed_orgBcg - convolve(
self.processed_orgBcg, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_DELETE_BASE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Delete_Base_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_DELETE_BASE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Delete_Base_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_DELETE_BASE_FINISHED)
@ -993,9 +1144,12 @@ class Data:
self.processed_Tho = (self.processed_Tho - mean(self.processed_Tho)) / std(self.processed_Tho)
self.processed_Abd = (self.processed_Abd - mean(self.processed_Abd)) / std(self.processed_Abd)
if Config["orgBcgConfig"]["orgBcgZScore"]:
self.processed_orgBcg = (self.processed_orgBcg - mean(self.processed_orgBcg)) / std(self.processed_orgBcg)
self.processed_orgBcg = (self.processed_orgBcg - mean(self.processed_orgBcg)) / std(
self.processed_orgBcg)
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_STANDARDIZE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Standardize_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_STANDARDIZE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Standardize_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_STANDARDIZE_FINISHED)
@ -1004,11 +1158,13 @@ class Data:
try:
# 用[::]完成
temp_frequency = Config["TempFrequency"]
self.processed_Tho = self.processed_Tho[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_Abd = self.processed_Abd[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_orgBcg = self.processed_orgBcg[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_downsample_Tho = self.processed_Tho[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_downsample_Abd = self.processed_Abd[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_downsample_orgBcg = self.processed_orgBcg[::int(temp_frequency / Config["ApplyFrequency"])]
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Resample_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FINISHED)
@ -1017,8 +1173,12 @@ class Data:
try:
# 计算因子
MULTIPLE_FACTOER = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["Multiple_Factor"]
a = self.processed_Tho[Config["PSGConfig"]["PreCut"]:len(self.processed_Tho) - Config["PSGConfig"]["PostCut"]].copy()
v = self.processed_orgBcg[Config["orgBcgConfig"]["PreCut"]:len(self.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"]].copy()
a = self.processed_downsample_Tho[
Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Tho) - Config["PSGConfig"][
"PostCut"]].copy()
v = self.processed_downsample_orgBcg[
Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][
"PostCut"]].copy()
a *= MULTIPLE_FACTOER
v *= MULTIPLE_FACTOER
a = a.astype(int64)
@ -1029,15 +1189,21 @@ class Data:
result = {"tho_relate": tho_relate, "tho_relate2": tho_relate2}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Correlation1_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FAILURE +
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Calculate_Correlation1_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FINISHED, data=result)
def calculate_correlation2(self):
# 计算互相关2/2
try:
a = self.processed_Abd[Config["PSGConfig"]["PreCut"]:len(self.processed_Abd) - Config["PSGConfig"]["PostCut"]].copy()
v = self.processed_orgBcg[Config["orgBcgConfig"]["PreCut"]:len(self.processed_orgBcg) - Config["orgBcgConfig"]["PostCut"]].copy()
a = self.processed_downsample_Abd[
Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Abd) - Config["PSGConfig"][
"PostCut"]].copy()
v = self.processed_downsample_orgBcg[
Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][
"PostCut"]].copy()
a *= 100
v *= 100
a = a.astype(int64)
@ -1048,7 +1214,9 @@ class Data:
result = {"abd_relate": abd_relate, "abd_relate2": abd_relate2}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Correlation2_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FAILURE +
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Calculate_Correlation2_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FINISHED, data=result)
@ -1061,11 +1229,13 @@ class Data:
abd_max2 = argmax(abd_relate2)
pre = Config["PSGConfig"]["PreCut"] + Config["orgBcgConfig"]["PostCut"]
bias = pre - len(self.processed_orgBcg) + 1
bias = pre - len(self.processed_downsample_orgBcg) + 1
result = {"tho_max": tho_max, "tho_max2": tho_max2, "abd_max": abd_max, "abd_max2": abd_max2, "bias": bias}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Calculate_Maxvalue_Pos_Exception"] + "\n" + format_exc())
return Result().failure(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FAILURE +
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Calculate_Maxvalue_Pos_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FINISHED, data=result)
@ -1073,12 +1243,64 @@ class Data:
# 获取epoch
try:
epoch_min = max(0, Config["pos"] // 30 // Config["ApplyFrequency"] + 1)
epoch_max = min(len(self.processed_Tho) // 30 // Config["ApplyFrequency"] - 1,
(len(self.processed_orgBcg) + Config["pos"]) // 30 // Config["ApplyFrequency"] - 1)
epoch_max = min(len(self.processed_downsample_Tho) // 30 // Config["ApplyFrequency"] - 1,
(len(self.processed_downsample_orgBcg) + Config["pos"]) // 30 // Config[
"ApplyFrequency"] - 1)
result = {"epoch_min": epoch_min, "epoch_max": epoch_max}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON["Get_Epoch_Exception"] + "\n" + format_exc())
return Result().failure(
info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Get_Epoch_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)
def get_corr_by_epoch(self):
# 获取相关系数
try:
# 获取 epoch 区间
response = self.get_epoch()
if not response.status:
raise Exception(response.info)
epoch_min = response.data["epoch_min"]
epoch_max = response.data["epoch_max"]
temp_freq = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["TempFrequency"]
window_epoch = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["CorrByEpoch"]["window_epoch"]
tho_bias_list = []
abd_bias_list = []
# pos采样率转换
pos = Config["pos"] * temp_freq // Config["ApplyFrequency"]
for epoch in range(epoch_min, epoch_max):
SP = epoch * 30 * temp_freq
EP = (epoch + window_epoch) * 30 * temp_freq
tho_seg = self.processed_Tho[SP:EP]
abd_seg = self.processed_Abd[SP:EP]
orgBcg_seg = self.processed_orgBcg[SP - pos:EP - pos] * Config["orgBcg_reverse"]
tho_relate_seg = correlate(tho_seg, orgBcg_seg, mode='full')
abd_relate_seg = correlate(abd_seg, orgBcg_seg, mode='full')
tho_seg_pos = argmax(tho_relate_seg) - len(orgBcg_seg)
abd_seg_pos = argmax(abd_relate_seg) - len(orgBcg_seg)
tho_bias_list.append(tho_seg_pos // temp_freq)
abd_bias_list.append(abd_seg_pos // temp_freq)
result = {
"tho_bias_list": tho_bias_list,
"abd_bias_list": abd_bias_list,
"epoch_min": epoch_min,
"epoch_max": epoch_max
}
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
"Get_Corr_By_Epoch"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)

View File

@ -38,12 +38,17 @@ class ConfigParams:
"AbdFreq": 100
},
"ApplyFrequency": 5,
"TempFrequency": 100,
"Filter": {
"BandPassOrder": 4,
"BandPassLow": 0.01,
"BandPassHigh": 0.7
},
"Multiple_Factor":100
"Multiple_Factor":100,
"CorrByEpoch":
{
"window_epoch": 6
}
}
APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME: str = "OrgBCG_Raw_"
APPROXIMATELY_ALIGN_INPUT_THO_FILENAME: str = "Effort Tho_Raw_"

View File

@ -122,6 +122,9 @@ class Constants:
APPROXIMATELY_ALIGN_FAILURE_REASON = {
"Data_Path_Not_Exist": "(路径不存在)",
"Data_File_Not_Exist": "(数据文件不存在)",
"Data_File_More_Than_One": "(数据文件超过一个)",
"Data_Frequency_Not_In_Filename": "(数据频率不在文件名中)",
"Read_Data_Exception": "(读取数据异常)",
"Raw_Data_Not_Exist": "(原始数据不存在)",
"Only_Resample_Exception": "(仅重采样异常)",

View File

@ -1,7 +1,7 @@
from datetime import datetime
from logging import error, info
from PySide6.QtWidgets import QMessageBox, QWidget, QPushButton, QProgressBar, QApplication
from PySide6.QtWidgets import QMessageBox, QWidget, QPushButton, QProgressBar, QApplication, QRadioButton
from func.utils.Constants import Constants
from func.utils.CustomException import TipsTypeValueNotExistError, MsgBoxTypeValueNotExistError
@ -106,6 +106,10 @@ class PublicFunc:
if widget.objectName() in buttonState["Current"].keys():
widget.setEnabled(False)
if isinstance(widget, QRadioButton):
if widget.objectName() in buttonState["Current"].keys():
widget.setEnabled(False)
@staticmethod
def __enableAllButton__(mainWindow, buttonState):
# 启用按钮
@ -117,6 +121,10 @@ class PublicFunc:
if widget.objectName() in buttonState["Current"].keys():
widget.setEnabled(buttonState["Current"][widget.objectName()])
if isinstance(widget, QRadioButton):
if widget.objectName() in buttonState["Current"].keys():
widget.setEnabled(buttonState["Current"][widget.objectName()])
@staticmethod
def __resetAllButton__(mainWindow, buttonState):
# 启用按钮
@ -128,6 +136,10 @@ class PublicFunc:
if widget.objectName() in buttonState["Default"].keys():
widget.setEnabled(buttonState["Default"][widget.objectName()])
if isinstance(widget, QRadioButton):
if widget.objectName() in buttonState["Default"].keys():
widget.setEnabled(buttonState["Default"][widget.objectName()])
@staticmethod
def add_progressbar(mainWindow):
mainWindow.progressbar = QProgressBar()