Files
Signal_Label_Reborn/func/Module_approximately_align.py

1258 lines
60 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from gc import collect
from pathlib import Path
from traceback import format_exc
import matplotlib.pyplot as plt
from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication
from matplotlib.backends.backend_qt import NavigationToolbar2QT
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
from numpy import convolve, ones, mean, std, int64, argmax, linspace, diff
from overrides import overrides
from pandas import read_csv, DataFrame
from scipy.signal import find_peaks, resample, butter, sosfiltfilt, correlate
from yaml import dump, load, FullLoader
from func.utils.PublicFunc import PublicFunc
from func.utils.Constants import Constants, ConfigParams
from func.utils.Result import Result
from ui.MainWindow.MainWindow_approximately_align import Ui_MainWindow_approximately_align
from ui.setting.approximately_align_input_setting import Ui_MainWindow_approximately_align_input_setting
Config = {
}
ButtonState = {
"Default": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_Standardize": False,
"pushButton_CutOff": False,
"pushButton_GetPos": False,
"pushButton_ChangeView": False,
"pushButton_JUMP": False,
"pushButton_EM1": False,
"pushButton_EM10": False,
"pushButton_EM100": False,
"pushButton_EP1": False,
"pushButton_EP10": False,
"pushButton_EP100": False,
"pushButton_save": False,
"radioButton_PTHO": False,
"radioButton_PABD": False,
"radioButton_NTHO": False,
"radioButton_NABD": False,
"radioButton_custom": False
},
"Current": {
"pushButton_input_setting": True,
"pushButton_input": True,
"pushButton_Standardize": False,
"pushButton_CutOff": False,
"pushButton_GetPos": False,
"pushButton_ChangeView": False,
"pushButton_JUMP": False,
"pushButton_EM1": False,
"pushButton_EM10": False,
"pushButton_EM100": False,
"pushButton_EP1": False,
"pushButton_EP10": False,
"pushButton_EP100": False,
"pushButton_save": False,
"radioButton_PTHO": False,
"radioButton_PABD": False,
"radioButton_NTHO": False,
"radioButton_NABD": False,
"radioButton_custom": False
}
}
class SettingWindow(QMainWindow):
def __init__(self, root_path, sampID):
super(SettingWindow, self).__init__()
self.ui = Ui_MainWindow_approximately_align_input_setting()
self.ui.setupUi(self)
self.root_path = root_path
self.sampID = sampID
self.config = None
self.__read_config__()
self.ui.spinBox_input_orgBcg_freq.valueChanged.connect(self.__update_ui__)
self.ui.spinBox_input_Tho_freq.valueChanged.connect(self.__update_ui__)
self.ui.spinBox_input_Abd_freq.valueChanged.connect(self.__update_ui__)
self.ui.spinBox_bandpassOrder.valueChanged.connect(self.__update_ui__)
self.ui.doubleSpinBox_bandpassLow.valueChanged.connect(self.__update_ui__)
self.ui.doubleSpinBox_bandpassHigh.valueChanged.connect(self.__update_ui__)
self.ui.spinBox_display_freq.valueChanged.connect(self.__update_ui__)
self.ui.pushButton_confirm.clicked.connect(self.__write_config__)
self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__)
self.ui.pushButton_cancel.clicked.connect(self.close)
def __read_config__(self):
if not Path(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH).exists():
with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "w") as f:
dump(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT, f)
with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "r") as f:
file_config = load(f.read(), Loader=FullLoader)
Config.update(file_config)
self.config = file_config
# 更新配置
Config.update({
"Path": {
"Input_orgBcg": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)))),
"Input_Tho": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)))),
"Input_Abd": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)))),
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_LABEL /
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_INFO +
ConfigParams.ENDSWITH_CSV)))
},
"orgBcgConfig": {},
"PSGConfig": {}
})
# 数据回显
self.ui.spinBox_input_orgBcg_freq.setValue(Config["InputConfig"]["orgBcgFreq"])
self.ui.spinBox_input_Tho_freq.setValue(Config["InputConfig"]["ThoFreq"])
self.ui.spinBox_input_Abd_freq.setValue(Config["InputConfig"]["AbdFreq"])
self.ui.spinBox_bandpassOrder.setValue(Config["Filter"]["BandPassOrder"])
self.ui.doubleSpinBox_bandpassLow.setValue(Config["Filter"]["BandPassLow"])
self.ui.doubleSpinBox_bandpassHigh.setValue(Config["Filter"]["BandPassHigh"])
self.ui.spinBox_display_freq.setValue(Config["ApplyFrequency"])
self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText(Config["Path"]["Input_orgBcg"])
self.ui.plainTextEdit_file_path_input_Tho.setPlainText(Config["Path"]["Input_Tho"])
self.ui.plainTextEdit_file_path_input_Abd.setPlainText(Config["Path"]["Input_Abd"])
self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"])
def __write_config__(self):
# 从界面写入配置
Config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value()
Config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value()
Config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value()
Config["ApplyFrequency"] = self.ui.spinBox_display_freq.value()
Config["Filter"]["BandPassOrder"] = self.ui.spinBox_bandpassOrder.value()
Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandpassLow.value()
Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandpassHigh.value()
Config["Path"]["Input_orgBcg"] = self.ui.plainTextEdit_file_path_input_orgBcg.toPlainText()
Config["Path"]["Input_Tho"] = self.ui.plainTextEdit_file_path_input_Tho.toPlainText()
Config["Path"]["Input_Abd"] = self.ui.plainTextEdit_file_path_input_Abd.toPlainText()
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
# 保存配置到文件
self.config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value()
self.config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value()
self.config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value()
self.config["ApplyFrequency"] = self.ui.spinBox_display_freq.value()
self.config["Filter"]["BandPassOrder"] = self.ui.spinBox_bandpassOrder.value()
self.config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandpassLow.value()
self.config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandpassHigh.value()
with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "w") as f:
dump(self.config, f)
self.close()
def __rollback_config__(self):
self.__read_config__()
def __update_ui__(self):
self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.ORGBCG_RAW +
str(self.ui.spinBox_input_orgBcg_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
self.ui.plainTextEdit_file_path_input_Tho.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.THO_RAW +
str(self.ui.spinBox_input_Tho_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
self.ui.plainTextEdit_file_path_input_Abd.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_PSG_TEXT /
Path(str(self.sampID)) /
Path(ConfigParams.ABD_RAW +
str(self.ui.spinBox_input_Abd_freq.value()) +
ConfigParams.ENDSWITH_TXT))))
self.ui.plainTextEdit_file_path_save.setPlainText(
str((Path(self.root_path) /
ConfigParams.PUBLIC_PATH_LABEL /
Path(str(self.sampID)) /
Path(ConfigParams.APPROXIMATELY_ALIGN_INFO +
ConfigParams.ENDSWITH_CSV))))
class MainWindow_approximately_align(QMainWindow):
def __init__(self):
super(MainWindow_approximately_align, self).__init__()
self.ui = Ui_MainWindow_approximately_align()
self.ui.setupUi(self)
self.root_path = None
self.sampID = None
self.data = None
self.setting = None
# 初始化进度条
self.progressbar = None
PublicFunc.add_progressbar(self)
# 初始化画框
self.fig = None
self.canvas = None
self.msgBox = QMessageBox()
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
@overrides
def show(self, root_path, sampID):
super().show()
self.root_path = root_path
self.sampID = sampID
self.setting = SettingWindow(root_path, sampID)
self.fig = Figure(figsize=(12, 9), dpi=100)
self.fig.subplots_adjust(left=0.05, right=0.98, top=0.95, bottom=0.05)
self.canvas = FigureCanvas(self.fig)
self.figToolbar = NavigationToolbar2QT(self.canvas)
self.ui.verticalLayout_canvas.addWidget(self.canvas)
self.ui.verticalLayout_canvas.addWidget(self.figToolbar)
PublicFunc.__resetAllButton__(self, ButtonState)
# self.ui.groupBox_align_position.setEnabled(False)
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
self.ui.pushButton_Standardize.clicked.connect(self.__slot_btn_Standardize__)
self.ui.pushButton_CutOff.clicked.connect(self.__slot_btn_CutOff__)
self.ui.pushButton_GetPos.clicked.connect(self.__slot_btn_GetPosition__)
self.ui.pushButton_ChangeView.clicked.connect(self.__slot_btn_changeview__)
self.ui.pushButton_JUMP.clicked.connect(self.__slot_btn_jump__)
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
self.ui.pushButton_EM1.clicked.connect(self.__EpochChange__)
self.ui.pushButton_EM10.clicked.connect(self.__EpochChange__)
self.ui.pushButton_EM100.clicked.connect(self.__EpochChange__)
self.ui.pushButton_EP1.clicked.connect(self.__EpochChange__)
self.ui.pushButton_EP10.clicked.connect(self.__EpochChange__)
self.ui.pushButton_EP100.clicked.connect(self.__EpochChange__)
self.ui.radioButton_NTHO.clicked.connect(self.__enableAlign__)
self.ui.radioButton_PTHO.clicked.connect(self.__enableAlign__)
self.ui.radioButton_PABD.clicked.connect(self.__enableAlign__)
self.ui.radioButton_NABD.clicked.connect(self.__enableAlign__)
self.ui.radioButton_custom.clicked.connect(self.__enableAlign__)
@overrides
def closeEvent(self, event):
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.Yes:
PublicFunc.__disableAllButton__(self, ButtonState)
PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
QApplication.processEvents()
# 释放资源
del self.data
self.fig.clf()
plt.close(self.fig)
self.deleteLater()
collect()
self.canvas = None
event.accept()
else:
event.ignore()
def __reset__(self):
ButtonState["Current"].update(ButtonState["Default"].copy())
ButtonState["Current"]["pushButton_Standardize"] = True
self.fig.clf()
self.fig.canvas.draw()
self.ui.spinBox_PSGPreA.setValue(0)
self.ui.spinBox_PSGPreCut.setValue(0)
self.ui.spinBox_PSGPostCut.setValue(0)
self.ui.spinBox_orgBcgPreA.setValue(0)
self.ui.spinBox_orgBcgPreCut.setValue(0)
self.ui.spinBox_orgBcgPostCut.setValue(0)
self.ui.radioButton_NABD.setChecked(False)
self.ui.radioButton_NTHO.setChecked(False)
self.ui.radioButton_PABD.setChecked(False)
self.ui.radioButton_PTHO.setChecked(False)
self.ui.radioButton_custom.setChecked(False)
self.ui.spinBox_SelectEpoch.setValue(0)
self.ui.spinBox_custom.setValue(0)
self.ui.spinBox_SelectEpoch.setToolTip("")
self.ui.radioButton_PABD.setText("备选3")
self.ui.radioButton_PTHO.setText("备选1")
self.ui.radioButton_NABD.setText("备选4")
self.ui.radioButton_NTHO.setText("备选2")
self.ui.spinBox_SelectEpoch.setMinimum(0)
def __plot__(self, *args, **kwargs):
sender = self.sender()
self.fig.clf()
result = None
try:
if sender == self.ui.pushButton_Standardize:
result = self.DrawPicRawOverview()
elif sender == self.ui.pushButton_CutOff:
result = self.DrawPicOverviewWithCutOff()
elif sender == self.ui.pushButton_GetPos:
result = self.DrawPicCorrelate(*args, **kwargs)
elif sender == self.ui.radioButton_NTHO:
result = self.DrawPicTryAlign()
elif sender == self.ui.radioButton_NABD:
result = self.DrawPicTryAlign()
elif sender == self.ui.radioButton_PTHO:
result = self.DrawPicTryAlign()
elif sender == self.ui.radioButton_PABD:
result = self.DrawPicTryAlign()
elif sender == self.ui.radioButton_custom:
result = self.DrawPicTryAlign()
elif sender == self.ui.pushButton_ChangeView:
result = self.DrawAlignScatter()
elif sender == self.ui.pushButton_JUMP:
result = self.DrawPicByEpoch(*args, **kwargs)
elif sender == self.ui.pushButton_EM1:
result = self.DrawPicByEpoch(*args, **kwargs)
elif sender == self.ui.pushButton_EM10:
result = self.DrawPicByEpoch(*args, **kwargs)
elif sender == self.ui.pushButton_EM100:
result = self.DrawPicByEpoch(*args, **kwargs)
elif sender == self.ui.pushButton_EP1:
result = self.DrawPicByEpoch(*args, **kwargs)
elif sender == self.ui.pushButton_EP10:
result = self.DrawPicByEpoch(*args, **kwargs)
elif sender == self.ui.pushButton_EP100:
result = self.DrawPicByEpoch(*args, **kwargs)
except Exception as e:
return Result().failure(info=Constants.DRAW_FAILURE + "\n" + format_exc())
if result is None:
return Result().failure(info=Constants.DRAW_FAILURE)
else:
return result
def __slot_btn_input__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
self.data = Data()
# 导入数据
PublicFunc.progressbar_update(self, 1, 3, Constants.INPUTTING_DATA, 0)
result = self.data.open_file()
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
orgBcg_seconds = round(self.data.raw_orgBcg.shape[0] / Config["InputConfig"]["orgBcgFreq"])
PSG_seconds = round(self.data.raw_Tho.shape[0] / Config["InputConfig"]["ThoFreq"])
Config.update({
"orgBcg_seconds": orgBcg_seconds,
"PSG_seconds": PSG_seconds
})
self.ui.label_orgBcg_length.setText(str(orgBcg_seconds))
self.ui.label_PSG_length.setText(str(PSG_seconds))
self.__reset__()
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_save__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 保存对齐信息
PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0)
epoch = self.ui.spinBox_SelectEpoch.value()
result = self.data.save(epoch)
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_Standardize__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
Config["RawSignal"] = self.ui.checkBox_RawSignal.isChecked()
Config["orgBcgConfig"].update({
"orgBcgDelBase": self.ui.checkBox_orgBcgDelBase.isChecked(),
"orgBcgZScore": self.ui.checkBox_orgBcgZScore.isChecked()
})
Config["PSGConfig"].update({
"PSGDelBase": self.ui.checkBox_PSGDelBase.isChecked(),
"PSGZScore": self.ui.checkBox_PSGZScore.isChecked()
})
if Config["RawSignal"]:
# 仅重采样
PublicFunc.progressbar_update(self, 1, 1, Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLING, 0)
result = self.data.Standardize_0()
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
else:
# 呼吸提取
PublicFunc.progressbar_update(self, 1, 5, Constants.APPROXIMATELY_RESP_GETTING, 0)
result = self.data.Standardize_1()
if not result.status:
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_INFO)
# 预重采样
PublicFunc.progressbar_update(self, 2, 5, Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLING, 20)
result = self.data.Standardize_2()
if not result.status:
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_INFO)
# 去基线
PublicFunc.progressbar_update(self, 3, 5, Constants.APPROXIMATELY_DELETING_BASE, 40)
result = self.data.Standardize_3()
if not result.status:
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_INFO)
# 标准化
PublicFunc.progressbar_update(self, 4, 5, Constants.APPROXIMATELY_STANDARDIZING, 60)
result = self.data.Standardize_4()
if not result.status:
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_INFO)
# 重采样
PublicFunc.progressbar_update(self, 5, 5, Constants.APPROXIMATELY_ALIGN_RESAMPLING, 80)
result = self.data.Standardize_5()
if not result.status:
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_INFO)
self.__plot__()
ButtonState["Current"]["pushButton_CutOff"] = True
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_CutOff__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
Config["orgBcgConfig"].update({"PreA": self.ui.spinBox_orgBcgPreA.value(),
"PreCut": self.ui.spinBox_orgBcgPreCut.value(),
"PostCut": self.ui.spinBox_orgBcgPostCut.value()})
Config["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(),
"PreCut": self.ui.spinBox_PSGPreCut.value(),
"PostCut": self.ui.spinBox_PSGPostCut.value()})
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
result = self.__plot__()
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
ButtonState["Current"]["pushButton_GetPos"] = True
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_GetPosition__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 计算互相关1/2
PublicFunc.progressbar_update(self, 1, 4, Constants.APPROXIMATELY_CORRELATION_CALCULATING1, 0)
result1 = self.data.calculate_correlation1()
if not result1.status:
PublicFunc.text_output(self.ui, "(1/4)" + result1.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result1.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/4)" + result1.info, Constants.TIPS_TYPE_INFO)
# 计算互相关2/2
PublicFunc.progressbar_update(self, 2, 4, Constants.APPROXIMATELY_CORRELATION_CALCULATING2, 25)
result2 = self.data.calculate_correlation2()
if not result2.status:
PublicFunc.text_output(self.ui, "(2/4)" + result2.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result2.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/4)" + result2.info, Constants.TIPS_TYPE_INFO)
# 绘图
PublicFunc.progressbar_update(self, 3, 4, Constants.DRAWING_DATA, 50)
result = self.__plot__(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"],
result2.data["abd_relate2"])
if not result.status:
PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_INFO)
# 计算最大值位置
PublicFunc.progressbar_update(self, 4, 4, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 90)
result = self.data.calculate_maxvalue_pos(result1.data["tho_relate"], result1.data["tho_relate2"],
result2.data["abd_relate"], result2.data["abd_relate2"])
if not result.status:
PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_INFO)
self.ui.radioButton_PABD.setText(str(result.data["abd_max"] + result.data["bias"]))
self.ui.radioButton_PTHO.setText(str(result.data["tho_max"] + result.data["bias"]))
self.ui.radioButton_NABD.setText(str(result.data["abd_max2"] + result.data["bias"]))
self.ui.radioButton_NTHO.setText(str(result.data["tho_max2"] + result.data["bias"]))
ButtonState["Current"]["radioButton_PTHO"] = True
ButtonState["Current"]["radioButton_PABD"] = True
ButtonState["Current"]["radioButton_NTHO"] = True
ButtonState["Current"]["radioButton_NABD"] = True
ButtonState["Current"]["radioButton_custom"] = True
# self.ui.groupBox_align_position.setEnabled(True)
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_changeview__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
# 绘图
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
result = self.__plot__()
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.finish_operation(self, ButtonState)
def __slot_btn_jump__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
epoch = self.ui.spinBox_SelectEpoch.value()
# 绘图
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
result = self.__plot__(epoch=epoch)
if not result.status:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
PublicFunc.finish_operation(self, ButtonState)
def __EpochChange__(self):
# 获取当前值
value = self.ui.spinBox_SelectEpoch.value()
# 判断按键
if self.sender() == self.ui.pushButton_EM1:
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 1)
elif self.sender() == self.ui.pushButton_EM10:
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 10)
elif self.sender() == self.ui.pushButton_EM100:
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 100)
elif self.sender() == self.ui.pushButton_EP1:
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 1)
elif self.sender() == self.ui.pushButton_EP10:
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 10)
elif self.sender() == self.ui.pushButton_EP100:
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 100)
else:
return
self.ui.spinBox_SelectEpoch.setValue(epoch)
QApplication.processEvents()
self.__slot_btn_jump__()
def __enableAlign__(self):
PublicFunc.__disableAllButton__(self, ButtonState)
if self.ui.radioButton_NTHO.isChecked():
relate = int(self.ui.radioButton_NTHO.text())
reverse = -1
elif self.ui.radioButton_PTHO.isChecked():
relate = int(self.ui.radioButton_PTHO.text())
reverse = 1
elif self.ui.radioButton_PABD.isChecked():
relate = int(self.ui.radioButton_PABD.text())
reverse = 1
elif self.ui.radioButton_NABD.isChecked():
relate = int(self.ui.radioButton_NABD.text())
reverse = -1
elif self.ui.radioButton_custom.isChecked():
relate = int(self.ui.spinBox_custom.value())
reverse = 1
else:
return
# 最大相关系数值相对于PSG的位置
Config.update({"pos": relate})
# 设置epoch上下限
Config.update({"orgBcg_reverse": reverse})
PublicFunc.text_output(self.ui, "相对位置:{}".format(Config["pos"]), Constants.TIPS_TYPE_INFO)
# 获取epoch
PublicFunc.progressbar_update(self, 1, 2, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 0)
result3 = self.data.get_epoch()
if not result3.status:
PublicFunc.text_output(self.ui, "(1/2)" + result3.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result3.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(1/2)" + result3.info, Constants.TIPS_TYPE_INFO)
# 绘图
PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 30)
result = self.__plot__()
if not result.status:
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR)
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
PublicFunc.finish_operation(self, ButtonState)
return
else:
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO)
self.ui.spinBox_SelectEpoch.setMinimum(result3.data["epoch_min"])
self.ui.spinBox_SelectEpoch.setMaximum(result3.data["epoch_max"])
self.ui.spinBox_SelectEpoch.setValue(result3.data["epoch_min"])
self.ui.spinBox_SelectEpoch.setToolTip(
"最小值:{}\n最大值:{}".format(result3.data["epoch_min"], result3.data["epoch_max"]))
self.ui.spinBox_SelectEpoch.setEnabled(True)
ButtonState["Current"]["pushButton_JUMP"] = True
ButtonState["Current"]["pushButton_EM1"] = True
ButtonState["Current"]["pushButton_EM10"] = True
ButtonState["Current"]["pushButton_EM100"] = True
ButtonState["Current"]["pushButton_EP1"] = True
ButtonState["Current"]["pushButton_EP10"] = True
ButtonState["Current"]["pushButton_EP100"] = True
ButtonState["Current"]["pushButton_save"] = True
ButtonState["Current"]["pushButton_ChangeView"] = True
PublicFunc.finish_operation(self, ButtonState)
def DrawPicRawOverview(self):
try:
max_x = max(self.data.processed_downsample_Tho.shape[0], self.data.processed_downsample_Abd.shape[0],
self.data.processed_downsample_orgBcg.shape[0])
ax1 = self.fig.add_subplot(311)
ax1.plot(self.data.processed_downsample_Tho, color='blue')
ax1.set_xlim(0, max_x)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(self.data.processed_downsample_orgBcg, color='blue')
ax2.set_xlim(0, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(self.data.processed_downsample_Abd, color='blue')
ax3.set_xlim(0, max_x)
ax3.set_title("ABD")
self.fig.canvas.draw()
except Exception as e:
return Result().failure(info=Constants.DRAW_FAILURE + "\n" + format_exc())
# 返回图片以便存到QPixImage
return Result().success(info=Constants.DRAW_FINISHED)
def DrawPicOverviewWithCutOff(self):
try:
max_x = max(self.data.processed_downsample_Tho.shape[0] + Config["PSGConfig"]["PreA"],
self.data.processed_downsample_orgBcg.shape[0] + Config["orgBcgConfig"]["PreA"])
min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"], 0)
ax1 = self.fig.add_subplot(311)
ax1.plot(
linspace(Config["PSGConfig"]["PreA"],
len(self.data.processed_downsample_Tho) + Config["PSGConfig"]["PreA"],
len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue')
# 绘制x = PreCut的线 和 x = PostCut的虚线
ax1.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red',
linestyle='--')
ax1.axvline(
x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][
"PreA"],
color='red', linestyle='--')
ax1.set_xlim(min_x, max_x)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(
linspace(Config["orgBcgConfig"]["PreA"],
len(self.data.processed_downsample_orgBcg) + Config["orgBcgConfig"]["PreA"],
len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg,
color='blue')
ax2.axvline(x=Config["orgBcgConfig"]["PreCut"] + Config["orgBcgConfig"]["PreA"], color='red',
linestyle='--')
ax2.axvline(
x=len(self.data.processed_downsample_orgBcg) - Config["orgBcgConfig"]["PostCut"] +
Config["orgBcgConfig"]["PreA"],
color='red', linestyle='--')
ax2.set_xlim(min_x, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(
linspace(Config["PSGConfig"]["PreA"],
len(self.data.processed_downsample_Abd) + Config["PSGConfig"]["PreA"],
len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue')
ax3.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red',
linestyle='--')
ax3.axvline(
x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][
"PreA"],
color='red', linestyle='--')
ax3.set_xlim(min_x, max_x)
ax3.set_title("ABD")
self.fig.canvas.draw()
except Exception as e:
return Result().failure(info=Constants.DRAW_FAILURE + "\n" + format_exc())
# 返回图片以便存到QPixImage
return Result().success(info=Constants.DRAW_FINISHED)
def DrawPicCorrelate(self, tho_pxx, tho_nxx, abd_pxx, abd_nxx):
try:
ax1 = self.fig.add_subplot(221)
ax1.plot(tho_pxx, color='blue')
ax1.set_title("The Correlation of THO and orgBcg")
ax2 = self.fig.add_subplot(222, sharex=ax1, sharey=ax1)
ax2.plot(tho_nxx, color='blue')
ax2.set_title("The Correlation of THO and Reverse orgBcg")
ax3 = self.fig.add_subplot(223, sharex=ax1, sharey=ax1)
ax3.plot(abd_pxx, color='blue')
ax3.set_title("The Correlation of ABD and orgBcg")
ax4 = self.fig.add_subplot(224, sharex=ax1, sharey=ax1)
ax4.plot(abd_nxx, color='blue')
ax4.set_title("The Correlation of ABD and Reverse orgBcg")
self.fig.canvas.draw()
except Exception as e:
return Result().failure(info=Constants.DRAW_FAILURE + "\n" + format_exc())
# 返回图片以便存到QPixImage
return Result().success(info=Constants.DRAW_FINISHED)
def DrawPicTryAlign(self):
try:
max_x = max(self.data.processed_downsample_Tho.shape[0],
self.data.processed_downsample_orgBcg.shape[0] + Config["pos"])
min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"] + Config["pos"], 0)
ax1 = self.fig.add_subplot(311)
ax1.plot(
linspace(0, len(self.data.processed_downsample_Tho),
len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue')
# 绘制x = PreCut的线 和 x = PostCut的虚线
ax1.set_xlim(min_x, max_x)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
ax2.plot(linspace(Config["pos"],
len(self.data.processed_downsample_orgBcg) + Config["pos"],
len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg,
color='blue')
ax2.set_xlim(min_x, max_x)
ax2.set_title("orgBcg")
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
ax3.plot(
linspace(0, len(self.data.processed_downsample_Abd),
len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue')
ax3.set_xlim(min_x, max_x)
ax3.set_title("ABD")
self.fig.canvas.draw()
except Exception as e:
return Result().failure(info=Constants.DRAW_FAILURE + "\n" + format_exc())
# 返回图片以便存到QPixImage
return Result().success(info=Constants.DRAW_FINISHED)
def DrawPicByEpoch(self, epoch):
try:
PSG_SP = epoch * 30 * Config["ApplyFrequency"]
PSG_EP = (epoch + 6) * 30 * Config["ApplyFrequency"]
orgBcg_SP = PSG_SP - Config["pos"]
orgBcg_EP = PSG_EP - Config["pos"]
tho_seg = self.data.processed_downsample_Tho[PSG_SP:PSG_EP]
orgBcg_seg = self.data.processed_downsample_orgBcg[orgBcg_SP:orgBcg_EP] * Config["orgBcg_reverse"]
abd_seg = self.data.processed_downsample_Abd[PSG_SP:PSG_EP]
# 根据PSG来和绘制
ax1 = self.fig.add_subplot(321)
ax1.plot(linspace(PSG_SP, PSG_EP, len(tho_seg)), tho_seg)
tho_peaks, _ = find_peaks(tho_seg, prominence=0, distance=3 * Config["ApplyFrequency"])
ax1.plot(linspace(PSG_SP, PSG_EP, len(tho_seg))[tho_peaks], tho_seg[tho_peaks], "x")
ax3 = self.fig.add_subplot(323)
ax3.plot(linspace(orgBcg_SP, orgBcg_EP, len(orgBcg_seg)), orgBcg_seg)
orgBcg_peaks, _ = find_peaks(orgBcg_seg, prominence=0, distance=3 * Config["ApplyFrequency"])
ax3.plot(linspace(orgBcg_SP, orgBcg_EP, len(orgBcg_seg))[orgBcg_peaks], orgBcg_seg[orgBcg_peaks], "x")
ax2 = self.fig.add_subplot(325)
ax2.plot(linspace(PSG_SP, PSG_EP, len(abd_seg)), abd_seg)
abd_peaks, _ = find_peaks(abd_seg, prominence=0, distance=3 * Config["ApplyFrequency"])
ax2.plot(linspace(PSG_SP, PSG_EP, len(abd_seg))[abd_peaks], abd_seg[abd_peaks], "x")
# 绘制间期
ax4 = self.fig.add_subplot(322)
ax4.plot(linspace(PSG_SP, PSG_EP, len(diff(tho_peaks).repeat(Config["ApplyFrequency"]))),
diff(tho_peaks).repeat(Config["ApplyFrequency"]), alpha=0.5, label="tho")
ax4.plot(linspace(PSG_SP, PSG_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))),
diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp")
ax4.set_title("tho_interval")
ax4.legend()
ax4.set_ylim((10, 50))
ax5 = self.fig.add_subplot(324)
ax5.plot(linspace(orgBcg_SP, orgBcg_EP, len(diff(tho_peaks).repeat(Config["ApplyFrequency"]))),
diff(tho_peaks).repeat(Config["ApplyFrequency"]))
ax5.plot(linspace(orgBcg_SP, orgBcg_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))),
diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp")
ax5.set_title("resp_interval")
ax5.set_ylim((10, 50))
ax6 = self.fig.add_subplot(326)
ax6.plot(linspace(PSG_SP, PSG_EP, len(diff(abd_peaks).repeat(Config["ApplyFrequency"]))),
diff(abd_peaks).repeat(Config["ApplyFrequency"]))
ax6.plot(linspace(PSG_SP, PSG_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))),
diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp")
ax6.set_title("abd_interval")
ax6.set_ylim((10, 50))
self.fig.canvas.draw()
except Exception as e:
return Result().failure(info=Constants.DRAW_FAILURE + "\n" + format_exc())
# 返回图片以便存到QPixImage
return Result().success(info=Constants.DRAW_FINISHED)
def DrawAlignScatter(self):
try:
response = self.data.get_corr_by_epoch()
epoch_min = response.data["epoch_min"]
epoch_max = response.data["epoch_max"]
tho_bias_list = response.data["tho_bias_list"]
abd_bias_list = response.data["abd_bias_list"]
ax1 = self.fig.add_subplot(211)
ax1.scatter(linspace(epoch_min, epoch_max, len(tho_bias_list)), tho_bias_list, alpha=0.2)
ax1.set_title("THO")
ax2 = self.fig.add_subplot(212)
ax2.scatter(linspace(epoch_min, epoch_max, len(abd_bias_list)), abd_bias_list, alpha=0.2)
ax2.set_title("ABD")
self.fig.canvas.draw()
return Result().success(info=Constants.DRAW_FINISHED)
except Exception as e:
return Result().failure(info=Constants.DRAW_FAILURE + "\n" + format_exc())
class Data:
def __init__(self):
self.raw_orgBcg = None
self.raw_Tho = None
self.raw_Abd = None
self.processed_orgBcg = None
self.processed_Tho = None
self.processed_Abd = None
self.processed_downsample_orgBcg = None
self.processed_downsample_Tho = None
self.processed_downsample_Abd = None
self.relate_list = None
self.relate_point = None
def open_file(self):
if Path(Config["Path"]["Input_orgBcg"]).is_file():
Config["Path"]["Input_orgBcg"] = str(Path(Config["Path"]["Input_orgBcg"]).parent)
if Path(Config["Path"]["Input_Tho"]).is_file():
Config["Path"]["Input_Tho"] = str(Path(Config["Path"]["Input_Tho"]).parent)
if Path(Config["Path"]["Input_Abd"]).is_file():
Config["Path"]["Input_Abd"] = str(Path(Config["Path"]["Input_Abd"]).parent)
result = PublicFunc.examine_file(Config["Path"]["Input_orgBcg"], ConfigParams.ORGBCG_RAW, ConfigParams.ENDSWITH_TXT)
if result.status:
Config["Path"]["Input_orgBcg"] = result.data["path"]
Config["InputConfig"]["orgBcgFreq"] = result.data["freq"]
else:
return result
result = PublicFunc.examine_file(Config["Path"]["Input_Tho"], ConfigParams.THO_RAW, ConfigParams.ENDSWITH_TXT)
if result.status:
Config["Path"]["Input_Tho"] = result.data["path"]
Config["InputConfig"]["ThoFreq"] = result.data["freq"]
else:
return result
result = PublicFunc.examine_file(Config["Path"]["Input_Abd"], ConfigParams.ABD_RAW, ConfigParams.ENDSWITH_TXT)
if result.status:
Config["Path"]["Input_Abd"] = result.data["path"]
Config["InputConfig"]["AbdFreq"] = result.data["freq"]
else:
return result
try:
self.raw_orgBcg = read_csv(Config["Path"]["Input_orgBcg"],
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
self.raw_Tho = read_csv(Config["Path"]["Input_Tho"],
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
self.raw_Abd = read_csv(Config["Path"]["Input_Abd"],
encoding=ConfigParams.UTF8_ENCODING,
header=None).to_numpy().reshape(-1)
except Exception as e:
return Result().failure(info=Constants.INPUT_FAILURE + Constants.FAILURE_REASON[
"Open_Data_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.INPUT_FINISHED)
def save(self, epoch):
if (not Path(Config["Path"]["Save"]).parent.exists()) or (not Path(Config["Path"]["Save"]).parent.is_dir()):
Path(Config["Path"]["Save"]).parent.mkdir(parents=True, exist_ok=True)
try:
pos = Config["pos"]
ApplyFrequency = Config["ApplyFrequency"]
# 保存到csv中
df = DataFrame({"pos": [pos], "epoch": [epoch], "ApplyFrequency": [ApplyFrequency]})
df.to_csv(Path(Config["Path"]["Save"]), mode="w", header=True, index=False)
except Exception as e:
return Result().failure(info=Constants.SAVE_FAILURE + Constants.FAILURE_REASON[
"Save_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.SAVE_FINISHED)
def Standardize_0(self):
# 仅重采样
if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None:
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[
"Data_Not_Exist"])
try:
# 按照秒数进行截断
self.raw_orgBcg = self.raw_orgBcg[:int(Config["orgBcg_seconds"] * Config["InputConfig"]["orgBcgFreq"])]
self.raw_Tho = self.raw_Tho[:int(Config["PSG_seconds"] * Config["InputConfig"]["ThoFreq"])]
self.raw_Abd = self.raw_Abd[:int(Config["PSG_seconds"] * Config["InputConfig"]["AbdFreq"])]
self.processed_orgBcg = resample(self.raw_orgBcg,
int(Config["orgBcg_seconds"] * Config["ApplyFrequency"]))
self.processed_Tho = resample(self.raw_Tho, int(Config["PSG_seconds"] * Config["ApplyFrequency"]))
self.processed_Abd = resample(self.raw_Abd, int(Config["PSG_seconds"] * Config["ApplyFrequency"]))
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[
"Only_Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FINISHED)
def Standardize_1(self):
# 呼吸提取
def butter_bandpass_filter(data, lowCut, highCut, fs, order):
low = lowCut / (fs * 0.5)
high = highCut / (fs * 0.5)
sos = butter(order, [low, high], btype="bandpass", output='sos')
return sosfiltfilt(sos, data)
if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None:
return Result().failure(
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[
"Data_Not_Exist"])
try:
# 滤波
self.processed_orgBcg = butter_bandpass_filter(
self.raw_orgBcg, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["orgBcgFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Tho = butter_bandpass_filter(
self.raw_Tho, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["ThoFreq"],
Config["Filter"]["BandPassOrder"])
self.processed_Abd = butter_bandpass_filter(
self.raw_Abd, Config["Filter"]["BandPassLow"],
Config["Filter"]["BandPassHigh"],
Config["InputConfig"]["AbdFreq"],
Config["Filter"]["BandPassOrder"])
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_RESP_GET_FAILURE + Constants.FAILURE_REASON[
"Resp_Get_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_RESP_GET_FINISHED)
def Standardize_2(self):
# 预重采样
try:
# TODO这里的采样率处理如果THO和ABD的采样率不同可能还是会导致之后的ApplyFrequency出问题最后导致得到的粗同步坐标不正确
#
if Config["InputConfig"]["ThoFreq"] != Config["TempFrequency"]:
print(int(Config["InputConfig"]["ThoFreq"]), int(Config["TempFrequency"]))
self.processed_Tho = resample(self.processed_Tho,
int(Config["PSG_seconds"] * Config["TempFrequency"]))
if Config["InputConfig"]["AbdFreq"] != Config["TempFrequency"]:
self.processed_Abd = resample(self.processed_Abd,
int(Config["PSG_seconds"] * Config["TempFrequency"]))
if Config["InputConfig"]["orgBcgFreq"] != Config["TempFrequency"]:
self.processed_orgBcg = resample(self.processed_orgBcg,
int(Config["orgBcg_seconds"] * Config["TempFrequency"]))
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[
"Pre_Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FINISHED)
def Standardize_3(self):
# 去基线
try:
temp_frequency = Config["TempFrequency"]
if Config["PSGConfig"]["PSGDelBase"]:
# 减去四秒钟平均滤波
self.processed_Tho = self.processed_Tho - convolve(
self.processed_Tho, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
self.processed_Abd = self.processed_Abd - convolve(
self.processed_Abd, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
if Config["orgBcgConfig"]["orgBcgDelBase"]:
self.processed_orgBcg = self.processed_orgBcg - convolve(
self.processed_orgBcg, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_DELETE_BASE_FAILURE + Constants.FAILURE_REASON[
"Delete_Base_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_DELETE_BASE_FINISHED)
def Standardize_4(self):
# 标准化
try:
# 判断是否标准化
if Config["PSGConfig"]["PSGZScore"]:
self.processed_Tho = (self.processed_Tho - mean(self.processed_Tho)) / std(self.processed_Tho)
self.processed_Abd = (self.processed_Abd - mean(self.processed_Abd)) / std(self.processed_Abd)
if Config["orgBcgConfig"]["orgBcgZScore"]:
self.processed_orgBcg = (self.processed_orgBcg - mean(self.processed_orgBcg)) / std(
self.processed_orgBcg)
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_STANDARDIZE_FAILURE + Constants.FAILURE_REASON[
"Standardize_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_STANDARDIZE_FINISHED)
def Standardize_5(self):
# 重采样
try:
# 用[::]完成
temp_frequency = Config["TempFrequency"]
self.processed_downsample_Tho = self.processed_Tho[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_downsample_Abd = self.processed_Abd[::int(temp_frequency / Config["ApplyFrequency"])]
self.processed_downsample_orgBcg = self.processed_orgBcg[::int(temp_frequency / Config["ApplyFrequency"])]
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FAILURE + Constants.FAILURE_REASON[
"Resample_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FINISHED)
def calculate_correlation1(self):
# 计算互相关1/2
try:
# 计算因子
MULTIPLE_FACTOER = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["Multiple_Factor"]
a = self.processed_downsample_Tho[
Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Tho) - Config["PSGConfig"][
"PostCut"]].copy()
v = self.processed_downsample_orgBcg[
Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][
"PostCut"]].copy()
a *= MULTIPLE_FACTOER
v *= MULTIPLE_FACTOER
a = a.astype(int64)
v = v.astype(int64)
tho_relate = correlate(a, v, mode='full')
tho_relate = tho_relate / (MULTIPLE_FACTOER ** 2)
tho_relate2 = - tho_relate
result = {"tho_relate": tho_relate, "tho_relate2": tho_relate2}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FAILURE +
Constants.FAILURE_REASON[
"Calculate_Correlation1_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FINISHED, data=result)
def calculate_correlation2(self):
# 计算互相关2/2
try:
a = self.processed_downsample_Abd[
Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Abd) - Config["PSGConfig"][
"PostCut"]].copy()
v = self.processed_downsample_orgBcg[
Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][
"PostCut"]].copy()
a *= 100
v *= 100
a = a.astype(int64)
v = v.astype(int64)
abd_relate = correlate(a, v, mode='full')
abd_relate = abd_relate / 10000
abd_relate2 = - abd_relate
result = {"abd_relate": abd_relate, "abd_relate2": abd_relate2}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FAILURE +
Constants.FAILURE_REASON[
"Calculate_Correlation2_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FINISHED, data=result)
def calculate_maxvalue_pos(self, tho_relate, tho_relate2, abd_relate, abd_relate2):
# 计算最大值位置
try:
tho_max = argmax(tho_relate)
tho_max2 = argmax(tho_relate2)
abd_max = argmax(abd_relate)
abd_max2 = argmax(abd_relate2)
pre = Config["PSGConfig"]["PreCut"] + Config["orgBcgConfig"]["PostCut"]
bias = pre - len(self.processed_downsample_orgBcg) + 1
result = {"tho_max": tho_max, "tho_max2": tho_max2, "abd_max": abd_max, "abd_max2": abd_max2, "bias": bias}
except Exception as e:
return Result().failure(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FAILURE +
Constants.FAILURE_REASON[
"Calculate_Maxvalue_Pos_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FINISHED, data=result)
def get_epoch(self):
# 获取epoch
try:
epoch_min = max(0, Config["pos"] // 30 // Config["ApplyFrequency"] + 1)
epoch_max = min(len(self.processed_downsample_Tho) // 30 // Config["ApplyFrequency"] - 1,
(len(self.processed_downsample_orgBcg) + Config["pos"]) // 30 // Config[
"ApplyFrequency"] - 1)
result = {"epoch_min": epoch_min, "epoch_max": epoch_max}
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.FAILURE_REASON[
"Get_Epoch_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)
def get_corr_by_epoch(self):
# 获取相关系数
try:
# 获取 epoch 区间
response = self.get_epoch()
if not response.status:
raise Exception(response.info)
epoch_min = response.data["epoch_min"]
epoch_max = response.data["epoch_max"]
temp_freq = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["TempFrequency"]
window_epoch = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["CorrByEpoch"]["window_epoch"]
tho_bias_list = []
abd_bias_list = []
# pos采样率转换
pos = Config["pos"] * temp_freq // Config["ApplyFrequency"]
for epoch in range(epoch_min, epoch_max):
SP = epoch * 30 * temp_freq
EP = (epoch + window_epoch) * 30 * temp_freq
tho_seg = self.processed_Tho[SP:EP]
abd_seg = self.processed_Abd[SP:EP]
orgBcg_seg = self.processed_orgBcg[SP - pos:EP - pos] * Config["orgBcg_reverse"]
tho_relate_seg = correlate(tho_seg, orgBcg_seg, mode='full')
abd_relate_seg = correlate(abd_seg, orgBcg_seg, mode='full')
tho_seg_pos = argmax(tho_relate_seg) - len(orgBcg_seg)
abd_seg_pos = argmax(abd_relate_seg) - len(orgBcg_seg)
tho_bias_list.append(tho_seg_pos // temp_freq)
abd_bias_list.append(abd_seg_pos // temp_freq)
result = {
"tho_bias_list": tho_bias_list,
"abd_bias_list": abd_bias_list,
"epoch_min": epoch_min,
"epoch_max": epoch_max
}
except Exception as e:
return Result().failure(
info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.FAILURE_REASON[
"Get_Corr_By_Epoch_Exception"] + "\n" + format_exc())
return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)