dongchangxi 2 weeks ago
parent
commit
bac04def8e
  1. 8
      .gitignore
  2. 39
      factory_sliceing/auto_sliceing_operate/main_begin_sliceing.py
  3. BIN
      factory_sliceing/auto_sliceing_operate/utils/__pycache__/logs.cpython-311.pyc
  4. 290
      factory_sliceing/auto_sliceing_operate/utils/click_soft_button.py
  5. 2
      factory_sliceing/auto_sliceing_operate/utils/exe_operate.py
  6. 16
      factory_sliceing/auto_sliceing_operate/utils/import_all_file.py
  7. 48
      factory_sliceing/auto_sliceing_operate/utils/miniIo.py
  8. 4
      factory_sliceing/build_exe.py
  9. 6
      factory_sliceing/config.toml
  10. 18
      factory_sliceing/utils/config.py

8
.gitignore vendored

@ -16,7 +16,6 @@ timer/29.zip @@ -16,7 +16,6 @@ timer/29.zip
yj_local_build/main_step1.spec
yj_local_build/建模source_code.zip
apps/__pycache__/fix_up_color_two_a.cpython-310.pyc
.gitignore
apps/__pycache__/ps_image_shadow_up_ag_two_a.cpython-310.pyc
apps/__pycache__/white_purification_v3.cpython-310.pyc
apps/__pycache__/white_purification_v4.cpython-310.pyc
@ -65,11 +64,8 @@ script/factory_sliceing/README_BUILD.md @@ -65,11 +64,8 @@ script/factory_sliceing/README_BUILD.md
script/factory_sliceing/batchPrint/
script/factory_sliceing_v2/tempData/
.github/workflows/build-windows-exe.yml
.gitignore
script/factory_sliceing/.DS_Store
script/factory_sliceing/__pycache__/download_print_out.cpython-311.pyc
.gitignore
.gitignore
script/factory_sliceing_v2/tempData/
script/factory_sliceing_v2/utils/__pycache__/
.DS_Store
@ -84,3 +80,7 @@ script/factory_sliceing_v2/build/build_exe/ @@ -84,3 +80,7 @@ script/factory_sliceing_v2/build/build_exe/
factory_sliceing/download_batch_data/__pycache__/
factory_sliceing/build/factory_sliceing/localpycs/
factory_sliceing/build/factory_sliceing/
factory_sliceing/utils/__pycache__/__init__.cpython-311.pyc
factory_sliceing/utils/__pycache__/
factory_sliceing/auto_sliceing_operate/utils/__pycache__/
factory_sliceing/@AutomationLog.txt

39
factory_sliceing/auto_sliceing_operate/main_begin_sliceing.py

@ -1,11 +1,31 @@ @@ -1,11 +1,31 @@
from .utils.import_all_file import modify_file_dialog_path_and_import_all
from .utils.click_soft_button import clickFileIMportShow, clickBegingSlice
from .utils.click_soft_button import clickFileIMportShow, clickBegingSlice,checkIsSliceingText
import time
from .utils.oss_redis import redisClient
from .utils.exe_operate import start_exe, click_confirm, close
from .utils.logs import log
import os
from .utils.miniIo import upload_file
from .utils.request import requestApiToUpdateSliceStatusComplate
from utils.config import cfg
def upLoadSliceFileToOSS(folderPath):
exe_path = cfg('exe.small_exe', None)
if not exe_path:
print(f"错误:exe_path 未配置")
exit(1)
dataFolder = exe_path.replace('NormalTek.ThreeDPrinter.UI.exe','data')
if not os.path.exists(dataFolder):
print(f"错误:dataFolder 文件夹不存在 - {dataFolder}")
exit(1)
dataFilePath = ""
for file1 in os.listdir(dataFolder):
if os.path.isdir(file1):
dataFilePath = os.path.join(dataFolder,file1, 'data.snslc')
if os.path.exists(dataFilePath):
upload_file(f"complatte/slice/{folderPath}/data.snslc", dataFilePath)
#"C:/test/10192_small_No4/data"
def BeginSliceing(batchId,machineId, folderPath,data):
@ -26,8 +46,23 @@ def BeginSliceing(batchId,machineId, folderPath,data): @@ -26,8 +46,23 @@ def BeginSliceing(batchId,machineId, folderPath,data):
time.sleep(5)
log("开始切片")
clickBegingSlice()
#开启是循环,判断切片是否完成
while True:
time.sleep(60)
sliceTime = checkIsSliceingText()
if sliceTime is not False:
print(f"还未切完,切片时长: {sliceTime} 分钟")
time.sleep(60)
elif sliceTime is True:
break
# 开启死循环,判断文件是否全部导入成功
log("切片结束")
time.sleep(5)
res = upLoadSliceFileToOSS(folderPath)
if not res:
print("上传切片文件失败")
exit(1)
#切片完成之后,将切片文件打包成zip文件,上传到OSS,并且请求api 更新批次状态为切片完成
# requestApiToUpdateSliceStatusComplate(batchId, 0)
# sadd 插入对应的队列
@ -57,7 +92,7 @@ def main(work_dir=None): @@ -57,7 +92,7 @@ def main(work_dir=None):
log("取出的数据不是字符串")
time.sleep(10)
continue
folderPath = os.path.join(work_dir, data)
folderPath = os.path.join(work_dir,"batchPrint", data,"data")
# data 格式为 batchId_machineType_No machineId
batchId = data.split('_')[0] #批次ID
# machineType = data.split('_')[1] #机型类型

