from ecgdetectors import Detectors from numpy import quantile, delete, array, argmax, full, nan from func.BCGDataset import BCG_Operation def refinement( data, peak): if len(data) == 0 or len(peak) <=2 : return None firstPeak = peak[0] lastPeak = peak[-1] meanPeak = quantile( data[peak[1:-1]], 0.2 ) if data[firstPeak] < meanPeak * 0.6 : peak = delete(peak, 0) if data[lastPeak] < meanPeak * 0.6 : peak = delete(peak, -1) return array(peak) def find_TPeak(data,peaks,th=50): """ 找出真实的J峰或R峰 :param data: BCG或ECG数据 :param peaks: 初步峰值(从label中导出的location_R) :param th: 范围阈值 :return: 真实峰值 """ return_peak = [] for peak in peaks: if peak>len(data):continue min_win,max_win = max(0,int(peak-th)),min(len(data),int(peak+th)) return_peak.append(argmax(data[min_win:max_win])+min_win) return array(return_peak) def preprocess(raw_ecg, fs, low_cut, high_cut): preprocessing = BCG_Operation(sample_rate=fs) # 对ECG做了降采样处理 raw_ecg = preprocessing.Butterworth(raw_ecg, "bandpass", low_cut=low_cut, high_cut=high_cut, order=3) * 4 return raw_ecg # 界面会通过这个函数获取方法列表,此函数的返回值务必和Rpeak_Detection()中的方法名称对应否则程序或许直接崩溃 def get_method(): return ["pt", "ta", "Engzee", "Wt", "Christov", "Hamilton"] def Rpeak_Detection(ecg,fs,th1,detector_method): detectors = Detectors(sampling_frequency=fs) if detector_method == "pt": detectormethods = detectors.pan_tompkins_detector elif detector_method == "ta": detectormethods = detectors.two_average_detector elif detector_method == "Engzee": detectormethods = detectors.engzee_detector elif detector_method == "Wt": detectormethods = detectors.swt_detector elif detector_method == "Christov": detectormethods = detectors.christov_detector elif detector_method == "Hamilton": detectormethods = detectors.hamilton_detector else: raise Exception R_peak = array(detectormethods(ecg)) - 100 R_peak = find_TPeak(ecg, R_peak, th=int(th1 * fs / 1000)) R_peak = refinement(ecg, R_peak) RR_Interval = full(len(R_peak) - 1, nan) for i in range(len(R_peak) - 1): RR_Interval[i] = R_peak[i + 1] - R_peak[i] RRIV = full(len(RR_Interval) - 1, nan) for i in range(len(RR_Interval) - 1): RRIV[i] = RR_Interval[i + 1] - RR_Interval[i] Interval = full(len(ecg), nan) for i in range(len(R_peak) - 1): Interval[R_peak[i]: R_peak[i + 1]] = R_peak[i + 1] - R_peak[i] return R_peak, Interval, RRIV