79 lines
2.7 KiB
Python
79 lines
2.7 KiB
Python
from ecgdetectors import Detectors
|
||
from numpy import quantile, delete, array, argmax, full, nan
|
||
|
||
from func.BCGDataset import BCG_Operation
|
||
|
||
def refinement( data, peak):
|
||
if len(data) == 0 or len(peak) <=2 : return None
|
||
firstPeak = peak[0]
|
||
lastPeak = peak[-1]
|
||
meanPeak = quantile( data[peak[1:-1]], 0.2 )
|
||
if data[firstPeak] < meanPeak * 0.6 :
|
||
peak = delete(peak, 0)
|
||
if data[lastPeak] < meanPeak * 0.6 :
|
||
peak = delete(peak, -1)
|
||
return array(peak)
|
||
|
||
def find_TPeak(data,peaks,th=50):
|
||
"""
|
||
找出真实的J峰或R峰
|
||
:param data: BCG或ECG数据
|
||
:param peaks: 初步峰值(从label中导出的location_R)
|
||
:param th: 范围阈值
|
||
:return: 真实峰值
|
||
"""
|
||
return_peak = []
|
||
for peak in peaks:
|
||
if peak>len(data):continue
|
||
min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th))
|
||
return_peak.append(argmax(data[min_win:max_win])+min_win)
|
||
return array(return_peak)
|
||
|
||
def preprocess(raw_ecg, fs, low_cut, high_cut):
|
||
|
||
preprocessing = BCG_Operation(sample_rate=fs) # 对ECG做了降采样处理
|
||
raw_ecg = preprocessing.Butterworth(raw_ecg, "bandpass", low_cut=low_cut, high_cut=high_cut, order=3) * 4
|
||
return raw_ecg
|
||
|
||
# 界面会通过这个函数获取方法列表,此函数的返回值务必和Rpeak_Detection()中的方法名称对应否则程序或许直接崩溃
|
||
def get_method():
|
||
return ["pt", "ta", "Engzee", "Wt", "Christov", "Hamilton"]
|
||
|
||
def Rpeak_Detection(ecg,fs,th1,detector_method):
|
||
detectors = Detectors(sampling_frequency=fs)
|
||
|
||
if detector_method == "pt":
|
||
detectormethods = detectors.pan_tompkins_detector
|
||
elif detector_method == "ta":
|
||
detectormethods = detectors.two_average_detector
|
||
elif detector_method == "Engzee":
|
||
detectormethods = detectors.engzee_detector
|
||
elif detector_method == "Wt":
|
||
detectormethods = detectors.swt_detector
|
||
elif detector_method == "Christov":
|
||
detectormethods = detectors.christov_detector
|
||
elif detector_method == "Hamilton":
|
||
detectormethods = detectors.hamilton_detector
|
||
else:
|
||
raise Exception
|
||
|
||
R_peak = array(detectormethods(ecg)) - 100
|
||
|
||
R_peak = find_TPeak(ecg, R_peak, th=int(th1 * fs / 1000))
|
||
R_peak = refinement(ecg, R_peak)
|
||
|
||
RR_Interval = full(len(R_peak) - 1, nan)
|
||
|
||
for i in range(len(R_peak) - 1):
|
||
RR_Interval[i] = R_peak[i + 1] - R_peak[i]
|
||
|
||
RRIV = full(len(RR_Interval) - 1, nan)
|
||
for i in range(len(RR_Interval) - 1):
|
||
RRIV[i] = RR_Interval[i + 1] - RR_Interval[i]
|
||
|
||
Interval = full(len(ecg), nan)
|
||
for i in range(len(R_peak) - 1):
|
||
Interval[R_peak[i]: R_peak[i + 1]] = R_peak[i + 1] - R_peak[i]
|
||
|
||
return R_peak, Interval, RRIV
|