BIN
factory_sliceing/auto_sliceing_operate/utils/__pycache__/logs.cpython-311.pyc

Binary file not shown.

290
factory_sliceing/auto_sliceing_operate/utils/click_soft_button.py

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
import uiautomation as auto
from utils.config import cfg
import time
import math
softName = cfg('exe.soft_name', None)
# 遍历所有控件 找到
@ -67,7 +68,7 @@ def findAndClick(control, depth=0,index=1): @@ -67,7 +68,7 @@ def findAndClick(control, depth=0,index=1):
print(f"✗ 同级元素数量不足,只有 {len(siblings)}")
return False
else:
findAndClickFileImportShow(child, depth + 1)
findAndClick(child, depth + 1,index=index)
# 点击导入文件按钮
def clickFileIMportShow():
@ -75,7 +76,7 @@ def clickFileIMportShow(): @@ -75,7 +76,7 @@ def clickFileIMportShow():
print(f"错误:软件名称未配置")
exit(1)
control = auto.WindowControl(searchDepth=1, Name=softName)
clickRes = findAndClick(control,index=1)
clickRes = findAndClick(control,0,index=1)
return clickRes
# 点击开始切片按钮
@ -84,5 +85,286 @@ def clickBegingSlice(): @@ -84,5 +85,286 @@ def clickBegingSlice():
print(f"错误:软件名称未配置")
exit(1)
control = auto.WindowControl(searchDepth=1, Name=softName)
clickRes = findAndClick(control,index=2)
clickRes = findAndClick(control,0,index=2)
return clickRes
# 全局变量:存储已找到的.obj文件名
arrObjName = []
scroll_direction = "up"
scroll_amount = 360
def findObjFiles(control, depth=0):
"""递归查找所有包含.obj的控件,并添加到全局数组中(去重)"""
for child in control.GetChildren():
if ".obj" in child.Name and child.Name not in arrObjName:
arrObjName.append(child.Name)
print(f"找到文件: {child.Name}")
findObjFiles(child, depth + 1)
def findListControl(item_control, max_depth=5):
"""向上查找列表控件(ListView等)"""
current = item_control
for _ in range(max_depth):
try:
parent = current.GetParentControl()
if not parent:
break
# 检查是否是列表控件
is_list_control = (
parent.ControlType in [auto.ControlType.ListControl, auto.ControlType.DataGridControl] or
"list" in str(parent.ControlType).lower() or
"50028" in str(parent.AutomationId)
)
if is_list_control:
return parent
current = parent
except:
break
return None
def scrollListView(list_control):
"""滚动列表控件
Args:
list_control: 列表控件对象
scroll_direction: "up" 向上滚动, "down" 向下滚动
Returns:
bool: 是否滚动成功
"""
try:
import win32api
import win32con
# 获取列表控件中心位置
rect = list_control.BoundingRectangle
center_x = int(rect.left + (rect.right - rect.left) / 2)
center_y = int(rect.top + (rect.bottom - rect.top) / 2)
# 移动鼠标到列表中心
win32api.SetCursorPos((center_x, center_y))
time.sleep(0.1)
# 根据方向决定滚动量(负数向上,正数向下)
# scroll_amount = -360 if scroll_direction == "up" else 360
# if scroll_direction == "up":
# scroll_amount = -scroll_amount
scroll_count = 3 # 滚动次数
scroll_interval = 0.1 # 每次滚动间隔
# 向上取整
scroll_count = abs(math.ceil(scroll_amount / 360))
# 执行滚动
for _ in range(scroll_count):
win32api.mouse_event(win32con.MOUSEEVENTF_WHEEL, 0, 0, scroll_amount, 0)
time.sleep(scroll_interval)
time.sleep(0.2)
direction_text = "向上" if scroll_amount < 0 else "向下"
print(f"✓ 使用鼠标滚轮滚动成功,方向: {direction_text}")
return True
except Exception as e:
print(f"✗ 滚动列表失败: {e}")
return False
def clickAndScrollFirstObj(control, depth=0):
"""找到第一个.obj文件,点击并滚动列表
Args:
control: 控件对象
depth: 递归深度
Returns:
bool: 是否成功找到并点击
"""
for child in control.GetChildren():
if ".obj" in child.Name:
# 获取第一个子控件作为点击目标
siblings = control.GetChildren()
if not siblings:
continue
target = siblings[0]
# 尝试点击
try:
target.Click()
print("✓ 使用 Click() 方法点击成功")
# 等待点击生效
time.sleep(0.2)
# 查找列表控件并滚动
list_control = findListControl(child)
if list_control:
print(f"找到列表控件: {list_control.ControlType} - {list_control.AutomationId}")
print("开始滚动列表...")
scrollListView(list_control)
else:
print("未找到列表控件,跳过滚动")
return True
except Exception as e:
print(f"✗ Click() 失败: {e}")
return True
# 递归查找
if clickAndScrollFirstObj(child, depth + 1):
return True
return False
# 获取所有窗口信息
def getAllWindowsInfo():
windows_info = []
try:
import win32gui
def enum_windows_callback(hwnd, windows):
try:
if win32gui.IsWindowVisible(hwnd):
window_title = win32gui.GetWindowText(hwnd)
class_name = win32gui.GetClassName(hwnd)
# 获取最顶层窗口
top_hwnd = win32gui.GetForegroundWindow()
is_top = (hwnd == top_hwnd)
windows.append({
'hwnd': hwnd,
'title': window_title,
'class_name': class_name,
'is_visible': True,
'is_top': is_top
})
except:
pass
return True
win32gui.EnumWindows(enum_windows_callback, windows_info)
# 按是否最顶层排序(最顶层在前)
windows_info.sort(key=lambda x: (not x['is_top'], x['title']))
return windows_info
except ImportError:
print("错误: win32gui 未安装,无法获取窗口信息")
return []
except Exception as e:
print(f"获取窗口信息时发生异常: {e}")
import traceback
traceback.print_exc()
return []
# 检测是否存在标题为"生成代码信息"的弹框,并获取其文本信息
def checkIsSliceingText():
try:
import win32gui
# 获取所有窗口信息
all_windows = getAllWindowsInfo()
# 打印所有窗口信息
print(f"当前所有可见窗口(共 {len(all_windows)} 个):")
for win_info in all_windows:
if "SlicingInfoDialog" in win_info['title']:
#获取控件对象,获取到时间信息,并返回
control = auto.WindowControl(searchDepth=1, hwnd=win_info['hwnd'])
a = getWindowText(control)
return a[0]['text']
return True
except ImportError:
print("错误: win32gui 未安装,无法查找窗口")
return False
except Exception as e:
print(f"检查弹框时发生异常: {e}")
import traceback
traceback.print_exc()
return False
# 获取窗口内所有文本信息
def getWindowText(control, depth=0, texts=None):
"""递归获取控件及其子控件的所有文本信息
Args:
control: 控件对象
depth: 递归深度
texts: 文本列表
Returns:
list: 所有文本信息的列表
"""
if texts is None:
texts = []
try:
# 获取控件名称
if hasattr(control, 'Name') and control.Name:
if getattr(control, 'AutomationId', '') == "labelDuration":
texts.append({
'depth': depth,
'type': str(control.ControlType),
'name': control.Name,
'automation_id': getattr(control, 'AutomationId', ''),
'text': control.Name
})
# 递归获取子控件文本
try:
for child in control.GetChildren():
getWindowText(child, depth + 1, texts)
except:
pass
except Exception as e:
pass
return texts
def checkImportFileIsSuccess(modelCounts = 0,find_interval=2, scroll_interval=1):
if modelCounts == 0:
print(f"错误:模型数量未配置")
return False
if softName is None:
print("错误:软件名称未配置")
return
control = auto.WindowControl(searchDepth=1, Name=softName)
if not control.Exists(0, 0):
print(f"错误:未找到窗口 {softName}")
return
global arrObjName
global scroll_direction
global scroll_amount
arrObjName = []
scroll_direction = "up"
scroll_amount = 120*50*3
# 每次滚动 移动到最顶部
clickAndScrollFirstObj(control, 0)
time.sleep(10)
rection = "up"
scroll_amount = -120*10
temptoTalCounts = 0
compareCounts = 0
#死循环
while True:
if len(arrObjName) >= modelCounts:
break
else:
findObjFiles(control, 0)
time.sleep(find_interval)
clickAndScrollFirstObj(control, 0)
time.sleep(scroll_interval)
#判断 arrObjName 长度,连续3次没有变化,则退出循环
if temptoTalCounts < len(arrObjName):
temptoTalCounts = len(arrObjName)
else:
compareCounts += 1
if compareCounts >= 3:
break
print(f"当前已找到文件数量: {len(arrObjName)}")
if len(arrObjName) == modelCounts:
print(f"文件导入完成")
return True
print(f"文件导入未完成")
return False

