Browse Source

合并程序

master
dongchangxi 2 weeks ago
parent
commit
968189dee9
  1. 1
      .gitignore
  2. 77
      factory_sliceing/README_BUILD.md
  3. 2
      factory_sliceing/auto_sliceing_operate/__init__.py
  4. 27
      factory_sliceing/auto_sliceing_operate/main.py
  5. 67
      factory_sliceing/auto_sliceing_operate/main_begin_sliceing.py
  6. 70
      factory_sliceing/auto_sliceing_operate/main_download_zip.py
  7. 2
      factory_sliceing/auto_sliceing_operate/utils/__init__.py
  8. 27
      factory_sliceing/auto_sliceing_operate/utils/request.py
  9. 2
      factory_sliceing/auto_sliceing_operate/utils/zip.py
  10. 39
      factory_sliceing/build_exe.bat
  11. 160
      factory_sliceing/build_exe.py
  12. 2
      factory_sliceing/download_batch_data/__init__.py
  13. 59
      factory_sliceing/download_batch_data/build_exe.bat
  14. 152
      factory_sliceing/download_batch_data/build_exe.py
  15. 58
      factory_sliceing/download_batch_data/build_exe.spec
  16. BIN
      factory_sliceing/download_batch_data/dist/factory_sliceing.exe
  17. 158
      factory_sliceing/download_batch_data/download.py
  18. 45
      factory_sliceing/download_batch_data/main_download_batch_data_and_trans.py
  19. 7
      factory_sliceing/download_batch_data/requirements.txt
  20. 2
      factory_sliceing/download_batch_data/utils/__init__.py
  21. 5
      factory_sliceing/download_batch_data/utils/funcs.py
  22. 36
      factory_sliceing/main.py

1
.gitignore vendored

@ -78,3 +78,4 @@ script/test.py
script/test1.py script/test1.py
factory_sliceing/auto_sliceing_operate/__pycache__/ factory_sliceing/auto_sliceing_operate/__pycache__/
factory_sliceing/download_batch_data/utils/__pycache__/ factory_sliceing/download_batch_data/utils/__pycache__/
factory_sliceing/auto_sliceing_operate/utils/__pycache__/

77
factory_sliceing/README_BUILD.md

@ -0,0 +1,77 @@
# 打包说明
## 方法一:使用 Python 脚本打包(推荐)
### Windows
```bash
python build_exe.py
```
### 打包后清理临时文件
```bash
python build_exe.py --clean
```
### 仅清理临时文件
```bash
python build_exe.py --clean-only
```
## 方法二:使用批处理文件(Windows)
直接双击运行 `build_exe.bat` 或在命令行执行:
```bash
build_exe.bat
```
## 方法三:手动使用 PyInstaller
```bash
# 安装 PyInstaller(如果未安装)
pip install pyinstaller
# 打包
pyinstaller --name=factory_sliceing --onefile --console --clean main.py
```
## 使用打包后的 exe
打包完成后,exe 文件位于 `dist/factory_sliceing.exe`
使用方法:
```bash
factory_sliceing.exe <command> <work_dir>
```
可用命令:
- `batch_dwonload` - 批量下载
- `begin_sliceing` - 开始切片
- `download_zip` - 下载压缩包
示例:
```bash
factory_sliceing.exe begin_sliceing C:\work\data
```
## 注意事项
1. **依赖项**: 确保所有依赖已安装(redis, oss2, requests 等)
2. **隐藏导入**: 如果运行时提示模块未找到,可以在 `build_exe.py` 中添加 `--hidden-import` 参数
3. **数据文件**: 如果项目需要配置文件或其他数据文件,需要添加 `--add-data` 参数
4. **文件大小**: 打包后的 exe 文件可能较大(通常 50-200MB),这是正常的,因为包含了 Python 解释器和所有依赖
## 故障排除
### 问题:打包后 exe 无法运行
- 检查是否所有依赖都已正确安装
- 尝试在命令行运行 exe 查看错误信息
- 检查是否有隐藏导入的模块未添加
### 问题:模块未找到错误
- 在 `build_exe.py` 中添加对应的 `--hidden-import` 参数
- 或者使用 `--collect-all` 参数收集所有子模块
### 问题:文件过大
- 这是正常的,PyInstaller 会打包 Python 解释器和所有依赖
- 可以使用 `--exclude-module` 排除不需要的模块来减小体积

