dongchangxi 2 weeks ago
parent
commit
dcf9eb6f23
  1. 28
      script/auto_sliceing_operate/main.py
  2. 76
      script/auto_sliceing_operate/utils/click_sliceing.py
  3. 202
      script/auto_sliceing_operate/utils/exe_operate.py
  4. 179
      script/auto_sliceing_operate/utils/import_all_file.py
  5. 4
      script/auto_sliceing_operate/utils/logs.py
  6. 45
      script/auto_sliceing_operate/utils/oss_func.py
  7. 125
      script/auto_sliceing_operate/utils/oss_redis.py
  8. 76
      script/auto_sliceing_operate/utils/show_import_dialog.py
  9. 1
      script/factory_sliceing_v2/build_exe.bat
  10. 1
      script/factory_sliceing_v2/build_exe.py
  11. 158
      script/factory_sliceing_v2/download.py
  12. 1
      script/factory_sliceing_v2/requirements.txt
  13. 106
      script/factory_sliceing_v2/utils/oss_redis.py

28
script/auto_sliceing_operate/main.py

@ -0,0 +1,28 @@
from utils.show_import_dialog import clickFileIMportShow
from utils.import_all_file import modify_file_dialog_path_and_import_all
from utils.click_sliceing import begingClickSliceing
import time
from utils.exe_operate import start_exe, click_confirm, close
#"C:/test/10192_small_No4/data"
def BeginSliceing(folderPath):
# 打开3d切片软件
start_exe()
time.sleep(1)
#点击确认按钮
click_confirm()
#先打开导入文件的弹框
clickFileIMportShow()
time.sleep(1)
#在输入路径导入obj文件
isSuccess = modify_file_dialog_path_and_import_all(folderPath)
if not isSuccess:
print("导入文件失败")
exit()
#点击开始切片
time.sleep(5)
begingClickSliceing()

76
script/auto_sliceing_operate/utils/click_sliceing.py

@ -0,0 +1,76 @@
import uiautomation as auto
# 遍历所有控件 找到
def findAndClickFileImportShow(control, depth=0):
for child in control.GetChildren():
# print(child)
# print(' ' * depth + f"{child.ControlType}: {child.Name} | {child.AutomationId}")
if child.Name == "切片及打印":
# 点击和 child 同组的第二个元素(即 control 的子控件中的第二个)
siblings = control.GetChildren() # 获取所有同级元素
# print(f"\n找到目标控件: {child.Name}")
# print(f"同级元素数量: {len(siblings)}")
# 打印所有同级元素信息,方便调试
for i, sibling in enumerate(siblings):
marker = " <-- 这是找到的目标" if sibling == child else ""
print(f" 同级元素[{i}]: {sibling.ControlType} | {sibling.Name} | {sibling.AutomationId}{marker}")
# 确保有至少2个同级元素
if len(siblings) >= 2:
target = siblings[2] # 第二个元素(索引从0开始)
print(f"\n准备点击同级第二个元素:")
print(f" 控件类型: {target.ControlType}")
print(f" 控件名称: {target.Name}")
print(f" 自动化ID: {target.AutomationId}")
# 尝试多种点击方式
try:
# 方法1: 直接点击
target.Click()
print("✓ 使用 Click() 方法点击成功")
return True
except Exception as e:
print(f"✗ Click() 失败: {e}")
try:
# 方法2: 使用 Invoke(如果是可调用的控件)
target.Invoke()
print("✓ 使用 Invoke() 方法点击成功")
return True
except Exception as e2:
print(f"✗ Invoke() 失败: {e2}")
try:
# 方法3: 先设置焦点再点击
target.SetFocus()
import time
time.sleep(0.1)
target.Click()
print("✓ 使用 SetFocus() + Click() 方法点击成功")
return True
except Exception as e3:
print(f"✗ SetFocus() + Click() 失败: {e3}")
# 方法4: 使用鼠标点击坐标
try:
rect = target.BoundingRectangle
# BoundingRectangle 是一个矩形对象,计算中心点
x = rect.left + ((rect.right - rect.left) / 2)
y = rect.top + ((rect.bottom - rect.top) / 2)
auto.Click(x, y)
print(f"✓ 使用坐标点击成功: ({x}, {y})")
return True
except Exception as e4:
print(f"✗ 坐标点击失败: {e4}")
else:
print(f"✗ 同级元素数量不足,只有 {len(siblings)}")
return False
else:
findAndClickFileImportShow(child, depth + 1)
def begingClickSliceing():
control = auto.WindowControl(searchDepth=1, Name='赛纳3D打印控制系统 V1.4.3.2')
clickRes = findAndClickFileImportShow(control)
return clickRes
# clickFileIMportShow()