2
factory_sliceing/auto_sliceing_operate/utils/exe_operate.py

@ -21,7 +21,7 @@ def start_exe(data): @@ -21,7 +21,7 @@ def start_exe(data):
try:
# 检查文件是否存在
if not os.path.exists(exe_path):
print(f"错误:文件不存在 - {exe_path}")
print(f"错误:切片软件路径文件不存在 - {exe_path}")
return False
# 检查是否是文件(不是目录)

16
factory_sliceing/auto_sliceing_operate/utils/import_all_file.py

@ -1,6 +1,8 @@ @@ -1,6 +1,8 @@
import win32gui, win32con, time
import win32api
import ctypes
from .click_soft_button import checkImportFileIsSuccess
import os
def find_window(title=None, class_name=None):
# 任一参数为 None 即忽略该条件
@ -175,5 +177,19 @@ def modify_file_dialog_path_and_import_all(target_path=""): @@ -175,5 +177,19 @@ def modify_file_dialog_path_and_import_all(target_path=""):
print("正在确认打开...")
send_enter(hwnd)
print("✓ 已确认打开")
# 获取 target_path 路径下 有多少个obj
objsCounts = 0
for file in os.listdir(target_path):
if ".obj" in file:
objsCounts +=1
# 开启死循环,判断文件是否全部导入成功
while True:
res = checkImportFileIsSuccess(objsCounts)
if res:
break
else:
time.sleep(30)
return True

