1307 lines
62 KiB
Python
1307 lines
62 KiB
Python
from gc import collect
|
||
from pathlib import Path
|
||
from traceback import format_exc
|
||
|
||
import matplotlib.pyplot as plt
|
||
from PySide6.QtWidgets import QMessageBox, QMainWindow, QApplication
|
||
from matplotlib.backends.backend_qt import NavigationToolbar2QT
|
||
from matplotlib.backends.backend_qtagg import FigureCanvasQTAgg as FigureCanvas
|
||
from matplotlib.figure import Figure
|
||
from numpy import repeat, convolve, ones, mean, std, int64, argmax, linspace, diff
|
||
from overrides import overrides
|
||
from pandas import read_csv, DataFrame
|
||
from scipy.signal import find_peaks, resample, butter, sosfiltfilt, correlate
|
||
from yaml import dump, load, FullLoader
|
||
|
||
from func.utils.PublicFunc import PublicFunc
|
||
from func.utils.Constants import Constants, ConfigParams
|
||
from func.utils.Result import Result
|
||
|
||
from ui.MainWindow.MainWindow_approximately_align import Ui_MainWindow_approximately_align
|
||
from ui.setting.approximately_align_input_setting import Ui_MainWindow_approximately_align_input_setting
|
||
|
||
Config = {
|
||
|
||
}
|
||
|
||
ButtonState = {
|
||
"Default": {
|
||
"pushButton_input_setting": True,
|
||
"pushButton_input": True,
|
||
"pushButton_Standardize": False,
|
||
"pushButton_CutOff": False,
|
||
"pushButton_GetPos": False,
|
||
"pushButton_ChangeView": False,
|
||
"pushButton_JUMP": False,
|
||
"pushButton_EM1": False,
|
||
"pushButton_EM10": False,
|
||
"pushButton_EM100": False,
|
||
"pushButton_EP1": False,
|
||
"pushButton_EP10": False,
|
||
"pushButton_EP100": False,
|
||
"pushButton_save": False,
|
||
"radioButton_PTHO": False,
|
||
"radioButton_PABD": False,
|
||
"radioButton_NTHO": False,
|
||
"radioButton_NABD": False,
|
||
"radioButton_custom": False
|
||
},
|
||
"Current": {
|
||
"pushButton_input_setting": True,
|
||
"pushButton_input": True,
|
||
"pushButton_Standardize": False,
|
||
"pushButton_CutOff": False,
|
||
"pushButton_GetPos": False,
|
||
"pushButton_ChangeView": False,
|
||
"pushButton_JUMP": False,
|
||
"pushButton_EM1": False,
|
||
"pushButton_EM10": False,
|
||
"pushButton_EM100": False,
|
||
"pushButton_EP1": False,
|
||
"pushButton_EP10": False,
|
||
"pushButton_EP100": False,
|
||
"pushButton_save": False,
|
||
"radioButton_PTHO": False,
|
||
"radioButton_PABD": False,
|
||
"radioButton_NTHO": False,
|
||
"radioButton_NABD": False,
|
||
"radioButton_custom": False
|
||
}
|
||
}
|
||
|
||
|
||
class SettingWindow(QMainWindow):
|
||
|
||
def __init__(self, root_path, sampID):
|
||
super(SettingWindow, self).__init__()
|
||
self.ui = Ui_MainWindow_approximately_align_input_setting()
|
||
self.ui.setupUi(self)
|
||
|
||
self.root_path = root_path
|
||
self.sampID = sampID
|
||
|
||
self.config = None
|
||
self.__read_config__()
|
||
|
||
self.ui.spinBox_input_orgBcg_freq.valueChanged.connect(self.__update_ui__)
|
||
self.ui.spinBox_input_Tho_freq.valueChanged.connect(self.__update_ui__)
|
||
self.ui.spinBox_input_Abd_freq.valueChanged.connect(self.__update_ui__)
|
||
self.ui.spinBox_bandpassOrder.valueChanged.connect(self.__update_ui__)
|
||
self.ui.doubleSpinBox_bandpassLow.valueChanged.connect(self.__update_ui__)
|
||
self.ui.doubleSpinBox_bandpassHigh.valueChanged.connect(self.__update_ui__)
|
||
self.ui.spinBox_display_freq.valueChanged.connect(self.__update_ui__)
|
||
self.ui.pushButton_confirm.clicked.connect(self.__write_config__)
|
||
self.ui.pushButton_cancel.clicked.connect(self.__rollback_config__)
|
||
self.ui.pushButton_cancel.clicked.connect(self.close)
|
||
|
||
def __read_config__(self):
|
||
if not Path(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH).exists():
|
||
with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "w") as f:
|
||
dump(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT, f)
|
||
|
||
with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "r") as f:
|
||
file_config = load(f.read(), Loader=FullLoader)
|
||
Config.update(file_config)
|
||
self.config = file_config
|
||
|
||
# 更新配置
|
||
|
||
Config.update({
|
||
"Path": {
|
||
"Input_orgBcg": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
|
||
Path(str(self.sampID)))),
|
||
"Input_Tho": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
|
||
Path(str(self.sampID)))),
|
||
"Input_Abd": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
|
||
Path(str(self.sampID)))),
|
||
"Save": str((Path(self.root_path) / ConfigParams.PUBLIC_PATH_PSG_TEXT /
|
||
Path(str(self.sampID)) / Path(ConfigParams.APPROXIMATELY_ALIGN_SAVE_FILENAME +
|
||
ConfigParams.ENDSWITH_CSV)))
|
||
},
|
||
"orgBcgConfig": {},
|
||
"PSGConfig": {}
|
||
})
|
||
|
||
# 数据回显
|
||
self.ui.spinBox_input_orgBcg_freq.setValue(Config["InputConfig"]["orgBcgFreq"])
|
||
self.ui.spinBox_input_Tho_freq.setValue(Config["InputConfig"]["ThoFreq"])
|
||
self.ui.spinBox_input_Abd_freq.setValue(Config["InputConfig"]["AbdFreq"])
|
||
self.ui.spinBox_bandpassOrder.setValue(Config["Filter"]["BandPassOrder"])
|
||
self.ui.doubleSpinBox_bandpassLow.setValue(Config["Filter"]["BandPassLow"])
|
||
self.ui.doubleSpinBox_bandpassHigh.setValue(Config["Filter"]["BandPassHigh"])
|
||
self.ui.spinBox_display_freq.setValue(Config["ApplyFrequency"])
|
||
self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText(Config["Path"]["Input_orgBcg"])
|
||
self.ui.plainTextEdit_file_path_input_Tho.setPlainText(Config["Path"]["Input_Tho"])
|
||
self.ui.plainTextEdit_file_path_input_Abd.setPlainText(Config["Path"]["Input_Abd"])
|
||
self.ui.plainTextEdit_file_path_save.setPlainText(Config["Path"]["Save"])
|
||
|
||
def __write_config__(self):
|
||
# 从界面写入配置
|
||
Config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value()
|
||
Config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value()
|
||
Config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value()
|
||
Config["ApplyFrequency"] = self.ui.spinBox_display_freq.value()
|
||
Config["Filter"]["BandPassOrder"] = self.ui.spinBox_bandpassOrder.value()
|
||
Config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandpassLow.value()
|
||
Config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandpassHigh.value()
|
||
Config["Path"]["Input_orgBcg"] = self.ui.plainTextEdit_file_path_input_orgBcg.toPlainText()
|
||
Config["Path"]["Input_Tho"] = self.ui.plainTextEdit_file_path_input_Tho.toPlainText()
|
||
Config["Path"]["Input_Abd"] = self.ui.plainTextEdit_file_path_input_Abd.toPlainText()
|
||
Config["Path"]["Save"] = self.ui.plainTextEdit_file_path_save.toPlainText()
|
||
|
||
# 保存配置到文件
|
||
self.config["InputConfig"]["orgBcgFreq"] = self.ui.spinBox_input_orgBcg_freq.value()
|
||
self.config["InputConfig"]["ThoFreq"] = self.ui.spinBox_input_Tho_freq.value()
|
||
self.config["InputConfig"]["AbdFreq"] = self.ui.spinBox_input_Abd_freq.value()
|
||
self.config["ApplyFrequency"] = self.ui.spinBox_display_freq.value()
|
||
self.config["Filter"]["BandPassOrder"] = self.ui.spinBox_bandpassOrder.value()
|
||
self.config["Filter"]["BandPassLow"] = self.ui.doubleSpinBox_bandpassLow.value()
|
||
self.config["Filter"]["BandPassHigh"] = self.ui.doubleSpinBox_bandpassHigh.value()
|
||
|
||
with open(ConfigParams.APPROXIMATELY_ALIGN_CONFIG_FILE_PATH, "w") as f:
|
||
dump(self.config, f)
|
||
|
||
self.close()
|
||
|
||
def __rollback_config__(self):
|
||
self.__read_config__()
|
||
|
||
def __update_ui__(self):
|
||
self.ui.plainTextEdit_file_path_input_orgBcg.setPlainText(
|
||
str((Path(self.root_path) /
|
||
ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
|
||
Path(str(self.sampID)) /
|
||
Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME +
|
||
str(self.ui.spinBox_input_orgBcg_freq.value()) +
|
||
ConfigParams.ENDSWITH_TXT))))
|
||
self.ui.plainTextEdit_file_path_input_Tho.setPlainText(
|
||
str((Path(self.root_path) /
|
||
ConfigParams.PUBLIC_PATH_PSG_TEXT /
|
||
Path(str(self.sampID)) /
|
||
Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_THO_FILENAME +
|
||
str(self.ui.spinBox_input_Tho_freq.value()) +
|
||
ConfigParams.ENDSWITH_TXT))))
|
||
self.ui.plainTextEdit_file_path_input_Abd.setPlainText(
|
||
str((Path(self.root_path) /
|
||
ConfigParams.PUBLIC_PATH_PSG_TEXT /
|
||
Path(str(self.sampID)) /
|
||
Path(ConfigParams.APPROXIMATELY_ALIGN_INPUT_ABD_FILENAME +
|
||
str(self.ui.spinBox_input_Abd_freq.value()) +
|
||
ConfigParams.ENDSWITH_TXT))))
|
||
self.ui.plainTextEdit_file_path_save.setPlainText(
|
||
str((Path(self.root_path) /
|
||
ConfigParams.PUBLIC_PATH_ORGBCG_TEXT /
|
||
Path(str(self.sampID)) /
|
||
Path(ConfigParams.APPROXIMATELY_ALIGN_SAVE_FILENAME +
|
||
ConfigParams.ENDSWITH_CSV))))
|
||
|
||
|
||
class MainWindow_approximately_align(QMainWindow):
|
||
|
||
def __init__(self):
|
||
super(MainWindow_approximately_align, self).__init__()
|
||
self.ui = Ui_MainWindow_approximately_align()
|
||
self.ui.setupUi(self)
|
||
|
||
self.root_path = None
|
||
self.sampID = None
|
||
|
||
self.data = None
|
||
|
||
self.setting = None
|
||
|
||
# 初始化进度条
|
||
self.progressbar = None
|
||
PublicFunc.add_progressbar(self)
|
||
|
||
# 初始化画框
|
||
self.fig = None
|
||
self.canvas = None
|
||
|
||
self.msgBox = QMessageBox()
|
||
self.msgBox.setWindowTitle(Constants.MAINWINDOW_MSGBOX_TITLE)
|
||
|
||
@overrides
|
||
def show(self, root_path, sampID):
|
||
super().show()
|
||
self.root_path = root_path
|
||
self.sampID = sampID
|
||
|
||
self.setting = SettingWindow(root_path, sampID)
|
||
|
||
self.fig = Figure(figsize=(12, 9), dpi=100)
|
||
self.fig.subplots_adjust(left=0.05, right=0.98, top=0.95, bottom=0.05)
|
||
self.canvas = FigureCanvas(self.fig)
|
||
self.figToolbar = NavigationToolbar2QT(self.canvas)
|
||
self.ui.verticalLayout_canvas.addWidget(self.canvas)
|
||
self.ui.verticalLayout_canvas.addWidget(self.figToolbar)
|
||
|
||
PublicFunc.__resetAllButton__(self, ButtonState)
|
||
|
||
# self.ui.groupBox_align_position.setEnabled(False)
|
||
|
||
self.ui.pushButton_input.clicked.connect(self.__slot_btn_input__)
|
||
self.ui.pushButton_input_setting.clicked.connect(self.setting.show)
|
||
self.ui.pushButton_Standardize.clicked.connect(self.__slot_btn_Standardize__)
|
||
self.ui.pushButton_CutOff.clicked.connect(self.__slot_btn_CutOff__)
|
||
self.ui.pushButton_GetPos.clicked.connect(self.__slot_btn_GetPosition__)
|
||
self.ui.pushButton_ChangeView.clicked.connect(self.__slot_btn_changeview__)
|
||
self.ui.pushButton_JUMP.clicked.connect(self.__slot_btn_jump__)
|
||
self.ui.pushButton_save.clicked.connect(self.__slot_btn_save__)
|
||
self.ui.pushButton_EM1.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EM10.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EM100.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EP1.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EP10.clicked.connect(self.__EpochChange__)
|
||
self.ui.pushButton_EP100.clicked.connect(self.__EpochChange__)
|
||
self.ui.radioButton_NTHO.clicked.connect(self.__enableAlign__)
|
||
self.ui.radioButton_PTHO.clicked.connect(self.__enableAlign__)
|
||
self.ui.radioButton_PABD.clicked.connect(self.__enableAlign__)
|
||
self.ui.radioButton_NABD.clicked.connect(self.__enableAlign__)
|
||
self.ui.radioButton_custom.clicked.connect(self.__enableAlign__)
|
||
|
||
|
||
@overrides
|
||
def closeEvent(self, event):
|
||
reply = QMessageBox.question(self, '确认', '确认退出吗?', QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||
if reply == QMessageBox.Yes:
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
PublicFunc.statusbar_show_msg(self, PublicFunc.format_status_msg(Constants.SHUTTING_DOWN))
|
||
QApplication.processEvents()
|
||
|
||
# 释放资源
|
||
del self.data
|
||
self.fig.clf()
|
||
plt.close(self.fig)
|
||
self.deleteLater()
|
||
collect()
|
||
self.canvas = None
|
||
event.accept()
|
||
else:
|
||
event.ignore()
|
||
|
||
def __reset__(self):
|
||
ButtonState["Current"].update(ButtonState["Default"].copy())
|
||
ButtonState["Current"]["pushButton_Standardize"] = True
|
||
self.fig.clf()
|
||
self.fig.canvas.draw()
|
||
self.ui.spinBox_PSGPreA.setValue(0)
|
||
self.ui.spinBox_PSGPreCut.setValue(0)
|
||
self.ui.spinBox_PSGPostCut.setValue(0)
|
||
self.ui.spinBox_orgBcgPreA.setValue(0)
|
||
self.ui.spinBox_orgBcgPreCut.setValue(0)
|
||
self.ui.spinBox_orgBcgPostCut.setValue(0)
|
||
self.ui.radioButton_NABD.setChecked(False)
|
||
self.ui.radioButton_NTHO.setChecked(False)
|
||
self.ui.radioButton_PABD.setChecked(False)
|
||
self.ui.radioButton_PTHO.setChecked(False)
|
||
self.ui.radioButton_custom.setChecked(False)
|
||
self.ui.spinBox_SelectEpoch.setValue(0)
|
||
self.ui.spinBox_custom.setValue(0)
|
||
self.ui.spinBox_SelectEpoch.setToolTip("")
|
||
self.ui.radioButton_PABD.setText("备选3")
|
||
self.ui.radioButton_PTHO.setText("备选1")
|
||
self.ui.radioButton_NABD.setText("备选4")
|
||
self.ui.radioButton_NTHO.setText("备选2")
|
||
self.ui.spinBox_SelectEpoch.setMinimum(0)
|
||
|
||
def __plot__(self, *args, **kwargs):
|
||
sender = self.sender()
|
||
self.fig.clf()
|
||
result = None
|
||
try:
|
||
if sender == self.ui.pushButton_Standardize:
|
||
result = self.DrawPicRawOverview()
|
||
elif sender == self.ui.pushButton_CutOff:
|
||
result = self.DrawPicOverviewWithCutOff()
|
||
elif sender == self.ui.pushButton_GetPos:
|
||
result = self.DrawPicCorrelate(*args, **kwargs)
|
||
elif sender == self.ui.radioButton_NTHO:
|
||
result = self.DrawPicTryAlign()
|
||
elif sender == self.ui.radioButton_NABD:
|
||
result = self.DrawPicTryAlign()
|
||
elif sender == self.ui.radioButton_PTHO:
|
||
result = self.DrawPicTryAlign()
|
||
elif sender == self.ui.radioButton_PABD:
|
||
result = self.DrawPicTryAlign()
|
||
elif sender == self.ui.radioButton_custom:
|
||
result = self.DrawPicTryAlign()
|
||
elif sender == self.ui.pushButton_ChangeView:
|
||
result = self.DrawAlignScatter()
|
||
elif sender == self.ui.pushButton_JUMP:
|
||
result = self.DrawPicByEpoch(*args, **kwargs)
|
||
elif sender == self.ui.pushButton_EM1:
|
||
result = self.DrawPicByEpoch(*args, **kwargs)
|
||
elif sender == self.ui.pushButton_EM10:
|
||
result = self.DrawPicByEpoch(*args, **kwargs)
|
||
elif sender == self.ui.pushButton_EM100:
|
||
result = self.DrawPicByEpoch(*args, **kwargs)
|
||
elif sender == self.ui.pushButton_EP1:
|
||
result = self.DrawPicByEpoch(*args, **kwargs)
|
||
elif sender == self.ui.pushButton_EP10:
|
||
result = self.DrawPicByEpoch(*args, **kwargs)
|
||
elif sender == self.ui.pushButton_EP100:
|
||
result = self.DrawPicByEpoch(*args, **kwargs)
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc())
|
||
|
||
if result is None:
|
||
return Result().failure(info=Constants.DRAWING_FAILURE)
|
||
else:
|
||
return result
|
||
|
||
def __slot_btn_input__(self):
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
self.data = Data()
|
||
|
||
# 导入数据
|
||
PublicFunc.progressbar_update(self, 1, 3, Constants.INPUTTING_DATA, 0)
|
||
result = self.data.open_file()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
orgBcg_seconds = round(self.data.raw_orgBcg.shape[0] / Config["InputConfig"]["orgBcgFreq"])
|
||
PSG_seconds = round(self.data.raw_Tho.shape[0] / Config["InputConfig"]["ThoFreq"])
|
||
Config.update({
|
||
"orgBcg_seconds": orgBcg_seconds,
|
||
"PSG_seconds": PSG_seconds
|
||
})
|
||
self.ui.label_orgBcg_length.setText(str(orgBcg_seconds))
|
||
self.ui.label_PSG_length.setText(str(PSG_seconds))
|
||
self.__reset__()
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
|
||
def __slot_btn_save__(self):
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
# 保存对齐信息
|
||
PublicFunc.progressbar_update(self, 1, 1, Constants.SAVING_DATA, 0)
|
||
epoch = self.ui.spinBox_SelectEpoch.value()
|
||
result = self.data.save(epoch)
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
PublicFunc.msgbox_output(self, result.info, Constants.TIPS_TYPE_INFO)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
|
||
def __slot_btn_Standardize__(self):
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
Config["RawSignal"] = self.ui.checkBox_RawSignal.isChecked()
|
||
Config["orgBcgConfig"].update({
|
||
"orgBcgDelBase": self.ui.checkBox_orgBcgDelBase.isChecked(),
|
||
"orgBcgZScore": self.ui.checkBox_orgBcgZScore.isChecked()
|
||
})
|
||
Config["PSGConfig"].update({
|
||
"PSGDelBase": self.ui.checkBox_PSGDelBase.isChecked(),
|
||
"PSGZScore": self.ui.checkBox_PSGZScore.isChecked()
|
||
})
|
||
|
||
if Config["RawSignal"]:
|
||
# 仅重采样
|
||
PublicFunc.progressbar_update(self, 1, 1, Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLING, 0)
|
||
result = self.data.Standardize_0()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
else:
|
||
# 呼吸提取
|
||
PublicFunc.progressbar_update(self, 1, 5, Constants.APPROXIMATELY_RESP_GETTING, 0)
|
||
result = self.data.Standardize_1()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
# 预重采样
|
||
PublicFunc.progressbar_update(self, 2, 5, Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLING, 20)
|
||
result = self.data.Standardize_2()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(2/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
# 去基线
|
||
PublicFunc.progressbar_update(self, 3, 5, Constants.APPROXIMATELY_DELETING_BASE, 40)
|
||
result = self.data.Standardize_3()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(3/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
# 标准化
|
||
PublicFunc.progressbar_update(self, 4, 5, Constants.APPROXIMATELY_STANDARDIZING, 60)
|
||
result = self.data.Standardize_4()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(4/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
# 重采样
|
||
PublicFunc.progressbar_update(self, 5, 5, Constants.APPROXIMATELY_ALIGN_RESAMPLING, 80)
|
||
result = self.data.Standardize_5()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(5/5)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
self.__plot__()
|
||
ButtonState["Current"]["pushButton_CutOff"] = True
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
|
||
def __slot_btn_CutOff__(self):
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
Config["orgBcgConfig"].update({"PreA": self.ui.spinBox_orgBcgPreA.value(),
|
||
"PreCut": self.ui.spinBox_orgBcgPreCut.value(),
|
||
"PostCut": self.ui.spinBox_orgBcgPostCut.value()})
|
||
Config["PSGConfig"].update({"PreA": self.ui.spinBox_PSGPreA.value(),
|
||
"PreCut": self.ui.spinBox_PSGPreCut.value(),
|
||
"PostCut": self.ui.spinBox_PSGPostCut.value()})
|
||
|
||
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
|
||
result = self.__plot__()
|
||
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
ButtonState["Current"]["pushButton_GetPos"] = True
|
||
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
|
||
def __slot_btn_GetPosition__(self):
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
# 计算互相关1/2
|
||
PublicFunc.progressbar_update(self, 1, 4, Constants.APPROXIMATELY_CORRELATION_CALCULATING1, 0)
|
||
result1 = self.data.calculate_correlation1()
|
||
if not result1.status:
|
||
PublicFunc.text_output(self.ui, "(1/4)" + result1.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result1.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/4)" + result1.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
# 计算互相关2/2
|
||
PublicFunc.progressbar_update(self, 2, 4, Constants.APPROXIMATELY_CORRELATION_CALCULATING2, 25)
|
||
result2 = self.data.calculate_correlation2()
|
||
if not result2.status:
|
||
PublicFunc.text_output(self.ui, "(2/4)" + result2.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result2.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(2/4)" + result2.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
# 绘图
|
||
PublicFunc.progressbar_update(self, 3, 4, Constants.DRAWING_DATA, 50)
|
||
result = self.__plot__(result1.data["tho_relate"], result1.data["tho_relate2"], result2.data["abd_relate"],
|
||
result2.data["abd_relate2"])
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(3/4)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
# 计算最大值位置
|
||
PublicFunc.progressbar_update(self, 4, 4, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 90)
|
||
result = self.data.calculate_maxvalue_pos(result1.data["tho_relate"], result1.data["tho_relate2"],
|
||
result2.data["abd_relate"], result2.data["abd_relate2"])
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(4/4)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
self.ui.radioButton_PABD.setText(str(result.data["abd_max"] + result.data["bias"]))
|
||
self.ui.radioButton_PTHO.setText(str(result.data["tho_max"] + result.data["bias"]))
|
||
self.ui.radioButton_NABD.setText(str(result.data["abd_max2"] + result.data["bias"]))
|
||
self.ui.radioButton_NTHO.setText(str(result.data["tho_max2"] + result.data["bias"]))
|
||
ButtonState["Current"]["radioButton_PTHO"] = True
|
||
ButtonState["Current"]["radioButton_PABD"] = True
|
||
ButtonState["Current"]["radioButton_NTHO"] = True
|
||
ButtonState["Current"]["radioButton_NABD"] = True
|
||
ButtonState["Current"]["radioButton_custom"] = True
|
||
# self.ui.groupBox_align_position.setEnabled(True)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
|
||
def __slot_btn_changeview__(self):
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
# 绘图
|
||
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
|
||
result = self.__plot__()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
|
||
def __slot_btn_jump__(self):
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
epoch = self.ui.spinBox_SelectEpoch.value()
|
||
|
||
# 绘图
|
||
PublicFunc.progressbar_update(self, 1, 1, Constants.DRAWING_DATA, 0)
|
||
result = self.__plot__(epoch=epoch)
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/1)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
|
||
def __EpochChange__(self):
|
||
# 获取当前值
|
||
value = self.ui.spinBox_SelectEpoch.value()
|
||
# 判断按键
|
||
if self.sender() == self.ui.pushButton_EM1:
|
||
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 1)
|
||
elif self.sender() == self.ui.pushButton_EM10:
|
||
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 10)
|
||
elif self.sender() == self.ui.pushButton_EM100:
|
||
epoch = max(self.ui.spinBox_SelectEpoch.minimum(), value - 100)
|
||
elif self.sender() == self.ui.pushButton_EP1:
|
||
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 1)
|
||
elif self.sender() == self.ui.pushButton_EP10:
|
||
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 10)
|
||
elif self.sender() == self.ui.pushButton_EP100:
|
||
epoch = min(self.ui.spinBox_SelectEpoch.maximum(), value + 100)
|
||
else:
|
||
return
|
||
self.ui.spinBox_SelectEpoch.setValue(epoch)
|
||
QApplication.processEvents()
|
||
self.__slot_btn_jump__()
|
||
|
||
def __enableAlign__(self):
|
||
PublicFunc.__disableAllButton__(self, ButtonState)
|
||
|
||
if self.ui.radioButton_NTHO.isChecked():
|
||
relate = int(self.ui.radioButton_NTHO.text())
|
||
reverse = -1
|
||
elif self.ui.radioButton_PTHO.isChecked():
|
||
relate = int(self.ui.radioButton_PTHO.text())
|
||
reverse = 1
|
||
elif self.ui.radioButton_PABD.isChecked():
|
||
relate = int(self.ui.radioButton_PABD.text())
|
||
reverse = 1
|
||
elif self.ui.radioButton_NABD.isChecked():
|
||
relate = int(self.ui.radioButton_NABD.text())
|
||
reverse = -1
|
||
elif self.ui.radioButton_custom.isChecked():
|
||
relate = int(self.ui.spinBox_custom.value())
|
||
reverse = 1
|
||
else:
|
||
return
|
||
# 最大相关系数值相对于PSG的位置
|
||
Config.update({"pos": relate})
|
||
# 设置epoch上下限
|
||
Config.update({"orgBcg_reverse": reverse})
|
||
PublicFunc.text_output(self.ui, "相对位置:{}".format(Config["pos"]), Constants.TIPS_TYPE_INFO)
|
||
|
||
# 获取epoch
|
||
PublicFunc.progressbar_update(self, 1, 2, Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATING, 0)
|
||
result3 = self.data.get_epoch()
|
||
if not result3.status:
|
||
PublicFunc.text_output(self.ui, "(1/2)" + result3.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result3.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(1/2)" + result3.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
# 绘图
|
||
PublicFunc.progressbar_update(self, 2, 2, Constants.DRAWING_DATA, 30)
|
||
result = self.__plot__()
|
||
if not result.status:
|
||
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_ERROR)
|
||
PublicFunc.msgbox_output(self, result.info, Constants.MSGBOX_TYPE_ERROR)
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
return
|
||
else:
|
||
PublicFunc.text_output(self.ui, "(2/2)" + result.info, Constants.TIPS_TYPE_INFO)
|
||
|
||
self.ui.spinBox_SelectEpoch.setMinimum(result3.data["epoch_min"])
|
||
self.ui.spinBox_SelectEpoch.setMaximum(result3.data["epoch_max"])
|
||
self.ui.spinBox_SelectEpoch.setValue(result3.data["epoch_min"])
|
||
self.ui.spinBox_SelectEpoch.setToolTip(
|
||
"最小值:{}\n最大值:{}".format(result3.data["epoch_min"], result3.data["epoch_max"]))
|
||
self.ui.spinBox_SelectEpoch.setEnabled(True)
|
||
ButtonState["Current"]["pushButton_JUMP"] = True
|
||
ButtonState["Current"]["pushButton_EM1"] = True
|
||
ButtonState["Current"]["pushButton_EM10"] = True
|
||
ButtonState["Current"]["pushButton_EM100"] = True
|
||
ButtonState["Current"]["pushButton_EP1"] = True
|
||
ButtonState["Current"]["pushButton_EP10"] = True
|
||
ButtonState["Current"]["pushButton_EP100"] = True
|
||
ButtonState["Current"]["pushButton_save"] = True
|
||
ButtonState["Current"]["pushButton_ChangeView"] = True
|
||
PublicFunc.finish_operation(self, ButtonState)
|
||
|
||
def DrawPicRawOverview(self):
|
||
try:
|
||
max_x = max(self.data.processed_downsample_Tho.shape[0], self.data.processed_downsample_Abd.shape[0],
|
||
self.data.processed_downsample_orgBcg.shape[0])
|
||
ax1 = self.fig.add_subplot(311)
|
||
ax1.plot(self.data.processed_downsample_Tho, color='blue')
|
||
ax1.set_xlim(0, max_x)
|
||
ax1.set_title("THO")
|
||
|
||
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
|
||
ax2.plot(self.data.processed_downsample_orgBcg, color='blue')
|
||
ax2.set_xlim(0, max_x)
|
||
ax2.set_title("orgBcg")
|
||
|
||
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
|
||
ax3.plot(self.data.processed_downsample_Abd, color='blue')
|
||
ax3.set_xlim(0, max_x)
|
||
ax3.set_title("ABD")
|
||
|
||
self.fig.canvas.draw()
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc())
|
||
# 返回图片以便存到QPixImage
|
||
return Result().success(info=Constants.DRAWING_FINISHED)
|
||
|
||
def DrawPicOverviewWithCutOff(self):
|
||
try:
|
||
max_x = max(self.data.processed_downsample_Tho.shape[0] + Config["PSGConfig"]["PreA"],
|
||
self.data.processed_downsample_orgBcg.shape[0] + Config["orgBcgConfig"]["PreA"])
|
||
min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"], 0)
|
||
ax1 = self.fig.add_subplot(311)
|
||
ax1.plot(
|
||
linspace(Config["PSGConfig"]["PreA"],
|
||
len(self.data.processed_downsample_Tho) + Config["PSGConfig"]["PreA"],
|
||
len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue')
|
||
# 绘制x = PreCut的线 和 x = PostCut的虚线
|
||
ax1.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red',
|
||
linestyle='--')
|
||
ax1.axvline(
|
||
x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][
|
||
"PreA"],
|
||
color='red', linestyle='--')
|
||
|
||
ax1.set_xlim(min_x, max_x)
|
||
ax1.set_title("THO")
|
||
|
||
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
|
||
ax2.plot(
|
||
linspace(Config["orgBcgConfig"]["PreA"],
|
||
len(self.data.processed_downsample_orgBcg) + Config["orgBcgConfig"]["PreA"],
|
||
len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg,
|
||
color='blue')
|
||
ax2.axvline(x=Config["orgBcgConfig"]["PreCut"] + Config["orgBcgConfig"]["PreA"], color='red',
|
||
linestyle='--')
|
||
ax2.axvline(
|
||
x=len(self.data.processed_downsample_orgBcg) - Config["orgBcgConfig"]["PostCut"] +
|
||
Config["orgBcgConfig"]["PreA"],
|
||
color='red', linestyle='--')
|
||
ax2.set_xlim(min_x, max_x)
|
||
ax2.set_title("orgBcg")
|
||
|
||
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
|
||
ax3.plot(
|
||
linspace(Config["PSGConfig"]["PreA"],
|
||
len(self.data.processed_downsample_Abd) + Config["PSGConfig"]["PreA"],
|
||
len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue')
|
||
ax3.axvline(x=Config["PSGConfig"]["PreCut"] + Config["PSGConfig"]["PreA"], color='red',
|
||
linestyle='--')
|
||
ax3.axvline(
|
||
x=len(self.data.processed_downsample_Tho) - Config["PSGConfig"]["PostCut"] + Config["PSGConfig"][
|
||
"PreA"],
|
||
color='red', linestyle='--')
|
||
ax3.set_xlim(min_x, max_x)
|
||
ax3.set_title("ABD")
|
||
|
||
self.fig.canvas.draw()
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc())
|
||
# 返回图片以便存到QPixImage
|
||
return Result().success(info=Constants.DRAWING_FINISHED)
|
||
|
||
def DrawPicCorrelate(self, tho_pxx, tho_nxx, abd_pxx, abd_nxx):
|
||
try:
|
||
ax1 = self.fig.add_subplot(221)
|
||
ax1.plot(tho_pxx, color='blue')
|
||
ax1.set_title("The Correlation of THO and orgBcg")
|
||
|
||
ax2 = self.fig.add_subplot(222, sharex=ax1, sharey=ax1)
|
||
ax2.plot(tho_nxx, color='blue')
|
||
ax2.set_title("The Correlation of THO and Reverse orgBcg")
|
||
|
||
ax3 = self.fig.add_subplot(223, sharex=ax1, sharey=ax1)
|
||
ax3.plot(abd_pxx, color='blue')
|
||
ax3.set_title("The Correlation of ABD and orgBcg")
|
||
|
||
ax4 = self.fig.add_subplot(224, sharex=ax1, sharey=ax1)
|
||
ax4.plot(abd_nxx, color='blue')
|
||
ax4.set_title("The Correlation of ABD and Reverse orgBcg")
|
||
|
||
self.fig.canvas.draw()
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc())
|
||
# 返回图片以便存到QPixImage
|
||
return Result().success(info=Constants.DRAWING_FINISHED)
|
||
|
||
def DrawPicTryAlign(self):
|
||
try:
|
||
max_x = max(self.data.processed_downsample_Tho.shape[0],
|
||
self.data.processed_downsample_orgBcg.shape[0] + Config["pos"])
|
||
min_x = min(Config["PSGConfig"]["PreA"], Config["orgBcgConfig"]["PreA"] + Config["pos"], 0)
|
||
|
||
ax1 = self.fig.add_subplot(311)
|
||
ax1.plot(
|
||
linspace(0, len(self.data.processed_downsample_Tho),
|
||
len(self.data.processed_downsample_Tho)), self.data.processed_downsample_Tho, color='blue')
|
||
# 绘制x = PreCut的线 和 x = PostCut的虚线
|
||
ax1.set_xlim(min_x, max_x)
|
||
ax1.set_title("THO")
|
||
|
||
ax2 = self.fig.add_subplot(312, sharex=ax1, sharey=ax1)
|
||
ax2.plot(linspace(Config["pos"],
|
||
len(self.data.processed_downsample_orgBcg) + Config["pos"],
|
||
len(self.data.processed_downsample_orgBcg)), self.data.processed_downsample_orgBcg,
|
||
color='blue')
|
||
ax2.set_xlim(min_x, max_x)
|
||
ax2.set_title("orgBcg")
|
||
|
||
ax3 = self.fig.add_subplot(313, sharex=ax1, sharey=ax1)
|
||
ax3.plot(
|
||
linspace(0, len(self.data.processed_downsample_Abd),
|
||
len(self.data.processed_downsample_Abd)), self.data.processed_downsample_Abd, color='blue')
|
||
ax3.set_xlim(min_x, max_x)
|
||
ax3.set_title("ABD")
|
||
|
||
self.fig.canvas.draw()
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc())
|
||
# 返回图片以便存到QPixImage
|
||
return Result().success(info=Constants.DRAWING_FINISHED)
|
||
|
||
def DrawPicByEpoch(self, epoch):
|
||
try:
|
||
PSG_SP = epoch * 30 * Config["ApplyFrequency"]
|
||
PSG_EP = (epoch + 6) * 30 * Config["ApplyFrequency"]
|
||
|
||
orgBcg_SP = PSG_SP - Config["pos"]
|
||
orgBcg_EP = PSG_EP - Config["pos"]
|
||
|
||
tho_seg = self.data.processed_downsample_Tho[PSG_SP:PSG_EP]
|
||
orgBcg_seg = self.data.processed_downsample_orgBcg[orgBcg_SP:orgBcg_EP] * Config["orgBcg_reverse"]
|
||
abd_seg = self.data.processed_downsample_Abd[PSG_SP:PSG_EP]
|
||
|
||
# 根据PSG来和绘制
|
||
ax1 = self.fig.add_subplot(321)
|
||
ax1.plot(linspace(PSG_SP, PSG_EP, len(tho_seg)), tho_seg)
|
||
tho_peaks, _ = find_peaks(tho_seg, prominence=0, distance=3 * Config["ApplyFrequency"])
|
||
ax1.plot(linspace(PSG_SP, PSG_EP, len(tho_seg))[tho_peaks], tho_seg[tho_peaks], "x")
|
||
|
||
ax3 = self.fig.add_subplot(323)
|
||
ax3.plot(linspace(orgBcg_SP, orgBcg_EP, len(orgBcg_seg)), orgBcg_seg)
|
||
orgBcg_peaks, _ = find_peaks(orgBcg_seg, prominence=0, distance=3 * Config["ApplyFrequency"])
|
||
ax3.plot(linspace(orgBcg_SP, orgBcg_EP, len(orgBcg_seg))[orgBcg_peaks], orgBcg_seg[orgBcg_peaks], "x")
|
||
|
||
ax2 = self.fig.add_subplot(325)
|
||
ax2.plot(linspace(PSG_SP, PSG_EP, len(abd_seg)), abd_seg)
|
||
abd_peaks, _ = find_peaks(abd_seg, prominence=0, distance=3 * Config["ApplyFrequency"])
|
||
ax2.plot(linspace(PSG_SP, PSG_EP, len(abd_seg))[abd_peaks], abd_seg[abd_peaks], "x")
|
||
|
||
# 绘制间期
|
||
ax4 = self.fig.add_subplot(322)
|
||
ax4.plot(linspace(PSG_SP, PSG_EP, len(diff(tho_peaks).repeat(Config["ApplyFrequency"]))),
|
||
diff(tho_peaks).repeat(Config["ApplyFrequency"]), alpha=0.5, label="tho")
|
||
ax4.plot(linspace(PSG_SP, PSG_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))),
|
||
diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp")
|
||
ax4.set_title("tho_interval")
|
||
ax4.legend()
|
||
ax4.set_ylim((10, 50))
|
||
|
||
ax5 = self.fig.add_subplot(324)
|
||
ax5.plot(linspace(orgBcg_SP, orgBcg_EP, len(diff(tho_peaks).repeat(Config["ApplyFrequency"]))),
|
||
diff(tho_peaks).repeat(Config["ApplyFrequency"]))
|
||
ax5.plot(linspace(orgBcg_SP, orgBcg_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))),
|
||
diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp")
|
||
ax5.set_title("resp_interval")
|
||
ax5.set_ylim((10, 50))
|
||
|
||
ax6 = self.fig.add_subplot(326)
|
||
ax6.plot(linspace(PSG_SP, PSG_EP, len(diff(abd_peaks).repeat(Config["ApplyFrequency"]))),
|
||
diff(abd_peaks).repeat(Config["ApplyFrequency"]))
|
||
ax6.plot(linspace(PSG_SP, PSG_EP, len(diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]))),
|
||
diff(orgBcg_peaks).repeat(Config["ApplyFrequency"]), label="resp")
|
||
ax6.set_title("abd_interval")
|
||
ax6.set_ylim((10, 50))
|
||
|
||
self.fig.canvas.draw()
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc())
|
||
# 返回图片以便存到QPixImage
|
||
return Result().success(info=Constants.DRAWING_FINISHED)
|
||
|
||
def DrawAlignScatter(self):
|
||
try:
|
||
response = self.data.get_corr_by_epoch()
|
||
epoch_min = response.data["epoch_min"]
|
||
epoch_max = response.data["epoch_max"]
|
||
tho_bias_list = response.data["tho_bias_list"]
|
||
abd_bias_list = response.data["abd_bias_list"]
|
||
|
||
ax1 = self.fig.add_subplot(211)
|
||
ax1.scatter(linspace(epoch_min, epoch_max, len(tho_bias_list)), tho_bias_list, alpha=0.2)
|
||
ax1.set_title("THO")
|
||
|
||
ax2 = self.fig.add_subplot(212)
|
||
ax2.scatter(linspace(epoch_min, epoch_max, len(abd_bias_list)), abd_bias_list, alpha=0.2)
|
||
ax2.set_title("ABD")
|
||
self.fig.canvas.draw()
|
||
|
||
return Result().success(info=Constants.DRAWING_FINISHED)
|
||
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.DRAWING_FAILURE + "\n" + format_exc())
|
||
|
||
|
||
|
||
class Data:
|
||
|
||
def __init__(self):
|
||
self.raw_orgBcg = None
|
||
self.raw_Tho = None
|
||
self.raw_Abd = None
|
||
self.processed_orgBcg = None
|
||
self.processed_Tho = None
|
||
self.processed_Abd = None
|
||
self.processed_downsample_orgBcg = None
|
||
self.processed_downsample_Tho = None
|
||
self.processed_downsample_Abd = None
|
||
self.relate_list = None
|
||
self.relate_point = None
|
||
|
||
def open_file(self):
|
||
if not Path(Config["Path"]["Input_orgBcg"]).exists():
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_Path_Not_Exist"])
|
||
temp_orgBcg_path = list(
|
||
Path(Config["Path"]["Input_orgBcg"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_ORGBCG_FILENAME}*"))
|
||
if len(temp_orgBcg_path) == 0:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_File_Not_Exist"])
|
||
elif len(temp_orgBcg_path) > 1:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_File_More_Than_One"])
|
||
else:
|
||
orgBcg_path = temp_orgBcg_path[0]
|
||
Config["Path"]["Input_orgBcg"] = str(orgBcg_path)
|
||
orgBcg_freq = orgBcg_path.stem.split("_")[-1]
|
||
# 验证是否为整数或是否存在
|
||
if not orgBcg_freq.isdigit():
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "orgBcg: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_Frequency_Not_In_Filename"])
|
||
Config["InputConfig"]["orgBcgFreq"] = int(orgBcg_freq)
|
||
|
||
if not Path(Config["Path"]["Input_Tho"]).exists():
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_Path_Not_Exist"])
|
||
temp_Tho_path = list(
|
||
Path(Config["Path"]["Input_Tho"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_THO_FILENAME}*"))
|
||
if len(temp_Tho_path) == 0:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_File_Not_Exist"])
|
||
elif len(temp_Tho_path) > 1:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_File_More_Than_One"])
|
||
else:
|
||
tho_path = temp_Tho_path[0]
|
||
Config["Path"]["Input_Tho"] = str(tho_path)
|
||
tho_freq = tho_path.stem.split("_")[-1]
|
||
# 验证是否为整数或是否存在
|
||
if not tho_freq.isdigit():
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "Tho: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_Frequency_Not_In_Filename"])
|
||
Config["InputConfig"]["ThoFreq"] = int(tho_freq)
|
||
|
||
|
||
if not Path(Config["Path"]["Input_Abd"]).exists():
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_Path_Not_Exist"])
|
||
|
||
temp_Abd_path = list(
|
||
Path(Config["Path"]["Input_Abd"]).glob(f"{ConfigParams.APPROXIMATELY_ALIGN_INPUT_ABD_FILENAME}*"))
|
||
if len(temp_Abd_path) == 0:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_File_Not_Exist"])
|
||
elif len(temp_Abd_path) > 1:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_File_More_Than_One"])
|
||
else:
|
||
abd_path = temp_Abd_path[0]
|
||
Config["Path"]["Input_Abd"] = str(abd_path)
|
||
abd_freq = abd_path.stem.split("_")[-1]
|
||
# 验证是否为整数或是否存在
|
||
if not abd_freq.isdigit():
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "Abd: " + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Data_Frequency_Not_In_Filename"])
|
||
Config["InputConfig"]["AbdFreq"] = int(abd_freq)
|
||
|
||
try:
|
||
self.raw_orgBcg = read_csv(Config["Path"]["Input_orgBcg"],
|
||
encoding=ConfigParams.UTF8_ENCODING,
|
||
header=None).to_numpy().reshape(-1)
|
||
self.raw_Tho = read_csv(Config["Path"]["Input_Tho"],
|
||
encoding=ConfigParams.UTF8_ENCODING,
|
||
header=None).to_numpy().reshape(-1)
|
||
self.raw_Abd = read_csv(Config["Path"]["Input_Abd"],
|
||
encoding=ConfigParams.UTF8_ENCODING,
|
||
header=None).to_numpy().reshape(-1)
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.INPUT_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Read_Data_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.INPUT_FINISHED)
|
||
|
||
def save(self, epoch):
|
||
try:
|
||
pos = Config["pos"]
|
||
ApplyFrequency = Config["ApplyFrequency"]
|
||
# 保存到csv中
|
||
df = DataFrame({"pos": [pos], "epoch": [epoch], "ApplyFrequency": [ApplyFrequency]})
|
||
df.to_csv(Path(Config["Path"]["Save"]), mode="w", header=True, index=False)
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.SAVING_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Save_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.SAVING_FINISHED)
|
||
|
||
def Standardize_0(self):
|
||
# 仅重采样
|
||
if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Raw_Data_Not_Exist"])
|
||
try:
|
||
# 按照秒数进行截断
|
||
self.raw_orgBcg = self.raw_orgBcg[:int(Config["orgBcg_seconds"] * Config["InputConfig"]["orgBcgFreq"])]
|
||
self.raw_Tho = self.raw_Tho[:int(Config["PSG_seconds"] * Config["InputConfig"]["ThoFreq"])]
|
||
self.raw_Abd = self.raw_Abd[:int(Config["PSG_seconds"] * Config["InputConfig"]["AbdFreq"])]
|
||
|
||
self.processed_orgBcg = resample(self.raw_orgBcg,
|
||
int(Config["orgBcg_seconds"] * Config["ApplyFrequency"]))
|
||
self.processed_Tho = resample(self.raw_Tho, int(Config["PSG_seconds"] * Config["ApplyFrequency"]))
|
||
self.processed_Abd = resample(self.raw_Abd, int(Config["PSG_seconds"] * Config["ApplyFrequency"]))
|
||
except Exception as e:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Only_Resample_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FINISHED)
|
||
|
||
def Standardize_1(self):
|
||
# 呼吸提取
|
||
def butter_bandpass_filter(data, lowCut, highCut, fs, order):
|
||
low = lowCut / (fs * 0.5)
|
||
high = highCut / (fs * 0.5)
|
||
sos = butter(order, [low, high], btype="bandpass", output='sos')
|
||
return sosfiltfilt(sos, data)
|
||
|
||
if self.raw_orgBcg is None or self.raw_Tho is None or self.raw_Abd is None:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_ONLY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Raw_Data_Not_Exist"])
|
||
try:
|
||
# 滤波
|
||
self.processed_orgBcg = butter_bandpass_filter(
|
||
self.raw_orgBcg, Config["Filter"]["BandPassLow"],
|
||
Config["Filter"]["BandPassHigh"],
|
||
Config["InputConfig"]["orgBcgFreq"],
|
||
Config["Filter"]["BandPassOrder"])
|
||
self.processed_Tho = butter_bandpass_filter(
|
||
self.raw_Tho, Config["Filter"]["BandPassLow"],
|
||
Config["Filter"]["BandPassHigh"],
|
||
Config["InputConfig"]["ThoFreq"],
|
||
Config["Filter"]["BandPassOrder"])
|
||
self.processed_Abd = butter_bandpass_filter(
|
||
self.raw_Abd, Config["Filter"]["BandPassLow"],
|
||
Config["Filter"]["BandPassHigh"],
|
||
Config["InputConfig"]["AbdFreq"],
|
||
Config["Filter"]["BandPassOrder"])
|
||
except Exception as e:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_RESP_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Resp_Get_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_RESP_GET_FINISHED)
|
||
|
||
def Standardize_2(self):
|
||
# 预重采样
|
||
try:
|
||
# TODO:这里的采样率处理,如果THO和ABD的采样率不同,可能还是会导致之后的ApplyFrequency出问题,最后导致得到的粗同步坐标不正确
|
||
#
|
||
if Config["InputConfig"]["ThoFreq"] != Config["TempFrequency"]:
|
||
print(int(Config["InputConfig"]["ThoFreq"]), int(Config["TempFrequency"]))
|
||
self.processed_Tho = resample(self.processed_Tho,
|
||
int(Config["PSG_seconds"] * Config["TempFrequency"]))
|
||
|
||
if Config["InputConfig"]["AbdFreq"] != Config["TempFrequency"]:
|
||
self.processed_Abd = resample(self.processed_Abd,
|
||
int(Config["PSG_seconds"] * Config["TempFrequency"]))
|
||
|
||
if Config["InputConfig"]["orgBcgFreq"] != Config["TempFrequency"]:
|
||
self.processed_orgBcg = resample(self.processed_orgBcg,
|
||
int(Config["orgBcg_seconds"] * Config["TempFrequency"]))
|
||
|
||
except Exception as e:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Pre_Resample_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_PRE_ALIGN_RESAMPLE_FINISHED)
|
||
|
||
def Standardize_3(self):
|
||
# 去基线
|
||
try:
|
||
temp_frequency = Config["TempFrequency"]
|
||
if Config["PSGConfig"]["PSGDelBase"]:
|
||
# 减去四秒钟平均滤波
|
||
self.processed_Tho = self.processed_Tho - convolve(
|
||
self.processed_Tho, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
|
||
self.processed_Abd = self.processed_Abd - convolve(
|
||
self.processed_Abd, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
|
||
if Config["orgBcgConfig"]["orgBcgDelBase"]:
|
||
self.processed_orgBcg = self.processed_orgBcg - convolve(
|
||
self.processed_orgBcg, ones(int(4 * temp_frequency)) / int(4 * temp_frequency), mode='same')
|
||
except Exception as e:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_DELETE_BASE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Delete_Base_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_DELETE_BASE_FINISHED)
|
||
|
||
def Standardize_4(self):
|
||
# 标准化
|
||
try:
|
||
# 判断是否标准化
|
||
if Config["PSGConfig"]["PSGZScore"]:
|
||
self.processed_Tho = (self.processed_Tho - mean(self.processed_Tho)) / std(self.processed_Tho)
|
||
self.processed_Abd = (self.processed_Abd - mean(self.processed_Abd)) / std(self.processed_Abd)
|
||
if Config["orgBcgConfig"]["orgBcgZScore"]:
|
||
self.processed_orgBcg = (self.processed_orgBcg - mean(self.processed_orgBcg)) / std(
|
||
self.processed_orgBcg)
|
||
except Exception as e:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_STANDARDIZE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Standardize_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_STANDARDIZE_FINISHED)
|
||
|
||
def Standardize_5(self):
|
||
# 重采样
|
||
try:
|
||
# 用[::]完成
|
||
temp_frequency = Config["TempFrequency"]
|
||
self.processed_downsample_Tho = self.processed_Tho[::int(temp_frequency / Config["ApplyFrequency"])]
|
||
self.processed_downsample_Abd = self.processed_Abd[::int(temp_frequency / Config["ApplyFrequency"])]
|
||
self.processed_downsample_orgBcg = self.processed_orgBcg[::int(temp_frequency / Config["ApplyFrequency"])]
|
||
except Exception as e:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Resample_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_ALIGN_RESAMPLE_FINISHED)
|
||
|
||
def calculate_correlation1(self):
|
||
# 计算互相关1/2
|
||
try:
|
||
# 计算因子
|
||
MULTIPLE_FACTOER = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["Multiple_Factor"]
|
||
a = self.processed_downsample_Tho[
|
||
Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Tho) - Config["PSGConfig"][
|
||
"PostCut"]].copy()
|
||
v = self.processed_downsample_orgBcg[
|
||
Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][
|
||
"PostCut"]].copy()
|
||
a *= MULTIPLE_FACTOER
|
||
v *= MULTIPLE_FACTOER
|
||
a = a.astype(int64)
|
||
v = v.astype(int64)
|
||
tho_relate = correlate(a, v, mode='full')
|
||
tho_relate = tho_relate / (MULTIPLE_FACTOER ** 2)
|
||
tho_relate2 = - tho_relate
|
||
|
||
result = {"tho_relate": tho_relate, "tho_relate2": tho_relate2}
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FAILURE +
|
||
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Calculate_Correlation1_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE1_FINISHED, data=result)
|
||
|
||
def calculate_correlation2(self):
|
||
# 计算互相关2/2
|
||
try:
|
||
a = self.processed_downsample_Abd[
|
||
Config["PSGConfig"]["PreCut"]:len(self.processed_downsample_Abd) - Config["PSGConfig"][
|
||
"PostCut"]].copy()
|
||
v = self.processed_downsample_orgBcg[
|
||
Config["orgBcgConfig"]["PreCut"]:len(self.processed_downsample_orgBcg) - Config["orgBcgConfig"][
|
||
"PostCut"]].copy()
|
||
a *= 100
|
||
v *= 100
|
||
a = a.astype(int64)
|
||
v = v.astype(int64)
|
||
abd_relate = correlate(a, v, mode='full')
|
||
abd_relate = abd_relate / 10000
|
||
abd_relate2 = - abd_relate
|
||
|
||
result = {"abd_relate": abd_relate, "abd_relate2": abd_relate2}
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FAILURE +
|
||
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Calculate_Correlation2_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_CORRELATION_CALCULATE2_FINISHED, data=result)
|
||
|
||
def calculate_maxvalue_pos(self, tho_relate, tho_relate2, abd_relate, abd_relate2):
|
||
# 计算最大值位置
|
||
try:
|
||
tho_max = argmax(tho_relate)
|
||
tho_max2 = argmax(tho_relate2)
|
||
abd_max = argmax(abd_relate)
|
||
abd_max2 = argmax(abd_relate2)
|
||
pre = Config["PSGConfig"]["PreCut"] + Config["orgBcgConfig"]["PostCut"]
|
||
|
||
bias = pre - len(self.processed_downsample_orgBcg) + 1
|
||
|
||
result = {"tho_max": tho_max, "tho_max2": tho_max2, "abd_max": abd_max, "abd_max2": abd_max2, "bias": bias}
|
||
except Exception as e:
|
||
return Result().failure(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FAILURE +
|
||
Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Calculate_Maxvalue_Pos_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_MAXVALUE_POS_CALCULATE_FINISHED, data=result)
|
||
|
||
def get_epoch(self):
|
||
# 获取epoch
|
||
try:
|
||
epoch_min = max(0, Config["pos"] // 30 // Config["ApplyFrequency"] + 1)
|
||
epoch_max = min(len(self.processed_downsample_Tho) // 30 // Config["ApplyFrequency"] - 1,
|
||
(len(self.processed_downsample_orgBcg) + Config["pos"]) // 30 // Config[
|
||
"ApplyFrequency"] - 1)
|
||
|
||
result = {"epoch_min": epoch_min, "epoch_max": epoch_max}
|
||
except Exception as e:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Get_Epoch_Exception"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)
|
||
|
||
def get_corr_by_epoch(self):
|
||
# 获取相关系数
|
||
try:
|
||
# 获取 epoch 区间
|
||
response = self.get_epoch()
|
||
if not response.status:
|
||
raise Exception(response.info)
|
||
|
||
epoch_min = response.data["epoch_min"]
|
||
epoch_max = response.data["epoch_max"]
|
||
|
||
temp_freq = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["TempFrequency"]
|
||
window_epoch = ConfigParams.APPROXIMATELY_ALIGN_CONFIG_NEW_CONTENT["CorrByEpoch"]["window_epoch"]
|
||
tho_bias_list = []
|
||
abd_bias_list = []
|
||
|
||
# pos采样率转换
|
||
pos = Config["pos"] * temp_freq // Config["ApplyFrequency"]
|
||
|
||
for epoch in range(epoch_min, epoch_max):
|
||
SP = epoch * 30 * temp_freq
|
||
EP = (epoch + window_epoch) * 30 * temp_freq
|
||
tho_seg = self.processed_Tho[SP:EP]
|
||
abd_seg = self.processed_Abd[SP:EP]
|
||
|
||
orgBcg_seg = self.processed_orgBcg[SP - pos:EP - pos] * Config["orgBcg_reverse"]
|
||
tho_relate_seg = correlate(tho_seg, orgBcg_seg, mode='full')
|
||
abd_relate_seg = correlate(abd_seg, orgBcg_seg, mode='full')
|
||
tho_seg_pos = argmax(tho_relate_seg) - len(orgBcg_seg)
|
||
abd_seg_pos = argmax(abd_relate_seg) - len(orgBcg_seg)
|
||
|
||
tho_bias_list.append(tho_seg_pos // temp_freq)
|
||
abd_bias_list.append(abd_seg_pos // temp_freq)
|
||
|
||
|
||
result = {
|
||
"tho_bias_list": tho_bias_list,
|
||
"abd_bias_list": abd_bias_list,
|
||
"epoch_min": epoch_min,
|
||
"epoch_max": epoch_max
|
||
}
|
||
|
||
|
||
except Exception as e:
|
||
return Result().failure(
|
||
info=Constants.APPROXIMATELY_EPOCH_GET_FAILURE + Constants.APPROXIMATELY_ALIGN_FAILURE_REASON[
|
||
"Get_Corr_By_Epoch"] + "\n" + format_exc())
|
||
|
||
return Result().success(info=Constants.APPROXIMATELY_EPOCH_GET_FINISHED, data=result)
|