202
script/auto_sliceing_operate/utils/exe_operate.py

@ -0,0 +1,202 @@
#启动exe 文件
import os
import subprocess
import time
import uiautomation as auto
exe_path = r"C:\Users\Administrator\Desktop\排版软件\NormalTek.ThreeDPrinter.UI.exe"
def start_exe():
try:
# 检查文件是否存在
if not os.path.exists(exe_path):
print(f"错误:文件不存在 - {exe_path}")
return False
# 检查是否是文件(不是目录)
if not os.path.isfile(exe_path):
print(f"错误:路径不是文件 - {exe_path}")
return False
# 获取 exe 文件所在的目录(工作目录)
exe_dir = os.path.dirname(exe_path)
print(f"工作目录设置为: {exe_dir}")
# 检查工作目录是否存在
if not os.path.exists(exe_dir):
print(f"错误:工作目录不存在 - {exe_dir}")
return False
# 使用 subprocess 启动,并设置工作目录
# 这样可以确保程序能找到同目录下的配置文件
try:
process = subprocess.Popen(
[exe_path],
cwd=exe_dir, # 设置工作目录为 exe 所在目录
creationflags=subprocess.CREATE_NEW_CONSOLE # 在新控制台窗口运行
)
print(f"✓ 成功启动程序: {exe_path}")
print(f" 进程ID: {process.pid}")
print(f" 工作目录: {exe_dir}")
return True
except Exception as e:
print(f"subprocess 启动失败: {e}")
# 备用方案:使用 os.startfile(但无法设置工作目录)
try:
# 先切换到工作目录
original_cwd = os.getcwd()
os.chdir(exe_dir)
os.startfile(exe_path)
os.chdir(original_cwd) # 恢复原目录
print(f"✓ 使用 os.startfile 成功启动程序: {exe_path}")
return True
except Exception as e2:
print(f"os.startfile 启动也失败: {e2}")
return False
except Exception as e:
print(f"启动程序时发生异常: {e}")
import traceback
traceback.print_exc()
return False
def click_confirm():
"""
触发点击 ESC 用于关闭弹框
"""
try:
# 发送 ESC 键到当前活动窗口
auto.SendKeys('{ESC}')
print("✓ 已发送 ESC 键")
time.sleep(0.3)
return True
except Exception as e:
print(f"发送 ESC 键失败: {e}")
return False
def close():
"""
关闭打开的应用程序
通过查找窗口名称并关闭窗口使用多种方法确保成功
"""
try:
import win32gui
import win32con
import win32api
import win32process
# 应用程序的窗口名称
window_name = "赛纳3D打印控制系统 V1.4.3.2"
print(f"正在查找窗口: {window_name}")
# 方法1: 使用 win32gui 查找窗口(更可靠)
hwnd = win32gui.FindWindow(None, window_name)
if hwnd:
print(f"✓ 找到窗口句柄: {hwnd}")
# 激活窗口
win32gui.SetForegroundWindow(hwnd)
win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) # 确保窗口不是最小化
time.sleep(0.3)
# 方法1: 使用 SendMessage 发送 WM_CLOSE(比 PostMessage 更强制)
try:
win32gui.SendMessage(hwnd, win32con.WM_CLOSE, 0, 0)
print("✓ 使用 SendMessage(WM_CLOSE) 关闭窗口")
time.sleep(1)
# 检查窗口是否还存在
if not win32gui.IsWindow(hwnd):
print("✓ 窗口已成功关闭")
return True
except Exception as e:
print(f"SendMessage(WM_CLOSE) 失败: {e}")
# 方法2: 使用 win32api 发送 Alt+F4 按键事件(更可靠)
try:
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.2)
# 按下 Alt
win32api.keybd_event(win32con.VK_MENU, 0, 0, 0)
time.sleep(0.05)
# 按下 F4
win32api.keybd_event(win32con.VK_F4, 0, 0, 0)
time.sleep(0.05)
# 释放 F4
win32api.keybd_event(win32con.VK_F4, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.05)
# 释放 Alt
win32api.keybd_event(win32con.VK_MENU, 0, win32con.KEYEVENTF_KEYUP, 0)
print("✓ 使用 win32api 发送 Alt+F4")
time.sleep(1)
# 检查窗口是否还存在
if not win32gui.IsWindow(hwnd):
print("✓ 窗口已成功关闭")
return True
except Exception as e:
print(f"win32api Alt+F4 失败: {e}")
# 方法3: 尝试使用 uiautomation 的 Close()
try:
window = auto.WindowControl(searchDepth=1, Name=window_name)
if window.Exists(0, 0):
window.SetFocus()
time.sleep(0.2)
window.Close()
print("✓ 使用 uiautomation Close() 关闭窗口")
time.sleep(1)
if not win32gui.IsWindow(hwnd):
print("✓ 窗口已成功关闭")
return True
except Exception as e:
print(f"uiautomation Close() 失败: {e}")
# 方法4: 如果以上都失败,尝试终止进程(最后手段)
print("尝试通过进程ID终止程序...")
try:
import psutil
# 通过窗口句柄获取进程ID
_, pid = win32process.GetWindowThreadProcessId(hwnd)
process = psutil.Process(pid)
process.terminate()
print(f"✓ 已终止进程 (PID: {pid})")
time.sleep(0.5)
return True
except ImportError:
print("psutil 未安装,无法使用进程终止方式")
except Exception as e:
print(f"终止进程失败: {e}")
print("✗ 所有关闭方法都失败了")
return False
else:
# 如果 win32gui 找不到,尝试使用 uiautomation
print("win32gui 未找到窗口,尝试使用 uiautomation...")
window = auto.WindowControl(searchDepth=1, Name=window_name)
if window.Exists(0, 0):
print(f"✓ 使用 uiautomation 找到窗口")
window.SetFocus()
time.sleep(0.2)
hwnd = window.Handle
win32gui.SendMessage(hwnd, win32con.WM_CLOSE, 0, 0)
print("✓ 使用 SendMessage(WM_CLOSE) 关闭窗口")
time.sleep(1)
return True
else:
print(f"✗ 未找到窗口: {window_name}")
return False
except Exception as e:
print(f"关闭应用程序时发生异常: {e}")
import traceback
traceback.print_exc()
return False
#if __name__ == "__main__":
# start_exe()
# #启动成功之后,有一个弹框 的窗口,点击确定
# time.sleep(5)
# click_confirm()
# time.sleep(10)
# close()