2
factory_sliceing/auto_sliceing_operate/__init__.py

@ -0,0 +1,2 @@
# auto_sliceing_operate package

27
factory_sliceing/auto_sliceing_operate/main.py

@ -1,27 +0,0 @@
from utils.import_all_file import modify_file_dialog_path_and_import_all
from utils.click_soft_button import clickFileIMportShow, clickBegingSlice
import time
from utils.exe_operate import start_exe, click_confirm, close
#"C:/test/10192_small_No4/data"
def BeginSliceing(folderPath):
# 打开3d切片软件
start_exe()
time.sleep(1)
#点击确认按钮
click_confirm()
#先打开导入文件的弹框
clickFileIMportShow()
time.sleep(1)
#在输入路径导入obj文件
isSuccess = modify_file_dialog_path_and_import_all(folderPath)
if not isSuccess:
print("导入文件失败")
exit()
#点击开始切片
time.sleep(5)
clickBegingSlice()

67
factory_sliceing/auto_sliceing_operate/main_begin_sliceing.py

@ -0,0 +1,67 @@
from .utils.import_all_file import modify_file_dialog_path_and_import_all
from .utils.click_soft_button import clickFileIMportShow, clickBegingSlice
import time
from .utils.oss_redis import redisClient
from .utils.exe_operate import start_exe, click_confirm, close
from .utils.logs import log
import os
from .utils.request import requestApiToUpdateSliceStatusComplate
#"C:/test/10192_small_No4/data"
def BeginSliceing(batchId,machineId, folderPath):
# 打开3d切片软件
start_exe()
time.sleep(1)
#点击确认按钮
click_confirm()
#先打开导入文件的弹框
clickFileIMportShow()
time.sleep(1)
#在输入路径导入obj文件
isSuccess = modify_file_dialog_path_and_import_all(folderPath)
if not isSuccess:
print("导入文件失败")
exit()
#点击开始切片
time.sleep(5)
log("开始切片")
clickBegingSlice()
log("切片结束")
time.sleep(5)
#切片完成之后,将切片文件打包成zip文件,上传到OSS,并且请求api 更新批次状态为切片完成
requestApiToUpdateSliceStatusComplate(batchId, 0)
# sadd 插入对应的队列
r = redisClient()
r.sadd('pb:begin_print_machine_'+machineId, batchId)
def main(work_dir=None):
r = redisClient()
while True:
#判断队列连接是否正常,进行重连
if not r.ping():
log("队列连接异常,进行重连")
r = redisClient()
time.sleep(10)
continue
data = r.spop('pb:to_sliceing')
if data is None:
log("队列为空")
time.sleep(10)
continue
data = data.decode('utf-8')
# 判断是否是字符串
if not isinstance(data, str):
log("取出的数据不是字符串")
time.sleep(10)
continue
folderPath = os.path.join(work_dir, data)
# data 格式为 batchId_machineType_No machineId
batchId = data.split('_')[0] #批次ID
# machineType = data.split('_')[1] #机型类型
machineId = data.split('_')[2].replace('No', '') #机器ID
BeginSliceing(batchId,machineId,folderPath)
time.sleep(10)

70
factory_sliceing/auto_sliceing_operate/main_download_zip.py

