Browse Source

目录迁移

master
dongchangxi 2 weeks ago
parent
commit
cc15c76e62
  1. 1
      .gitignore
  2. 0
      factory_sliceing/auto_sliceing_operate/main.py
  3. BIN
      factory_sliceing/auto_sliceing_operate/utils/__pycache__/logs.cpython-311.pyc
  4. 11
      factory_sliceing/auto_sliceing_operate/utils/click_soft_button.py
  5. 0
      factory_sliceing/auto_sliceing_operate/utils/exe_operate.py
  6. 0
      factory_sliceing/auto_sliceing_operate/utils/import_all_file.py
  7. 0
      factory_sliceing/auto_sliceing_operate/utils/logs.py
  8. 188
      factory_sliceing/auto_sliceing_operate/utils/miniIo.py
  9. 0
      factory_sliceing/auto_sliceing_operate/utils/oss_func.py
  10. 0
      factory_sliceing/auto_sliceing_operate/utils/oss_redis.py
  11. 69
      factory_sliceing/auto_sliceing_operate/utils/zip.py
  12. 0
      factory_sliceing/download_batch_data/build_exe.bat
  13. 0
      factory_sliceing/download_batch_data/build_exe.py
  14. 0
      factory_sliceing/download_batch_data/build_exe.spec
  15. 0
      factory_sliceing/download_batch_data/dist/factory_sliceing.exe
  16. 0
      factory_sliceing/download_batch_data/download.py
  17. 0
      factory_sliceing/download_batch_data/main.py
  18. 0
      factory_sliceing/download_batch_data/requirements.txt
  19. 0
      factory_sliceing/download_batch_data/utils/changeFiles.py
  20. 0
      factory_sliceing/download_batch_data/utils/funcs.py
  21. 0
      factory_sliceing/download_batch_data/utils/logs.py
  22. 0
      factory_sliceing/download_batch_data/utils/oss_func.py
  23. 0
      factory_sliceing/download_batch_data/utils/oss_redis.py
  24. 0
      factory_sliceing/download_batch_data/utils/small_machine_transform.py
  25. 65
      script/test.py
  26. 51
      script/test1.py

1
.gitignore vendored

@ -77,3 +77,4 @@ script/auto_sliceing_operate/__pycache__/
script/test.py script/test.py
script/test1.py script/test1.py
factory_sliceing/auto_sliceing_operate/__pycache__/ factory_sliceing/auto_sliceing_operate/__pycache__/
factory_sliceing/download_batch_data/utils/__pycache__/

0
script/auto_sliceing_operate/main.py → factory_sliceing/auto_sliceing_operate/main.py

BIN
factory_sliceing/auto_sliceing_operate/utils/__pycache__/logs.cpython-311.pyc

Binary file not shown.

11
script/auto_sliceing_operate/utils/click_soft_button.py → factory_sliceing/auto_sliceing_operate/utils/click_soft_button.py

@ -1,9 +1,10 @@
import uiautomation as auto import uiautomation as auto
softName = "赛纳3D打印控制系统 V1.4.3.2"
# 遍历所有控件 找到 # 遍历所有控件 找到
def findAndClickFileImportShow(control, depth=0,index=1): def findAndClick(control, depth=0,index=1):
for child in control.GetChildren(): for child in control.GetChildren():
# print(child) # print(child)
# print(' ' * depth + f"{child.ControlType}: {child.Name} | {child.AutomationId}") # print(' ' * depth + f"{child.ControlType}: {child.Name} | {child.AutomationId}")
@ -70,12 +71,12 @@ def findAndClickFileImportShow(control, depth=0,index=1):
# 点击导入文件按钮 # 点击导入文件按钮
def clickFileIMportShow(): def clickFileIMportShow():
control = auto.WindowControl(searchDepth=1, Name='赛纳3D打印控制系统 V1.4.3.2') control = auto.WindowControl(searchDepth=1, Name=softName)
clickRes = findAndClickFileImportShow(control,index=1) clickRes = findAndClick(control,index=1)
return clickRes return clickRes
# 点击开始切片按钮 # 点击开始切片按钮
def clickBegingSlice(): def clickBegingSlice():
control = auto.WindowControl(searchDepth=1, Name='赛纳3D打印控制系统 V1.4.3.2') control = auto.WindowControl(searchDepth=1, Name=softName)
clickRes = findAndClickFileImportShow(control,index=2) clickRes = findAndClick(control,index=2)
return clickRes return clickRes