48
factory_sliceing/auto_sliceing_operate/utils/miniIo.py

@ -2,6 +2,22 @@ import os @@ -2,6 +2,22 @@ import os
from minio import Minio
from minio.error import S3Error
from .logs import log
from utils.config import cfg
# 全局变量:存储桶名称(从配置文件读取)
_bucket_name = None
def get_bucket_name():
"""获取存储桶名称(从配置文件读取)
Returns:
str: 存储桶名称
"""
global _bucket_name
if _bucket_name is None:
# 优先从配置文件读取,如果没有则从环境变量读取,最后使用默认值
_bucket_name = cfg('minio.bucket', None) or os.getenv('MINIO_BUCKET', 'default-bucket')
log(f"MiniIO 存储桶名称: {_bucket_name}")
return _bucket_name
# MiniIO 客户端单例模式
class MiniIOClientSingleton:
@ -41,19 +57,22 @@ def miniIOClient(): @@ -41,19 +57,22 @@ def miniIOClient():
return MiniIOClientSingleton().get_client()
def upload_file(bucket_name, object_name, file_path):
def upload_file(object_name, file_path, bucket_name=None):
"""
上传文件到 MiniIO
Args:
bucket_name: 存储桶名称
object_name: 对象名称 MiniIO 中的路径
file_path: 本地文件路径
bucket_name: 存储桶名称可选如果不提供则使用全局配置
Returns:
bool: 上传成功返回 True失败返回 False
"""
try:
if bucket_name is None:
bucket_name = get_bucket_name()
client = miniIOClient()
# 检查存储桶是否存在,不存在则创建
@ -74,23 +93,26 @@ def upload_file(bucket_name, object_name, file_path): @@ -74,23 +93,26 @@ def upload_file(bucket_name, object_name, file_path):
return False
def download_file(bucket_name, object_name, file_path):
def download_file(object_name, file_path, bucket_name=None):
"""
MiniIO 下载文件到本地
Args:
bucket_name: 存储桶名称
object_name: 对象名称 MiniIO 中的路径
file_path: 本地保存路径
bucket_name: 存储桶名称可选如果不提供则使用全局配置
Returns:
bool: 下载成功返回 True失败返回 False
"""
try:
if bucket_name is None:
bucket_name = get_bucket_name()
client = miniIOClient()
# 检查文件是否存在
if not check_file_exists(bucket_name, object_name):
if not check_file_exists(object_name, bucket_name):
log(f"文件不存在: {bucket_name}/{object_name}")
return False
@ -119,22 +141,25 @@ def download_file(bucket_name, object_name, file_path): @@ -119,22 +141,25 @@ def download_file(bucket_name, object_name, file_path):
return False
def delete_file(bucket_name, object_name):
def delete_file(object_name, bucket_name=None):
"""
MiniIO 删除文件
Args:
bucket_name: 存储桶名称
object_name: 对象名称 MiniIO 中的路径
bucket_name: 存储桶名称可选如果不提供则使用全局配置
Returns:
bool: 删除成功返回 True失败返回 False
"""
try:
if bucket_name is None:
bucket_name = get_bucket_name()
client = miniIOClient()
# 检查文件是否存在
if not check_file_exists(bucket_name, object_name):
if not check_file_exists(object_name, bucket_name):
log(f"文件不存在,无需删除: {bucket_name}/{object_name}")
return True
@ -151,18 +176,21 @@ def delete_file(bucket_name, object_name): @@ -151,18 +176,21 @@ def delete_file(bucket_name, object_name):
return False
def check_file_exists(bucket_name, object_name):
def check_file_exists(object_name, bucket_name=None):
"""
检查文件是否存在于 MiniIO
Args:
bucket_name: 存储桶名称
object_name: 对象名称 MiniIO 中的路径
bucket_name: 存储桶名称可选如果不提供则使用全局配置
Returns:
bool: 文件存在返回 True不存在返回 False
"""
try:
if bucket_name is None:
bucket_name = get_bucket_name()
client = miniIOClient()
# 检查存储桶是否存在

