You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

380 lines
14 KiB

import open3d as o3d
import numpy as np
import copy
import time
import argparse
import os
from general import *
from compute_print_net_out import get_volume_centroid
from compute_print_net_out import get_lowest_position_of_z_ext
from compute_print_net_out import get_lowest_position_of_z_ext
from compute_print_net_out import compute_bbox_ext
from compute_print_net_out import voxel_size
# -------------------------- 开始:bbox --------------------------
def get_models_bbox(dict_pcd_fix):
"""
单独提取:从dict_fix中解析所有模型的包围盒(bbox)尺寸信息
:param dict_fix: 包含PLY文件名和对应点云的字典
:return: 模型列表(包含name和dimensions)
"""
all_models = []
extend_dist = 2 # 尺寸扩展量(单位:厘米)
for ply_file in dict_pcd_fix:
# 解析PLY文件名中的尺寸信息(格式:"模型ID=维度1+维度2+维度3.ply")
bbox_with_text = ply_file.split("=")
bbox_with = bbox_with_text[-1]
split_text = bbox_with.replace(".ply", "").split("+")
# 转换单位:米 → 厘米 → 加扩展量 → 转回米(int取整避免浮点数精度问题)
x_length = int(float(split_text[2]) * 100) + extend_dist # 第三个维度→x方向
y_length = int(float(split_text[0]) * 100) + extend_dist # 第一个维度→y方向
z_length = int(float(split_text[1]) * 100) + extend_dist # 第二个维度→z方向
all_models.append({
'name': ply_file,
'dimensions': (int(x_length / 100), int(z_length / 100), int(y_length / 100)) # 单位:米
})
return all_models
def arrange_models_on_platform(models, machine_size):
"""
单独提取:将模型在打印平台上进行排版布局
:param models: 由get_models_bbox返回的模型列表(包含name和dimensions)
:param machine_size: 打印机尺寸 (width, depth, height)
:return: (placed_models, unplaced_models) - 已放置和未放置的模型列表
"""
# 初始化打印平台
platform = Platform(
int(machine_size[0]),
int(machine_size[1]),
int(machine_size[2])
)
print("开始计算排序...")
platform.arrange_models(models)
platform.print_results()
return platform.get_result()
import os
def compute_bbox_all_ext(base_original_obj_dir,compact_min_dis=True):
obj_id_list = [aa.split(".o")[0] for aa in os.listdir(base_original_obj_dir) if aa.endswith(".obj")]
obj_id_list = obj_id_list
dict_mesh_obj = {}
for pid_t_y in obj_id_list:
obj_name = pid_t_y+".obj"
obj_path = os.path.join(base_original_obj_dir,obj_name)
mesh_obj = read_mesh(obj_path)
if mesh_obj is None:
continue
dict_mesh_obj[obj_name] = mesh_obj
return compute_bbox_all(dict_mesh_obj,compact_min_dis)
def compute_bbox_all(dict_mesh_obj,is_downsample):
dict_total_matrix= {}
dict_pcd_fix= {}
for key, value in dict_mesh_obj.items():
start_time = time.time()
obj_name = key
mesh_obj = value
total_matrix, pcd_fix, ply_name = compute_bbox(mesh_obj,obj_name,is_downsample)
dict_total_matrix[obj_name] = total_matrix
dict_pcd_fix[ply_name] = pcd_fix
# print(f"compute_bbox obj_name={obj_name} ply_name={ply_name} time={time.time()-start_time}")
# dict_mesh_obj.clear()
# del dict_mesh_obj
all_models = get_models_bbox(dict_pcd_fix)
return dict_total_matrix,all_models
def compute_bbox(mesh_obj, obj_name, is_downsample=True):
# return compute_bbox_ext(mesh_obj, obj_name, is_downsample)
mesh_obj_origin = copy.deepcopy(mesh_obj)
if is_compute_bbox_net:
# 从网络上获取数据
printId = ""
match = re.search(r"P(\d+)", obj_name) # 匹配 "P" 后的连续数字
if match:
printId = match.group(1)
succ, total_matrix, z_max, min_bound, max_bound, ply_name = compute_bbox_net(printId)
if not succ :
total_matrix, z_max, min_bound, max_bound, ply_name = compute_bbox_ext(mesh_obj, obj_name, is_downsample)
print(f"compute_bbox_net fail {printId}")
else:
ply_name = f"{os.path.splitext(obj_name)[0]}={ply_name.split('=')[1]}"
print(f"compute_bbox_net succ {printId}")
# print(f"compute_bbox_net printId = {printId}, total_matrix = {total_matrix}, ply_name = {ply_name}")
else :
total_matrix, z_max, min_bound, max_bound, ply_name = compute_bbox_ext(mesh_obj, obj_name, is_downsample)
print(f"compute_bbox_ext ply_name = {ply_name}")
transformed_vertices = mesh_transform_by_matrix(np.asarray(mesh_obj_origin.vertices), total_matrix)
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(transformed_vertices)
if is_downsample:
pcd_downsampled = down_sample(pcd, voxel_size, False)
pcd_fix = pcd_downsampled
else:
pcd_fix = pcd
return total_matrix, pcd_fix, ply_name
import requests
import ast
def compute_bbox_net(printId):
url = f"{url_infoByPrintId}{printId}"
res = requests.get(url)
datas = res.json()["data"]["layout"]
# print("datas=", datas)
if not "ply_name" in datas:
return False, np.eye(4), 0.0, np.zeros(3), np.zeros(3), ""
homo_matrix_str = datas.get("homo_matrix")
# print("homo_matrix_str=", homo_matrix_str)
try:
matrix_list = ast.literal_eval(homo_matrix_str) # 直接转换为二维列表
reconstructed_matrix = np.array(matrix_list, dtype=float)
# print("矩阵形状:", reconstructed_matrix.shape) # 应该是 (4, 4)
except (ValueError, SyntaxError) as e:
print(f"解析错误: {e}")
layout_z = datas.get("layout_z", 0)
max_bound = datas.get("max_bound",0)
min_bound = datas.get("min_bound",0)
ply_name = datas.get("ply_name",0)
# print(f"layout_z={layout_z}, max_bound={max_bound}, min_bound={min_bound}, ply_name={ply_name}, ")
return True, reconstructed_matrix, layout_z, max_bound, min_bound, ply_name
class Platform:
def __init__(self, width, depth, height):
self.width = width
self.depth = depth
self.height = height
self.placed_models = [] # 已放置的模型
self.unplaced_models = [] # 未能放置的模型
self.first_line = True
self.remove_multiobj_name = ""
def is_cross_border(self, x, y, z, model):
mx, my, mz = model['dimensions']
return is_cross_border_c(x, y, z, mx, my, mz, self.width, self.depth, self.height)
def check_multiobj_cross_pre(self, name, pre_model):
if (pre_model==None):
return
if not is_same_obj(name, pre_model['name']):
return
self.unplaced_models.append(pre_model)
if pre_model in self.placed_models:
self.placed_models.remove(pre_model)
if "pre_model" in pre_model:
self.pre_model = pre_model["pre_model"]
self.check_multiobj_cross_pre(name, self.pre_model)
def check_multiobj_cross(self, model):
if not is_multi_obj(model['name']):
return False
self.unplaced_models.append(model)
if "pre_model" in model:
self.pre_model = model["pre_model"]
self.check_multiobj_cross_pre(model['name'], self.pre_model)
return True
def can_place(self, x, y, z, model, is_print=False):
mx, my, mz = model['dimensions']
if self.is_cross_border(x, y, z, model):
# print(f"can_place False 1 cross_border {x}, {y}, {z}, {model}, {self.width}, {self.depth}, {self.height}")
return False
# 碰撞检测(正确逻辑与间距处理)
for placed in self.placed_models:
px, py, pz = placed['position']
pdx, pdy, pdz = placed['dimensions']
# 使用AABB碰撞检测算法[4](@ref)
if (x > px - pdx - extend_dist_model_x and
x - mx - extend_dist_model_x < px and
y > py - pdy - extend_dist_model_y and
y - my - extend_dist_model_y < py and
z < pz + pdz and
z + mz > pz):
# print("can_place False 2",False,model,x,y,z,px,pdx,extend_dist_model_x,py,pdy,extend_dist_model_y,my,pz,pdz,pz)
return False
return True
def place_model(self, model, pre_model):
mx, my, mz = model['dimensions']
if mz > self.height:
self.unplaced_models.append(model)
return False
if is_same_obj(model['name'], self.remove_multiobj_name):
self.unplaced_models.append(model)
return False
z = 0
if pre_model is None:
if self.first_line:
model['position'] = (mx + extend_dist_border_x_min, self.depth - extend_dist_border_y_max, 0)
print(f"First Model {model['name']}")
model['first_line'] = True
else:
model['position'] = (self.width - extend_dist_border_x_max, self.depth - extend_dist_border_y_max, 0)
model['first_line'] = False
# print("model position1", model['name'], model['position'])
self.placed_models.append(model)
return True
pre_px, pre_py, pre_pz = pre_model['position']
pre_mx, pre_my, pre_mz = pre_model['dimensions']
if self.first_line:
px = pre_px + mx + extend_dist_model_x
model['first_line'] = True
else:
px = pre_px - pre_mx - extend_dist_model_x
model['first_line'] = False
# print(model['name'], "px", px, pre_px, pre_mx)
reach_limit_x = False
if self.first_line:
if px > self.width:
reach_limit_x = True
else:
if px - mx < 0:
reach_limit_x = True
if reach_limit_x:
self.first_line = False
px = self.width - extend_dist_border_x_max
start_y = my + extend_dist_border_y_min
final_y = self.depth
print("reach_limit_x final_y1", model['name'], my, final_y, my, extend_dist_border_x_max, px)
for y in range(start_y, final_y, +1):
# print("y",y)
if self.can_place(px, y, z, model, True)==False:
y -= 1
if self.is_cross_border(px, y, z, model):
print(f"cross border : {model['name']}")
if self.check_multiobj_cross(model):
self.remove_multiobj_name = model['name']
return False
model['position'] = (px, y, z)
# print("model position2", model['name'], model['position'])
self.placed_models.append(model)
return True
else:
start_y = my + extend_dist_border_y_min
final_y = self.depth
# print("final_y2", model['name'], start_y, final_y, my, extend_dist_border_y_max, px)
for y in range(start_y, final_y, +1):
if self.can_place(px, y, z, model)==False:
y -= 1
if self.is_cross_border(px, y, z, model):
print(f"cross border : {model['name']}")
if self.check_multiobj_cross(model):
self.remove_multiobj_name = model['name']
return False
model['position'] = (px, y, z)
# print("model position2", model['name'], model['position'])
self.placed_models.append(model)
return True
if 'position' in model:
print("model position3", model['name'], model['position'])
else:
print("model position3 no exist position", model['name'])
self.unplaced_models.append(model)
return False
def arrange_models(self, models):
"""对所有模型进行排布(单层)"""
print(" 单层放置模式:所有模型只能放在平台底面(Z=0)")
# 按高度和面积排序,优先放大模型
models = sorted(models, key=lambda m: (-m['dimensions'][2], -m['dimensions'][0] * m['dimensions'][1]))
self.pre_model = None
for model in models:
print(f"arrange_models {model['name']}")
pre_model_temp = self.pre_model
if self.place_model(model, self.pre_model):
self.pre_model = model
model["pre_model"] = pre_model_temp
def print_results(self):
"""打印排布结果"""
print("Placed Models:")
for model in self.placed_models:
print(f" - {model['name']} at {model['position']} with dimensions {model['dimensions']}")
print("Unplaced Models:")
for model in self.unplaced_models:
print(f" - {model['name']} with dimensions {model['dimensions']}")
def get_result(self):
return self.placed_models, self.unplaced_models
# -------------------------- 结束:bbox --------------------------
# -------------------------- 开始:oss --------------------------
import yaml
import oss2
def get_oss_client(cfg_path):
with open(os.path.expanduser(cfg_path), "r") as config:
cfg = yaml.safe_load(config)
AccessKeyId_down = cfg["run"]["down"]["AccessKeyId"]
AccessKeySecret_down = cfg["run"]["down"]["AccessKeySecret"]
Endpoint_down = cfg["run"]["down"]["Endpoint"]
Bucket_down = cfg["run"]["down"]["Bucket"]
oss_client = oss2.Bucket(
oss2.Auth(AccessKeyId_down, AccessKeySecret_down), Endpoint_down, Bucket_down
)
return oss_client
# -------------------------- 结束:oss --------------------------