0
script/auto_sliceing_operate/utils/exe_operate.py → factory_sliceing/auto_sliceing_operate/utils/exe_operate.py

0
script/auto_sliceing_operate/utils/import_all_file.py → factory_sliceing/auto_sliceing_operate/utils/import_all_file.py

0
script/auto_sliceing_operate/utils/logs.py → factory_sliceing/auto_sliceing_operate/utils/logs.py

188
factory_sliceing/auto_sliceing_operate/utils/miniIo.py

@ -0,0 +1,188 @@
import os
from minio import Minio
from minio.error import S3Error
from .logs import log
# MiniIO 客户端单例模式
class MiniIOClientSingleton:
_instance = None
_client = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(MiniIOClientSingleton, cls).__new__(cls)
return cls._instance
def get_client(self):
if self._client is None:
# MiniIO 连接配置
# 可以从环境变量或配置文件中读取
endpoint = os.getenv('MINIO_ENDPOINT', 'localhost:9000')
access_key = os.getenv('MINIO_ACCESS_KEY', 'minioadmin')
secret_key = os.getenv('MINIO_SECRET_KEY', 'minioadmin')
secure = os.getenv('MINIO_SECURE', 'False').lower() == 'true'
try:
self._client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
secure=secure
)
log(f"MiniIO 客户端初始化成功: {endpoint}")
except Exception as e:
log(f"MiniIO 客户端初始化失败: {str(e)}")
raise
return self._client
def miniIOClient():
"""获取 MiniIO 客户端单例"""
return MiniIOClientSingleton().get_client()
def upload_file(bucket_name, object_name, file_path):
"""
上传文件到 MiniIO
Args:
bucket_name: 存储桶名称
object_name: 对象名称 MiniIO 中的路径
file_path: 本地文件路径
Returns:
bool: 上传成功返回 True失败返回 False
"""
try:
client = miniIOClient()
# 检查存储桶是否存在,不存在则创建
if not client.bucket_exists(bucket_name):
client.make_bucket(bucket_name)
log(f"创建存储桶: {bucket_name}")
# 上传文件
log(f"开始上传文件: {file_path} -> {bucket_name}/{object_name}")
client.fput_object(bucket_name, object_name, file_path)
log(f"文件上传成功: {bucket_name}/{object_name}")
return True
except S3Error as e:
log(f"上传文件失败 (S3Error): {str(e)}")
return False
except Exception as e:
log(f"上传文件失败: {str(e)}")
return False
def download_file(bucket_name, object_name, file_path):
"""
MiniIO 下载文件到本地
Args:
bucket_name: 存储桶名称
object_name: 对象名称 MiniIO 中的路径
file_path: 本地保存路径
Returns:
bool: 下载成功返回 True失败返回 False
"""
try:
client = miniIOClient()
# 检查文件是否存在
if not check_file_exists(bucket_name, object_name):
log(f"文件不存在: {bucket_name}/{object_name}")
return False
# 确保本地目录存在
local_dir = os.path.dirname(file_path)
if local_dir and not os.path.exists(local_dir):
os.makedirs(local_dir, exist_ok=True)
# 下载文件
log(f"开始下载文件: {bucket_name}/{object_name} -> {file_path}")
client.fget_object(bucket_name, object_name, file_path)
# 验证文件是否下载成功
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
log(f"文件下载成功: {file_path} (大小: {file_size} 字节)")
return True
else:
log("文件下载失败: 本地文件不存在")
return False
except S3Error as e:
log(f"下载文件失败 (S3Error): {str(e)}")
return False
except Exception as e:
log(f"下载文件失败: {str(e)}")
return False
def delete_file(bucket_name, object_name):
"""
MiniIO 删除文件
Args:
bucket_name: 存储桶名称
object_name: 对象名称 MiniIO 中的路径
Returns:
bool: 删除成功返回 True失败返回 False
"""
try:
client = miniIOClient()
# 检查文件是否存在
if not check_file_exists(bucket_name, object_name):
log(f"文件不存在,无需删除: {bucket_name}/{object_name}")
return True
# 删除文件
log(f"开始删除文件: {bucket_name}/{object_name}")
client.remove_object(bucket_name, object_name)
log(f"文件删除成功: {bucket_name}/{object_name}")
return True
except S3Error as e:
log(f"删除文件失败 (S3Error): {str(e)}")
return False
except Exception as e:
log(f"删除文件失败: {str(e)}")
return False
def check_file_exists(bucket_name, object_name):
"""
检查文件是否存在于 MiniIO
Args:
bucket_name: 存储桶名称
object_name: 对象名称 MiniIO 中的路径
Returns:
bool: 文件存在返回 True不存在返回 False
"""
try:
client = miniIOClient()
# 检查存储桶是否存在
if not client.bucket_exists(bucket_name):
log(f"存储桶不存在: {bucket_name}")
return False
# 检查对象是否存在
try:
client.stat_object(bucket_name, object_name)
return True
except S3Error as e:
if e.code == 'NoSuchKey':
return False
else:
log(f"检查文件存在性时出错: {str(e)}")
return False
except S3Error as e:
log(f"检查文件存在性失败 (S3Error): {str(e)}")
return False
except Exception as e:
log(f"检查文件存在性失败: {str(e)}")
return False