@ -0,0 +1,70 @@
from .utils import miniIo as mio
import zipfile,time
import os
from .utils.oss_redis import redisClient
from .utils.logs import log
# 打印机旁边的电脑下载切片文件到本地
# 解压压缩包,将数据移动到指定的目录
#
#
#
# 从 minio 服务上下载 zip 文件到指定目录和解压
def downloadZip(ossZipPath, localZipPath):
# 下载 zip 文件
mio.download_file(ossZipPath, localZipPath)
def unzip(localZipPath, localUnzipPath):
# 解压 zip 文件
with zipfile.ZipFile(localZipPath, 'r') as zip_ref:
zip_ref.extractall(localUnzipPath)
# 解压完成后删除 zip 文件
os.remove(localZipPath)
def getCurrentMachineId():
# 获取当前电脑的id
return "1"
def main(work_dir=None):
if work_dir is None:
log("工作目录不能为空")
exit(0)
# redis 获取队列中的数据
r = redisClient()
machineId = getCurrentMachineId()
if machineId is None:
log("获取当前电脑id失败")
exit(0)
while True:
data = r.lpop('pb:begin_print_machine_'+machineId)
if data is None:
log("队列为空")
time.sleep(10)
continue
data = data.decode('utf-8')
# 判断是否是数字
if not data.isdigit():
log("取出的数据不是数字")
time.sleep(10)
continue
batchId = str(data)
ossZipPath = f'data/slice/{batchId}.zip'
localZipPath = os.path.join(work_dir,batchId, f'{batchId}.zip')
# 判断目录是否存在,不存在就创建
if not os.path.exists(os.path.join(work_dir,batchId)):
os.makedirs(os.path.join(work_dir,batchId))
else:
# 删除目录下的所有文件
for file in os.listdir(os.path.join(work_dir,batchId)):
os.remove(os.path.join(work_dir,batchId, file))
# 下载 zip 文件
downloadZip(ossZipPath, localZipPath)
# 解压 zip 文件
localUnzipPath = os.path.join(work_dir,batchId)
unzip(localZipPath, localUnzipPath)
# 触发打印处理

2
factory_sliceing/auto_sliceing_operate/utils/__init__.py

@ -0,0 +1,2 @@
# utils package

27
factory_sliceing/auto_sliceing_operate/utils/request.py

@ -0,0 +1,27 @@
import requests
from .logs import log
url = 'https://mp.api.suwa3d.com'
def requestApiToUpdateSliceStatusComplate(versionId, downloadCounts):
api_url = f"{url}/api/printTypeSettingOrder/updateBatchSliceing?batch_id={versionId}&is_slice_complete=1&download_counts=" + str(downloadCounts)
log(f'发起状态变更请求url={api_url}, versionId={versionId}')
try:
# 添加超时参数,防止请求时间过长
res = requests.post(api_url, timeout=60) # 60秒超时
if res.status_code != 200:
log(f'状态变更请求失败, res={res.text}')
return False
log(f'状态变更请求成功, res={res.text}')
#判断返回的code是否是1000
if res.json()["code"] != 1000:
log(f'状态变更请求失败, res={res.text}')
return False
log(f'状态变更请求成功, res={res.text}')
return True
except requests.exceptions.Timeout:
log(f'状态变更请求超时, url={api_url}')
return False
except requests.exceptions.RequestException as e:
log(f'状态变更请求异常, error={str(e)}')
return False

2
factory_sliceing/auto_sliceing_operate/utils/zip.py

@ -66,4 +66,4 @@ def zip_file(folderPath, outputZipPath=None):
# zip_file(folderPath) # zip_file(folderPath)
zip_file("/Users/dcx/code/make2/script/batchPrint/1","1024.zip") # zip_file("/Users/dcx/code/make2/script/batchPrint/1","1024.zip")

39
factory_sliceing/build_exe.bat

@ -0,0 +1,39 @@
@echo off
chcp 65001 >nul
echo ========================================
echo 开始打包 factory_sliceing 项目为 exe
echo ========================================
echo.
REM 检查 Python 是否安装
python --version >nul 2>&1
if errorlevel 1 (
echo 错误: 未找到 Python,请先安装 Python
pause
exit /b 1
)
REM 检查 PyInstaller 是否安装
python -c "import PyInstaller" >nul 2>&1
if errorlevel 1 (
echo PyInstaller 未安装,正在安装...
pip install pyinstaller
if errorlevel 1 (
echo 安装 PyInstaller 失败
pause
exit /b 1
)
)
REM 执行打包脚本
python build_exe.py --clean
echo.
echo ========================================
echo 打包完成!
echo ========================================
echo.
echo 生成的 exe 文件位于: dist\factory_sliceing.exe
echo.
pause