179
script/auto_sliceing_operate/utils/import_all_file.py

@ -0,0 +1,179 @@
import win32gui, win32con, time
import win32api
import ctypes
def find_window(title=None, class_name=None):
# 任一参数为 None 即忽略该条件
hwnd = win32gui.FindWindow(class_name, title)
return hwnd if hwnd else None
def send_text_to_window(hwnd, text):
"""向窗口发送文本"""
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.2)
for char in text:
# 获取虚拟键码
vk_code = win32api.VkKeyScan(char)
if vk_code != -1:
# 提取低字节(虚拟键码)和高字节(修饰键)
vk = vk_code & 0xFF
shift_state = (vk_code >> 8) & 0xFF
# 如果需要 Shift
if shift_state & 1: # Shift 键
win32api.keybd_event(win32con.VK_SHIFT, 0, 0, 0)
time.sleep(0.01)
# 按下键
win32api.keybd_event(vk, 0, 0, 0)
time.sleep(0.01)
# 释放键
win32api.keybd_event(vk, 0, win32con.KEYEVENTF_KEYUP, 0)
# 释放 Shift(如果按下了)
if shift_state & 1:
time.sleep(0.01)
win32api.keybd_event(win32con.VK_SHIFT, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.02) # 字符之间的延迟
#发送 Alt+D 快捷键(聚焦地址栏)
def send_alt_d(hwnd):
"""发送 Alt+D 快捷键(聚焦地址栏)"""
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.2)
win32api.keybd_event(win32con.VK_MENU, 0, 0, 0) # Alt
time.sleep(0.05)
win32api.keybd_event(ord('D'), 0, 0, 0)
time.sleep(0.05)
win32api.keybd_event(ord('D'), 0, win32con.KEYEVENTF_KEYUP, 0)
win32api.keybd_event(win32con.VK_MENU, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.3)
#发送 Ctrl+A 快捷键(全选)
def send_ctrl_a(hwnd):
"""发送 Ctrl+A 快捷键(全选)"""
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.2)
win32api.keybd_event(win32con.VK_CONTROL, 0, 0, 0)
time.sleep(0.05)
win32api.keybd_event(ord('A'), 0, 0, 0)
time.sleep(0.05)
win32api.keybd_event(ord('A'), 0, win32con.KEYEVENTF_KEYUP, 0)
win32api.keybd_event(win32con.VK_CONTROL, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.3)
#发送 Enter 键
def send_enter(hwnd):
"""发送 Enter 键"""
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.1)
win32api.keybd_event(win32con.VK_RETURN, 0, 0, 0)
time.sleep(0.05)
win32api.keybd_event(win32con.VK_RETURN, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.2)
#
def send_tab(hwnd, count=1):
"""发送 Tab 键(用于导航焦点)"""
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.1)
for _ in range(count):
win32api.keybd_event(win32con.VK_TAB, 0, 0, 0)
time.sleep(0.05)
win32api.keybd_event(win32con.VK_TAB, 0, win32con.KEYEVENTF_KEYUP, 0)
time.sleep(0.1)
time.sleep(0.2)
#聚焦文件列表窗口
def focus_file_list(hwnd):
"""将焦点移到文件列表区域"""
win32gui.SetForegroundWindow(hwnd)
time.sleep(0.2)
# 使用鼠标点击文件列表区域(最可靠的方法)
try:
# 获取窗口位置和大小
left, top, right, bottom = win32gui.GetWindowRect(hwnd)
width = right - left
height = bottom - top
# 文件列表通常在窗口中间偏下的位置
# 计算文件列表的大概位置(窗口中心偏下,避开地址栏和按钮区域)
click_x = left + width // 2
click_y = top + height // 2 + 30 # 稍微偏下一点,避开地址栏
# 点击文件列表区域
win32api.SetCursorPos((click_x, click_y))
time.sleep(0.1)
win32api.mouse_event(win32con.MOUSEEVENTF_LEFTDOWN, 0, 0, 0, 0)
time.sleep(0.05)
win32api.mouse_event(win32con.MOUSEEVENTF_LEFTUP, 0, 0, 0, 0)
time.sleep(0.2)
except Exception as e:
print(f"点击文件列表失败,使用Tab键方式: {e}")
# 如果点击失败,使用Tab键导航(备用方案)
send_tab(hwnd, 1)
time.sleep(0.2)
def modify_file_dialog_path_and_import_all(target_path=""):
if not target_path:
return False
"""
修改文件对话框的路径并导入所有文件
参数:
target_path: 目标文件夹路径默认为 C://work/batchPrint
"""
# 1. 找到"打开"窗口
print("正在查找'打开'窗口...")
hwnd = find_window(title='打开')
if not hwnd:
print("未找到'打开'窗口,请确保文件对话框已打开")
return False
print(f"✓ 找到'打开'窗口,句柄: {hwnd}")
time.sleep(0.5)
# 2. 激活窗口并聚焦地址栏(使用 Alt+D 快捷键)
print("正在聚焦地址栏...")
send_alt_d(hwnd)
time.sleep(0.3)
# 3. 清空地址栏并输入新路径
print(f"正在设置路径为: {target_path}")
send_ctrl_a(hwnd) # 全选现有路径
time.sleep(0.2)
# 输入新路径
send_text_to_window(hwnd, target_path)
time.sleep(0.3)
# 按 Enter 确认路径
send_enter(hwnd)
print("✓ 路径已设置")
time.sleep(1.5) # 等待路径加载完成
# 4. 将焦点移到文件列表
print("正在将焦点移到文件列表...")
focus_file_list(hwnd)
time.sleep(0.3)
# 5. 选择所有文件
print("正在选择所有文件...")
send_ctrl_a(hwnd) # Ctrl+A 选择所有文件
time.sleep(0.5)
print("✓ 已选择所有文件")
# 6. 点击"打开"按钮(使用 Enter 键)
print("正在确认打开...")
send_enter(hwnd)
print("✓ 已确认打开")
return True

