Browse Source

代码更新

main
hesuicong 2 weeks ago
parent
commit
09d6c3106b
  1. 7
      config.py
  2. 484
      download_print.py
  3. 296
      download_print_out.py
  4. 66
      general.py
  5. 46
      test_load_json.py

7
config.py

File diff suppressed because one or more lines are too long

484
download_print.py

@ -1,456 +1,32 @@
import yaml
import oss2 import oss2
import os import os
from tqdm import tqdm from tqdm import tqdm
# from utils.log_utils import log_execution
import os import os
from pathlib import Path from pathlib import Path
import numpy as np
import collections
import os import os
import argparse import argparse
from dataclasses import dataclass
from config import oss_config from config import oss_config
from config import print_data_dir from config import print_data_dir
from config import url_get_oss_suffix_by_orderId from config import url_get_oss_suffix_by_orderId
from config import url_get_info_by_print_batch_id
from general import is_use_debug_oss from general import is_use_debug_oss
from general import transform_save_bpy from general import transform_save_bpy
from general import read_models_from_json
from general import is_small_machine
from compute_print_net import get_oss_client from compute_print_net import get_oss_client
class DataTransfer: from download_print_out import DataTransfer
''' from download_print_out import download_data_by_json
数据传输类
'''
def __init__(self, local_path: str, oss_path: str, oss_client: oss2.Bucket):
'''
local_path: 本地输出路径
oss_path: oss路径
oss_client: oss客户端
'''
self.local_path = local_path
self.oss_path = oss_path.lstrip('/')
self.oss_client = oss_client
# self.description = description
# @log_execution(self.description)
def download_data(self):
"""
OSS 下载数据到本地保持原有目录结构
"""
# 列出所有对象
objects = []
prefix = self.oss_path.lstrip('/') # 移除开头的 '/' 以匹配 OSS 格式
for obj in oss2.ObjectIterator(self.oss_client, prefix=prefix):
if obj.key != prefix: # 跳过目录本身
objects.append(obj.key)
# 下载所有文件,添加进度条
for obj_key in tqdm(objects, desc="下载进度"):
if obj_key.endswith('/'):
continue
if "printId" in obj_key:
continue
# 计算相对路径
rel_path = obj_key[len(prefix):].lstrip('/')
# 构建本地完整路径
local_path = os.path.join(self.local_path, rel_path)
# 创建必要的目录
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
self.oss_client.get_object_to_file(obj_key, local_path)
print("download_data local_path=" + local_path)
order_id: str
pid: str
model_height: str
def download_data_rename_json(self, json_model_info):
"""
OSS 下载数据到本地保持原有目录结构
"""
# 列出所有对象
objects = []
prefix = self.oss_path.lstrip('/') # 移除开头的 '/' 以匹配 OSS 格式
for obj in oss2.ObjectIterator(self.oss_client, prefix=prefix):
if obj.key != prefix: # 跳过目录本身
objects.append(obj.key)
# 下载所有文件,添加进度条
for obj_key in tqdm(objects, desc="下载进度"):
if obj_key.endswith('/'):
continue
if "printId" in obj_key:
continue
# 计算相对路径
rel_path = obj_key[len(prefix):].lstrip('/')
file_dir, file_name = os.path.split(rel_path)
file_base, file_ext = os.path.splitext(file_name)
# 根据文件后缀名进行重命名
if file_ext.lower() in ['.mtl', '.jpg', '.jpeg', '.png']:
# 对于.mtl和图片文件,在原名前加order_id
new_file_name = f"{json_model_info.order_id}_{file_name}"
# new_file_name = file_name
elif file_ext.lower() == '.obj':
# 对于.obj文件,完全重命名
new_file_name = f"{json_model_info.obj_name}"
else:
# 其他文件类型保持原名
new_file_name = file_name
print("new_file_name=", new_file_name)
# 构建新的相对路径
if file_dir: # 如果有子目录
new_rel_path = os.path.join(file_dir, new_file_name)
else:
new_rel_path = new_file_name
# 构建本地完整路径
local_path = os.path.join(self.local_path, new_rel_path)
# 创建必要的目录
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
self.oss_client.get_object_to_file(obj_key, local_path)
if file_ext == '.obj': # 10MB以上
try:
# 使用临时文件避免内存问题 [8](@ref)
temp_path = local_path + '.tmp'
with open(local_path, 'r', encoding='utf-8') as f_in, \
open(temp_path, 'w', encoding='utf-8') as f_out:
mtllib_modified = False
for line in f_in:
if not mtllib_modified and line.strip().startswith('mtllib '):
parts = line.split(' ', 1)
if len(parts) > 1:
old_mtl_name = parts[1].strip()
new_mtl_name = f"{json_model_info.order_id}_{old_mtl_name}"
f_out.write(f"mtllib {new_mtl_name}\n")
mtllib_modified = True
continue
f_out.write(line)
os.replace(temp_path, local_path) # 原子性替换
except IOError as e:
print(f"处理大文件 {local_path} 时出错: {e}")
if os.path.exists(temp_path):
os.remove(temp_path)
# 优化后的.obj文件处理逻辑
if file_ext == '.mtl':
try:
# 使用更高效的文件读取方式 [6,8](@ref)
with open(local_path, 'r', encoding='utf-8') as f:
content = f.read()
# 使用字符串方法直接查找和替换,避免不必要的循环 [9](@ref)
lines = content.split('\n')
mtllib_modified = False
for i, line in enumerate(lines):
stripped_line = line.strip()
if not mtllib_modified and stripped_line.startswith('map_Kd '):
# 更高效的分割方式 [9](@ref)
parts = line.split(' ', 1)
if len(parts) > 1:
old_name = parts[1].strip()
new_name = f"{json_model_info.order_id}_{old_name}"
lines[i] = f"map_Kd {new_name}"
mtllib_modified = True
print(f"已更新材质库引用: {old_name} -> {new_name}")
break # 找到第一个后立即退出
# 批量写入,减少I/O操作 [6](@ref)
with open(local_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
except IOError as e:
print(f"处理文件 {local_path} 时出错: {e}")
except UnicodeDecodeError as e:
print(f"文件编码错误 {local_path}: {e}")
print(f"下载文件: {obj_key} -> {local_path}")
def download_data_rename_batch(self, batch_model_info):
"""
OSS 下载数据到本地保持原有目录结构
"""
# 列出所有对象
objects = []
prefix = self.oss_path.lstrip('/') # 移除开头的 '/' 以匹配 OSS 格式
prefix_exists = False
for obj in oss2.ObjectIterator(self.oss_client, prefix=prefix):
prefix_exists = True
if obj.key != prefix: # 跳过目录本身
objects.append(obj.key)
print(f"obj.key={obj.key}")
if not prefix_exists:
print(f"前缀 '{prefix}' 下没有找到任何文件或目录。")
return False
else:
print(f"前缀 '{prefix}' 存在,共找到 {len(objects)} 个对象。")
# 下载所有文件,添加进度条
for obj_key in tqdm(objects, desc="下载进度"):
if obj_key.endswith('/'):
print("下载 endswith('/'")
continue
if "printId" in obj_key:
print(f"下载 in obj_key")
continue
# 计算相对路径
rel_path = obj_key[len(prefix):].lstrip('/')
file_dir, file_name = os.path.split(rel_path)
file_base, file_ext = os.path.splitext(file_name)
# 根据文件后缀名进行重命名
if file_ext.lower() in ['.mtl', '.jpg', '.jpeg', '.png']:
# 对于.mtl和图片文件,在原名前加order_id
new_file_name = f"{batch_model_info.order_id}_{file_name}"
# new_file_name = file_name
elif file_ext.lower() == '.obj':
# 对于.obj文件,完全重命名
new_file_name = f"{batch_model_info.order_id}_{batch_model_info.pid}_P{batch_model_info.print_order_id}_{batch_model_info.model_size}{file_ext}"
else:
# 其他文件类型保持原名
new_file_name = file_name
# 构建新的相对路径
if file_dir: # 如果有子目录
new_rel_path = os.path.join(file_dir, new_file_name)
else:
new_rel_path = new_file_name
# 构建本地完整路径
local_path = os.path.join(self.local_path, new_rel_path)
# 创建必要的目录
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
self.oss_client.get_object_to_file(obj_key, local_path)
if file_ext == '.obj': # 10MB以上
try:
# 使用临时文件避免内存问题 [8](@ref)
temp_path = local_path + '.tmp'
with open(local_path, 'r', encoding='utf-8') as f_in, \
open(temp_path, 'w', encoding='utf-8') as f_out:
mtllib_modified = False
for line in f_in:
if not mtllib_modified and line.strip().startswith('mtllib '):
parts = line.split(' ', 1)
if len(parts) > 1:
old_mtl_name = parts[1].strip()
new_mtl_name = f"{batch_model_info.order_id}_{old_mtl_name}"
f_out.write(f"mtllib {new_mtl_name}\n")
mtllib_modified = True
print("len(parts) > 1")
continue
f_out.write(line)
os.replace(temp_path, local_path) # 原子性替换
except IOError as e:
print(f"处理大文件 {local_path} 时出错: {e}")
if os.path.exists(temp_path):
os.remove(temp_path)
# 优化后的.obj文件处理逻辑
if file_ext == '.mtl':
try:
# 使用更高效的文件读取方式 [6,8](@ref)
with open(local_path, 'r', encoding='utf-8') as f:
content = f.read()
# 使用字符串方法直接查找和替换,避免不必要的循环 [9](@ref)
lines = content.split('\n')
mtllib_modified = False
for i, line in enumerate(lines):
stripped_line = line.strip()
if not mtllib_modified and stripped_line.startswith('map_Kd '):
# 更高效的分割方式 [9](@ref)
parts = line.split(' ', 1)
if len(parts) > 1:
old_name = parts[1].strip()
new_name = f"{batch_model_info.order_id}_{old_name}"
lines[i] = f"map_Kd {new_name}"
mtllib_modified = True
print(f"已更新材质库引用: {old_name} -> {new_name}")
break # 找到第一个后立即退出
# 批量写入,减少I/O操作 [6](@ref)
with open(local_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
except IOError as e:
print(f"处理文件 {local_path} 时出错: {e}")
except UnicodeDecodeError as e:
print(f"文件编码错误 {local_path}: {e}")
print(f"下载文件: {obj_key} -> {local_path}")
return True
def upload_data(self):
'''
上传数据到OSS
'''
# 检测本地路径是否存在
if not os.path.exists(self.local_path):
raise FileNotFoundError(f"本地路径不存在: {self.local_path}")
# 判断本地路径是文件还是目录
if os.path.isfile(self.local_path):
local_suffix = Path(self.local_path).suffix
oss_suffix = Path(self.oss_path).suffix
if oss_suffix and oss_suffix != local_suffix:
# 后缀名不一致,上传到指定文件夹下的同名文件
oss_dir = os.path.dirname(self.oss_path)
oss_target_path = os.path.join(oss_dir, os.path.basename(self.local_path))
else:
# 后缀名一致,上传到指定OSS路径
oss_target_path = self.oss_path
# 上传文件
self.oss_client.put_object_from_file(oss_target_path, self.local_path)
print(f"文件已上传到: {oss_target_path}")
elif os.path.isdir(self.local_path):
oss_suffix = Path(self.oss_path).suffix
if oss_suffix:
raise ValueError("不能将目录上传到具有后缀名的OSS路径。")
# 遍历本地目录并上传
for root, dirs, files in os.walk(self.local_path):
for file in files:
local_file_path = os.path.join(root, file)
relative_path = os.path.relpath(local_file_path, self.local_path)
oss_file_path = os.path.join(self.oss_path, relative_path).replace("\\", "/")
# 创建必要的目录
oss_dir = os.path.dirname(oss_file_path)
# 上传文件
self.oss_client.put_object_from_file(oss_file_path, local_file_path)
print(f"文件已上传到: {oss_file_path}")
else:
raise ValueError(f"无效的本地路径类型: {self.local_path}")
import requests import requests
import json
import shutil import shutil
from dataclasses import dataclass
@dataclass
class JSONModelInfo:
obj_name: str
order_id: str
pid: str
model_height: str
def read_pids_from_json(pid_file):
"""从文件读取所有PID"""
# with open(pid_file, 'r') as f:
# # 过滤掉空行并去除每行首尾的空白字符
# return [line.strip() for line in f if line.strip()]
json_path = pid_file
"""
加载JSON文件读取所有模型信息应用变换后返回模型列表
"""
# 检查JSON文件是否存在
if not os.path.exists(json_path):
print(f"错误: JSON文件不存在 - {json_path}")
return []
# 读取JSON文件
try:
with open(json_path, 'r') as f:
data = json.load(f)
except Exception as e:
print(f"读取JSON文件失败: {e}")
return []
list_model_info = []
# 处理每个模型
for model in data.get('models', []):
obj_name = model.get('file_name', '')
parts = obj_name.split('_')
order_id = parts[0]
pid = parts[1]
model_height = parts[3]
model_info = JSONModelInfo(
obj_name=obj_name,
order_id=order_id,
pid=pid,
model_height=model_height
)
list_model_info.append(model_info)
return list_model_info, data
def download_data_by_json(model_info, workdir, oss_client ):
try:
pid = model_info.pid
model_height = model_info.model_height
target_dir = f"{workdir}"
url = f"{url_get_oss_suffix_by_orderId}{model_info.order_id}"
res = requests.get(url)
data = res.json()["data"]
# print("datas=",data)
data = data.replace("/init_obj", "")
print("target_dir=", target_dir)
# download_textures = DataTransfer(target_dir, f"objs/download/print/{pid}/base/model/{model_height}/", oss_client)
# download_textures = DataTransfer(target_dir, f"objs/download/print/{pid}/base_cartoon/badge/101/3/{model_height}/", oss_client)
download_textures = DataTransfer(target_dir, f"objs/download/print/{pid}/{data}/{model_height}/", oss_client)
download_textures.download_data_rename_json(model_info)
# 下载后检查目标文件夹是否为空
if os.path.exists(target_dir) and not os.listdir(target_dir):
shutil.rmtree(target_dir)
print(f"下载后检查发现目标文件夹为空,已删除: {target_dir}")
except Exception as e:
print(f"卡通图片下载失败: {pid}, 错误: {str(e)}")
pass
@dataclass @dataclass
class BatchModelInfo: class BatchModelInfo:
order_id: str order_id: str
@ -461,8 +37,8 @@ class BatchModelInfo:
count: str count: str
def read_paths_from_batch(batch_id): def read_paths_from_batch(batch_id):
url = f"https://mp.api.suwa3d.com/api/printOrder/getInfoByPrintBatchId?batch_id={batch_id}"
res = requests.get(url) res = requests.get(f"{url_get_info_by_print_batch_id}{batch_id}")
datas = res.json()["data"] datas = res.json()["data"]
print("datas=",datas) print("datas=",datas)
@ -528,35 +104,19 @@ def download_datas_by_pre_layout(list_print_model_info, workdir, oss_config):
return False return False
return True return True
def download_transform_save_by_batch(batch_id, workdir, oss_config):
datas, succ = download_datas_by_batch(batch_id, workdir, oss_config)
print("datas=", datas)
layout_data = datas["layout_data"]
original_obj_pid_dir = workdir
transform_save_o3d(layout_data, original_obj_pid_dir)
def download_datas_by_json(json_name, workdir, oss_config): def download_datas_by_json(json_name, workdir, oss_config):
oss_client = get_oss_client(oss_config) oss_client = get_oss_client(oss_config)
#json_path = os.path.join(workdir, "3DPrintLayout.json")
json_path = os.path.join(workdir, f"{json_name}.json") json_path = os.path.join(workdir, f"{json_name}.json")
# 读取所有PID list_model_info, json_data = read_models_from_json(json_path)
list_model_info, data = read_pids_from_json(json_path) print(f"从文件读取了 {len(list_model_info)} 个model")
print(f"从文件读取了 {len(list_model_info)} 个PID")
# 批量下载 # 批量下载
for model_info in list_model_info: for model_info in list_model_info:
print(f"开始下载PID: {model_info}") print(f"开始下载PID: {model_info}")
download_data_by_json(model_info, workdir, oss_client) download_data_by_json(model_info, workdir, oss_client)
return data return json_data
def download_transform_save_by_json(json_name, workdir, oss_config):
layout_data = download_datas_by_json(json_name, workdir, oss_config)
original_obj_pid_dir = workdir
transform_save_bpy(layout_data, original_obj_pid_dir)
def upload_result(base_original_obj_dir, oss_config, batch_id): def upload_result(base_original_obj_dir, oss_config, batch_id):
@ -580,13 +140,10 @@ def upload_result(base_original_obj_dir, oss_config, batch_id):
print(f"失败: {batch_id}, 错误: {str(e)}") print(f"失败: {batch_id}, 错误: {str(e)}")
pass pass
import open3d as o3d
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
is_by_batch = True is_by_batch = False
is_transform_save = False
batch_id = 10118 batch_id = 10118
if is_by_batch: if is_by_batch:
# 通过batch_id下载 # 通过batch_id下载
@ -599,11 +156,11 @@ if __name__ == "__main__":
workdir = f"{print_data_dir}{batch_id}" workdir = f"{print_data_dir}{batch_id}"
is_transform_save = False
datas, succ = download_datas_by_batch(batch_id, workdir, oss_config)
if is_transform_save: if is_transform_save:
download_transform_save_by_batch(batch_id, workdir, oss_config) layout_data = datas["layout_data"]
else: transform_save_bpy(layout_data, workdir)
download_datas_by_batch(batch_id, workdir, oss_config)
else: else:
# 通过Json下载 # 通过Json下载
@ -617,7 +174,6 @@ if __name__ == "__main__":
json_name = batch_id json_name = batch_id
if is_transform_save: json_data = download_datas_by_json(json_name, workdir, oss_config)
download_transform_save_by_json(json_name, workdir, oss_config) if is_small_machine(json_data):
else: transform_save_bpy(json_data, workdir)
download_datas_by_json(json_name, workdir, oss_config)

296
download_print_out.py

@ -5,10 +5,12 @@ import os
import numpy as np import numpy as np
import os import os
import argparse import argparse
import bpy # import bpy
import sys import sys
import open3d as o3d import open3d as o3d
from pathlib import Path
import requests import requests
import json import json
import shutil import shutil
@ -16,6 +18,7 @@ import shutil
from config import url_get_oss_suffix_by_orderId from config import url_get_oss_suffix_by_orderId
from general import transform_save_bpy from general import transform_save_bpy
from general import read_models_from_json
from compute_print_net import get_oss_client from compute_print_net import get_oss_client
@ -37,6 +40,46 @@ class DataTransfer:
self.local_path = local_path self.local_path = local_path
self.oss_path = oss_path.lstrip('/') self.oss_path = oss_path.lstrip('/')
self.oss_client = oss_client self.oss_client = oss_client
# self.description = description
# @log_execution(self.description)
def download_data(self):
"""
OSS 下载数据到本地保持原有目录结构
"""
# 列出所有对象
objects = []
prefix = self.oss_path.lstrip('/') # 移除开头的 '/' 以匹配 OSS 格式
for obj in oss2.ObjectIterator(self.oss_client, prefix=prefix):
if obj.key != prefix: # 跳过目录本身
objects.append(obj.key)
# 下载所有文件,添加进度条
for obj_key in tqdm(objects, desc="下载进度"):
if obj_key.endswith('/'):
continue
if "printId" in obj_key:
continue
# 计算相对路径
rel_path = obj_key[len(prefix):].lstrip('/')
# 构建本地完整路径
local_path = os.path.join(self.local_path, rel_path)
# 创建必要的目录
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
self.oss_client.get_object_to_file(obj_key, local_path)
print("download_data local_path=" + local_path)
order_id: str
pid: str
model_height: str
def download_data_rename_json(self, json_model_info): def download_data_rename_json(self, json_model_info):
""" """
@ -154,6 +197,197 @@ class DataTransfer:
print(f"下载文件: {obj_key} -> {local_path}") print(f"下载文件: {obj_key} -> {local_path}")
def download_data_rename_batch(self, batch_model_info):
"""
OSS 下载数据到本地保持原有目录结构
"""
# 列出所有对象
objects = []
prefix = self.oss_path.lstrip('/') # 移除开头的 '/' 以匹配 OSS 格式
prefix_exists = False
for obj in oss2.ObjectIterator(self.oss_client, prefix=prefix):
prefix_exists = True
if obj.key != prefix: # 跳过目录本身
objects.append(obj.key)
print(f"obj.key={obj.key}")
if not prefix_exists:
print(f"前缀 '{prefix}' 下没有找到任何文件或目录。")
return False
else:
print(f"前缀 '{prefix}' 存在,共找到 {len(objects)} 个对象。")
# 下载所有文件,添加进度条
for obj_key in tqdm(objects, desc="下载进度"):
if obj_key.endswith('/'):
print("下载 endswith('/'")
continue
if "printId" in obj_key:
print(f"下载 in obj_key")
continue
# 计算相对路径
rel_path = obj_key[len(prefix):].lstrip('/')
file_dir, file_name = os.path.split(rel_path)
file_base, file_ext = os.path.splitext(file_name)
# 根据文件后缀名进行重命名
if file_ext.lower() in ['.mtl', '.jpg', '.jpeg', '.png']:
# 对于.mtl和图片文件,在原名前加order_id
new_file_name = f"{batch_model_info.order_id}_{file_name}"
# new_file_name = file_name
elif file_ext.lower() == '.obj':
# 对于.obj文件,完全重命名
new_file_name = f"{batch_model_info.order_id}_{batch_model_info.pid}_P{batch_model_info.print_order_id}_{batch_model_info.model_size}{file_ext}"
else:
# 其他文件类型保持原名
new_file_name = file_name
# 构建新的相对路径
if file_dir: # 如果有子目录
new_rel_path = os.path.join(file_dir, new_file_name)
else:
new_rel_path = new_file_name
# 构建本地完整路径
local_path = os.path.join(self.local_path, new_rel_path)
# 创建必要的目录
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
self.oss_client.get_object_to_file(obj_key, local_path)
if file_ext == '.obj': # 10MB以上
try:
# 使用临时文件避免内存问题 [8](@ref)
temp_path = local_path + '.tmp'
with open(local_path, 'r', encoding='utf-8') as f_in, \
open(temp_path, 'w', encoding='utf-8') as f_out:
mtllib_modified = False
for line in f_in:
if not mtllib_modified and line.strip().startswith('mtllib '):
parts = line.split(' ', 1)
if len(parts) > 1:
old_mtl_name = parts[1].strip()
new_mtl_name = f"{batch_model_info.order_id}_{old_mtl_name}"
f_out.write(f"mtllib {new_mtl_name}\n")
mtllib_modified = True
print("len(parts) > 1")
continue
f_out.write(line)
os.replace(temp_path, local_path) # 原子性替换
except IOError as e:
print(f"处理大文件 {local_path} 时出错: {e}")
if os.path.exists(temp_path):
os.remove(temp_path)
# 优化后的.obj文件处理逻辑
if file_ext == '.mtl':
try:
# 使用更高效的文件读取方式 [6,8](@ref)
with open(local_path, 'r', encoding='utf-8') as f:
content = f.read()
# 使用字符串方法直接查找和替换,避免不必要的循环 [9](@ref)
lines = content.split('\n')
mtllib_modified = False
for i, line in enumerate(lines):
stripped_line = line.strip()
if not mtllib_modified and stripped_line.startswith('map_Kd '):
# 更高效的分割方式 [9](@ref)
parts = line.split(' ', 1)
if len(parts) > 1:
old_name = parts[1].strip()
new_name = f"{batch_model_info.order_id}_{old_name}"
lines[i] = f"map_Kd {new_name}"
mtllib_modified = True
print(f"已更新材质库引用: {old_name} -> {new_name}")
break # 找到第一个后立即退出
# 批量写入,减少I/O操作 [6](@ref)
with open(local_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
except IOError as e:
print(f"处理文件 {local_path} 时出错: {e}")
except UnicodeDecodeError as e:
print(f"文件编码错误 {local_path}: {e}")
print(f"下载文件: {obj_key} -> {local_path}")
return True
def download_single_file(self):
"""
下载单个文件从OSS到本地
"""
# 确保本地目录存在
os.makedirs(os.path.dirname(self.local_path), exist_ok=True)
# 直接下载文件
try:
self.oss_client.get_object_to_file(self.oss_path, self.local_path)
print(f"文件已下载到: {self.local_path}")
except oss2.exceptions.NoSuchKey:
print(f"OSS文件不存在: {self.oss_path}")
def upload_data(self):
'''
上传数据到OSS
'''
# 检测本地路径是否存在
if not os.path.exists(self.local_path):
raise FileNotFoundError(f"本地路径不存在: {self.local_path}")
# 判断本地路径是文件还是目录
if os.path.isfile(self.local_path):
local_suffix = Path(self.local_path).suffix
oss_suffix = Path(self.oss_path).suffix
if oss_suffix and oss_suffix != local_suffix:
# 后缀名不一致,上传到指定文件夹下的同名文件
oss_dir = os.path.dirname(self.oss_path)
oss_target_path = os.path.join(oss_dir, os.path.basename(self.local_path))
else:
# 后缀名一致,上传到指定OSS路径
oss_target_path = self.oss_path
# 上传文件
self.oss_client.put_object_from_file(oss_target_path, self.local_path)
print(f"文件已上传到: {oss_target_path}")
elif os.path.isdir(self.local_path):
oss_suffix = Path(self.oss_path).suffix
if oss_suffix:
raise ValueError("不能将目录上传到具有后缀名的OSS路径。")
# 遍历本地目录并上传
for root, dirs, files in os.walk(self.local_path):
for file in files:
local_file_path = os.path.join(root, file)
relative_path = os.path.relpath(local_file_path, self.local_path)
oss_file_path = os.path.join(self.oss_path, relative_path).replace("\\", "/")
# 创建必要的目录
oss_dir = os.path.dirname(oss_file_path)
# 上传文件
self.oss_client.put_object_from_file(oss_file_path, local_file_path)
print(f"文件已上传到: {oss_file_path}")
else:
raise ValueError(f"无效的本地路径类型: {self.local_path}")
def get_api(url): def get_api(url):
try: try:
response = requests.get(url) response = requests.get(url)
@ -166,61 +400,6 @@ def get_api(url):
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
raise Exception(f"Error fetching URL {url}: {e}") raise Exception(f"Error fetching URL {url}: {e}")
from dataclasses import dataclass
@dataclass
class JSONModelInfo:
obj_name: str
order_id: str
pid: str
print_order_id: str
model_height: str
def read_pids_from_json(pid_file):
"""从文件读取所有PID"""
json_path = pid_file
"""
加载JSON文件读取所有模型信息应用变换后返回模型列表
"""
# 检查JSON文件是否存在
if not os.path.exists(json_path):
print(f"错误: JSON文件不存在 - {json_path}")
return []
# 读取JSON文件
try:
with open(json_path, 'r') as f:
data = json.load(f)
except Exception as e:
print(f"读取JSON文件失败: {e}")
return []
list_model_info = []
# 处理每个模型
for model in data.get('models', []):
obj_name = model.get('file_name', '')
parts = obj_name.split('_')
order_id = parts[0]
pid = parts[1]
print_order_id = parts[2]
print_order_id = print_order_id.replace("P", "")
model_height = parts[3]
model_info = JSONModelInfo(
obj_name=obj_name,
order_id=order_id,
pid=pid,
print_order_id=print_order_id,
model_height=model_height
)
list_model_info.append(model_info)
print(f"model_info={model_info}")
return list_model_info, data
def download_data_by_json(model_info, workdir, oss_client ): def download_data_by_json(model_info, workdir, oss_client ):
try: try:
pid = model_info.pid pid = model_info.pid
@ -249,18 +428,17 @@ def download_data_by_json(model_info, workdir, oss_client ):
def download_datas_by_json(pid_file, workdir, oss_config): def download_datas_by_json(pid_file, workdir, oss_config):
oss_client = get_oss_client(oss_config) oss_client = get_oss_client(oss_config)
# json_path = os.path.join(workdir, "3DPrintLayout.json")
json_path = os.path.join(workdir, f"{pid_file}.json") json_path = os.path.join(workdir, f"{pid_file}.json")
# 读取所有PID # 读取所有PID
list_model_info, data = read_pids_from_json(json_path) list_model_info, json_data = read_models_from_json(json_path)
print(f"从文件读取了 {len(list_model_info)} 个PID") print(f"从文件读取了 {len(list_model_info)} 个PID")
# 批量下载 # 批量下载
for model_info in list_model_info: for model_info in list_model_info:
print(f"开始下载PID: {model_info}") print(f"开始下载PID: {model_info}")
download_data_by_json(model_info, args.workdir, oss_client) download_data_by_json(model_info, args.workdir, oss_client)
return data return json_data
if __name__ == "__main__": if __name__ == "__main__":

66
general.py

@ -218,3 +218,69 @@ def is_cross_border_c(x, y, z, mx, my, mz, max_x, max_y, max_z):
return False return False
# -------------------------- 结束:碰撞检测和越界 -------------------------- # -------------------------- 结束:碰撞检测和越界 --------------------------
# -------------------------- 开始:JSON --------------------------
import json
from dataclasses import dataclass
@dataclass
class JSONModelInfo:
obj_name: str
order_id: str
pid: str
print_order_id: str
model_height: str
def read_from_json(json_path):
if not os.path.exists(json_path):
print(f"错误: JSON文件不存在 - {json_path}")
return []
try:
with open(json_path, 'r') as f:
json_data = json.load(f)
except Exception as e:
print(f"读取JSON文件失败: {e}")
return []
return json_data
def read_models_from_json(json_path):
json_data = read_from_json(json_path)
list_model_info = []
# 处理每个模型
for model in json_data.get('models', []):
obj_name = model.get('file_name', '')
parts = obj_name.split('_')
order_id = parts[0]
pid = parts[1]
model_height = parts[3]
model_info = JSONModelInfo(
obj_name=obj_name,
order_id=order_id,
pid=pid,
model_height=model_height
)
list_model_info.append(model_info)
return list_model_info, json_data
def get_summary(json_data):
return json_data.get('summary')
def get_selected_machine(json_data):
return get_summary(json_data)['selected_machine']
def is_small_machine(json_data):
selected_machine = get_selected_machine(json_data)
is_small_machine = True if selected_machine=="小机型" else False
return is_small_machine
# -------------------------- 结束:JSON --------------------------

46
test_load_json.py

@ -8,6 +8,7 @@ import gc
from config import print_data_dir from config import print_data_dir
from config import test_print_max from config import test_print_max
from config import version
from general import mesh_transform_by_matrix from general import mesh_transform_by_matrix
from general import get_blank_path from general import get_blank_path
@ -425,6 +426,8 @@ def set_orthographic(meshes, width=1920, height=1080,
return vis return vis
from PIL import Image, ImageDraw, ImageFont
def render_to_texture(meshes, output_image_path): def render_to_texture(meshes, output_image_path):
vis = set_orthographic(meshes) vis = set_orthographic(meshes)
@ -432,6 +435,49 @@ def render_to_texture(meshes, output_image_path):
# 渲染并保存 # 渲染并保存
vis.capture_screen_image(output_image_path, do_render=True) vis.capture_screen_image(output_image_path, do_render=True)
text = f"v {version}" # 使用PIL添加文字
try:
image = Image.open(output_image_path)
draw = ImageDraw.Draw(image)
font = ImageFont.load_default()
font_families = [
'DejaVu Sans', # 许多 Linux 发行版的标准配置[2,5](@ref)
'Liberation Sans', # 作为 Arial 等字体的开源替代,预装可能性高[1](@ref)
'Ubuntu', # Ubuntu 系统默认字体之一[3,5](@ref)
'FreeSans', # 另一个开源无衬线字体
'Source Sans Pro' # 一款清晰易读的开源字体
]
for font_family in font_families:
try:
font = ImageFont.truetype(font_family, 30)
break
except IOError:
continue
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
# 设置位置:左下角,左边距50像素,下边距10像素[1,3](@ref)
width, height = image.size
x = 50 # 左边距
y = height - text_height - 50 # 图片高度减去文字高度再减去10像素边距
fill_color = (255, 0, 0)
# 添加文字
draw.text((x, y), text, fill=fill_color, font=font)
# 保存带文字的图片
image.save(output_image_path)
print(f"带文字的渲染图片已保存到: {output_image_path}")
except Exception as e:
print(f"添加文字时出错: {e}")
print(f"高级渲染图片已保存到: {output_image_path}") print(f"高级渲染图片已保存到: {output_image_path}")
return vis return vis

Loading…
Cancel
Save