160
factory_sliceing/build_exe.py

@ -0,0 +1,160 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
打包脚本 factory_sliceing 项目打包成 exe 文件
使用方法: python build_exe.py
"""
import os
import sys
import subprocess
import shutil
def build_exe():
"""使用 PyInstaller 打包项目"""
# 获取当前脚本所在目录(factory_sliceing 目录)
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(current_dir) # make2 目录
# 切换到 factory_sliceing 目录
os.chdir(current_dir)
# PyInstaller 命令参数
main_script = os.path.join(current_dir, 'main.py')
# 构建 PyInstaller 命令
cmd = [
'pyinstaller',
'--name=factory_sliceing', # 生成的 exe 名称
'--onefile', # 打包成单个 exe 文件
'--console', # 显示控制台窗口(因为需要命令行参数)
'--clean', # 清理临时文件
'--noconfirm', # 不询问确认
# 添加隐藏导入(如果 PyInstaller 无法自动检测)
'--hidden-import=auto_sliceing_operate',
'--hidden-import=auto_sliceing_operate.main_begin_sliceing',
'--hidden-import=auto_sliceing_operate.main_download_zip',
'--hidden-import=download_batch_data',
'--hidden-import=download_batch_data.main_download_batch_data_and_trans',
'--hidden-import=auto_sliceing_operate.utils',
'--hidden-import=download_batch_data.utils',
# 添加数据文件(如果需要)
# '--add-data=config;config', # Windows 格式
# '--add-data=config:config', # Linux/Mac 格式
# 工作目录设置
f'--workpath={os.path.join(current_dir, "build")}',
f'--distpath={os.path.join(current_dir, "dist")}',
f'--specpath={os.path.join(current_dir, "build")}',
main_script
]
print("=" * 60)
print("开始打包 factory_sliceing 项目...")
print("=" * 60)
print(f"工作目录: {current_dir}")
print(f"主脚本: {main_script}")
print()
try:
# 检查 PyInstaller 是否安装
result = subprocess.run(['pyinstaller', '--version'],
capture_output=True, text=True)
if result.returncode != 0:
print("错误: PyInstaller 未安装")
print("请运行: pip install pyinstaller")
return False
print(f"PyInstaller 版本: {result.stdout.strip()}")
print()
# 执行打包命令
print("执行打包命令...")
print(" ".join(cmd))
print()
result = subprocess.run(cmd, check=True)
if result.returncode == 0:
print()
print("=" * 60)
print("打包成功!")
print("=" * 60)
exe_path = os.path.join(current_dir, 'dist', 'factory_sliceing.exe')
if os.path.exists(exe_path):
file_size = os.path.getsize(exe_path) / (1024 * 1024) # MB
print(f"生成的 exe 文件: {exe_path}")
print(f"文件大小: {file_size:.2f} MB")
print()
print("使用方法:")
print(" factory_sliceing.exe <command> <work_dir>")
print(" 可用命令: batch_dwonload | begin_sliceing | download_zip")
return True
else:
print("打包失败!")
return False
except subprocess.CalledProcessError as e:
print(f"打包过程中出现错误: {e}")
return False
except FileNotFoundError:
print("错误: 找不到 PyInstaller")
print("请先安装: pip install pyinstaller")
return False
except Exception as e:
print(f"发生未知错误: {e}")
return False
def clean_build_files():
"""清理构建产生的临时文件"""
current_dir = os.path.dirname(os.path.abspath(__file__))
dirs_to_remove = [
os.path.join(current_dir, 'build'),
os.path.join(current_dir, '__pycache__'),
]
files_to_remove = [
os.path.join(current_dir, 'factory_sliceing.spec'),
]
print("清理临时文件...")
for dir_path in dirs_to_remove:
if os.path.exists(dir_path):
try:
shutil.rmtree(dir_path)
print(f"已删除: {dir_path}")
except Exception as e:
print(f"删除 {dir_path} 失败: {e}")
for file_path in files_to_remove:
if os.path.exists(file_path):
try:
os.remove(file_path)
print(f"已删除: {file_path}")
except Exception as e:
print(f"删除 {file_path} 失败: {e}")
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='打包 factory_sliceing 项目为 exe')
parser.add_argument('--clean', action='store_true',
help='打包后清理临时文件')
parser.add_argument('--clean-only', action='store_true',
help='仅清理临时文件,不打包')
args = parser.parse_args()
if args.clean_only:
clean_build_files()
else:
success = build_exe()
if success and args.clean:
print()
clean_build_files()

2
factory_sliceing/download_batch_data/__init__.py

@ -0,0 +1,2 @@
# download_batch_data package

59
factory_sliceing/download_batch_data/build_exe.bat

@ -1,59 +0,0 @@
@echo off
chcp 65001 >nul
echo ========================================
echo 开始打包 factory_sliceing 为 Windows EXE
echo ========================================
echo.
REM 检查 Python 是否安装
python --version >nul 2>&1
if errorlevel 1 (
echo [错误] 未找到 Python,请先安装 Python
pause
exit /b 1
)
echo [1/4] 检查并安装依赖包...
python -m pip install --upgrade pip
python -m pip install redis>=4.0.0 oss2>=2.17.0 requests>=2.28.0 pyinstaller>=5.0.0
if errorlevel 1 (
echo [错误] 依赖包安装失败
pause
exit /b 1
)
echo.
echo [2/4] 清理之前的构建文件...
if exist build rmdir /s /q build
if exist dist rmdir /s /q dist
if exist __pycache__ rmdir /s /q __pycache__
for /d /r . %%d in (__pycache__) do @if exist "%%d" rmdir /s /q "%%d"
echo.
echo [3/4] 使用 PyInstaller 打包...
pyinstaller build_exe.spec --clean --noconfirm
if errorlevel 1 (
echo [错误] 打包失败
pause
exit /b 1
)
echo.
echo [4/4] 打包完成!
echo.
echo ========================================
echo 打包结果:
echo ========================================
echo EXE 文件位置: dist\factory_sliceing.exe
echo.
echo 使用说明:
echo 1. 将 dist\factory_sliceing.exe 复制到目标 Windows 机器
echo 2. 在命令行中运行: factory_sliceing.exe --work-dir D:\work
echo.
echo ========================================
pause

152
factory_sliceing/download_batch_data/build_exe.py

@ -1,152 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
打包脚本 - main.py 打包成 Windows EXE 文件
使用方法: python build_exe.py
"""
import os
import sys
import subprocess
import shutil
def check_python_version():
"""检查 Python 版本"""
if sys.version_info < (3, 7):
print("[错误] 需要 Python 3.7 或更高版本")
return False
print(f"[信息] Python 版本: {sys.version}")
return True
def install_dependencies():
"""安装依赖包"""
print("\n[1/4] 检查并安装依赖包...")
dependencies = [
'redis>=4.0.0',
'oss2>=2.17.0',
'requests>=2.28.0',
'pyinstaller>=5.0.0',
]
for dep in dependencies:
print(f" 安装 {dep}...")
result = subprocess.run(
[sys.executable, '-m', 'pip', 'install', '--upgrade', dep],
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"[错误] 安装 {dep} 失败: {result.stderr}")
return False
print("[成功] 所有依赖包已安装")
return True
def clean_build_files():
"""清理之前的构建文件"""
print("\n[2/4] 清理之前的构建文件...")
dirs_to_remove = ['build', 'dist']
for dir_name in dirs_to_remove:
if os.path.exists(dir_name):
shutil.rmtree(dir_name)
print(f" 已删除: {dir_name}")
# 清理 __pycache__ 目录
for root, dirs, files in os.walk('.'):
if '__pycache__' in dirs:
pycache_path = os.path.join(root, '__pycache__')
shutil.rmtree(pycache_path)
print(f" 已删除: {pycache_path}")
print("[成功] 清理完成")
return True
def build_exe():
"""使用 PyInstaller 打包"""
print("\n[3/4] 使用 PyInstaller 打包...")
spec_file = 'build_exe.spec'
if not os.path.exists(spec_file):
print(f"[错误] 未找到配置文件: {spec_file}")
return False
result = subprocess.run(
[sys.executable, '-m', 'PyInstaller', spec_file, '--clean', '--noconfirm'],
capture_output=True,
text=True
)
if result.returncode != 0:
print(f"[错误] 打包失败:")
print(result.stderr)
return False
print("[成功] 打包完成")
return True
def show_result():
"""显示打包结果"""
print("\n" + "="*50)
print("打包结果:")
print("="*50)
exe_path = os.path.join('dist', 'factory_sliceing.exe')
if os.path.exists(exe_path):
file_size = os.path.getsize(exe_path) / (1024 * 1024) # MB
print(f"✓ EXE 文件位置: {exe_path}")
print(f"✓ 文件大小: {file_size:.2f} MB")
else:
print(f"✗ 未找到 EXE 文件: {exe_path}")
print("\n使用说明:")
print(" 1. 将 dist/factory_sliceing.exe 复制到目标 Windows 机器")
print(" 2. 在命令行中运行: factory_sliceing.exe --work-dir D:\\work")
print("="*50)
def main():
"""主函数"""
print("="*50)
print("开始打包 factory_sliceing 为 Windows EXE")
print("="*50)
# 切换到脚本所在目录
script_dir = os.path.dirname(os.path.abspath(__file__))
os.chdir(script_dir)
print(f"[信息] 工作目录: {script_dir}")
# 检查 Python 版本
if not check_python_version():
return 1
# 安装依赖
if not install_dependencies():
return 1
# 清理构建文件
if not clean_build_files():
return 1
# 打包
if not build_exe():
return 1
# 显示结果
show_result()
return 0
if __name__ == '__main__':
try:
sys.exit(main())
except KeyboardInterrupt:
print("\n\n[中断] 用户取消操作")
sys.exit(1)
except Exception as e:
print(f"\n[错误] 发生异常: {str(e)}")
import traceback
traceback.print_exc()
sys.exit(1)