4
script/auto_sliceing_operate/utils/logs.py

@ -0,0 +1,4 @@
import time
def log(message):
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {message}')

45
script/auto_sliceing_operate/utils/oss_func.py

@ -0,0 +1,45 @@
import os
from .oss_redis import ossClient
from .logs import log
# 从 OSS 下载文件到本地,并做完整性校验(状态码 + 文件大小)
def download_file_with_check(ossFilePath, localFilePath):
try:
log(f"开始从 OSS 下载文件: {ossFilePath} -> {localFilePath}")
if not checkFileExists(ossFilePath):
log(f"文件不存在: {ossFilePath}")
return False
# 同步阻塞下载,下载完成后才会返回
result = ossClient().get_object_to_file(ossFilePath, localFilePath)
# 1. 检查 HTTP 状态码
status = getattr(result, "status", None)
if status != 200:
log(f"下载失败,HTTP 状态码异常: status={status}")
return False
# 2. 远端 / 本地文件大小对比,作为二次校验
remote_meta = ossClient().head_object(ossFilePath)
remote_size = getattr(remote_meta, "content_length", None)
if remote_size is None:
log("无法获取远端文件大小,放弃本次下载结果")
return False
if not os.path.exists(localFilePath):
log("本地文件不存在,下载可能失败")
return False
local_size = os.path.getsize(localFilePath)
if remote_size != local_size:
log(f"文件大小不一致,下载可能不完整: remote={remote_size}, local={local_size}")
return False
log("文件下载成功且完整性校验通过")
return True
except Exception as e:
log(f"下载文件出现异常: {str(e)}")
return False
def checkFileExists(ossFilePath):
return ossClient().object_exists(ossFilePath)

