You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
275 lines
9.5 KiB
275 lines
9.5 KiB
import open3d as o3d |
|
import os |
|
import numpy as np |
|
from scipy.spatial.transform import Rotation |
|
import sys |
|
|
|
import argparse |
|
# import cv2 |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
from numba import njit, prange |
|
import time |
|
|
|
|
|
# 核心计算函数(支持Numba加速) |
|
@njit(fastmath=True, cache=True) |
|
def calculate_rotation_z(angle_x, angle_y, angle_z, points, cos_cache, sin_cache, angle_step): |
|
"""计算单个旋转组合后的重心Z坐标(无显式平移)""" |
|
# 获取预计算的三角函数值 |
|
idx_x = angle_x // angle_step |
|
idx_y = angle_y // angle_step |
|
idx_z = angle_z // angle_step |
|
|
|
cos_x = cos_cache[idx_x] |
|
sin_x = sin_cache[idx_x] |
|
cos_y = cos_cache[idx_y] |
|
sin_y = sin_cache[idx_y] |
|
cos_z = cos_cache[idx_z] |
|
sin_z = sin_cache[idx_z] |
|
|
|
# 构造旋转矩阵(展开矩阵乘法优化) |
|
# R = Rz @ Ry @ Rx |
|
# 计算矩阵元素(手动展开矩阵乘法) |
|
m00 = cos_z * cos_y |
|
m01 = cos_z * sin_y * sin_x - sin_z * cos_x |
|
m02 = cos_z * sin_y * cos_x + sin_z * sin_x |
|
|
|
m10 = sin_z * cos_y |
|
m11 = sin_z * sin_y * sin_x + cos_z * cos_x |
|
m12 = sin_z * sin_y * cos_x - cos_z * sin_x |
|
|
|
m20 = -sin_y |
|
m21 = cos_y * sin_x |
|
m22 = cos_y * cos_x |
|
|
|
# 计算所有点的Z坐标 |
|
z_values = np.empty(points.shape[0], dtype=np.float64) |
|
for i in prange(points.shape[0]): |
|
x, y, z = points[i, 0], points[i, 1], points[i, 2] |
|
# 应用旋转矩阵 |
|
rotated_z = m20 * x + m21 * y + m22 * z |
|
z_values[i] = rotated_z |
|
|
|
# 计算重心Z(等效于平移后的重心) |
|
min_z = np.min(z_values) |
|
avg_z = np.mean(z_values) |
|
return avg_z - min_z # 等效于平移后的重心Z坐标 |
|
|
|
|
|
# 并行优化主函数 |
|
def parallel_rotation2(points, angle_step=3): |
|
""" |
|
参数: |
|
points : numpy.ndarray (N,3) - 三维点云 |
|
angle_step : int - 角度搜索步长(度数) |
|
|
|
返回: |
|
(best_angle_x, best_angle_y, best_angle_z, min_z) |
|
""" |
|
points = np.ascontiguousarray(points.astype(np.float64)) |
|
|
|
# 生成所有可能角度 |
|
angles = np.arange(0, 360, angle_step) |
|
n_angles = len(angles) |
|
|
|
# 预计算三角函数值(大幅减少重复计算) |
|
rads = np.radians(angles) |
|
cos_cache = np.cos(rads).astype(np.float64) |
|
sin_cache = np.sin(rads).astype(np.float64) |
|
|
|
# 生成所有角度组合(内存优化版) |
|
total_combinations = n_angles ** 3 |
|
print(f"Total combinations: {total_combinations:,}") |
|
|
|
# 分块处理以避免内存溢出 |
|
best_z = np.inf |
|
best_angles = (0, 0, 0) |
|
batch_size = 10 ** 6 # 根据可用内存调整 |
|
|
|
for x_chunk in range(0, n_angles, max(1, n_angles // 4)): |
|
angles_x = angles[x_chunk:x_chunk + max(1, n_angles // 4)] |
|
for y_chunk in range(0, n_angles, max(1, n_angles // 4)): |
|
angles_y = angles[y_chunk:y_chunk + max(1, n_angles // 4)] |
|
|
|
# 生成当前分块的所有组合 |
|
xx, yy, zz = np.meshgrid(angles_x, angles_y, angles) |
|
current_batch = np.stack([xx.ravel(), yy.ravel(), zz.ravel()], axis=1) |
|
|
|
# 处理子批次 |
|
for i in range(0, len(current_batch), batch_size): |
|
batch = current_batch[i:i + batch_size] |
|
results = np.zeros(len(batch), dtype=np.float64) |
|
_process_batch(batch, points, cos_cache, sin_cache, angle_step, results) |
|
|
|
# 更新最佳结果 |
|
min_idx = np.argmin(results) |
|
if results[min_idx] < best_z: |
|
best_z = results[min_idx] |
|
best_angles = tuple(batch[min_idx]) |
|
print(f"New best: {best_angles} -> Z={best_z:.4f}") |
|
|
|
return (*best_angles, best_z) |
|
|
|
|
|
@njit(parallel=True, fastmath=True) |
|
def _process_batch(batch, points, cos_cache, sin_cache, angle_step, results): |
|
for i in prange(len(batch)): |
|
ax, ay, az = batch[i] |
|
results[i] = calculate_rotation_z( |
|
ax, ay, az, points, |
|
cos_cache, sin_cache, angle_step |
|
) |
|
|
|
|
|
class ModelProcessor: |
|
def __init__(self): |
|
|
|
# argv = sys.argv[sys.argv.index("--") + 1:] if "--" in sys.argv else [] |
|
parser = argparse.ArgumentParser() |
|
|
|
parser.add_argument( |
|
"--id", |
|
required=False, |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
self.id = args.id |
|
|
|
self.mesh = None |
|
self.asset_dir = f"/home/algo/Documents/datasets/{self.id}" |
|
|
|
def load_model(self): |
|
"""加载并初始化3D模型""" |
|
# model_path = f"{self.asset_dir}/baked/{self.id}.obj" |
|
# model_path = f"{self.asset_dir}/repair_{self.id}_mesh.ply" |
|
model_path = "/data/datasets_20t/8/88884_253283_P65951_6cm_x1.obj" |
|
if not os.path.exists(model_path): |
|
raise FileNotFoundError(f"Model file not found: {model_path}") |
|
|
|
print(model_path) |
|
|
|
mesh_native = o3d.io.read_triangle_mesh(model_path, enable_post_processing=False) |
|
# self.mesh = o3d.io.read_triangle_mesh(model_path, enable_post_processing=False) |
|
|
|
print("Open3D去重前顶点数:", len(mesh_native.vertices)) |
|
self.mesh = mesh_native.merge_close_vertices(eps=1e-6) |
|
|
|
vertices2 = np.asarray(self.mesh.vertices) |
|
print("Open3D去重后顶点数:", len(vertices2)) |
|
vertices2_sorted = sorted( |
|
vertices2.tolist(), |
|
key=lambda x: (x[0], x[1], x[2]) |
|
) |
|
|
|
if not self.mesh.has_vertex_colors(): |
|
num_vertices = len(self.mesh.vertices) |
|
self.mesh.vertex_colors = o3d.utility.Vector3dVector( |
|
np.ones((num_vertices, 3)) |
|
) |
|
|
|
self.uv_array = np.asarray(self.mesh.triangle_uvs) |
|
# print(f"UV 坐标形状:{self.uv_array.shape}, {self.uv_array[0][1]}") |
|
|
|
def calculate_rotation_and_center_of_mass(self, angle_x, angle_y, angle_z, points): |
|
"""计算某一组旋转角度后的重心""" |
|
# 计算绕X轴、Y轴和Z轴的旋转矩阵 |
|
R_x = np.array([ |
|
[1, 0, 0], |
|
[0, np.cos(np.radians(angle_x)), -np.sin(np.radians(angle_x))], |
|
[0, np.sin(np.radians(angle_x)), np.cos(np.radians(angle_x))] |
|
]) |
|
|
|
R_y = np.array([ |
|
[np.cos(np.radians(angle_y)), 0, np.sin(np.radians(angle_y))], |
|
[0, 1, 0], |
|
[-np.sin(np.radians(angle_y)), 0, np.cos(np.radians(angle_y))] |
|
]) |
|
|
|
R_z = np.array([ |
|
[np.cos(np.radians(angle_z)), -np.sin(np.radians(angle_z)), 0], |
|
[np.sin(np.radians(angle_z)), np.cos(np.radians(angle_z)), 0], |
|
[0, 0, 1] |
|
]) |
|
|
|
# 综合旋转矩阵 |
|
R = R_z @ R_y @ R_x |
|
|
|
# 执行旋转 |
|
rotated_points = points @ R.T |
|
|
|
# 计算最小z值 |
|
min_z = np.min(rotated_points[:, 2]) |
|
|
|
# 计算平移向量,将最小Z值平移到0 |
|
translation_vector = np.array([0, 0, -min_z]) |
|
rotated_points += translation_vector |
|
|
|
# 计算重心 |
|
center_of_mass = np.mean(rotated_points, axis=0) |
|
|
|
return center_of_mass[2], angle_x, angle_y, angle_z |
|
|
|
def parallel_rotation(self, angle_step=3): |
|
"""顺序计算最优旋转角度(单线程)""" |
|
min_center_of_mass_y = float('inf') |
|
best_angle_x, best_angle_y, best_angle_z = 0, 0, 0 |
|
|
|
# 遍历所有角度组合 |
|
for angle_x in range(0, 360, angle_step): |
|
for angle_y in range(0, 360, angle_step): |
|
for angle_z in range(0, 360, angle_step): |
|
center_of_mass_z, ax, ay, az = self.calculate_rotation_and_center_of_mass( |
|
angle_x, angle_y, angle_z, self.mesh.vertices |
|
) |
|
if center_of_mass_z < min_center_of_mass_y: |
|
min_center_of_mass_y = center_of_mass_z |
|
best_angle_x, best_angle_y, best_angle_z = ax, ay, az |
|
|
|
return best_angle_x, best_angle_y, best_angle_z, min_center_of_mass_y |
|
|
|
def process(self): |
|
"""执行完整处理流程""" |
|
self.load_model() |
|
|
|
try: |
|
start = time.time() |
|
|
|
# mesh = o3d.geometry.TriangleMesh() |
|
# mesh.vertices = o3d.utility.Vector3dVector(np.random.rand(100, 3)) |
|
# points = np.asarray(mesh.vertices) |
|
|
|
pcd = o3d.geometry.PointCloud() |
|
pcd.points = o3d.utility.Vector3dVector(self.mesh.vertices) |
|
|
|
# 自动计算合理体素大小 |
|
raw_points = np.asarray(pcd.points) |
|
bounds = np.ptp(raw_points, axis=0) |
|
voxel_size = np.max(bounds) / 50 # 默认取最大边长的2% |
|
|
|
# 执行下采样并验证 |
|
pcd_downsampled = pcd.voxel_down_sample(voxel_size) |
|
if len(pcd_downsampled.points) < 10: # 最少保留10个点 |
|
raise RuntimeError(f"下采样失败:voxel_size={voxel_size:.3f}过大") |
|
|
|
print(f"下采样后点数: {len(pcd_downsampled.points)} (voxel_size={voxel_size:.3f})") |
|
|
|
# pcd.paint_uniform_color([1,0,0]) # 原始红色 |
|
# pcd_downsampled.paint_uniform_color([0,0,1]) # 采样后蓝色 |
|
# o3d.visualization.draw_geometries([pcd, pcd_downsampled]) |
|
|
|
# 继续后续处理 |
|
points = np.asarray(pcd_downsampled.points) |
|
|
|
best_angle_x, best_angle_y, best_angle_z, min_z = parallel_rotation2(points, angle_step=5) |
|
print("best=", best_angle_x, best_angle_y, best_angle_z, min_z) |
|
print(time.time() - start) |
|
|
|
except Exception as e: |
|
print(f"Error during processing: {str(e)}") |
|
raise |
|
|
|
|
|
if __name__ == "__main__": |
|
ModelProcessor().process() |