4
factory_sliceing/build_exe.py

@ -40,10 +40,6 @@ def build_exe(): @@ -40,10 +40,6 @@ def build_exe():
'--hidden-import=download_batch_data.main_download_batch_data_and_trans',
'--hidden-import=auto_sliceing_operate.utils',
'--hidden-import=download_batch_data.utils',
'--hidden-import=utils.config',
# 配置文件解析模块
'--hidden-import=toml',
# UI 自动化相关模块
'--hidden-import=uiautomation',

6
factory_sliceing/config.toml

@ -38,7 +38,7 @@ level = "INFO" # DEBUG, INFO, WARNING, ERROR @@ -38,7 +38,7 @@ level = "INFO" # DEBUG, INFO, WARNING, ERROR
#切片软件的执行路径
[exe]
soft_name = ""
small_exe = ""
big_exe = "C:\Users\Administrator\print_factory_type_setting\software\小机型 切片软件\切片软件V1.4.3.6_2\切片软件V1.4.3.6_2\NormalTek.ThreeDPrinter.UI.exe"
soft_name = "赛纳3D打印控制系统 V1.4.3.6"
big_exe = ""
small_exe = "C:/Users/Administrator/print_factory_type_setting/software/小机型 切片软件/切片软件V1.4.3.6_2/切片软件V1.4.3.6_2/NormalTek.ThreeDPrinter.UI.exe"

18
factory_sliceing/utils/config.py

@ -2,10 +2,15 @@ @@ -2,10 +2,15 @@
配置文件读取工具
支持从 config.toml 文件中读取配置信息
"""
import toml
import os
import sys
# 优先使用 Python 3.11+ 内置的 tomllib,否则回退到 toml 包
try:
import tomllib # Python 3.11+
except ImportError:
import toml as tomllib # 回退到 toml 包
def get_config_path():
"""
@ -64,8 +69,15 @@ def cfg(key_name, default=None): @@ -64,8 +69,15 @@ def cfg(key_name, default=None):
print(f"警告: 配置文件不存在,路径: {config_path}")
return default
with open(config_path, 'r', encoding='utf-8') as f:
config = toml.load(f)
# tomllib 需要二进制模式,toml 包需要文本模式
if hasattr(tomllib, 'load'):
# Python 3.11+ 的 tomllib
with open(config_path, 'rb') as f:
config = tomllib.load(f)
else:
# toml 包
with open(config_path, 'r', encoding='utf-8') as f:
config = tomllib.load(f)
# 处理嵌套键名,如 "redis.host"
if "." in key_name:

Loading…
Cancel
Save