125
script/auto_sliceing_operate/utils/oss_redis.py

@ -0,0 +1,125 @@
import oss2,redis
import time
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 连接oss - 单例模式
class OSSClientSingleton:
_instance = None
_client = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(OSSClientSingleton, cls).__new__(cls)
return cls._instance
def get_client(self):
if self._client is None:
AccessKeyId = 'LTAI5tSReWm8hz7dSYxxth8f'
AccessKeySecret = '8ywTDF9upPAtvgXtLKALY2iMYHIxdS'
Endpoint = 'oss-cn-shanghai.aliyuncs.com'
Bucket = 'suwa3d-securedata'
self._client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket)
return self._client
def ossClient():
"""获取OSS客户端单例"""
return OSSClientSingleton().get_client()
#连接redis,单例模式
class RedisClientSingleton:
_instance = None
_client = None
# Redis连接配置
REDIS_CONFIG = {
'host': 'mp.api.suwa3d.com',
'password': 'kcV2000',
'port': 6379,
'db': 6,
'socket_timeout': 30, # 操作超时30秒
'socket_connect_timeout': 10, # 连接超时10秒
'socket_keepalive': True, # 启用 TCP keepalive
'socket_keepalive_options': {}, # keepalive 选项
'health_check_interval': 30 # 健康检查间隔30秒
}
# 重试配置
RETRY_INTERVAL = 5 # 重试间隔(秒)
MAX_RETRY_INTERVAL = 60 # 最大重试间隔(秒),用于指数退避
def __new__(cls):
if cls._instance is None:
cls._instance = super(RedisClientSingleton, cls).__new__(cls)
return cls._instance
def _create_redis_client(self):
"""创建Redis客户端"""
return redis.Redis(**self.REDIS_CONFIG)
def _connect_with_retry(self, max_retries=None, retry_interval=None):
"""
带重试机制的Redis连接方法
:param max_retries: 最大重试次数None表示无限重试
:param retry_interval: 重试间隔None表示使用默认值支持指数退避
:return: Redis客户端实例
"""
if retry_interval is None:
retry_interval = self.RETRY_INTERVAL
retry_count = 0
current_interval = retry_interval
while True:
try:
logger.info(f"尝试连接Redis (第 {retry_count + 1} 次)...")
client = self._create_redis_client()
# 测试连接
client.ping()
logger.info("Redis连接成功!")
return client
except (redis.ConnectionError, redis.TimeoutError, AttributeError, Exception) as e:
retry_count += 1
error_msg = str(e)
logger.warning(f"Redis连接失败 (第 {retry_count} 次): {error_msg}")
# 如果设置了最大重试次数且已达到,则抛出异常
if max_retries is not None and retry_count >= max_retries:
logger.error(f"达到最大重试次数 {max_retries},停止重试")
raise
# 指数退避:每次重试间隔逐渐增加,但不超过最大值
logger.info(f"等待 {current_interval} 秒后重试...")
time.sleep(current_interval)
current_interval = min(current_interval * 1.5, self.MAX_RETRY_INTERVAL)
def get_client(self):
"""
获取Redis客户端如果连接断开则自动重连带重试机制
:return: Redis客户端实例
"""
if self._client is None:
# 首次连接,使用无限重试直到成功
logger.info("初始化Redis连接...")
self._client = self._connect_with_retry(max_retries=None)
else:
# 检查连接是否有效,如果断开则重新连接(带重试)
try:
self._client.ping()
except (redis.ConnectionError, redis.TimeoutError, AttributeError) as e:
# 连接断开,重新创建连接(使用无限重试直到成功)
logger.warning(f"Redis连接已断开: {str(e)},开始重新连接...")
self._client = None
self._client = self._connect_with_retry(max_retries=None)
return self._client
def redisClient():
"""获取Redis客户端单例"""
return RedisClientSingleton().get_client()