0
script/auto_sliceing_operate/utils/oss_func.py → factory_sliceing/auto_sliceing_operate/utils/oss_func.py

0
script/auto_sliceing_operate/utils/oss_redis.py → factory_sliceing/auto_sliceing_operate/utils/oss_redis.py

69
factory_sliceing/auto_sliceing_operate/utils/zip.py

@ -0,0 +1,69 @@
import os,shutil
import zipfile
from logs import log
def zip_file(folderPath, outputZipPath=None):
"""
压缩指定目录为zip文件
Args:
folderPath: 要压缩的目录路径
outputZipPath: 输出的zip文件路径如果为None则使用目录名+'.zip'
Returns:
str: 生成的zip文件路径如果失败返回None
"""
try:
if not os.path.exists(folderPath):
log(f"目录不存在: {folderPath}")
return None
if not os.path.isdir(folderPath):
log(f"路径不是目录: {folderPath}")
return None
# 如果没有指定输出路径,使用目录名+'.zip'
if outputZipPath is None:
folderName = os.path.basename(folderPath.rstrip(os.sep))
parentDir = os.path.dirname(folderPath)
outputZipPath = os.path.join(parentDir, f"{folderName}.zip")
log(f"开始压缩目录: {folderPath} -> {outputZipPath}")
# 创建zip文件
with zipfile.ZipFile(outputZipPath, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 遍历目录中的所有文件
for root, dirs, files in os.walk(folderPath):
# 计算相对路径
# 如果root就是folderPath,相对路径为空
# 否则计算相对于folderPath的路径
if root == folderPath:
arcname_prefix = ''
else:
# 获取相对于folderPath的路径
arcname_prefix = os.path.relpath(root, folderPath) + os.sep
for filename in files:
file_path = os.path.join(root, filename)
# 在zip中的路径(相对路径)
arcname = arcname_prefix + filename
zipf.write(file_path, arcname)
log(f"已添加文件到压缩包: {arcname}")
log(f"目录压缩完成: {outputZipPath}")
return outputZipPath
except Exception as e:
log(f"压缩目录时出现错误: {str(e)}")
return None
# def uploadAndZip(folderPath):
# # 上传文件到oss
# # upload_file(folderPath) # 暂时注释,等待实现
# # 压缩文件
# zip_file(folderPath)
zip_file("/Users/dcx/code/make2/script/batchPrint/1","1024.zip")

0
script/factory_sliceing_v2/build_exe.bat → factory_sliceing/download_batch_data/build_exe.bat

0
script/factory_sliceing_v2/build_exe.py → factory_sliceing/download_batch_data/build_exe.py

0
script/factory_sliceing_v2/build_exe.spec → factory_sliceing/download_batch_data/build_exe.spec

0
script/factory_sliceing_v2/dist/factory_sliceing.exe → factory_sliceing/download_batch_data/dist/factory_sliceing.exe vendored

0
script/factory_sliceing_v2/download.py → factory_sliceing/download_batch_data/download.py

0
script/factory_sliceing_v2/main.py → factory_sliceing/download_batch_data/main.py

0
script/factory_sliceing_v2/requirements.txt → factory_sliceing/download_batch_data/requirements.txt

0
script/factory_sliceing_v2/utils/changeFiles.py → factory_sliceing/download_batch_data/utils/changeFiles.py

0
script/factory_sliceing_v2/utils/funcs.py → factory_sliceing/download_batch_data/utils/funcs.py

0
script/factory_sliceing_v2/utils/logs.py → factory_sliceing/download_batch_data/utils/logs.py

0
script/factory_sliceing_v2/utils/oss_func.py → factory_sliceing/download_batch_data/utils/oss_func.py

0
script/factory_sliceing_v2/utils/oss_redis.py → factory_sliceing/download_batch_data/utils/oss_redis.py

0
script/factory_sliceing_v2/utils/small_machine_transform.py → factory_sliceing/download_batch_data/utils/small_machine_transform.py

65
script/test.py

@ -1,54 +1,13 @@
from pathlib import Path from pathlib import Path
import os import requests,json,os
import shutil
# 指定要遍历的文件夹路径 objFilePath = "/Users/dcx/code/make2/script/factory_sliceing/batchPrint/45/data"
folder_path = Path('G://obj_fix/print') objCounts = 0
for file in os.listdir(objFilePath):
# 定义函数来递归遍历文件夹下的所有文件 print(file)
def list_files(directory): # if file == 'arrange':
for root, dirs, files in os.walk(directory): # continue
for file in files: if file.endswith('.obj'):
yield os.path.join(root, file) objCounts += 1
# 指定要遍历的文件夹路径 print(objCounts)
# folder_path = folder_path
# 使用list_files函数来列出文件夹下的所有文件
nums = 0
for file_path in list_files(folder_path):
if "around" in file_path:
continue
if "_new.obj" not in file_path:
continue
#提取pid
arrTemp = file_path.split('\\')
pid = arrTemp[len(arrTemp)-2]
#判段是否存在 {pid}.obj 文件,存在的话就进行重命名
initObjPath = f'G://obj_fix/print/{pid}/{pid}.obj'
if not os.path.exists(initObjPath):
continue
#存在的话就重命名文件
old_file_path = f'G://obj_fix/print/{pid}/{pid}_old.obj'
os.rename(initObjPath, old_file_path)
newObjfilePath = f'G://obj_fix/print/{pid}/{pid}_new.obj'
os.rename(newObjfilePath, initObjPath)
# # 将 new.obj 文件复制到指定的文件夹下 G:\obj_fix\print\{pid}\
# source_obj = file_path
# target_obj = f'G://obj_fix/print/{pid}/{pid}_new.obj'
# print(f"当前正在处理{pid}")
# shutil.copy(source_obj, target_obj)
nums += 1
print(pid,file_path,nums)

51
script/test1.py

@ -1,33 +1,24 @@
import os import os
import shutil import requests
folder_path = 'G:\\obj_fix\print' #向 https://lenovodata.blob.core.windows.net/secretdata/photos/1/photo2/92_2.jpg?se=2025-10-30T10%3A01%3A13Z&sig=bv2KAPuNkYT5%2B4tNVMo53kWKIoniQaDkog27AYAXm4s%3D&sp=rcw&spr=https&sr=b&st=2025-10-30T09%3A01%3A13Z&sv=2025-05-05
# 上传图片
def upload_image(image_path):
url = 'https://lenovodata.blob.core.windows.net/secretdata/photos/1/photo2/92_2.jpg?se=2025-10-30T10%3A01%3A13Z&sig=bv2KAPuNkYT5%2B4tNVMo53kWKIoniQaDkog27AYAXm4s%3D&sp=rcw&spr=https&sr=b&st=2025-10-30T09%3A01%3A13Z&sv=2025-05-05'
with open(image_path, 'rb') as f:
image_data = f.read()
headers = {
'x-ms-blob-type': 'BlockBlob',
'Content-Type': 'image/jpeg',
}
response = requests.put(url, data=image_data, headers=headers)
try:
response.raise_for_status()
except requests.HTTPError as e:
raise RuntimeError(f'上传失败 {response.status_code}: {response.text}') from e
return {
'status_code': response.status_code,
'etag': response.headers.get('ETag'),
}
# 遍历文件夹下的所有文件 upload_image('273628Tex1.jpg')
# 定义函数来递归遍历文件夹下的所有文件
def list_files(directory):
for root, dirs, files in os.walk(directory):
for file in files:
yield os.path.join(root, file)
nums = 0
for file_path in list_files(folder_path):
if "_preview.png" not in file_path:
continue
print(file_path)
nums += 1
# #提取pid
# arrTemp = file_path.split('\\')
# pid = arrTemp[len(arrTemp)-2]
# #判断 G:\obj_fix\print\wrong\ 文件夹里 是否存在 {pid}_preview.png 文件
# filePath = f'G://obj_fix\print\wrong\{pid}_preview.png'
# #不存在跳过
# if not os.path.exists(filePath):
# continue
# #存在的话就重命名文件
# new_file_path = f'G://obj_fix\print\wrong\{pid}_preview_already_right.png'
# os.rename(filePath, new_file_path)
print(nums)
Loading…
Cancel
Save