58
factory_sliceing/download_batch_data/build_exe.spec

@ -1,58 +0,0 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['main.py'],
pathex=[],
binaries=[],
datas=[
# 将 small_machine_transform.py 作为数据文件包含,以便 Blender 可以调用
('utils/small_machine_transform.py', 'utils'),
],
hiddenimports=[
'redis',
'oss2',
'requests',
'utils.oss_redis',
'utils.funcs',
'utils.logs',
'utils.oss_func',
'utils.changeFiles',
'utils.small_machine_transform',
],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[],
name='factory_sliceing',
debug=False,
bootloader_ignore_signals=False, # 允许信号传播,使 Ctrl+C 可以中断
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True, # 显示控制台窗口
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=None, # 可以指定图标文件路径,例如: 'icon.ico'
)

BIN
factory_sliceing/download_batch_data/dist/factory_sliceing.exe vendored

Binary file not shown.

158
factory_sliceing/download_batch_data/download.py

@ -1,158 +0,0 @@
import os,shutil
import redis
import oss2,time,sys
import requests
import argparse,json
from utils.funcs import requestApiToUpdateSliceStatus
# 将当前脚本所在目录添加到 Python 路径,以便导入 utils 模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# from download_print_out import download_transform_save_by_json
from utils.oss_redis import redisClient
from utils.funcs import downloadJsonAndJpgFileAndMoveToCorrectDir, downloadDataByOssAndTransformSave
# 默认使用脚本所在目录
currentDir = os.path.dirname(os.path.abspath(__file__))
ENV = 'prod'
url = 'https://mp.api.suwa3d.com'
if ENV == 'dev':
url = 'http://mp.api.dev.com'
elif ENV == 'prod':
url = 'https://mp.api.suwa3d.com'
#判断是否上传了 JSON 文件
def step1(versionId):
# 下载json 文件 和 图片
dirName,machineInfo = downloadJsonAndJpgFileAndMoveToCorrectDir(versionId,currentDir)
if not dirName:
return False
#判断是否是小机台
isSmallMachine = False
if str(machineInfo["machine_type"]) == '1':
isSmallMachine = True
#下载数据,转换数据
res = downloadDataByOssAndTransformSave(dirName,isSmallMachine)
if not res:
return False
#判断下载的obj文件数量和json里的是否一致,排除arrange文件夹
objFilePath = os.path.join(dirName, 'data')
objCounts = 0
for file in os.listdir(objFilePath):
if file == 'arrange':
continue
if file.endswith('.obj'):
objCounts += 1
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 下载处理完成的obj文件数量: {objCounts}')
# requestApiToUpdateSliceStatus(versionId,objCounts)
# 读取 队列中一个数据出来
def main(work_dir=None, batch_id=None):
global currentDir
# 如果指定了工作目录,使用指定的目录
if work_dir:
work_dir = os.path.abspath(work_dir)
if not os.path.exists(work_dir):
print(f'指定的工作目录不存在: {work_dir},将创建该目录')
os.makedirs(work_dir, exist_ok=True)
currentDir = work_dir
print(f'使用指定的工作目录: {currentDir}')
else:
print(f'没有指定工作目录,退出')
exit(0)
versionId = str(batch_id)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
res = step1(versionId)
if res == False:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒')
# time.sleep(10)
# continue
# 循环处理,直到队列为空
# try:
# while True:
# try:
# r = redisClient()
# #检测队列是否有值
# if r.scard('pb:sliceing') == 0:
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒')
# time.sleep(10)
# continue
# #获取队列中的值
# data = r.spop('pb:sliceing')
# if data is None:
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒')
# time.sleep(10)
# continue
# data = data.decode('utf-8')
# #判断是否是数字
# if not data.isdigit():
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒')
# time.sleep(10)
# continue
# versionId = str(data)
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
# res = step1(versionId)
# if res == False:
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒')
# time.sleep(10)
# continue
# # 在长时间操作后,确保 Redis 连接仍然有效
# # 通过重新获取客户端来触发连接检查
# try:
# r = redisClient()
# r.ping() # 测试连接
# except Exception as e:
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连')
# #time.sleep(10)
# except KeyboardInterrupt:
# # 在循环内部捕获 KeyboardInterrupt,允许在 sleep 或操作中被中断
# raise # 重新抛出,让外层捕获
# except KeyboardInterrupt:
# print(f'\n{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 收到中断信号,正在优雅退出...')
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 程序已停止')
# sys.exit(0)
def testMain():
global currentDir
currentDir = "/Users/dcx/code/make2/script/factory_sliceing_v2/tempData"
versionId = '10153' #'10153 10158'
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
res = step1(versionId)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 处理完成,res={res}')
if __name__ == '__main__':
#testMain() 新增参数 batchId
parser = argparse.ArgumentParser(description='排版打印订单处理程序')
parser.add_argument(
'--work-dir',
type=str,
default=None,
help='指定工作目录(磁盘路径),例如: D:/work 或 /Users/username/work。如果不指定,则使用脚本所在目录'
)
parser.add_argument(
'--batch—id',
type=str,
default=None,
help='指定批次ID'
)
args = parser.parse_args()
main(work_dir=args.work_dir, batch_id=args.batch_id)

