Files
EDF_to_txt/HYS/HYS_ReadEDF.py
2025-06-20 12:18:00 +08:00

149 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import pyedflib
import numpy as np
import pandas as pd
import shutil
from scipy.signal import resample
def writefile(filename,data,style='w'):
"""
函数说明:
在指定地点创建一个文件写入data数据
:param filename: 需写入的文件地址和名称
:param data: 需写入的数据
:return: 无返回
"""
data = data.astype('str') #将data数组的数据转换成字符串
with open(filename,style) as f :
for str in data :
f.write(str+'\n')
return 0
def event_handle(df):
if df.columns[0].strip() == 'Event type':
df['Event type'] = df['Event type'].replace('μ-arousal', 'u-arousal')
elif df.columns[0].strip() == '事件类型':
column_mapping = {
"事件类型\r\r": "Event type",
"睡眠期\r\r": "Stage",
"时间\r\r": "Time",
"\r": "Epoch",
"日期\r\r": "Date",
"持续时间\r\r": "Duration",
"体位记录.": "Body Position",
"判定\r\r": "Validation"
} # 列名映射
df = df.rename(columns=column_mapping) # 重命名列
target_columns = [
"Event type", "Stage", "Time", "Epoch", "Date", "Duration",
"HR bef.", "HR extr.", "HR delta", "O2 bef.", "O2 min.",
"O2 delta", "Body Position", "Validation", "Unnamed: 14"
] # 添加目标文件中要求的其他列并赋值为None
df = df[target_columns] # 按目标列顺序排序
df['Event type'] = df['Event type'].replace('腿动', 'Leg Movement')
df['Event type'] = df['Event type'].replace('低通气', 'Hypopnea')
df['Event type'] = df['Event type'].replace('混合性暂停', 'Mixed apnea')
df['Event type'] = df['Event type'].replace('中枢性暂停', 'Central apnea')
df['Event type'] = df['Event type'].replace('阻塞性暂停', 'Obstructive apnea')
df['Event type'] = df['Event type'].replace('氧减', 'Desaturation')
df['Event type'] = df['Event type'].replace('长RR间距', 'Long RR')
df['Event type'] = df['Event type'].replace('心率上升', 'Heart Rate Rise')
df['Event type'] = df['Event type'].replace('心率下降', 'Heart Rate Drop')
df['Event type'] = df['Event type'].replace('PTT下降', 'PTT drop')
df['Event type'] = df['Event type'].replace('鼾声', 'Snore')
df['Event type'] = df['Event type'].replace('微觉醒', 'u-arousal')
else:
assert (df.columns[0].strip() == '事件类型')
return df
def upsample_resample(data,data_fs,target_fs):
up_ratio = target_fs / data_fs
target_num = int(len(data) * up_ratio)
# 执行重采样
if up_ratio > 1:
data_up = resample(data, target_num)
target_fs = int(target_fs)
else:
print("上采样倍数不大于1不进行上采样操作")
data_up = data
target_fs = int(data_fs)
return data_up,target_fs
def read_edf(edfpath,stagepath,eventpath,outpath):
list = np.array([f for f in os.listdir(edfpath) if f.endswith('.edf')])
for sample_num in list:
print("样本编号:", sample_num)
output_sample = outpath + sample_num.split('.')[0][-4:] + '/'
if not os.path.exists(output_sample):
os.makedirs(output_sample)
##读取edf文件数据
fpath = edfpath + sample_num
shutil.copy(fpath, output_sample)
with pyedflib.EdfReader(fpath) as file:
##读取导出的睡眠分期标签csv文件
df = pd.read_csv(stagepath + sample_num.split('.')[0] + '_Stage.csv')
stage_values = df.values[:,0].reshape(-1)
##读edf文件信息
start_time = file.getStartdatetime()
sec = file.getFileDuration()
day_start = start_time.strftime("%Y") + "/" + str(start_time.month) + "/" + str(start_time.day)
sec_start = start_time.strftime("%H") + ":" + start_time.strftime("%M") + ":" + start_time.strftime("%S")
# sec_start = str(start_time.hour) + ":" + str(f"{start_time.minute:02d}") + ":" + str(f"{start_time.second:02d}")
assert (df.columns[3] == day_start)
assert (df.columns[4] == sec_start)
print("edf文件与睡眠分期标签开始时间一致")
##读取Event列表
df1 = pd.read_csv(eventpath + sample_num.split('.')[0] + '_Event.csv',header=0,encoding='GBK')
df1 = event_handle(df1)
df1.to_csv(output_sample + 'SA Label_Raw.csv', index=False, encoding='GBK')
signal_num = file.signals_in_file
signal_label = file.getSignalLabels()
min_length = int(np.min([sec,df.shape[0]]))
print("裁剪为最小长度:",min_length)
flag = 0
for i, index in enumerate(signal_label):
signal = file.readSignal(i)
sample_frequency = int(file.getSampleFrequency(i))
data_org = signal[0:min_length * sample_frequency]
if (index== 'Flow Patient') :
if flag == 0:
writefile(output_sample + 'Flow T_Raw_' + str(sample_frequency) + '.txt', data_org)
flag = 1
else:
writefile(output_sample + 'Flow P_Raw_' + str(sample_frequency) + '.txt', data_org)
else:
writefile(output_sample + index + '_Raw_' + str(sample_frequency) + '.txt', data_org)
with open(os.path.join(output_sample, "StartTime_Raw.txt"), "a") as text_file:
text_file.write(start_time.strftime("%Y-%m-%d %H:%M:%S\n"))
text_file.write("{:g}\n".format(int(sec)))
##裁剪对齐睡眠分期标签
stage_values = stage_values.astype(int)
stage_values = np.where(stage_values == 11, 'W', stage_values)
stage_values = np.where(stage_values == '12', 'R', stage_values)
stage_values = np.where(stage_values == '13', 'N1', stage_values)
stage_values = np.where(stage_values == '14', 'N2', stage_values)
stage = np.where(stage_values == '15', 'N3', stage_values)
writefile(output_sample + '5_class_Raw_1.txt',stage[0:min_length])
if __name__ == '__main__':
edf_dir = 'E:\DB\\testdata\HYS\PSG_Origin/Edf/'
stage_dir = 'E:\DB\\testdata\HYS\PSG_Origin/Stage/'
event_dir = 'E:\DB\\testdata\HYS\PSG_Origin/Event/'
output_dir = 'E:\DB\\testdata\HYS\PSG_Text/'
read_edf(edf_dir,stage_dir,event_dir,output_dir)
print("finished")