import os import pyedflib import numpy as np import pandas as pd import shutil from scipy.signal import resample def writefile(filename,data,style='w'): """ 函数说明: 在指定地点创建一个文件,写入data数据 :param filename: 需写入的文件地址和名称 :param data: 需写入的数据 :return: 无返回 """ data = data.astype('str') #将data数组的数据转换成字符串 with open(filename,style) as f : for str in data : f.write(str+'\n') return 0 def event_handle(df): if df.columns[0].strip() == 'Event type': df['Event type'] = df['Event type'].replace('μ-arousal', 'u-arousal') elif df.columns[0].strip() == '事件类型': column_mapping = { "事件类型\r\r": "Event type", "睡眠期\r\r": "Stage", "时间\r\r": "Time", "屏\r": "Epoch", "日期\r\r": "Date", "持续时间\r\r": "Duration", "体位记录.": "Body Position", "判定\r\r": "Validation" } # 列名映射 df = df.rename(columns=column_mapping) # 重命名列 target_columns = [ "Event type", "Stage", "Time", "Epoch", "Date", "Duration", "HR bef.", "HR extr.", "HR delta", "O2 bef.", "O2 min.", "O2 delta", "Body Position", "Validation", "Unnamed: 14" ] # 添加目标文件中要求的其他列,并赋值为None df = df[target_columns] # 按目标列顺序排序 df['Event type'] = df['Event type'].replace('腿动', 'Leg Movement') df['Event type'] = df['Event type'].replace('低通气', 'Hypopnea') df['Event type'] = df['Event type'].replace('混合性暂停', 'Mixed apnea') df['Event type'] = df['Event type'].replace('中枢性暂停', 'Central apnea') df['Event type'] = df['Event type'].replace('阻塞性暂停', 'Obstructive apnea') df['Event type'] = df['Event type'].replace('氧减', 'Desaturation') df['Event type'] = df['Event type'].replace('长RR间距', 'Long RR') df['Event type'] = df['Event type'].replace('心率上升', 'Heart Rate Rise') df['Event type'] = df['Event type'].replace('心率下降', 'Heart Rate Drop') df['Event type'] = df['Event type'].replace('PTT下降', 'PTT drop') df['Event type'] = df['Event type'].replace('鼾声', 'Snore') df['Event type'] = df['Event type'].replace('微觉醒', 'u-arousal') else: assert (df.columns[0].strip() == '事件类型') return df def upsample_resample(data,data_fs,target_fs): up_ratio = target_fs / data_fs target_num = int(len(data) * up_ratio) # 执行重采样 if up_ratio > 1: data_up = resample(data, target_num) target_fs = int(target_fs) else: print("上采样倍数不大于1,不进行上采样操作") data_up = data target_fs = int(data_fs) return data_up,target_fs def read_edf(edfpath,stagepath,eventpath,outpath): list = np.array([f for f in os.listdir(edfpath) if f.endswith('.edf')]) for sample_num in list: print("样本编号:", sample_num) output_sample = outpath + sample_num.split('.')[0][-4:] + '/' if not os.path.exists(output_sample): os.makedirs(output_sample) ##读取edf文件数据 fpath = edfpath + sample_num shutil.copy(fpath, output_sample) with pyedflib.EdfReader(fpath) as file: ##读取导出的睡眠分期标签csv文件 df = pd.read_csv(stagepath + sample_num.split('.')[0] + '_Stage.csv') stage_values = df.values[:,0].reshape(-1) ##读edf文件信息 start_time = file.getStartdatetime() sec = file.getFileDuration() day_start = start_time.strftime("%Y") + "/" + str(start_time.month) + "/" + str(start_time.day) sec_start = start_time.strftime("%H") + ":" + start_time.strftime("%M") + ":" + start_time.strftime("%S") # sec_start = str(start_time.hour) + ":" + str(f"{start_time.minute:02d}") + ":" + str(f"{start_time.second:02d}") assert (df.columns[3] == day_start) assert (df.columns[4] == sec_start) print("edf文件与睡眠分期标签开始时间一致!!!!!!!") ##读取Event列表 df1 = pd.read_csv(eventpath + sample_num.split('.')[0] + '_Event.csv',header=0,encoding='GBK') df1 = event_handle(df1) df1.to_csv(output_sample + 'SA Label_Raw.csv', index=False, encoding='GBK') signal_num = file.signals_in_file signal_label = file.getSignalLabels() min_length = int(np.min([sec,df.shape[0]])) print("裁剪为最小长度:",min_length) flag = 0 for i, index in enumerate(signal_label): signal = file.readSignal(i) sample_frequency = int(file.getSampleFrequency(i)) data_org = signal[0:min_length * sample_frequency] if (index== 'Flow Patient') : if flag == 0: writefile(output_sample + 'Flow T_Raw_' + str(sample_frequency) + '.txt', data_org) flag = 1 else: writefile(output_sample + 'Flow P_Raw_' + str(sample_frequency) + '.txt', data_org) else: writefile(output_sample + index + '_Raw_' + str(sample_frequency) + '.txt', data_org) with open(os.path.join(output_sample, "StartTime_Raw.txt"), "a") as text_file: text_file.write(start_time.strftime("%Y-%m-%d %H:%M:%S\n")) text_file.write("{:g}\n".format(int(sec))) ##裁剪对齐睡眠分期标签 stage_values = stage_values.astype(int) stage_values = np.where(stage_values == 11, 'W', stage_values) stage_values = np.where(stage_values == '12', 'R', stage_values) stage_values = np.where(stage_values == '13', 'N1', stage_values) stage_values = np.where(stage_values == '14', 'N2', stage_values) stage = np.where(stage_values == '15', 'N3', stage_values) writefile(output_sample + '5_class_Raw_1.txt',stage[0:min_length]) if __name__ == '__main__': edf_dir = 'E:\DB\\testdata\HYS\PSG_Origin/Edf/' stage_dir = 'E:\DB\\testdata\HYS\PSG_Origin/Stage/' event_dir = 'E:\DB\\testdata\HYS\PSG_Origin/Event/' output_dir = 'E:\DB\\testdata\HYS\PSG_Text/' read_edf(edf_dir,stage_dir,event_dir,output_dir) print("finished")