45
factory_sliceing/download_batch_data/main.py → factory_sliceing/download_batch_data/main_download_batch_data_and_trans.py

@ -3,14 +3,11 @@ import redis
import oss2,time,sys import oss2,time,sys
import requests import requests
import argparse,json import argparse,json
from utils.funcs import requestApiToUpdateSliceStatus from .utils.funcs import requestApiToUpdateSliceStatus
# 将当前脚本所在目录添加到 Python 路径,以便导入 utils 模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# from download_print_out import download_transform_save_by_json # from download_print_out import download_transform_save_by_json
from utils.oss_redis import redisClient from .utils.oss_redis import redisClient
from utils.funcs import downloadJsonAndJpgFileAndMoveToCorrectDir, downloadDataByOssAndTransformSave from .utils.funcs import downloadJsonAndJpgFileAndMoveToCorrectDir, downloadDataByOssAndTransformSave
# 默认使用脚本所在目录 # 默认使用脚本所在目录
currentDir = os.path.dirname(os.path.abspath(__file__)) currentDir = os.path.dirname(os.path.abspath(__file__))
@ -51,7 +48,19 @@ def step1(versionId):
objCounts += 1 objCounts += 1
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 下载处理完成的obj文件数量: {objCounts}') print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 下载处理完成的obj文件数量: {objCounts}')
requestApiToUpdateSliceStatus(versionId,objCounts) isSuccess = requestApiToUpdateSliceStatus(versionId,objCounts)
if not isSuccess:
return False
#塞入切片的队列 ,SADD 插入 , {batchId}_{machineType}_No{machineId}
machineType = ""
if machineInfo["machine_type"] == '1':
machineType = 'small'
elif machineInfo["machine_type"] == '2':
machineType = 'big'
keyValue = f'{versionId}_{machineType}_No{machineInfo["id"]}'
r = redisClient()
r.sadd('pb:to_sliceing', keyValue)
return True
@ -131,14 +140,14 @@ def testMain():
res = step1(versionId) res = step1(versionId)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 处理完成,res={res}') print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 处理完成,res={res}')
if __name__ == '__main__': # if __name__ == '__main__':
#testMain() # #testMain()
parser = argparse.ArgumentParser(description='排版打印订单处理程序') # parser = argparse.ArgumentParser(description='排版打印订单处理程序')
parser.add_argument( # parser.add_argument(
'--work-dir', # '--work-dir',
type=str, # type=str,
default=None, # default=None,
help='指定工作目录(磁盘路径),例如: D:/work 或 /Users/username/work。如果不指定,则使用脚本所在目录' # help='指定工作目录(磁盘路径),例如: D:/work 或 /Users/username/work。如果不指定,则使用脚本所在目录'
) # )
args = parser.parse_args() # args = parser.parse_args()
main(work_dir=args.work_dir) # main(work_dir=args.work_dir)

