建模程序 多个定时程序
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

354 lines
13 KiB

import yaml
import oss2
import os
from tqdm import tqdm
import os
from pathlib import Path
import numpy as np
import os
import argparse
import open3d as o3d
def custom_mesh_transform(vertices, transform_matrix):
"""
手动实现网格变换:对每个顶点应用齐次变换矩阵
参数:
vertices: 网格顶点数组 (N, 3)
transform_matrix: 4x4 齐次变换矩阵
返回:
变换后的顶点数组 (N, 3)
"""
# 1. 顶点转齐次坐标 (N, 3) → (N, 4)
homogeneous_vertices = np.hstack((vertices, np.ones((vertices.shape[0], 1))))
# 2. 应用变换矩阵:矩阵乘法 (4x4) * (4xN) → (4xN)
transformed_homogeneous = transform_matrix @ homogeneous_vertices.T
# 3. 转回非齐次坐标 (3xN) → (N, 3)
transformed_vertices = transformed_homogeneous[:3, :].T
return transformed_vertices
class DataTransfer:
'''
数据传输类
'''
def __init__(self, local_path: str, oss_path: str, oss_client: oss2.Bucket):
'''
local_path: 本地输出路径
oss_path: oss路径
oss_client: oss客户端
'''
self.local_path = local_path
self.oss_path = oss_path.lstrip('/')
self.oss_client = oss_client
order_id: str
pid: str
model_height: str
def download_data_rename_json(self, json_model_info):
"""
从 OSS 下载数据到本地,保持原有目录结构
"""
# 列出所有对象
objects = []
prefix = self.oss_path.lstrip('/') # 移除开头的 '/' 以匹配 OSS 格式
for obj in oss2.ObjectIterator(self.oss_client, prefix=prefix):
if obj.key != prefix: # 跳过目录本身
objects.append(obj.key)
# 下载所有文件,添加进度条
for obj_key in tqdm(objects, desc="下载进度"):
if obj_key.endswith('/'):
continue
if "printId" in obj_key:
continue
# 计算相对路径
rel_path = obj_key[len(prefix):].lstrip('/')
file_dir, file_name = os.path.split(rel_path)
file_base, file_ext = os.path.splitext(file_name)
# 根据文件后缀名进行重命名
if file_ext.lower() in ['.mtl', '.jpg', '.jpeg', '.png']:
# 对于.mtl和图片文件,在原名前加order_id
new_file_name = f"{json_model_info.order_id}_{file_name}"
# new_file_name = file_name
elif file_ext.lower() == '.obj':
# 对于.obj文件,完全重命名
new_file_name = f"{json_model_info.obj_name}"
else:
# 其他文件类型保持原名
new_file_name = file_name
print("new_file_name=", new_file_name)
# 构建新的相对路径
if file_dir: # 如果有子目录
new_rel_path = os.path.join(file_dir, new_file_name)
else:
new_rel_path = new_file_name
# 构建本地完整路径
local_path = os.path.join(self.local_path, new_rel_path)
# 创建必要的目录
os.makedirs(os.path.dirname(local_path), exist_ok=True)
# 下载文件
self.oss_client.get_object_to_file(obj_key, local_path)
if file_ext == '.obj': # 10MB以上
try:
# 使用临时文件避免内存问题 [8](@ref)
temp_path = local_path + '.tmp'
with open(local_path, 'r', encoding='utf-8') as f_in, \
open(temp_path, 'w', encoding='utf-8') as f_out:
mtllib_modified = False
for line in f_in:
if not mtllib_modified and line.strip().startswith('mtllib '):
parts = line.split(' ', 1)
if len(parts) > 1:
old_mtl_name = parts[1].strip()
new_mtl_name = f"{json_model_info.order_id}_{old_mtl_name}"
f_out.write(f"mtllib {new_mtl_name}\n")
mtllib_modified = True
continue
f_out.write(line)
os.replace(temp_path, local_path) # 原子性替换
except IOError as e:
print(f"处理大文件 {local_path} 时出错: {e}")
if os.path.exists(temp_path):
os.remove(temp_path)
# 优化后的.obj文件处理逻辑
if file_ext == '.mtl':
try:
# 使用更高效的文件读取方式 [6,8](@ref)
with open(local_path, 'r', encoding='utf-8') as f:
content = f.read()
# 使用字符串方法直接查找和替换,避免不必要的循环 [9](@ref)
lines = content.split('\n')
mtllib_modified = False
for i, line in enumerate(lines):
stripped_line = line.strip()
if not mtllib_modified and stripped_line.startswith('map_Kd '):
# 更高效的分割方式 [9](@ref)
parts = line.split(' ', 1)
if len(parts) > 1:
old_name = parts[1].strip()
new_name = f"{json_model_info.order_id}_{old_name}"
lines[i] = f"map_Kd {new_name}"
mtllib_modified = True
print(f"已更新材质库引用: {old_name} -> {new_name}")
break # 找到第一个后立即退出
# 批量写入,减少I/O操作 [6](@ref)
with open(local_path, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
except IOError as e:
print(f"处理文件 {local_path} 时出错: {e}")
except UnicodeDecodeError as e:
print(f"文件编码错误 {local_path}: {e}")
print(f"下载文件: {obj_key} -> {local_path}")
import requests
import json
import shutil
def get_api(url):
try:
response = requests.get(url)
response.raise_for_status() # 检查请求是否成功
response = json.loads(response.text)
if response.get("code") != 1000:
raise Exception(f"Error fetching URL {url}: {response.get('message')}")
else:
return response
except requests.exceptions.RequestException as e:
raise Exception(f"Error fetching URL {url}: {e}")
from dataclasses import dataclass
@dataclass
class JSONModelInfo:
obj_name: str
order_id: str
pid: str
model_height: str
def read_pids_from_json(pid_file):
"""从文件读取所有PID"""
json_path = pid_file
"""
加载JSON文件,读取所有模型信息,应用变换后返回模型列表
"""
# 检查JSON文件是否存在
if not os.path.exists(json_path):
print(f"错误: JSON文件不存在 - {json_path}")
return []
# 读取JSON文件
try:
with open(json_path, 'r') as f:
data = json.load(f)
except Exception as e:
print(f"读取JSON文件失败: {e}")
return []
list_model_info = []
# 处理每个模型
for model in data.get('models', []):
obj_name = model.get('file_name', '')
parts = obj_name.split('_')
order_id = parts[0]
pid = parts[1]
model_height = parts[3]
model_info = JSONModelInfo(
obj_name=obj_name,
order_id=order_id,
pid=pid,
model_height=model_height
)
list_model_info.append(model_info)
return list_model_info, data
def download_data_by_json(model_info, workdir, oss_client ):
try:
pid = model_info.pid
model_height = model_info.model_height
# target_dir = f"{workdir}/{pid}_image"
target_dir = f"{workdir}"
url = f"https://mp.api.suwa3d.com/api/order/getOssSuffixByOrderId?order_id={model_info.order_id}"
res = requests.get(url)
data = res.json()["data"]
# print("datas=",data)
data = data.replace("/init_obj", "")
print("target_dir=", target_dir)
download_textures = DataTransfer(target_dir, f"objs/download/print/{pid}/{data}/{model_height}/", oss_client)
download_textures.download_data_rename_json(model_info)
# 下载后检查目标文件夹是否为空
if os.path.exists(target_dir) and not os.listdir(target_dir):
shutil.rmtree(target_dir)
print(f"下载后检查发现目标文件夹为空,已删除: {target_dir}")
except Exception as e:
print(f"卡通图片下载失败: {pid}, 错误: {str(e)}")
pass
def get_oss_client(cfg_path):
# with open(os.path.expanduser(cfg_path), "r") as config:
# cfg = yaml.safe_load(config)
AccessKeyId_down = 'LTAI5tSReWm8hz7dSYxxth8f'
AccessKeySecret_down = '8ywTDF9upPAtvgXtLKALY2iMYHIxdS'
Endpoint_down = 'oss-cn-shanghai.aliyuncs.com'
Bucket_down = 'suwa3d-securedata'
oss_client = oss2.Bucket(
oss2.Auth(AccessKeyId_down, AccessKeySecret_down), Endpoint_down, Bucket_down
)
return oss_client
def download_datas_by_json(pid_file, workdir, oss_config):
oss_client = get_oss_client(oss_config)
# json_path = os.path.join(workdir, "3DPrintLayout.json")
json_path = pid_file
# 读取所有PID
list_model_info, data = read_pids_from_json(json_path)
print(f"从文件读取了 {len(list_model_info)} 个PID")
# 批量下载
for model_info in list_model_info:
print(f"开始下载PID: {model_info}")
download_data_by_json(model_info, args.workdir, oss_client)
return data
def download_transform_save_by_json(pid_file, workdir, oss_config):
layout_data = download_datas_by_json(pid_file, workdir, oss_config)
original_obj_pid_dir = workdir
cache_type_setting_dir = os.path.join(workdir, "arrange")
Path(cache_type_setting_dir).mkdir(exist_ok=True)
print(f"original_obj_pid_dir={original_obj_pid_dir}, cache_type_setting_dir={cache_type_setting_dir}")
transform_save(layout_data, original_obj_pid_dir, cache_type_setting_dir)
def transform_save(layout_data, original_obj_pid_dir, cache_type_setting_dir):
meshes = []
# 小打印机380*345,需要偏移-380,-345
need_offset = True
for model in layout_data["models"]:
transform = model.get('transform', {})
homo_matrix = transform["homo_matrix"] # 获取存储的列表
reconstructed_matrix = np.array(homo_matrix, dtype=np.float64)
obj_name = model.get('file_name', '')
obj_path = os.path.join(original_obj_pid_dir, obj_name)
# 加载网格
try:
mesh = o3d.io.read_triangle_mesh(obj_path, enable_post_processing=True)
if not mesh.has_vertices():
print(f"警告: 网格无有效顶点 - {obj_path}")
continue
except Exception as e:
print(f"加载模型失败: {obj_path} - {e}")
continue
original_vertices = np.asarray(mesh.vertices)
transformed_vertices = custom_mesh_transform(original_vertices, reconstructed_matrix)
# 如果 need_offset 为 True,应用额外的偏移
if need_offset:
# 应用偏移 (-380, -345, 0)
offset = np.array([-380, -345, 0])
transformed_vertices += offset
print(f"已对模型 {obj_name} 应用偏移: {offset}")
mesh.vertices = o3d.utility.Vector3dVector(transformed_vertices)
meshes.append(mesh)
# obj_path_arrange = os.path.join(original_obj_pid_dir, "arrange")
obj_path_arrange = cache_type_setting_dir
if not os.path.exists(obj_path_arrange):
os.mkdir(obj_path_arrange)
obj_path_arrange_obj = os.path.join(obj_path_arrange, obj_name)
print("obj_path_arrange_obj", obj_path_arrange_obj)
mesh.compute_vertex_normals()
o3d.io.write_triangle_mesh(obj_path_arrange_obj, mesh,write_triangle_uvs=True)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--pid_file", type=str, required=True, help="批次号, 也是json文件名")
parser.add_argument("--workdir", type=str, required=True, help="本代码文件所在的目录")
parser.add_argument("--oss_config", type=str, default="")
args = parser.parse_args()
download_transform_save_by_json(args.pid_file, args.workdir, args.oss_config)