76
script/auto_sliceing_operate/utils/show_import_dialog.py

@ -0,0 +1,76 @@
import uiautomation as auto
# 遍历所有控件 找到
def findAndClickFileImportShow(control, depth=0):
for child in control.GetChildren():
# print(child)
# print(' ' * depth + f"{child.ControlType}: {child.Name} | {child.AutomationId}")
if child.Name == "切片及打印":
# 点击和 child 同组的第二个元素(即 control 的子控件中的第二个)
siblings = control.GetChildren() # 获取所有同级元素
# print(f"\n找到目标控件: {child.Name}")
# print(f"同级元素数量: {len(siblings)}")
# 打印所有同级元素信息,方便调试
for i, sibling in enumerate(siblings):
marker = " <-- 这是找到的目标" if sibling == child else ""
print(f" 同级元素[{i}]: {sibling.ControlType} | {sibling.Name} | {sibling.AutomationId}{marker}")
# 确保有至少2个同级元素
if len(siblings) >= 2:
target = siblings[1] # 第二个元素(索引从0开始)
print(f"\n准备点击同级第二个元素:")
print(f" 控件类型: {target.ControlType}")
print(f" 控件名称: {target.Name}")
print(f" 自动化ID: {target.AutomationId}")
# 尝试多种点击方式
try:
# 方法1: 直接点击
target.Click()
print("✓ 使用 Click() 方法点击成功")
return True
except Exception as e:
print(f"✗ Click() 失败: {e}")
try:
# 方法2: 使用 Invoke(如果是可调用的控件)
target.Invoke()
print("✓ 使用 Invoke() 方法点击成功")
return True
except Exception as e2:
print(f"✗ Invoke() 失败: {e2}")
try:
# 方法3: 先设置焦点再点击
target.SetFocus()
import time
time.sleep(0.1)
target.Click()
print("✓ 使用 SetFocus() + Click() 方法点击成功")
return True
except Exception as e3:
print(f"✗ SetFocus() + Click() 失败: {e3}")
# 方法4: 使用鼠标点击坐标
try:
rect = target.BoundingRectangle
# BoundingRectangle 是一个矩形对象,计算中心点
x = rect.left + ((rect.right - rect.left) / 2)
y = rect.top + ((rect.bottom - rect.top) / 2)
auto.Click(x, y)
print(f"✓ 使用坐标点击成功: ({x}, {y})")
return True
except Exception as e4:
print(f"✗ 坐标点击失败: {e4}")
else:
print(f"✗ 同级元素数量不足,只有 {len(siblings)}")
return False
else:
findAndClickFileImportShow(child, depth + 1)
def clickFileIMportShow():
control = auto.WindowControl(searchDepth=1, Name='赛纳3D打印控制系统 V1.4.3.2')
clickRes = findAndClickFileImportShow(control)
return clickRes
# clickFileIMportShow()

1
script/factory_sliceing_v2/build_exe.bat

@ -56,3 +56,4 @@ echo ========================================
pause pause

1
script/factory_sliceing_v2/build_exe.py

@ -149,3 +149,4 @@ if __name__ == '__main__':
sys.exit(1) sys.exit(1)

158
script/factory_sliceing_v2/download.py

