# encoding:utf-8 import warnings from os import listdir from numpy import array, arange, zeros, percentile, append, full from numpy import max as np_max from numpy import max as np_min from pandas import read_csv, DataFrame, concat from scipy.signal import butter, filtfilt from torch.utils.data import Dataset warnings.filterwarnings("ignore") class BCGDataset(Dataset): def __init__(self, train=True): if train: self.data = array(read_csv("./in_data/train.txt").iloc[:,arange(1000)]) self.label = array(read_csv("./in_data/train.txt").iloc[:,arange(1000,2000)]) else: self.data = array(read_csv("./in_data/test.txt").iloc[:, arange(1000)]) self.label = array(read_csv("./in_data/test.txt").iloc[:, arange(1000, 2000)]) def __getitem__(self, index): return self.data[index], self.label[index] def __len__(self): return len(self.label) class BCG_Operation(): def __init__(self, sample_rate=1000): self.sample_rate = sample_rate def down_sample(self,data=None, down_radio=10): if data is None: raise ValueError("data is None, please given an real value!") length_before = len(data) length_after = length_before//down_radio data = data[:length_after*down_radio] data = data.reshape(-1,down_radio) data = data[:,0] self.sample_rate = self.sample_rate/down_radio return data def Splitwin(self, data=None, len_win=None, coverage=1.0,calculate_to_end=False): """ 分窗 :param len_win: length of window :return: signal windows """ if ( len_win is None) or (data is None): raise ValueError("length of window or data is None, please given an real value!") else: length = len_win * self.sample_rate # number point of a window # step of split windows step = length*coverage start = 0 Splitdata = [] while (len(data)-start>=length): Splitdata.append( data[int(start):int(start+length)] ) start += step if calculate_to_end and (len(data)-start>2000): remain = len(data)-start start = start - step step = int(remain/2000) start = start + step*2000 Splitdata.append(data[int(start):int(start+length)]) return array(Splitdata), step elif calculate_to_end : return array(Splitdata), 0 else: return array(Splitdata) def Butterworth(self,data, type, low_cut = 0.0, high_cut = 0.0, order = 10): """ :param type: Type of Butter. filter, lowpass, bandpass, ... :param lowcut: Low cutoff frequency :param highcut: High cutoff frequency :param order: Order of filter :return: Signal after filtering """ if type == "lowpass": # 低通滤波处理 b, a = butter(order, low_cut / (self.sample_rate * 0.5), btype='lowpass') return filtfilt(b, a, array(data)) elif type == "bandpass": # 带通滤波处理 low = low_cut / (self.sample_rate * 0.5) high = high_cut / (self.sample_rate * 0.5) b, a = butter(order, [low, high], btype='bandpass') return filtfilt(b, a, array(data)) elif type == "highpass": # 高通滤波处理 b, a = butter(order, high_cut / (self.sample_rate * 0.5), btype='highpass') return filtfilt(b, a, array(data)) else: # 警告,滤波器类型必须有 raise ValueError("Please choose a type of fliter") def AmpMovement(self, data, win_size, threshold=20, get_judge_line=False): """ 基于幅值方法检测体动: 1.将输入信号按win_size切分 2.将每个win_size信号片段分窗,每个窗2s,步长为2s 3.计算一分钟所有信号窗的最大峰谷值差,获取中位数和均值 4.所有2s时间窗内,大于中位数/均值的2.2倍视为体动 5.体动间间隔过短的信号,同样标记为体动 :param data: Input signal :param win_size: Size of the win(Must be a multiple of 2) :return: State of signal """ Dataframe, cover_num = self.Splitwin(data, len_win=win_size, coverage=1.0, calculate_to_end=True) state_all = array([]) Amp_list = array([]) for win in range(Dataframe.shape[0]): state = array([]) # two seconds window data_win = self.Splitwin(Dataframe[win], len_win=2, coverage=1.0) Amp = zeros(data_win.shape[0]) for i in range(data_win.shape[0]): Amp[i] = np_max(data_win[i]) - np_min(data_win[i]) # max - min # 取..位数 Median_Amp = percentile(Amp, 20) # 20% if get_judge_line: Amp_list = append(Amp_list, full(win_size * self.sample_rate, 2.3 * Median_Amp)) for i in range(len(Amp)): if (Amp[i] > 2.1 * Median_Amp): state = append(state, "Movement") elif Amp[i] < threshold: state = append(state, "Nobody") else: state = append(state, "Sleep") if win == Dataframe.shape[0] - 1 and cover_num > 0: state = state[-int(cover_num):] state_all = append(state_all, state) if get_judge_line: return state_all, Amp_list else: return state_all def preprocess1(self): # ---------------------------------------------------------- data_dir = "../in_data/" dir_list = listdir(data_dir) data_list = [data_dir + dir + "/orgData.txt" for dir in dir_list] label_list = [data_dir + dir + "/label.txt" for dir in dir_list] print(data_list) print(label_list) for i in range(len(data_list)): orgBCG = array(read_csv(data_list[i], header=None)).reshape(-1) orgLabel = array(read_csv(label_list[i])).reshape(-1) # ---------------------Movement Detection------------------------- operation = BCG_Operation() BCG = operation.Butterworth(data=orgBCG, type="bandpass", low_cut=2.5, high_cut=10, order=2) state_win60 = operation.AmpMovement(orgBCG, win_size=60) visual_state = array([]) for num in range(state_win60.shape[0]): print("state_num/all_state: ", num, '/', state_win60.shape[0]) if state_win60[num] == "Movement": visual_state = append(visual_state, full(2000, 1)) else: visual_state = append(visual_state, full(2000, 0)) # ------------------------------------------------------------------ downBCG = operation.down_sample(data=orgBCG, down_radio=10) downLabel = operation.down_sample(data=orgLabel, down_radio=10) downState = operation.down_sample(data=visual_state, down_radio=10) length_before = len(downState) length_after = length_before // 1000 downBCG = downBCG[:length_after * 1000] downLabel = downLabel[:length_after * 1000] downState = downState[:length_after * 1000] downBCG = downBCG.reshape(-1, 1000) downLabel = downLabel.reshape(-1, 1000) downState = downState.reshape(-1, 1000) downState = np_max(downState, axis=1) df_BCG = DataFrame(downBCG) df_label = DataFrame(downLabel) df_state = DataFrame(downState, columns=["state"]) df_BCG.to_csv() df_all = concat([df_BCG, df_label, df_state], axis=1) df_all.to_csv(data_dir + "/data" + str(i + 1) + ".txt", index=False) def read_all_data(data_dir): df_all = read_csv(data_dir) df_clean = df_all[ df_all["state"]==0.0 ] df_artifact = df_all[ df_all["state"]==1.0 ] data_clean = df_clean.iloc[:,arange(1000)] label_clean = df_clean.iloc[:,arange(1000,2000)] data_artifact = df_artifact.iloc[:,arange(1000)] label_artifact = df_artifact.iloc[:,arange(1000,2000)] return array(data_clean),array(label_clean),array(data_artifact),array(label_artifact)