329 lines
13 KiB
Python
329 lines
13 KiB
Python
import traceback
|
||
from datetime import datetime
|
||
from logging import error, info
|
||
from pathlib import Path
|
||
import json
|
||
from PySide6.QtWidgets import QMessageBox, QWidget, QPushButton, QProgressBar, QApplication, QRadioButton, QCheckBox
|
||
|
||
from func.utils.Constants import Constants
|
||
from func.utils.CustomException import TipsTypeValueNotExistError, MsgBoxTypeValueNotExistError
|
||
from func.utils.Result import Result
|
||
import numpy as np
|
||
|
||
class PublicFunc:
|
||
|
||
@staticmethod
|
||
def get_current_localtime() -> str:
|
||
"""
|
||
获取当前本地时间
|
||
|
||
Parameters:
|
||
|
||
Returns:
|
||
str,格式化为"%H:%M:%S"的当前本地时间
|
||
|
||
Raises:
|
||
"""
|
||
return str(datetime.now().strftime("%H:%M:%S"))
|
||
|
||
@staticmethod
|
||
def format_status_msg(msg) -> str:
|
||
"""
|
||
格式化状态栏信息
|
||
|
||
Parameters:
|
||
msg - str,需要被格式化的字符串
|
||
|
||
Returns:
|
||
str,格式化为"%H:%M:%S"的当前本地时间 + 信息
|
||
|
||
Raises:
|
||
"""
|
||
return str(datetime.now().strftime("%H:%M:%S")) + " " + msg
|
||
|
||
@staticmethod
|
||
def text_output(main_window: object, content: str, tips_type: str) -> None:
|
||
"""
|
||
更新textBrowser中的内容,同时输出日志
|
||
|
||
Parameters:
|
||
main_window - object,Qt的含有textBrowser属性的对象,这里一般指的是mainWindow,里面有唯一的textBrowser属性
|
||
content - str,需要输出的内容
|
||
tips_type - 日志输出的类型
|
||
|
||
Returns:
|
||
|
||
Raises:
|
||
TipsTypeValueNotExistError
|
||
"""
|
||
if tips_type is Constants.TIPS_TYPE_INFO:
|
||
info(f"{tips_type}: {content}")
|
||
main_window.textBrowser_info.append(
|
||
f"<font color='black'>{PublicFunc.get_current_localtime()} {tips_type}: {content}</font>")
|
||
elif tips_type is Constants.TIPS_TYPE_ERROR:
|
||
error(f"{tips_type}: {content}")
|
||
main_window.textBrowser_info.append(
|
||
f"<font color='red'>{PublicFunc.get_current_localtime()} {tips_type}: {content}</font>")
|
||
else:
|
||
raise TipsTypeValueNotExistError()
|
||
|
||
main_window.textBrowser_info.verticalScrollBar().setValue(
|
||
main_window.textBrowser_info.verticalScrollBar().maximum())
|
||
|
||
@staticmethod
|
||
def msgbox_output(main_window: object, content: str, msg_box_type: str) -> None:
|
||
"""
|
||
更新messageBox中的内容,并弹出
|
||
|
||
Parameters:
|
||
main_window - object,Qt的含有messageBox属性的对象,这里一般指的是mainWindow,里面有唯一的messageBox属性
|
||
content - str,需要输出的内容
|
||
msg_box_type - str,messageBox弹窗的类型
|
||
|
||
Returns:
|
||
|
||
Raises:
|
||
MsgBoxTypeValueNotExistError
|
||
"""
|
||
main_window.msgBox.setText(f"{msg_box_type}: {content}")
|
||
if msg_box_type is Constants.MSGBOX_TYPE_INFO:
|
||
main_window.msgBox.setIcon(QMessageBox.Information)
|
||
elif msg_box_type is Constants.MSGBOX_TYPE_WARNING:
|
||
main_window.msgBox.setIcon(QMessageBox.Warning)
|
||
elif msg_box_type is Constants.MSGBOX_TYPE_ERROR:
|
||
main_window.msgBox.setIcon(QMessageBox.Critical)
|
||
elif msg_box_type is Constants.MSGBOX_TYPE_QUESTION:
|
||
main_window.msgBox.setIcon(QMessageBox.Question)
|
||
else:
|
||
raise MsgBoxTypeValueNotExistError()
|
||
main_window.msgBox.exec()
|
||
|
||
@staticmethod
|
||
def __disableAllButton__(mainWindow, buttonState):
|
||
# 禁用所有按钮
|
||
all_widgets = mainWindow.centralWidget().findChildren(QWidget)
|
||
|
||
# 迭代所有部件,查找按钮并禁用它们
|
||
for widget in all_widgets:
|
||
if isinstance(widget, QPushButton):
|
||
if widget.objectName() in buttonState["Current"].keys():
|
||
widget.setEnabled(False)
|
||
|
||
if isinstance(widget, QRadioButton):
|
||
if widget.objectName() in buttonState["Current"].keys():
|
||
widget.setEnabled(False)
|
||
|
||
@staticmethod
|
||
def __enableAllButton__(mainWindow, buttonState):
|
||
# 启用按钮
|
||
all_widgets = mainWindow.centralWidget().findChildren(QWidget)
|
||
|
||
# 迭代所有部件,查找按钮并启用它们
|
||
for widget in all_widgets:
|
||
if isinstance(widget, QPushButton):
|
||
if widget.objectName() in buttonState["Current"].keys():
|
||
widget.setEnabled(buttonState["Current"][widget.objectName()])
|
||
|
||
if isinstance(widget, QRadioButton):
|
||
if widget.objectName() in buttonState["Current"].keys():
|
||
widget.setEnabled(buttonState["Current"][widget.objectName()])
|
||
|
||
@staticmethod
|
||
def __resetAllButton__(mainWindow, buttonState):
|
||
# 启用按钮
|
||
all_widgets = mainWindow.centralWidget().findChildren(QWidget)
|
||
|
||
# 迭代所有部件,查找按钮并启用它们
|
||
for widget in all_widgets:
|
||
if isinstance(widget, QPushButton):
|
||
if widget.objectName() in buttonState["Default"].keys():
|
||
widget.setEnabled(buttonState["Default"][widget.objectName()])
|
||
|
||
if isinstance(widget, QRadioButton):
|
||
if widget.objectName() in buttonState["Default"].keys():
|
||
widget.setEnabled(buttonState["Default"][widget.objectName()])
|
||
|
||
@staticmethod
|
||
def __styleAllButton__(mainWindow, buttonState):
|
||
# 启用按钮
|
||
all_widgets = mainWindow.centralWidget().findChildren(QWidget)
|
||
|
||
# 迭代所有部件,查找按钮并启用它们
|
||
for widget in all_widgets:
|
||
if isinstance(widget, QPushButton):
|
||
if widget.objectName() in buttonState["Default"].keys():
|
||
widget.setStyleSheet(Constants.LABELBTN_STYLE_NORMAL)
|
||
if isinstance(widget, QCheckBox):
|
||
widget.setStyleSheet(Constants.CHECKBOX_STYLE_NORMAL)
|
||
|
||
@staticmethod
|
||
def add_progressbar(mainWindow):
|
||
mainWindow.progressbar = QProgressBar()
|
||
mainWindow.progressbar.setRange(0, 100)
|
||
mainWindow.progressbar.setValue(0)
|
||
mainWindow.progressbar.setStyleSheet(Constants.PROGRESSBAR_STYLE)
|
||
mainWindow.ui.statusbar.addPermanentWidget(mainWindow.progressbar)
|
||
|
||
@staticmethod
|
||
def statusbar_show_msg(mainWindow, msg):
|
||
mainWindow.ui.statusbar.showMessage(msg)
|
||
|
||
@staticmethod
|
||
def statusbar_clear_msg(mainWindow):
|
||
mainWindow.ui.statusbar.clearMessage()
|
||
|
||
@staticmethod
|
||
def finish_operation(mainWindow, buttonState):
|
||
PublicFunc.statusbar_show_msg(mainWindow, PublicFunc.format_status_msg(Constants.OPERATION_FINISHED))
|
||
mainWindow.progressbar.setValue(100)
|
||
QApplication.processEvents()
|
||
|
||
PublicFunc.__enableAllButton__(mainWindow, buttonState)
|
||
|
||
@staticmethod
|
||
def progressbar_update(mainWindow, current: int, total: int, msg: str, progressbarState: int):
|
||
if current > total:
|
||
raise ValueError("当前进度值大于总进度值")
|
||
if progressbarState < 0 or progressbarState > 100:
|
||
raise ValueError("进度条值的范围应该在[0, 100]")
|
||
PublicFunc.statusbar_show_msg(mainWindow, PublicFunc.format_status_msg(f"({str(current)}/{str(total)}){msg}"))
|
||
mainWindow.progressbar.setValue(progressbarState)
|
||
QApplication.processEvents()
|
||
|
||
@staticmethod
|
||
def examine_file(path_str, filename, suffix):
|
||
if not isinstance(path_str, str):
|
||
path_str = str(path_str)
|
||
temp_path = [p for p in Path(path_str).glob(f"{filename}*") if p.stem.startswith(filename) and p.suffix == suffix]
|
||
|
||
if len(temp_path) == 0:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
filename + ":" +
|
||
path_str +
|
||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||
elif len(temp_path) > 1:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
filename + ":" +
|
||
path_str +
|
||
Constants.FAILURE_REASON["File_More_Than_One"])
|
||
else:
|
||
path = temp_path[0]
|
||
freq = path.stem.split("_")[-1]
|
||
# 验证是否为整数或是否存在
|
||
if not freq.isdigit():
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
filename + ":" +
|
||
Constants.FAILURE_REASON["Data_Frequency_Not_In_Filename"])
|
||
if path.suffix != suffix:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
filename + ":" +
|
||
Constants.FAILURE_REASON["Suffix_Not_Correct"])
|
||
data = {
|
||
"path": path,
|
||
"freq": int(freq)
|
||
}
|
||
return Result().success(data=data)
|
||
|
||
|
||
@staticmethod
|
||
def get_machine_start_time_bias(psg_dir_path, orgBcg_dir_path):
|
||
# 查看orgBcg文件夹下是否有zd5y2_jf_info.json文件
|
||
orgBcg_info_path = [p for p in Path(orgBcg_dir_path).glob("*info*") if
|
||
p.suffix == ".json"]
|
||
if len(orgBcg_info_path) == 0:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
"*info.json:" +
|
||
str(orgBcg_dir_path) +
|
||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||
elif len(orgBcg_info_path) > 1:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
"*info.json:" +
|
||
str(orgBcg_dir_path) +
|
||
Constants.FAILURE_REASON["File_More_Than_One"])
|
||
else:
|
||
orgBcg_info_path = orgBcg_info_path[0]
|
||
with open(orgBcg_info_path, 'r', encoding='utf-8') as f:
|
||
orgBcg_info = json.load(f)
|
||
|
||
machine_start_time_str = orgBcg_info.get("StartDate", None)
|
||
|
||
if machine_start_time_str is None:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
"*info.json:" +
|
||
str(orgBcg_dir_path) +
|
||
Constants.FAILURE_REASON["orgBcg_Machine_Start_Time_Not_Exist"])
|
||
|
||
|
||
# 查看psg文件夹下是否有StartTime_Raw.txt文件
|
||
psg_start_time_path = [p for p in Path(psg_dir_path).glob("StartTime_Raw*") if
|
||
p.suffix == ".txt"]
|
||
if len(psg_start_time_path) == 0:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
"StartTime_Raw.txt:" +
|
||
str(psg_dir_path) +
|
||
Constants.FAILURE_REASON["File_Not_Exist"])
|
||
elif len(psg_start_time_path) > 1:
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
"StartTime_Raw.txt:" +
|
||
str(psg_dir_path) +
|
||
Constants.FAILURE_REASON["File_More_Than_One"])
|
||
else:
|
||
psg_start_time_path = psg_start_time_path[0]
|
||
with open(psg_start_time_path, 'r', encoding='utf-8') as f:
|
||
# 读取第一行
|
||
psg_start_time_str = f.readline().strip()
|
||
print(psg_start_time_str)
|
||
|
||
try:
|
||
# 计算时间差,单位为秒
|
||
fmt = "%Y-%m-%d %H:%M:%S"
|
||
machine_start_time = datetime.strptime(machine_start_time_str, fmt)
|
||
psg_start_time = datetime.strptime(psg_start_time_str, fmt)
|
||
time_bias = (psg_start_time - machine_start_time).total_seconds()
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
return Result().failure(
|
||
info=Constants.INPUT_FAILURE + "\n" +
|
||
"时间格式错误,无法计算时间差:" +
|
||
str(e))
|
||
|
||
return Result().success(data={"time_bias":time_bias})
|
||
|
||
@staticmethod
|
||
def examine_artifact(artifact):
|
||
# 检查体动标签正确性,长度
|
||
if len(artifact) % 4 != 0:
|
||
return Result().failure(info=Constants.INPUT_FAILURE +
|
||
Constants.FAILURE_REASON["Artifact_Format_Not_Correct"])
|
||
for i in range(0, len(artifact), 4):
|
||
unit_data = artifact[i:i + 4]
|
||
if len(unit_data) < 4:
|
||
break
|
||
|
||
@staticmethod
|
||
def sanitize_data(obj):
|
||
"""
|
||
递归将对象中的 NumPy 类型转换为 Python 原生类型
|
||
"""
|
||
if isinstance(obj, dict):
|
||
return {k: PublicFunc._sanitize_data(v) for k, v in obj.items()}
|
||
elif isinstance(obj, (list, tuple)):
|
||
return [PublicFunc._sanitize_data(i) for i in obj]
|
||
elif isinstance(obj, (np.integer, np.int64)):
|
||
return int(obj)
|
||
elif isinstance(obj, (np.floating, np.float64)):
|
||
return float(obj)
|
||
elif isinstance(obj, np.ndarray):
|
||
return obj.tolist()
|
||
else:
|
||
return obj
|