@ -0,0 +1,158 @@
import os,shutil
import redis
import oss2,time,sys
import requests
import argparse,json
from utils.funcs import requestApiToUpdateSliceStatus
# 将当前脚本所在目录添加到 Python 路径,以便导入 utils 模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# from download_print_out import download_transform_save_by_json
from utils.oss_redis import redisClient
from utils.funcs import downloadJsonAndJpgFileAndMoveToCorrectDir, downloadDataByOssAndTransformSave
# 默认使用脚本所在目录
currentDir = os.path.dirname(os.path.abspath(__file__))
ENV = 'prod'
url = 'https://mp.api.suwa3d.com'
if ENV == 'dev':
url = 'http://mp.api.dev.com'
elif ENV == 'prod':
url = 'https://mp.api.suwa3d.com'
#判断是否上传了 JSON 文件
def step1(versionId):
# 下载json 文件 和 图片
dirName,machineInfo = downloadJsonAndJpgFileAndMoveToCorrectDir(versionId,currentDir)
if not dirName:
return False
#判断是否是小机台
isSmallMachine = False
if str(machineInfo["machine_type"]) == '1':
isSmallMachine = True
#下载数据,转换数据
res = downloadDataByOssAndTransformSave(dirName,isSmallMachine)
if not res:
return False
#判断下载的obj文件数量和json里的是否一致,排除arrange文件夹
objFilePath = os.path.join(dirName, 'data')
objCounts = 0
for file in os.listdir(objFilePath):
if file == 'arrange':
continue
if file.endswith('.obj'):
objCounts += 1
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 下载处理完成的obj文件数量: {objCounts}')
# requestApiToUpdateSliceStatus(versionId,objCounts)
# 读取 队列中一个数据出来
def main(work_dir=None, batch_id=None):
global currentDir
# 如果指定了工作目录,使用指定的目录
if work_dir:
work_dir = os.path.abspath(work_dir)
if not os.path.exists(work_dir):
print(f'指定的工作目录不存在: {work_dir},将创建该目录')
os.makedirs(work_dir, exist_ok=True)
currentDir = work_dir
print(f'使用指定的工作目录: {currentDir}')
else:
print(f'没有指定工作目录,退出')
exit(0)
versionId = str(batch_id)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
res = step1(versionId)
if res == False:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒')
# time.sleep(10)
# continue
# 循环处理,直到队列为空
# try:
# while True:
# try:
# r = redisClient()
# #检测队列是否有值
# if r.scard('pb:sliceing') == 0:
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒')
# time.sleep(10)
# continue
# #获取队列中的值
# data = r.spop('pb:sliceing')
# if data is None:
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒')
# time.sleep(10)
# continue
# data = data.decode('utf-8')
# #判断是否是数字
# if not data.isdigit():
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒')
# time.sleep(10)
# continue
# versionId = str(data)
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
# res = step1(versionId)
# if res == False:
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒')
# time.sleep(10)
# continue
# # 在长时间操作后,确保 Redis 连接仍然有效
# # 通过重新获取客户端来触发连接检查
# try:
# r = redisClient()
# r.ping() # 测试连接
# except Exception as e:
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连')
# #time.sleep(10)
# except KeyboardInterrupt:
# # 在循环内部捕获 KeyboardInterrupt,允许在 sleep 或操作中被中断
# raise # 重新抛出,让外层捕获
# except KeyboardInterrupt:
# print(f'\n{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 收到中断信号,正在优雅退出...')
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 程序已停止')
# sys.exit(0)
def testMain():
global currentDir
currentDir = "/Users/dcx/code/make2/script/factory_sliceing_v2/tempData"
versionId = '10153' #'10153 10158'
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
res = step1(versionId)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 处理完成,res={res}')
if __name__ == '__main__':
#testMain() 新增参数 batchId
parser = argparse.ArgumentParser(description='排版打印订单处理程序')
parser.add_argument(
'--work-dir',
type=str,
default=None,
help='指定工作目录(磁盘路径),例如: D:/work 或 /Users/username/work。如果不指定,则使用脚本所在目录'
)
parser.add_argument(
'--batch—id',
type=str,
default=None,
help='指定批次ID'
)
args = parser.parse_args()
main(work_dir=args.work_dir, batch_id=args.batch_id)

1
script/factory_sliceing_v2/requirements.txt

@ -4,3 +4,4 @@ requests>=2.28.0
pyinstaller>=5.0.0 pyinstaller>=5.0.0

106
script/factory_sliceing_v2/utils/oss_redis.py