7
factory_sliceing/download_batch_data/requirements.txt

@ -1,7 +0,0 @@
redis>=4.0.0
oss2>=2.17.0
requests>=2.28.0
pyinstaller>=5.0.0

2
factory_sliceing/download_batch_data/utils/__init__.py

@ -0,0 +1,2 @@
# utils package

5
factory_sliceing/download_batch_data/utils/funcs.py

@ -116,6 +116,11 @@ def requestApiToUpdateSliceStatus(versionId, downloadCounts):
log(f'状态变更请求失败, res={res.text}') log(f'状态变更请求失败, res={res.text}')
return False return False
log(f'状态变更请求成功, res={res.text}') log(f'状态变更请求成功, res={res.text}')
#判断返回的code是否是1000
if res.json()["code"] != 1000:
log(f'状态变更请求失败, res={res.text}')
return False
log(f'状态变更请求成功, res={res.text}')
return True return True
except requests.exceptions.Timeout: except requests.exceptions.Timeout:
log(f'状态变更请求超时, url={api_url}') log(f'状态变更请求超时, url={api_url}')

36
factory_sliceing/main.py

@ -0,0 +1,36 @@
import os,sys
from auto_sliceing_operate import main_begin_sliceing
from download_batch_data import main_download_batch_data_and_trans
from auto_sliceing_operate import main_download_zip
if __name__ == '__main__':
# 根据参数决定执行哪个模块
# 命令映射表:命令名 -> 处理函数
command_handlers = {
'batch_dwonload': main_download_batch_data_and_trans.main,
'begin_sliceing': main_begin_sliceing.main,
'download_zip': main_download_zip.main,
}
# 检查参数数量
if len(sys.argv) < 2:
print('Usage: python main.py <command> <work_dir>')
print('可用命令:', ' | '.join(command_handlers.keys()))
sys.exit(1)
command = sys.argv[1]
# 检查命令是否存在
if command not in command_handlers:
print(f'错误: 未知命令 "{command}"')
print('可用命令:', ' | '.join(command_handlers.keys()))
sys.exit(1)
# 检查是否提供了 work_dir 参数
if len(sys.argv) < 3:
print(f'Usage: python main.py {command} <work_dir>')
sys.exit(1)
work_dir = sys.argv[2]
# 执行对应的处理函数
command_handlers[command](work_dir=work_dir)
Loading…
Cancel
Save