diff --git a/script/auto_sliceing_operate/main.py b/script/auto_sliceing_operate/main.py new file mode 100644 index 0000000..ecd14f9 --- /dev/null +++ b/script/auto_sliceing_operate/main.py @@ -0,0 +1,28 @@ +from utils.show_import_dialog import clickFileIMportShow +from utils.import_all_file import modify_file_dialog_path_and_import_all +from utils.click_sliceing import begingClickSliceing +import time + +from utils.exe_operate import start_exe, click_confirm, close + + + +#"C:/test/10192_small_No4/data" +def BeginSliceing(folderPath): + # 打开3d切片软件 + start_exe() + time.sleep(1) + #点击确认按钮 + click_confirm() + #先打开导入文件的弹框 + clickFileIMportShow() + time.sleep(1) + #在输入路径导入obj文件 + isSuccess = modify_file_dialog_path_and_import_all(folderPath) + if not isSuccess: + print("导入文件失败") + exit() + #点击开始切片 + time.sleep(5) + begingClickSliceing() + \ No newline at end of file diff --git a/script/auto_sliceing_operate/utils/click_sliceing.py b/script/auto_sliceing_operate/utils/click_sliceing.py new file mode 100644 index 0000000..6eb6f3f --- /dev/null +++ b/script/auto_sliceing_operate/utils/click_sliceing.py @@ -0,0 +1,76 @@ +import uiautomation as auto + + + +# 遍历所有控件 找到 +def findAndClickFileImportShow(control, depth=0): + for child in control.GetChildren(): + # print(child) + # print(' ' * depth + f"{child.ControlType}: {child.Name} | {child.AutomationId}") + if child.Name == "切片及打印": + # 点击和 child 同组的第二个元素(即 control 的子控件中的第二个) + siblings = control.GetChildren() # 获取所有同级元素 + # print(f"\n找到目标控件: {child.Name}") + # print(f"同级元素数量: {len(siblings)}") + + # 打印所有同级元素信息,方便调试 + for i, sibling in enumerate(siblings): + marker = " <-- 这是找到的目标" if sibling == child else "" + print(f" 同级元素[{i}]: {sibling.ControlType} | {sibling.Name} | {sibling.AutomationId}{marker}") + + # 确保有至少2个同级元素 + if len(siblings) >= 2: + target = siblings[2] # 第二个元素(索引从0开始) + print(f"\n准备点击同级第二个元素:") + print(f" 控件类型: {target.ControlType}") + print(f" 控件名称: {target.Name}") + print(f" 自动化ID: {target.AutomationId}") + + # 尝试多种点击方式 + try: + # 方法1: 直接点击 + target.Click() + print("✓ 使用 Click() 方法点击成功") + return True + except Exception as e: + print(f"✗ Click() 失败: {e}") + try: + # 方法2: 使用 Invoke(如果是可调用的控件) + target.Invoke() + print("✓ 使用 Invoke() 方法点击成功") + return True + except Exception as e2: + print(f"✗ Invoke() 失败: {e2}") + try: + # 方法3: 先设置焦点再点击 + target.SetFocus() + import time + time.sleep(0.1) + target.Click() + print("✓ 使用 SetFocus() + Click() 方法点击成功") + return True + except Exception as e3: + print(f"✗ SetFocus() + Click() 失败: {e3}") + # 方法4: 使用鼠标点击坐标 + try: + rect = target.BoundingRectangle + # BoundingRectangle 是一个矩形对象,计算中心点 + x = rect.left + ((rect.right - rect.left) / 2) + y = rect.top + ((rect.bottom - rect.top) / 2) + auto.Click(x, y) + print(f"✓ 使用坐标点击成功: ({x}, {y})") + return True + except Exception as e4: + print(f"✗ 坐标点击失败: {e4}") + else: + print(f"✗ 同级元素数量不足,只有 {len(siblings)} 个") + return False + else: + findAndClickFileImportShow(child, depth + 1) + +def begingClickSliceing(): + control = auto.WindowControl(searchDepth=1, Name='赛纳3D打印控制系统 V1.4.3.2') + clickRes = findAndClickFileImportShow(control) + return clickRes + +# clickFileIMportShow() \ No newline at end of file diff --git a/script/auto_sliceing_operate/utils/exe_operate.py b/script/auto_sliceing_operate/utils/exe_operate.py new file mode 100644 index 0000000..5dcb44a --- /dev/null +++ b/script/auto_sliceing_operate/utils/exe_operate.py @@ -0,0 +1,202 @@ +#启动exe 文件 +import os +import subprocess +import time +import uiautomation as auto +exe_path = r"C:\Users\Administrator\Desktop\排版软件\NormalTek.ThreeDPrinter.UI.exe" + +def start_exe(): + try: + # 检查文件是否存在 + if not os.path.exists(exe_path): + print(f"错误:文件不存在 - {exe_path}") + return False + + # 检查是否是文件(不是目录) + if not os.path.isfile(exe_path): + print(f"错误:路径不是文件 - {exe_path}") + return False + + # 获取 exe 文件所在的目录(工作目录) + exe_dir = os.path.dirname(exe_path) + print(f"工作目录设置为: {exe_dir}") + + # 检查工作目录是否存在 + if not os.path.exists(exe_dir): + print(f"错误:工作目录不存在 - {exe_dir}") + return False + + # 使用 subprocess 启动,并设置工作目录 + # 这样可以确保程序能找到同目录下的配置文件 + try: + process = subprocess.Popen( + [exe_path], + cwd=exe_dir, # 设置工作目录为 exe 所在目录 + creationflags=subprocess.CREATE_NEW_CONSOLE # 在新控制台窗口运行 + ) + print(f"✓ 成功启动程序: {exe_path}") + print(f" 进程ID: {process.pid}") + print(f" 工作目录: {exe_dir}") + return True + except Exception as e: + print(f"subprocess 启动失败: {e}") + # 备用方案:使用 os.startfile(但无法设置工作目录) + try: + # 先切换到工作目录 + original_cwd = os.getcwd() + os.chdir(exe_dir) + os.startfile(exe_path) + os.chdir(original_cwd) # 恢复原目录 + print(f"✓ 使用 os.startfile 成功启动程序: {exe_path}") + return True + except Exception as e2: + print(f"os.startfile 启动也失败: {e2}") + return False + + except Exception as e: + print(f"启动程序时发生异常: {e}") + import traceback + traceback.print_exc() + return False + + +def click_confirm(): + """ + 触发点击 ESC 键,用于关闭弹框 + """ + try: + # 发送 ESC 键到当前活动窗口 + auto.SendKeys('{ESC}') + print("✓ 已发送 ESC 键") + time.sleep(0.3) + return True + except Exception as e: + print(f"发送 ESC 键失败: {e}") + return False + + +def close(): + """ + 关闭打开的应用程序 + 通过查找窗口名称并关闭窗口,使用多种方法确保成功 + """ + try: + import win32gui + import win32con + import win32api + import win32process + + # 应用程序的窗口名称 + window_name = "赛纳3D打印控制系统 V1.4.3.2" + + print(f"正在查找窗口: {window_name}") + + # 方法1: 使用 win32gui 查找窗口(更可靠) + hwnd = win32gui.FindWindow(None, window_name) + + if hwnd: + print(f"✓ 找到窗口句柄: {hwnd}") + # 激活窗口 + win32gui.SetForegroundWindow(hwnd) + win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) # 确保窗口不是最小化 + time.sleep(0.3) + + # 方法1: 使用 SendMessage 发送 WM_CLOSE(比 PostMessage 更强制) + try: + win32gui.SendMessage(hwnd, win32con.WM_CLOSE, 0, 0) + print("✓ 使用 SendMessage(WM_CLOSE) 关闭窗口") + time.sleep(1) + # 检查窗口是否还存在 + if not win32gui.IsWindow(hwnd): + print("✓ 窗口已成功关闭") + return True + except Exception as e: + print(f"SendMessage(WM_CLOSE) 失败: {e}") + + # 方法2: 使用 win32api 发送 Alt+F4 按键事件(更可靠) + try: + win32gui.SetForegroundWindow(hwnd) + time.sleep(0.2) + # 按下 Alt + win32api.keybd_event(win32con.VK_MENU, 0, 0, 0) + time.sleep(0.05) + # 按下 F4 + win32api.keybd_event(win32con.VK_F4, 0, 0, 0) + time.sleep(0.05) + # 释放 F4 + win32api.keybd_event(win32con.VK_F4, 0, win32con.KEYEVENTF_KEYUP, 0) + time.sleep(0.05) + # 释放 Alt + win32api.keybd_event(win32con.VK_MENU, 0, win32con.KEYEVENTF_KEYUP, 0) + print("✓ 使用 win32api 发送 Alt+F4") + time.sleep(1) + # 检查窗口是否还存在 + if not win32gui.IsWindow(hwnd): + print("✓ 窗口已成功关闭") + return True + except Exception as e: + print(f"win32api Alt+F4 失败: {e}") + + # 方法3: 尝试使用 uiautomation 的 Close() + try: + window = auto.WindowControl(searchDepth=1, Name=window_name) + if window.Exists(0, 0): + window.SetFocus() + time.sleep(0.2) + window.Close() + print("✓ 使用 uiautomation Close() 关闭窗口") + time.sleep(1) + if not win32gui.IsWindow(hwnd): + print("✓ 窗口已成功关闭") + return True + except Exception as e: + print(f"uiautomation Close() 失败: {e}") + + # 方法4: 如果以上都失败,尝试终止进程(最后手段) + print("尝试通过进程ID终止程序...") + try: + import psutil + # 通过窗口句柄获取进程ID + _, pid = win32process.GetWindowThreadProcessId(hwnd) + process = psutil.Process(pid) + process.terminate() + print(f"✓ 已终止进程 (PID: {pid})") + time.sleep(0.5) + return True + except ImportError: + print("psutil 未安装,无法使用进程终止方式") + except Exception as e: + print(f"终止进程失败: {e}") + + print("✗ 所有关闭方法都失败了") + return False + else: + # 如果 win32gui 找不到,尝试使用 uiautomation + print("win32gui 未找到窗口,尝试使用 uiautomation...") + window = auto.WindowControl(searchDepth=1, Name=window_name) + if window.Exists(0, 0): + print(f"✓ 使用 uiautomation 找到窗口") + window.SetFocus() + time.sleep(0.2) + hwnd = window.Handle + win32gui.SendMessage(hwnd, win32con.WM_CLOSE, 0, 0) + print("✓ 使用 SendMessage(WM_CLOSE) 关闭窗口") + time.sleep(1) + return True + else: + print(f"✗ 未找到窗口: {window_name}") + return False + + except Exception as e: + print(f"关闭应用程序时发生异常: {e}") + import traceback + traceback.print_exc() + return False + +#if __name__ == "__main__": + # start_exe() + # #启动成功之后,有一个弹框 的窗口,点击确定 + # time.sleep(5) + # click_confirm() + # time.sleep(10) + # close() \ No newline at end of file diff --git a/script/auto_sliceing_operate/utils/import_all_file.py b/script/auto_sliceing_operate/utils/import_all_file.py new file mode 100644 index 0000000..15c91b4 --- /dev/null +++ b/script/auto_sliceing_operate/utils/import_all_file.py @@ -0,0 +1,179 @@ +import win32gui, win32con, time +import win32api +import ctypes + +def find_window(title=None, class_name=None): + # 任一参数为 None 即忽略该条件 + hwnd = win32gui.FindWindow(class_name, title) + return hwnd if hwnd else None + + +def send_text_to_window(hwnd, text): + """向窗口发送文本""" + win32gui.SetForegroundWindow(hwnd) + time.sleep(0.2) + + for char in text: + # 获取虚拟键码 + vk_code = win32api.VkKeyScan(char) + + if vk_code != -1: + # 提取低字节(虚拟键码)和高字节(修饰键) + vk = vk_code & 0xFF + shift_state = (vk_code >> 8) & 0xFF + + # 如果需要 Shift + if shift_state & 1: # Shift 键 + win32api.keybd_event(win32con.VK_SHIFT, 0, 0, 0) + time.sleep(0.01) + + # 按下键 + win32api.keybd_event(vk, 0, 0, 0) + time.sleep(0.01) + # 释放键 + win32api.keybd_event(vk, 0, win32con.KEYEVENTF_KEYUP, 0) + + # 释放 Shift(如果按下了) + if shift_state & 1: + time.sleep(0.01) + win32api.keybd_event(win32con.VK_SHIFT, 0, win32con.KEYEVENTF_KEYUP, 0) + + time.sleep(0.02) # 字符之间的延迟 + +#发送 Alt+D 快捷键(聚焦地址栏) +def send_alt_d(hwnd): + """发送 Alt+D 快捷键(聚焦地址栏)""" + win32gui.SetForegroundWindow(hwnd) + time.sleep(0.2) + win32api.keybd_event(win32con.VK_MENU, 0, 0, 0) # Alt + time.sleep(0.05) + win32api.keybd_event(ord('D'), 0, 0, 0) + time.sleep(0.05) + win32api.keybd_event(ord('D'), 0, win32con.KEYEVENTF_KEYUP, 0) + win32api.keybd_event(win32con.VK_MENU, 0, win32con.KEYEVENTF_KEYUP, 0) + time.sleep(0.3) + + +#发送 Ctrl+A 快捷键(全选) +def send_ctrl_a(hwnd): + """发送 Ctrl+A 快捷键(全选)""" + win32gui.SetForegroundWindow(hwnd) + time.sleep(0.2) + win32api.keybd_event(win32con.VK_CONTROL, 0, 0, 0) + time.sleep(0.05) + win32api.keybd_event(ord('A'), 0, 0, 0) + time.sleep(0.05) + win32api.keybd_event(ord('A'), 0, win32con.KEYEVENTF_KEYUP, 0) + win32api.keybd_event(win32con.VK_CONTROL, 0, win32con.KEYEVENTF_KEYUP, 0) + time.sleep(0.3) + + +#发送 Enter 键 +def send_enter(hwnd): + """发送 Enter 键""" + win32gui.SetForegroundWindow(hwnd) + time.sleep(0.1) + win32api.keybd_event(win32con.VK_RETURN, 0, 0, 0) + time.sleep(0.05) + win32api.keybd_event(win32con.VK_RETURN, 0, win32con.KEYEVENTF_KEYUP, 0) + time.sleep(0.2) + +# +def send_tab(hwnd, count=1): + """发送 Tab 键(用于导航焦点)""" + win32gui.SetForegroundWindow(hwnd) + time.sleep(0.1) + for _ in range(count): + win32api.keybd_event(win32con.VK_TAB, 0, 0, 0) + time.sleep(0.05) + win32api.keybd_event(win32con.VK_TAB, 0, win32con.KEYEVENTF_KEYUP, 0) + time.sleep(0.1) + time.sleep(0.2) + +#聚焦文件列表窗口 +def focus_file_list(hwnd): + """将焦点移到文件列表区域""" + win32gui.SetForegroundWindow(hwnd) + time.sleep(0.2) + + # 使用鼠标点击文件列表区域(最可靠的方法) + try: + # 获取窗口位置和大小 + left, top, right, bottom = win32gui.GetWindowRect(hwnd) + width = right - left + height = bottom - top + + # 文件列表通常在窗口中间偏下的位置 + # 计算文件列表的大概位置(窗口中心偏下,避开地址栏和按钮区域) + click_x = left + width // 2 + click_y = top + height // 2 + 30 # 稍微偏下一点,避开地址栏 + + # 点击文件列表区域 + win32api.SetCursorPos((click_x, click_y)) + time.sleep(0.1) + win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN, 0, 0, 0, 0) + time.sleep(0.05) + win32api.mouse_event(win32con.MOUSEEVENTF_LEFTUP, 0, 0, 0, 0) + time.sleep(0.2) + except Exception as e: + print(f"点击文件列表失败,使用Tab键方式: {e}") + # 如果点击失败,使用Tab键导航(备用方案) + send_tab(hwnd, 1) + time.sleep(0.2) + + +def modify_file_dialog_path_and_import_all(target_path=""): + if not target_path: + return False + """ + 修改文件对话框的路径并导入所有文件 + + 参数: + target_path: 目标文件夹路径,默认为 C://work/batchPrint + """ + # 1. 找到"打开"窗口 + print("正在查找'打开'窗口...") + hwnd = find_window(title='打开') + if not hwnd: + print("未找到'打开'窗口,请确保文件对话框已打开") + return False + + print(f"✓ 找到'打开'窗口,句柄: {hwnd}") + time.sleep(0.5) + + # 2. 激活窗口并聚焦地址栏(使用 Alt+D 快捷键) + print("正在聚焦地址栏...") + send_alt_d(hwnd) + time.sleep(0.3) + + # 3. 清空地址栏并输入新路径 + print(f"正在设置路径为: {target_path}") + send_ctrl_a(hwnd) # 全选现有路径 + time.sleep(0.2) + + # 输入新路径 + send_text_to_window(hwnd, target_path) + time.sleep(0.3) + + # 按 Enter 确认路径 + send_enter(hwnd) + print("✓ 路径已设置") + time.sleep(1.5) # 等待路径加载完成 + + # 4. 将焦点移到文件列表 + print("正在将焦点移到文件列表...") + focus_file_list(hwnd) + time.sleep(0.3) + + # 5. 选择所有文件 + print("正在选择所有文件...") + send_ctrl_a(hwnd) # Ctrl+A 选择所有文件 + time.sleep(0.5) + print("✓ 已选择所有文件") + + # 6. 点击"打开"按钮(使用 Enter 键) + print("正在确认打开...") + send_enter(hwnd) + print("✓ 已确认打开") + + return True diff --git a/script/auto_sliceing_operate/utils/logs.py b/script/auto_sliceing_operate/utils/logs.py new file mode 100644 index 0000000..2fd7823 --- /dev/null +++ b/script/auto_sliceing_operate/utils/logs.py @@ -0,0 +1,4 @@ +import time + +def log(message): + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {message}') \ No newline at end of file diff --git a/script/auto_sliceing_operate/utils/oss_func.py b/script/auto_sliceing_operate/utils/oss_func.py new file mode 100644 index 0000000..1e335f9 --- /dev/null +++ b/script/auto_sliceing_operate/utils/oss_func.py @@ -0,0 +1,45 @@ +import os +from .oss_redis import ossClient +from .logs import log + +# 从 OSS 下载文件到本地,并做完整性校验(状态码 + 文件大小) +def download_file_with_check(ossFilePath, localFilePath): + try: + log(f"开始从 OSS 下载文件: {ossFilePath} -> {localFilePath}") + if not checkFileExists(ossFilePath): + log(f"文件不存在: {ossFilePath}") + return False + # 同步阻塞下载,下载完成后才会返回 + result = ossClient().get_object_to_file(ossFilePath, localFilePath) + + # 1. 检查 HTTP 状态码 + status = getattr(result, "status", None) + if status != 200: + log(f"下载失败,HTTP 状态码异常: status={status}") + return False + + # 2. 远端 / 本地文件大小对比,作为二次校验 + remote_meta = ossClient().head_object(ossFilePath) + remote_size = getattr(remote_meta, "content_length", None) + if remote_size is None: + log("无法获取远端文件大小,放弃本次下载结果") + return False + + if not os.path.exists(localFilePath): + log("本地文件不存在,下载可能失败") + return False + + local_size = os.path.getsize(localFilePath) + if remote_size != local_size: + log(f"文件大小不一致,下载可能不完整: remote={remote_size}, local={local_size}") + return False + + log("文件下载成功且完整性校验通过") + return True + except Exception as e: + log(f"下载文件出现异常: {str(e)}") + return False + + +def checkFileExists(ossFilePath): + return ossClient().object_exists(ossFilePath) \ No newline at end of file diff --git a/script/auto_sliceing_operate/utils/oss_redis.py b/script/auto_sliceing_operate/utils/oss_redis.py new file mode 100644 index 0000000..72df86f --- /dev/null +++ b/script/auto_sliceing_operate/utils/oss_redis.py @@ -0,0 +1,125 @@ +import oss2,redis +import time +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# 连接oss - 单例模式 +class OSSClientSingleton: + _instance = None + _client = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super(OSSClientSingleton, cls).__new__(cls) + return cls._instance + + def get_client(self): + if self._client is None: + AccessKeyId = 'LTAI5tSReWm8hz7dSYxxth8f' + AccessKeySecret = '8ywTDF9upPAtvgXtLKALY2iMYHIxdS' + Endpoint = 'oss-cn-shanghai.aliyuncs.com' + Bucket = 'suwa3d-securedata' + self._client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket) + return self._client + +def ossClient(): + """获取OSS客户端单例""" + return OSSClientSingleton().get_client() + + +#连接redis,单例模式 +class RedisClientSingleton: + _instance = None + _client = None + + # Redis连接配置 + REDIS_CONFIG = { + 'host': 'mp.api.suwa3d.com', + 'password': 'kcV2000', + 'port': 6379, + 'db': 6, + 'socket_timeout': 30, # 操作超时30秒 + 'socket_connect_timeout': 10, # 连接超时10秒 + 'socket_keepalive': True, # 启用 TCP keepalive + 'socket_keepalive_options': {}, # keepalive 选项 + 'health_check_interval': 30 # 健康检查间隔30秒 + } + + # 重试配置 + RETRY_INTERVAL = 5 # 重试间隔(秒) + MAX_RETRY_INTERVAL = 60 # 最大重试间隔(秒),用于指数退避 + + def __new__(cls): + if cls._instance is None: + cls._instance = super(RedisClientSingleton, cls).__new__(cls) + return cls._instance + + def _create_redis_client(self): + """创建Redis客户端""" + return redis.Redis(**self.REDIS_CONFIG) + + def _connect_with_retry(self, max_retries=None, retry_interval=None): + """ + 带重试机制的Redis连接方法 + :param max_retries: 最大重试次数,None表示无限重试 + :param retry_interval: 重试间隔(秒),None表示使用默认值,支持指数退避 + :return: Redis客户端实例 + """ + if retry_interval is None: + retry_interval = self.RETRY_INTERVAL + + retry_count = 0 + current_interval = retry_interval + + while True: + try: + logger.info(f"尝试连接Redis (第 {retry_count + 1} 次)...") + client = self._create_redis_client() + # 测试连接 + client.ping() + logger.info("Redis连接成功!") + return client + except (redis.ConnectionError, redis.TimeoutError, AttributeError, Exception) as e: + retry_count += 1 + error_msg = str(e) + logger.warning(f"Redis连接失败 (第 {retry_count} 次): {error_msg}") + + # 如果设置了最大重试次数且已达到,则抛出异常 + if max_retries is not None and retry_count >= max_retries: + logger.error(f"达到最大重试次数 {max_retries},停止重试") + raise + + # 指数退避:每次重试间隔逐渐增加,但不超过最大值 + logger.info(f"等待 {current_interval} 秒后重试...") + time.sleep(current_interval) + current_interval = min(current_interval * 1.5, self.MAX_RETRY_INTERVAL) + + def get_client(self): + """ + 获取Redis客户端,如果连接断开则自动重连(带重试机制) + :return: Redis客户端实例 + """ + if self._client is None: + # 首次连接,使用无限重试直到成功 + logger.info("初始化Redis连接...") + self._client = self._connect_with_retry(max_retries=None) + else: + # 检查连接是否有效,如果断开则重新连接(带重试) + try: + self._client.ping() + except (redis.ConnectionError, redis.TimeoutError, AttributeError) as e: + # 连接断开,重新创建连接(使用无限重试直到成功) + logger.warning(f"Redis连接已断开: {str(e)},开始重新连接...") + self._client = None + self._client = self._connect_with_retry(max_retries=None) + + return self._client + + + +def redisClient(): + """获取Redis客户端单例""" + return RedisClientSingleton().get_client() diff --git a/script/auto_sliceing_operate/utils/show_import_dialog.py b/script/auto_sliceing_operate/utils/show_import_dialog.py new file mode 100644 index 0000000..97dc7e3 --- /dev/null +++ b/script/auto_sliceing_operate/utils/show_import_dialog.py @@ -0,0 +1,76 @@ +import uiautomation as auto + + + +# 遍历所有控件 找到 +def findAndClickFileImportShow(control, depth=0): + for child in control.GetChildren(): + # print(child) + # print(' ' * depth + f"{child.ControlType}: {child.Name} | {child.AutomationId}") + if child.Name == "切片及打印": + # 点击和 child 同组的第二个元素(即 control 的子控件中的第二个) + siblings = control.GetChildren() # 获取所有同级元素 + # print(f"\n找到目标控件: {child.Name}") + # print(f"同级元素数量: {len(siblings)}") + + # 打印所有同级元素信息,方便调试 + for i, sibling in enumerate(siblings): + marker = " <-- 这是找到的目标" if sibling == child else "" + print(f" 同级元素[{i}]: {sibling.ControlType} | {sibling.Name} | {sibling.AutomationId}{marker}") + + # 确保有至少2个同级元素 + if len(siblings) >= 2: + target = siblings[1] # 第二个元素(索引从0开始) + print(f"\n准备点击同级第二个元素:") + print(f" 控件类型: {target.ControlType}") + print(f" 控件名称: {target.Name}") + print(f" 自动化ID: {target.AutomationId}") + + # 尝试多种点击方式 + try: + # 方法1: 直接点击 + target.Click() + print("✓ 使用 Click() 方法点击成功") + return True + except Exception as e: + print(f"✗ Click() 失败: {e}") + try: + # 方法2: 使用 Invoke(如果是可调用的控件) + target.Invoke() + print("✓ 使用 Invoke() 方法点击成功") + return True + except Exception as e2: + print(f"✗ Invoke() 失败: {e2}") + try: + # 方法3: 先设置焦点再点击 + target.SetFocus() + import time + time.sleep(0.1) + target.Click() + print("✓ 使用 SetFocus() + Click() 方法点击成功") + return True + except Exception as e3: + print(f"✗ SetFocus() + Click() 失败: {e3}") + # 方法4: 使用鼠标点击坐标 + try: + rect = target.BoundingRectangle + # BoundingRectangle 是一个矩形对象,计算中心点 + x = rect.left + ((rect.right - rect.left) / 2) + y = rect.top + ((rect.bottom - rect.top) / 2) + auto.Click(x, y) + print(f"✓ 使用坐标点击成功: ({x}, {y})") + return True + except Exception as e4: + print(f"✗ 坐标点击失败: {e4}") + else: + print(f"✗ 同级元素数量不足,只有 {len(siblings)} 个") + return False + else: + findAndClickFileImportShow(child, depth + 1) + +def clickFileIMportShow(): + control = auto.WindowControl(searchDepth=1, Name='赛纳3D打印控制系统 V1.4.3.2') + clickRes = findAndClickFileImportShow(control) + return clickRes + +# clickFileIMportShow() \ No newline at end of file diff --git a/script/factory_sliceing_v2/build_exe.bat b/script/factory_sliceing_v2/build_exe.bat index 4543b84..273ec95 100644 --- a/script/factory_sliceing_v2/build_exe.bat +++ b/script/factory_sliceing_v2/build_exe.bat @@ -56,3 +56,4 @@ echo ======================================== pause + diff --git a/script/factory_sliceing_v2/build_exe.py b/script/factory_sliceing_v2/build_exe.py index 213f884..5a5ecc3 100644 --- a/script/factory_sliceing_v2/build_exe.py +++ b/script/factory_sliceing_v2/build_exe.py @@ -149,3 +149,4 @@ if __name__ == '__main__': sys.exit(1) + diff --git a/script/factory_sliceing_v2/download.py b/script/factory_sliceing_v2/download.py new file mode 100644 index 0000000..75ebae2 --- /dev/null +++ b/script/factory_sliceing_v2/download.py @@ -0,0 +1,158 @@ +import os,shutil +import redis +import oss2,time,sys +import requests +import argparse,json +from utils.funcs import requestApiToUpdateSliceStatus + +# 将当前脚本所在目录添加到 Python 路径,以便导入 utils 模块 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# from download_print_out import download_transform_save_by_json +from utils.oss_redis import redisClient +from utils.funcs import downloadJsonAndJpgFileAndMoveToCorrectDir, downloadDataByOssAndTransformSave +# 默认使用脚本所在目录 +currentDir = os.path.dirname(os.path.abspath(__file__)) + +ENV = 'prod' +url = 'https://mp.api.suwa3d.com' +if ENV == 'dev': + url = 'http://mp.api.dev.com' +elif ENV == 'prod': + url = 'https://mp.api.suwa3d.com' + + +#判断是否上传了 JSON 文件 +def step1(versionId): + + # 下载json 文件 和 图片 + dirName,machineInfo = downloadJsonAndJpgFileAndMoveToCorrectDir(versionId,currentDir) + if not dirName: + return False + + #判断是否是小机台 + isSmallMachine = False + if str(machineInfo["machine_type"]) == '1': + isSmallMachine = True + + + #下载数据,转换数据 + res = downloadDataByOssAndTransformSave(dirName,isSmallMachine) + if not res: + return False + + #判断下载的obj文件数量和json里的是否一致,排除arrange文件夹 + objFilePath = os.path.join(dirName, 'data') + objCounts = 0 + for file in os.listdir(objFilePath): + if file == 'arrange': + continue + if file.endswith('.obj'): + objCounts += 1 + + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 下载处理完成的obj文件数量: {objCounts}') + # requestApiToUpdateSliceStatus(versionId,objCounts) + + + +# 读取 队列中一个数据出来 +def main(work_dir=None, batch_id=None): + + global currentDir + # 如果指定了工作目录,使用指定的目录 + if work_dir: + work_dir = os.path.abspath(work_dir) + if not os.path.exists(work_dir): + print(f'指定的工作目录不存在: {work_dir},将创建该目录') + os.makedirs(work_dir, exist_ok=True) + currentDir = work_dir + print(f'使用指定的工作目录: {currentDir}') + else: + print(f'没有指定工作目录,退出') + exit(0) + + + versionId = str(batch_id) + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}') + res = step1(versionId) + if res == False: + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒') + # time.sleep(10) + # continue + # 循环处理,直到队列为空 + # try: + # while True: + # try: + # r = redisClient() + # #检测队列是否有值 + # if r.scard('pb:sliceing') == 0: + # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒') + # time.sleep(10) + # continue + # #获取队列中的值 + # data = r.spop('pb:sliceing') + # if data is None: + # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒') + # time.sleep(10) + # continue + # data = data.decode('utf-8') + # #判断是否是数字 + # if not data.isdigit(): + # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒') + # time.sleep(10) + # continue + + # versionId = str(data) + # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}') + # res = step1(versionId) + # if res == False: + # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒') + # time.sleep(10) + # continue + + + + + # # 在长时间操作后,确保 Redis 连接仍然有效 + # # 通过重新获取客户端来触发连接检查 + # try: + # r = redisClient() + # r.ping() # 测试连接 + # except Exception as e: + # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连') + + # #time.sleep(10) + # except KeyboardInterrupt: + # # 在循环内部捕获 KeyboardInterrupt,允许在 sleep 或操作中被中断 + # raise # 重新抛出,让外层捕获 + # except KeyboardInterrupt: + # print(f'\n{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 收到中断信号,正在优雅退出...') + # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 程序已停止') + # sys.exit(0) + + +def testMain(): + global currentDir + currentDir = "/Users/dcx/code/make2/script/factory_sliceing_v2/tempData" + versionId = '10153' #'10153 10158' + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}') + res = step1(versionId) + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 处理完成,res={res}') + +if __name__ == '__main__': + #testMain() 新增参数 batchId + parser = argparse.ArgumentParser(description='排版打印订单处理程序') + parser.add_argument( + '--work-dir', + type=str, + default=None, + help='指定工作目录(磁盘路径),例如: D:/work 或 /Users/username/work。如果不指定,则使用脚本所在目录' + ) + parser.add_argument( + '--batch—id', + type=str, + default=None, + help='指定批次ID' + ) + args = parser.parse_args() + main(work_dir=args.work_dir, batch_id=args.batch_id) diff --git a/script/factory_sliceing_v2/requirements.txt b/script/factory_sliceing_v2/requirements.txt index 8371edc..72f4fa3 100644 --- a/script/factory_sliceing_v2/requirements.txt +++ b/script/factory_sliceing_v2/requirements.txt @@ -4,3 +4,4 @@ requests>=2.28.0 pyinstaller>=5.0.0 + diff --git a/script/factory_sliceing_v2/utils/oss_redis.py b/script/factory_sliceing_v2/utils/oss_redis.py index d0c604a..72df86f 100644 --- a/script/factory_sliceing_v2/utils/oss_redis.py +++ b/script/factory_sliceing_v2/utils/oss_redis.py @@ -1,4 +1,10 @@ import oss2,redis +import time +import logging + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) # 连接oss - 单例模式 class OSSClientSingleton: @@ -29,47 +35,87 @@ class RedisClientSingleton: _instance = None _client = None + # Redis连接配置 + REDIS_CONFIG = { + 'host': 'mp.api.suwa3d.com', + 'password': 'kcV2000', + 'port': 6379, + 'db': 6, + 'socket_timeout': 30, # 操作超时30秒 + 'socket_connect_timeout': 10, # 连接超时10秒 + 'socket_keepalive': True, # 启用 TCP keepalive + 'socket_keepalive_options': {}, # keepalive 选项 + 'health_check_interval': 30 # 健康检查间隔30秒 + } + + # 重试配置 + RETRY_INTERVAL = 5 # 重试间隔(秒) + MAX_RETRY_INTERVAL = 60 # 最大重试间隔(秒),用于指数退避 + def __new__(cls): if cls._instance is None: cls._instance = super(RedisClientSingleton, cls).__new__(cls) return cls._instance + def _create_redis_client(self): + """创建Redis客户端""" + return redis.Redis(**self.REDIS_CONFIG) + + def _connect_with_retry(self, max_retries=None, retry_interval=None): + """ + 带重试机制的Redis连接方法 + :param max_retries: 最大重试次数,None表示无限重试 + :param retry_interval: 重试间隔(秒),None表示使用默认值,支持指数退避 + :return: Redis客户端实例 + """ + if retry_interval is None: + retry_interval = self.RETRY_INTERVAL + + retry_count = 0 + current_interval = retry_interval + + while True: + try: + logger.info(f"尝试连接Redis (第 {retry_count + 1} 次)...") + client = self._create_redis_client() + # 测试连接 + client.ping() + logger.info("Redis连接成功!") + return client + except (redis.ConnectionError, redis.TimeoutError, AttributeError, Exception) as e: + retry_count += 1 + error_msg = str(e) + logger.warning(f"Redis连接失败 (第 {retry_count} 次): {error_msg}") + + # 如果设置了最大重试次数且已达到,则抛出异常 + if max_retries is not None and retry_count >= max_retries: + logger.error(f"达到最大重试次数 {max_retries},停止重试") + raise + + # 指数退避:每次重试间隔逐渐增加,但不超过最大值 + logger.info(f"等待 {current_interval} 秒后重试...") + time.sleep(current_interval) + current_interval = min(current_interval * 1.5, self.MAX_RETRY_INTERVAL) + def get_client(self): + """ + 获取Redis客户端,如果连接断开则自动重连(带重试机制) + :return: Redis客户端实例 + """ if self._client is None: - # 添加超时参数,防止连接超时 - # socket_timeout: 每次操作的超时时间(秒) - # socket_connect_timeout: 连接超时时间(秒) - # socket_keepalive: 启用 TCP keepalive - # socket_keepalive_options: keepalive 选项 - self._client = redis.Redis( - host='mp.api.suwa3d.com', - password='kcV2000', - port=6379, - db=6, - socket_timeout=30, # 操作超时30秒 - socket_connect_timeout=10, # 连接超时10秒 - socket_keepalive=True, # 启用 TCP keepalive - socket_keepalive_options={}, # keepalive 选项 - health_check_interval=30 # 健康检查间隔30秒 - ) + # 首次连接,使用无限重试直到成功 + logger.info("初始化Redis连接...") + self._client = self._connect_with_retry(max_retries=None) else: - # 检查连接是否有效,如果断开则重新连接 + # 检查连接是否有效,如果断开则重新连接(带重试) try: self._client.ping() - except (redis.ConnectionError, redis.TimeoutError, AttributeError): - # 连接断开,重新创建连接 + except (redis.ConnectionError, redis.TimeoutError, AttributeError) as e: + # 连接断开,重新创建连接(使用无限重试直到成功) + logger.warning(f"Redis连接已断开: {str(e)},开始重新连接...") self._client = None - self._client = redis.Redis( - host='mp.api.suwa3d.com', - password='kcV2000', - port=6379, - db=6, - socket_timeout=30, - socket_connect_timeout=10, - socket_keepalive=True, - socket_keepalive_options={}, - health_check_interval=30 - ) + self._client = self._connect_with_retry(max_retries=None) + return self._client