@ -1,4 +1,10 @@
import oss2,redis import oss2,redis
import time
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 连接oss - 单例模式 # 连接oss - 单例模式
class OSSClientSingleton: class OSSClientSingleton:
@ -29,47 +35,87 @@ class RedisClientSingleton:
_instance = None _instance = None
_client = None _client = None
# Redis连接配置
REDIS_CONFIG = {
'host': 'mp.api.suwa3d.com',
'password': 'kcV2000',
'port': 6379,
'db': 6,
'socket_timeout': 30, # 操作超时30秒
'socket_connect_timeout': 10, # 连接超时10秒
'socket_keepalive': True, # 启用 TCP keepalive
'socket_keepalive_options': {}, # keepalive 选项
'health_check_interval': 30 # 健康检查间隔30秒
}
# 重试配置
RETRY_INTERVAL = 5 # 重试间隔(秒)
MAX_RETRY_INTERVAL = 60 # 最大重试间隔(秒),用于指数退避
def __new__(cls): def __new__(cls):
if cls._instance is None: if cls._instance is None:
cls._instance = super(RedisClientSingleton, cls).__new__(cls) cls._instance = super(RedisClientSingleton, cls).__new__(cls)
return cls._instance return cls._instance
def _create_redis_client(self):
"""创建Redis客户端"""
return redis.Redis(**self.REDIS_CONFIG)
def _connect_with_retry(self, max_retries=None, retry_interval=None):
"""
带重试机制的Redis连接方法
:param max_retries: 最大重试次数None表示无限重试
:param retry_interval: 重试间隔None表示使用默认值支持指数退避
:return: Redis客户端实例
"""
if retry_interval is None:
retry_interval = self.RETRY_INTERVAL
retry_count = 0
current_interval = retry_interval
while True:
try:
logger.info(f"尝试连接Redis (第 {retry_count + 1} 次)...")
client = self._create_redis_client()
# 测试连接
client.ping()
logger.info("Redis连接成功!")
return client
except (redis.ConnectionError, redis.TimeoutError, AttributeError, Exception) as e:
retry_count += 1
error_msg = str(e)
logger.warning(f"Redis连接失败 (第 {retry_count} 次): {error_msg}")
# 如果设置了最大重试次数且已达到,则抛出异常
if max_retries is not None and retry_count >= max_retries:
logger.error(f"达到最大重试次数 {max_retries},停止重试")
raise
# 指数退避:每次重试间隔逐渐增加,但不超过最大值
logger.info(f"等待 {current_interval} 秒后重试...")
time.sleep(current_interval)
current_interval = min(current_interval * 1.5, self.MAX_RETRY_INTERVAL)
def get_client(self): def get_client(self):
"""
获取Redis客户端如果连接断开则自动重连带重试机制
:return: Redis客户端实例
"""
if self._client is None: if self._client is None:
# 添加超时参数,防止连接超时 # 首次连接,使用无限重试直到成功
# socket_timeout: 每次操作的超时时间(秒) logger.info("初始化Redis连接...")
# socket_connect_timeout: 连接超时时间(秒) self._client = self._connect_with_retry(max_retries=None)
# socket_keepalive: 启用 TCP keepalive
# socket_keepalive_options: keepalive 选项
self._client = redis.Redis(
host='mp.api.suwa3d.com',
password='kcV2000',
port=6379,
db=6,
socket_timeout=30, # 操作超时30秒
socket_connect_timeout=10, # 连接超时10秒
socket_keepalive=True, # 启用 TCP keepalive
socket_keepalive_options={}, # keepalive 选项
health_check_interval=30 # 健康检查间隔30秒
)
else: else:
# 检查连接是否有效,如果断开则重新连接 # 检查连接是否有效,如果断开则重新连接(带重试)
try: try:
self._client.ping() self._client.ping()
except (redis.ConnectionError, redis.TimeoutError, AttributeError): except (redis.ConnectionError, redis.TimeoutError, AttributeError) as e:
# 连接断开,重新创建连接 # 连接断开,重新创建连接(使用无限重试直到成功)
logger.warning(f"Redis连接已断开: {str(e)},开始重新连接...")
self._client = None self._client = None
self._client = redis.Redis( self._client = self._connect_with_retry(max_retries=None)
host='mp.api.suwa3d.com',
password='kcV2000',
port=6379,
db=6,
socket_timeout=30,
socket_connect_timeout=10,
socket_keepalive=True,
socket_keepalive_options={},
health_check_interval=30
)
return self._client return self._client

Loading…
Cancel
Save