You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

1283 lines
52 KiB

import time
import open3d as o3d
import os
import numpy as np
from scipy.spatial.transform import Rotation
import sys
sys.path.append("/home/algo/Documents/openMVS/openMVS/libs/MVS/utils")
from colmap_loader import read_cameras_text, read_images_text, read_int_text, write_int_text, read_indices_from_file
from get_pose_matrix import get_w2c
import argparse
import matplotlib.pyplot as plt
import collections
import torch
import torch.nn.functional as F
from torch.utils.dlpack import to_dlpack, from_dlpack
import os
from typing import Dict, List, Set
class ModelProcessor:
def __init__(self):
# argv = sys.argv[sys.argv.index("--") + 1:] if "--" in sys.argv else []
parser = argparse.ArgumentParser()
parser.add_argument(
"--id",
required=True,
)
#"""
parser.add_argument(
"--base_path",
type=str,
required=True,
)
parser.add_argument(
"--mesh_path",
type=str,
required=True,
)
parser.add_argument(
"--sparse_dir",
type=str,
required=True,
)
#"""
# print("ModelProcessor Init", args.input_file, self.pose_path)
args = parser.parse_args()
self.id = args.id
# self.asset_dir = f"/home/algo/Documents/openMVS/data/{self.id}"
self.asset_dir = args.base_path
self.mesh_path = args.mesh_path
# self.pose_path = f"{self.asset_dir}/sparse/"
self.pose_path = args.sparse_dir
if not os.path.exists(self.pose_path):
raise FileNotFoundError(f"Camera data not found: {self.pose_path}")
# GPU设备设置
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {self.device}")
self.mesh = None
def load_model(self):
"""加载并初始化3D模型"""
# model_path = f"{self.asset_dir}/repair.ply"
model_path = self.mesh_path
if not os.path.exists(model_path):
raise FileNotFoundError(f"Model file not found: {model_path}")
print(model_path)
mesh_native = o3d.io.read_triangle_mesh(model_path, enable_post_processing=False)
# self.mesh = o3d.io.read_triangle_mesh(model_path, enable_post_processing=False)
self.mesh = mesh_native
#"""
print("Open3D去重前顶点数:", len(mesh_native.vertices))
# self.mesh = mesh_native.merge_close_vertices(eps=1e-6)
vertices2 = np.asarray(self.mesh.vertices)
print("Open3D去重后顶点数:", len(vertices2))
vertices2_sorted = sorted(
vertices2.tolist(),
key=lambda x: (x[0], x[1], x[2])
)
if not self.mesh.has_vertex_colors():
num_vertices = len(self.mesh.vertices)
self.mesh.vertex_colors = o3d.utility.Vector3dVector(
np.ones((num_vertices, 3))
)
self.uv_array = np.asarray(self.mesh.triangle_uvs)
# print(f"UV 坐标形状:{self.uv_array.shape}, {self.uv_array[0][1]}")
#"""
#"""
# 将数据转移到GPU
vertices = np.asarray(self.mesh.vertices, dtype=np.float32)
triangles = np.asarray(self.mesh.triangles, dtype=np.int32)
# 转换为PyTorch张量并转移到GPU
self.vertices_tensor = torch.from_numpy(vertices).to(self.device)
self.triangles_tensor = torch.from_numpy(triangles).to(self.device)
print(f"Loaded {len(vertices)} vertices and {len(triangles)} triangles and {len(self.triangles_tensor)} triangles_tensor")
self._build_face_adjacency_gpu()
#"""
# self._build_face_adjacency()
if not self.mesh.has_vertex_colors():
num_vertices = len(self.mesh.vertices)
self.mesh.vertex_colors = o3d.utility.Vector3dVector(
np.ones((num_vertices, 3))
)
def _build_face_adjacency_gpu(self):
"""优化的GPU版本面片邻接关系构建"""
if len(self.triangles_tensor) == 0:
return
triangles = self.triangles_tensor.cpu().numpy() # 转到CPU处理
num_faces = len(triangles)
# 使用更高效的方法构建边-面映射
edge_face_map = {}
for face_idx, tri in enumerate(triangles):
# 获取三条边(排序顶点保证唯一性)
edges = [
tuple(sorted([tri[0], tri[1]])),
tuple(sorted([tri[1], tri[2]])),
tuple(sorted([tri[2], tri[0]]))
]
for edge in edges:
if edge not in edge_face_map:
edge_face_map[edge] = []
edge_face_map[edge].append(face_idx)
# 构建邻接关系
self.face_adjacency = [[] for _ in range(num_faces)]
adjacency_count = 0
for edge, faces in edge_face_map.items():
if len(faces) > 1: # 只处理共享边
for i in faces:
for j in faces:
if i != j:
if j not in self.face_adjacency[i]:
self.face_adjacency[i].append(j)
adjacency_count += 1
print(f"邻接关系构建完成:")
print(f"- 面片总数: {num_faces}")
print(f"- 边总数: {len(edge_face_map)}")
print(f"- 共享边数: {len([f for f in edge_face_map.values() if len(f) > 1])}")
print(f"- 邻接关系数: {adjacency_count}")
def _build_depth_pyramid_gpu(self, depth_map, levels=4):
"""GPU版本的深度金字塔构建"""
if not isinstance(depth_map, torch.Tensor):
depth_tensor = torch.from_numpy(depth_map).float().to(self.device)
else:
depth_tensor = depth_map.float()
pyramid = [depth_tensor]
current_level = depth_tensor
for _ in range(levels-1):
# 使用平均池化进行下采样
current_level = current_level.unsqueeze(0).unsqueeze(0) # 添加batch和channel维度
current_level = F.avg_pool2d(current_level, kernel_size=2, stride=2)
current_level = current_level.squeeze(0).squeeze(0)
pyramid.append(current_level)
return pyramid
def _hierarchical_occlusion_test_gpu(self, vertices_cam, depth_pyramid, intrinsics, img_size):
"""GPU版本的层级遮挡检测 - 直接计算方法"""
fx, fy, cx, cy = intrinsics
height, width = img_size
# 过滤无效顶点
valid_mask = vertices_cam[:, 2] > 1e-6
vertices_valid = vertices_cam[valid_mask]
if len(vertices_valid) == 0:
return (torch.zeros(len(vertices_cam), dtype=torch.bool, device=self.device),
torch.zeros(len(vertices_cam), dtype=torch.bool, device=self.device))
visible = torch.zeros(len(vertices_valid), dtype=torch.bool, device=self.device)
occlusion = torch.zeros(len(vertices_valid), dtype=torch.bool, device=self.device)
# 批量处理所有层级
for level in reversed(range(len(depth_pyramid))):
scale = 2 ** level
current_depth = depth_pyramid[level]
h, w = current_depth.shape
# 直接计算投影坐标,避免矩阵乘法
x = vertices_valid[:, 0]
y = vertices_valid[:, 1]
z = vertices_valid[:, 2]
# 缩放的内参
fx_scaled = max(fx/(scale + 1e-6), 1e-6)
fy_scaled = max(fy/(scale + 1e-6), 1e-6)
cx_scaled = (cx - 0.5)/scale + 0.5
cy_scaled = (cy - 0.5)/scale + 0.5
# 投影计算
u = (x / z) * fx_scaled + cx_scaled
v = (y / z) * fy_scaled + cy_scaled
# 边界处理
u = torch.clamp(u, 0.0, float(w-1))
v = torch.clamp(v, 0.0, float(h-1))
# 转换为整数索引
u_idx = torch.clamp(torch.floor(u).long(), 0, w-1)
v_idx = torch.clamp(torch.floor(v).long(), 0, h-1)
# 批量采样深度值
depth_vals = current_depth[v_idx, u_idx]
# 批量深度比较
level_tol = 0.0008 * (2 ** level)
visible |= (z <= (depth_vals + level_tol))
occlusion |= (z > (depth_vals + level_tol))
# 映射回原始顶点数量
final_visible = torch.zeros(len(vertices_cam), dtype=torch.bool, device=self.device)
final_visible[valid_mask] = visible
final_occlusion = torch.zeros(len(vertices_cam), dtype=torch.bool, device=self.device)
final_occlusion[valid_mask] = occlusion
return final_visible, final_occlusion
def _compute_vertex_in_frustum_gpu(self, fx, fy, cx, cy, R, eye, height, width, depth_map, qvec, tvec):
"""GPU版本的视锥体计算和遮挡检测"""
print(f"开始 _compute_vertex_in_frustum_gpu")
# 直接使用get_w2c,避免重复计算
w2c = get_w2c(qvec, tvec)
# 确保w2c是4x4矩阵
if w2c.shape != (4, 4):
if w2c.shape == (3, 4):
w2c_4x4 = np.eye(4)
w2c_4x4[:3, :] = w2c
w2c = w2c_4x4
else:
raise ValueError(f"w2c matrix has unexpected shape: {w2c.shape}")
# 使用GPU张量
vertices = self.vertices_tensor.float()
ones = torch.ones(len(vertices), 1, device=self.device)
vertices_homo = torch.cat([vertices, ones], dim=1)
w2c_tensor = torch.tensor(w2c, device=self.device, dtype=torch.float32)
# 简化矩阵乘法
vertices_cam_homo = (w2c_tensor @ vertices_homo.T).T
vertices_cam = vertices_cam_homo[:, :3]
# 视锥体快速剔除
valid_z = vertices_cam[:, 2] > 0
tan_fov_x = (width / 2) / fx
tan_fov_y = (height / 2) / fy
x_ratio = vertices_cam[:, 0] / vertices_cam[:, 2]
y_ratio = vertices_cam[:, 1] / vertices_cam[:, 2]
frustum_mask = valid_z & (torch.abs(x_ratio) <= tan_fov_x) & (torch.abs(y_ratio) <= tan_fov_y)
# 构建深度金字塔
depth_pyramid = self._build_depth_pyramid_gpu(depth_map)
# 多级遮挡检测
visible_mask, occlusion_mask = self._hierarchical_occlusion_test_gpu(
vertices_cam, depth_pyramid, (fx, fy, cx, cy), (height, width)
)
final_visible = torch.zeros(len(vertices), dtype=torch.bool, device=self.device)
final_visible[frustum_mask] = visible_mask[frustum_mask]
final_occlusion = torch.zeros(len(vertices), dtype=torch.bool, device=self.device)
final_occlusion[frustum_mask] = occlusion_mask[frustum_mask]
# 转换为numpy数组返回
# return (final_visible.cpu().numpy().tolist(),
# self._occlusion_expansion_gpu(final_occlusion, vertices.cpu().numpy()))
# 转换为numpy数组返回
return (final_visible.cpu().numpy().tolist())
def _occlusion_expansion_gpu(self, occlusion_mask, vertices, radius=0.0008):
"""GPU版本的空间哈希遮挡扩展"""
if not isinstance(occlusion_mask, torch.Tensor):
occlusion_tensor = torch.from_numpy(occlusion_mask).to(self.device)
vertices_tensor = torch.from_numpy(vertices).to(self.device)
else:
occlusion_tensor = occlusion_mask
vertices_tensor = torch.from_numpy(vertices).to(self.device)
# 构建空间哈希
grid_size = radius * 2
quantized = (vertices_tensor / grid_size).long()
# 使用CUDA加速的哈希表
from collections import defaultdict
hash_table = defaultdict(list)
# 将数据移回CPU进行哈希构建(这部分在GPU上实现较复杂)
quantized_cpu = quantized.cpu().numpy()
for idx, (x, y, z) in enumerate(quantized_cpu):
hash_table[(x, y, z)].append(idx)
# 扩展遮挡区域
dilated_mask = occlusion_tensor.cpu().numpy().copy()
occluded_indices = np.where(occlusion_tensor.cpu().numpy())[0]
for idx in occluded_indices:
x, y, z = quantized_cpu[idx]
for dx in (-1, 0, 1):
for dy in (-1, 0, 1):
for dz in (-1, 0, 1):
neighbor_cell = (x+dx, y+dy, z+dz)
for neighbor_idx in hash_table.get(neighbor_cell, []):
dilated_mask[neighbor_idx] = True
return dilated_mask.tolist()
def _gen_depth_image_gpu(self, cam_data, render):
"""生成深度图(保持原样,因为Open3D渲染器可能不支持GPU)"""
# Open3D的渲染器目前主要在CPU上工作
return self._gen_depth_image(cam_data, render)
def _flag_model_gpu(self, camera_data, face_points=None):
# 确保使用正确的深度图生成方式
render = o3d.visualization.rendering.OffscreenRenderer(
camera_data['width'], camera_data['height'])
material = o3d.visualization.rendering.MaterialRecord()
render.scene.add_geometry("mesh", self.mesh, material)
# 生成深度图 - 确保与CPU版本一致
depth_image = self._gen_depth_image_gpu(camera_data, render)
# 使用与CPU版本相同的参数计算
R = self.qvec2rotmat(camera_data['qvec']).T
eye = -R @ camera_data['tvec']
# final_visible_list, final_occlusion_list = self._compute_vertex_in_frustum_gpu(
final_visible_list = self._compute_vertex_in_frustum_gpu(
camera_data['fx'], camera_data['fy'],
camera_data['cx'], camera_data['cy'],
R, eye,
camera_data['height'], camera_data['width'],
depth_image, camera_data['qvec'], camera_data['tvec']
)
# 确保使用正确的张量设备
final_visible_tensor = torch.tensor(final_visible_list, device=self.device)
triangles_tensor = self.triangles_tensor # 直接使用已加载的GPU张量
# 向量化计算面片可见性
v0_indices = triangles_tensor[:, 0]
v1_indices = triangles_tensor[:, 1]
v2_indices = triangles_tensor[:, 2]
v0_visible = final_visible_tensor[v0_indices]
v1_visible = final_visible_tensor[v1_indices]
v2_visible = final_visible_tensor[v2_indices]
face_visible = v0_visible | v1_visible | v2_visible
# 使用与CPU版本相同的后续处理
shrunk_visibility = self._shrink_face_visibility(face_visible.cpu().numpy(), 6)
expanded_visibility = self._expand_face_visibility(face_visible.cpu().numpy(), 30)
shrunk_visibility2 = self._shrink_face_visibility(face_visible.cpu().numpy(), 50)
expanded_edge = expanded_visibility & ~shrunk_visibility2
delete_edge = face_visible.cpu().numpy() & ~shrunk_visibility
return shrunk_visibility, expanded_edge, delete_edge
"""
def _gen_depth_image_gpu(self, cam_data, render):
# 复制CPU版本的逻辑
qvec = cam_data['qvec']
tvec = cam_data['tvec']
fx = cam_data['fx']
fy = cam_data['fy']
cx = cam_data['cx']
cy = cam_data['cy']
width = cam_data['width']
height = cam_data['height']
intrinsics = o3d.camera.PinholeCameraIntrinsic(
width, height, fx=fx, fy=fy, cx=cx, cy=cy)
w2c = get_w2c(qvec, tvec)
render.setup_camera(intrinsics, w2c)
depth = render.render_to_depth_image(z_in_view_space=True)
return np.asarray(depth) # 确保返回numpy数组
"""
def _mask_occlusion_gpu(self):
"""GPU版本的多相机遮挡检测"""
cameras = read_cameras_text(os.path.join(self.pose_path, "cameras.txt"))
images = read_images_text(os.path.join(self.pose_path, "images.txt"))
visible_faces_dict = {}
edge_faces_dict = {}
delete_edge_faces_dict = {}
total_start = time.time()
for n, img in enumerate(images.values()):
camera = cameras[img.camera_id]
camera_data = {
"qvec": img.qvec,
"tvec": img.tvec,
"fx": camera.params[0],
"fy": camera.params[1],
"cx": camera.params[2],
"cy": camera.params[3],
"width": camera.width,
"height": camera.height,
"name": img.name[:-4]
}
img_name = img.name[:-4]
print(f"处理图像 {img_name} ({n+1}/{len(images)})")
# if (img_name!="73_8" and img_name!="52_8" and img_name!="62_8"):
# if (img_name!="52_8" and img_name!="62_8"):
# if (img_name!="52_8"):
# continue
start_time = time.time()
face_visibility, face_edge, face_delete_edge = self._flag_model_gpu(camera_data)
processing_time = time.time() - start_time
visible_faces = np.where(face_visibility)[0].tolist()
visible_faces_dict[img_name] = visible_faces
edge_faces_dict[img_name] = np.where(face_edge)[0].tolist()
delete_edge_faces_dict[img_name] = np.where(face_delete_edge)[0].tolist()
print(f"图像 {img_name} 处理完成,耗时: {processing_time:.2f}秒,可见面数量{len(visible_faces)}")
total_time = time.time() - total_start
print(f"所有图像处理完成,总耗时: {total_time:.2f}")
print(f"平均每张图像耗时: {total_time/len(images):.2f}")
self.save_occlusion_data(visible_faces_dict, edge_faces_dict, delete_edge_faces_dict, self.asset_dir)
return {
"result1": visible_faces_dict,
"result2": edge_faces_dict,
"result3": delete_edge_faces_dict
}
#"""
def _build_face_adjacency(self):
if not self.mesh.triangles:
return
triangles = np.asarray(self.mesh.triangles)
num_faces = len(triangles)
self.face_adjacency = [[] for _ in range(num_faces)]
# 创建边到面片的映射
edge_face_map = {}
for idx, tri in enumerate(triangles):
# 获取三条边(排序顶点保证唯一性)
edges = [
tuple(sorted([tri[0], tri[1]])),
tuple(sorted([tri[1], tri[2]])),
tuple(sorted([tri[2], tri[0]]))
]
for edge in edges:
if edge not in edge_face_map:
edge_face_map[edge] = []
edge_face_map[edge].append(idx)
# 通过共享边建立邻接关系
for edge, faces in edge_face_map.items():
if len(faces) > 1: # 只处理共享边
for i in faces:
for j in faces:
if i != j and j not in self.face_adjacency[i]:
self.face_adjacency[i].append(j)
def _expand_face_visibility(self, face_visibility, shrink_radius = 1):
if self.face_adjacency is None:
return face_visibility.copy()
# 使用队列实现广度优先搜索的多层扩展
expanded = face_visibility.copy()
visited = set()
queue = collections.deque()
# 初始添加所有可见面片
for face_idx, is_visible in enumerate(face_visibility):
if is_visible:
queue.append((face_idx, 0)) # (面片索引, 当前扩展层数)
visited.add(face_idx)
self.expand_radius = shrink_radius
# 广度优先扩展
while queue:
current_idx, current_radius = queue.popleft()
# 如果当前扩展层数小于目标半径,继续扩展
if current_radius < self.expand_radius:
for neighbor_idx in self.face_adjacency[current_idx]:
# 仅处理未访问过的面片
if neighbor_idx not in visited:
expanded[neighbor_idx] = True
visited.add(neighbor_idx)
# 将邻居加入队列,扩展层数+1
queue.append((neighbor_idx, current_radius + 1))
return expanded
def _shrink_face_visibility(self, face_visibility, shrink_radius=1):
if self.face_adjacency is None or shrink_radius == 0:
return face_visibility.copy()
# 创建当前可见性副本
current_visible = face_visibility.copy()
# 创建边界队列
boundary_queue = collections.deque()
# 初始化:找出所有边界面片(可见但至少有一个邻居不可见)
for idx, is_visible in enumerate(current_visible):
if not is_visible:
continue
for neighbor_idx in self.face_adjacency[idx]:
if not current_visible[neighbor_idx]:
boundary_queue.append((idx, 1)) # (面片索引, 当前圈数)
break
# 分层剥离
removed = set()
while boundary_queue:
idx, current_radius = boundary_queue.popleft()
# 如果当前面片已被移除,跳过
if idx in removed:
continue
# 如果当前圈数已达到目标圈数,标记为移除
if current_radius <= shrink_radius:
current_visible[idx] = False
removed.add(idx)
# 检查邻居:如果邻居是可见的,且尚未被标记为边界
for neighbor_idx in self.face_adjacency[idx]:
if current_visible[neighbor_idx] and neighbor_idx not in removed:
# 如果邻居现在成为边界(因为当前面片被移除)
is_boundary = False
for n_neighbor_idx in self.face_adjacency[neighbor_idx]:
if not current_visible[n_neighbor_idx]:
is_boundary = True
break
if is_boundary:
boundary_queue.append((neighbor_idx, current_radius + 1))
return current_visible
#"""
@staticmethod
def qvec2rotmat(qvec):
"""四元数转旋转矩阵"""
return Rotation.from_quat([qvec[1], qvec[2], qvec[3], qvec[0]]).as_matrix()
def _compute_vertex_in_frustum(self, fx, fy, cx, cy, R, eye, height, width, depth_map, qvec, tvec):
"""基于深度金字塔的层级式遮挡检测"""
# 坐标转换
R = self.qvec2rotmat(qvec)
w2c = get_w2c(qvec, tvec)
vertices = np.asarray(self.mesh.vertices, dtype=np.float32)
vertices_homo = np.hstack([vertices, np.ones((len(vertices), 1))])
vertices_cam = (w2c @ vertices_homo.T).T[:, :3]
# 视锥体快速剔除
valid_z = vertices_cam[:, 2] > 0
tan_fov_x = (width / 2) / fx
tan_fov_y = (height / 2) / fy
x_ratio = vertices_cam[:, 0] / vertices_cam[:, 2]
y_ratio = vertices_cam[:, 1] / vertices_cam[:, 2]
frustum_mask = valid_z & (np.abs(x_ratio) <= tan_fov_x) & (np.abs(y_ratio) <= tan_fov_y)
# 构建深度金字塔
depth_pyramid = self._build_depth_pyramid(depth_map)
# 多级遮挡检测
visible_mask, occlusion_mask = self._hierarchical_occlusion_test(
# visible_mask, occlusion_mask, vertex_depth_difference = self._hierarchical_occlusion_test2(
vertices_cam[frustum_mask],
depth_pyramid,
(fx, fy, cx, cy),
(height, width)
)
final_visible= np.zeros(len(vertices), dtype=bool)
final_visible[frustum_mask] = visible_mask
final_occlusion = np.zeros(len(vertices), dtype=bool)
final_occlusion[frustum_mask] = occlusion_mask
# final_vertex_difference = np.zeros(len(vertices), dtype=bool)
# final_vertex_difference[frustum_mask] = vertex_depth_difference
# return final_visible.tolist(), self._occlusion_expansion(final_occlusion, vertices), final_vertex_difference.tolist()
return final_visible.tolist(), self._occlusion_expansion(final_occlusion, vertices)
def _build_depth_pyramid2(self, depth_map, levels=4):
"""构建深度图金字塔"""
pyramid = [depth_map.copy()]
current_level = depth_map
for _ in range(levels-1):
current_level = 0.25 * (current_level[::2, ::2] +
current_level[1::2, ::2] +
current_level[::2, 1::2] +
current_level[1::2, 1::2])
pyramid.append(current_level)
return pyramid
def _build_depth_pyramid(self, depth_map, levels=4):
pyramid = [depth_map.copy()]
current_level = depth_map
for _ in range(levels-1):
h, w = current_level.shape
# 确保尺寸可被2整除
if h % 2 != 0 or w % 2 != 0:
current_level = current_level[:h//2 * 2, :w//2 * 2] # 裁剪到最近偶尺寸
# 添加广播兼容性检查
if current_level[::2, ::2].shape != current_level[1::2, ::2].shape:
current_level = current_level[:h//2 * 2, :w//2 * 2]
current_level = 0.25 * (
current_level[::2, ::2] +
current_level[1::2, ::2] +
current_level[::2, 1::2] +
current_level[1::2, 1::2]
)
pyramid.append(current_level)
return pyramid
def _hierarchical_occlusion_test(self, vertices_cam, depth_pyramid, intrinsics, img_size):
"""层级式遮挡检测(安全版本)"""
fx, fy, cx, cy = intrinsics
height, width = img_size
# 1. 过滤无效顶点
valid_mask = vertices_cam[:, 2] > 1e-6
vertices_valid = vertices_cam[valid_mask]
if len(vertices_valid) == 0:
return (np.zeros(len(vertices_cam), dtype=bool),
np.zeros(len(vertices_cam), dtype=bool))
visible = np.zeros(len(vertices_valid), dtype=bool)
occlusion = np.zeros(len(vertices_valid), dtype=bool)
# 2. 层级检测
for level in reversed(range(len(depth_pyramid))):
scale = 2 ** level
current_depth = depth_pyramid[level]
h, w = current_depth.shape
# 安全构造内参矩阵
K = np.array([
[max(fx/(scale + 1e-6), 1e-6), 0, (cx - 0.5)/scale + 0.5],
[0, max(fy/(scale + 1e-6), 1e-6), (cy - 0.5)/scale + 0.5],
[0, 0, 1]
], dtype=np.float32)
# 投影计算
uv_homo = (K @ vertices_valid.T).T
uv = uv_homo[:, :2] / uv_homo[:, 2:3]
# 安全边界处理
u = np.clip(uv[:, 0], 0.0, float(w-1))
v = np.clip(uv[:, 1], 0.0, float(h-1))
# 转换为整数索引
u_idx = np.clip(np.floor(u).astype(np.int32), 0, w-1)
v_idx = np.clip(np.floor(v).astype(np.int32), 0, h-1)
# 采样深度值
depth_vals = current_depth[v_idx, u_idx]
# 深度比较
level_tol = 0.0008 * (2 ** level) # 0.005 0.0008
visible |= (vertices_valid[:, 2] <= (depth_vals + level_tol))
occlusion |= (vertices_valid[:, 2] > (depth_vals + level_tol))
# 3. 结果映射
final_visible = np.zeros(len(vertices_cam), dtype=bool)
final_visible[valid_mask] = visible
final_occlusion = np.zeros(len(vertices_cam), dtype=bool)
final_occlusion[valid_mask] = occlusion
return final_visible, final_occlusion
def _hierarchical_occlusion_test2(self, vertices_cam, depth_pyramid, intrinsics, img_size):
"""层级式遮挡检测(安全版本)"""
fx, fy, cx, cy = intrinsics
height, width = img_size
# 1. 过滤无效顶点
valid_mask = vertices_cam[:, 2] > 1e-6
vertices_valid = vertices_cam[valid_mask]
if len(vertices_valid) == 0:
return (np.zeros(len(vertices_cam), dtype=bool),
np.zeros(len(vertices_cam), dtype=bool),
np.zeros(len(vertices_cam))) # 返回空的深度差值数组
visible = np.zeros(len(vertices_valid), dtype=bool)
occlusion = np.zeros(len(vertices_valid), dtype=bool)
# 用于存储每个像素点的深度范围(最小值和最大值)
pixel_depth_min = {}
pixel_depth_max = {}
# 2. 层级检测
for level in reversed(range(len(depth_pyramid))):
scale = 2 ** level
current_depth = depth_pyramid[level]
h, w = current_depth.shape
# 安全构造内参矩阵
K = np.array([
[max(fx/(scale + 1e-6), 1e-6), 0, (cx - 0.5)/scale + 0.5],
[0, max(fy/(scale + 1e-6), 1e-6), (cy - 0.5)/scale + 0.5],
[0, 0, 1]
], dtype=np.float32)
# 投影计算
uv_homo = (K @ vertices_valid.T).T
uv = uv_homo[:, :2] / uv_homo[:, 2:3]
# 安全边界处理
u = np.clip(uv[:, 0], 0.0, float(w-1))
v = np.clip(uv[:, 1], 0.0, float(h-1))
# 转换为整数索引
u_idx = np.clip(np.floor(u).astype(np.int32), 0, w-1)
v_idx = np.clip(np.floor(v).astype(np.int32), 0, h-1)
# 采样深度值
depth_vals = current_depth[v_idx, u_idx]
# 只在最高分辨率层级(level=0)记录像素深度范围
if level == 0:
for i in range(len(u_idx)):
pixel_key = (u_idx[i], v_idx[i])
vertex_depth = vertices_valid[i, 2]
# 更新像素的最小深度值
if pixel_key not in pixel_depth_min or vertex_depth < pixel_depth_min[pixel_key]:
pixel_depth_min[pixel_key] = vertex_depth
# 更新像素的最大深度值
if pixel_key not in pixel_depth_max or vertex_depth > pixel_depth_max[pixel_key]:
pixel_depth_max[pixel_key] = vertex_depth
# 深度比较
level_tol = 0.0008 * (2 ** level) # 0.005 0.0008
visible |= (vertices_valid[:, 2] <= (depth_vals + level_tol))
occlusion |= (vertices_valid[:, 2] > (depth_vals + level_tol))
# 计算每个像素的深度差值(最大深度 - 最小深度)
pixel_depth_difference = {}
for pixel_key in pixel_depth_min:
if pixel_key in pixel_depth_max:
pixel_depth_difference[pixel_key] = pixel_depth_max[pixel_key] - pixel_depth_min[pixel_key]
# 为每个顶点分配对应的像素点深度差值
vertex_depth_difference = np.zeros(len(vertices_cam))
if level == 0: # 确保我们记录了深度范围
for i in range(len(vertices_valid)):
pixel_key = (u_idx[i], v_idx[i])
if pixel_key in pixel_depth_difference:
# 找到原始顶点索引
orig_idx = np.where(valid_mask)[0][i]
vertex_depth_difference[orig_idx] = pixel_depth_difference[pixel_key]
# 3. 结果映射
final_visible = np.zeros(len(vertices_cam), dtype=bool)
final_visible[valid_mask] = visible
final_occlusion = np.zeros(len(vertices_cam), dtype=bool)
final_occlusion[valid_mask] = occlusion
return final_visible, final_occlusion, vertex_depth_difference
def _hierarchical_occlusion_test3(self, vertices_cam, depth_pyramid, intrinsics, img_size):
"""层级式遮挡检测(安全版本)"""
fx, fy, cx, cy = intrinsics
height, width = img_size
# 1. 过滤无效顶点
valid_mask = vertices_cam[:, 2] > 1e-6
vertices_valid = vertices_cam[valid_mask]
if len(vertices_valid) == 0:
return (np.zeros(len(vertices_cam), dtype=bool),
np.zeros(len(vertices_cam), dtype=bool),
{}) # 返回空的深度差值字典
visible = np.zeros(len(vertices_valid), dtype=bool)
occlusion = np.zeros(len(vertices_valid), dtype=bool)
# 用于存储每个像素点的深度范围
pixel_depth_range = {}
# 用于存储每个顶点对应的像素坐标和深度差值
vertex_pixel_info = {}
# 2. 层级检测
for level in reversed(range(len(depth_pyramid))):
scale = 2 ** level
current_depth = depth_pyramid[level]
h, w = current_depth.shape
# 安全构造内参矩阵
K = np.array([
[max(fx/(scale + 1e-6), 1e-6), 0, (cx - 0.5)/scale + 0.5],
[0, max(fy/(scale + 1e-6), 1e-6), (cy - 0.5)/scale + 0.5],
[0, 0, 1]
], dtype=np.float32)
# 投影计算
uv_homo = (K @ vertices_valid.T).T
uv = uv_homo[:, :2] / uv_homo[:, 2:3]
# 安全边界处理
u = np.clip(uv[:, 0], 0.0, float(w-1))
v = np.clip(uv[:, 1], 0.0, float(h-1))
# 转换为整数索引
u_idx = np.clip(np.floor(u).astype(np.int32), 0, w-1)
v_idx = np.clip(np.floor(v).astype(np.int32), 0, h-1)
# 采样深度值
depth_vals = current_depth[v_idx, u_idx]
# 记录每个像素点的深度范围(只在最高分辨率层级记录)
# if level == 0: # 只在原始分辨率层级记录
if True:
for i in range(len(u_idx)):
vertex_idx = np.where(valid_mask)[0][i] # 获取原始顶点索引
pixel_key = (u_idx[i], v_idx[i])
# 记录顶点对应的像素坐标
vertex_pixel_info[vertex_idx] = pixel_key
# 记录像素点的深度范围
if pixel_key not in pixel_depth_range:
pixel_depth_range[pixel_key] = {
'min': vertices_valid[i, 2], # 顶点深度
'max': vertices_valid[i, 2], # 顶点深度
'count': 1
}
else:
pixel_depth_range[pixel_key]['min'] = min(
pixel_depth_range[pixel_key]['min'], vertices_valid[i, 2])
pixel_depth_range[pixel_key]['max'] = max(
pixel_depth_range[pixel_key]['max'], vertices_valid[i, 2])
pixel_depth_range[pixel_key]['count'] += 1
# 深度比较
level_tol = 0.0008 * (2 ** level) # 0.005 0.0008
visible |= (vertices_valid[:, 2] <= (depth_vals + level_tol))
occlusion |= (vertices_valid[:, 2] > (depth_vals + level_tol))
# 计算每个像素点的深度差值
pixel_depth_difference = {}
for pixel_key, depth_range in pixel_depth_range.items():
pixel_depth_difference[pixel_key] = depth_range['max'] - depth_range['min']
# 为每个顶点分配对应的像素点深度差值
vertex_depth_difference = np.zeros(len(vertices_cam))
for vertex_idx, pixel_key in vertex_pixel_info.items():
if pixel_key in pixel_depth_difference:
vertex_depth_difference[vertex_idx] = pixel_depth_difference[pixel_key]
# 3. 结果映射
final_visible = np.zeros(len(vertices_cam), dtype=bool)
final_visible[valid_mask] = visible
final_occlusion = np.zeros(len(vertices_cam), dtype=bool)
final_occlusion[valid_mask] = occlusion
return final_visible, final_occlusion, vertex_depth_difference
def _occlusion_expansion(self, occlusion_mask, vertices, radius=0.0008):
"""基于空间哈希的快速遮挡扩展"""
from collections import defaultdict
# 构建空间哈希
grid_size = radius * 2
hash_table = defaultdict(list)
# 量化顶点坐标
quantized = (vertices / grid_size).astype(int)
for idx, (x, y, z) in enumerate(quantized):
hash_table[(x, y, z)].append(idx)
# 扩展遮挡区域
dilated_mask = occlusion_mask.copy()
occluded_indices = np.where(occlusion_mask)[0]
for idx in occluded_indices:
x, y, z = quantized[idx]
# 查询邻近27个网格
for dx in (-1, 0, 1):
for dy in (-1, 0, 1):
for dz in (-1, 0, 1):
neighbor_cell = (x+dx, y+dy, z+dz)
dilated_mask[hash_table.get(neighbor_cell, [])] = True
return dilated_mask.tolist()
def _gen_depth_image(self, cam_data, render):
"""生成深度图"""
qvec = cam_data['qvec']
tvec = cam_data['tvec']
fx = cam_data['fx']
fy = cam_data['fy']
cx = cam_data['cx']
cy = cam_data['cy']
width = cam_data['width']
height = cam_data['height']
intrinsics = o3d.camera.PinholeCameraIntrinsic(
width, height, fx=fx, fy=fy, cx=cx, cy=cy)
w2c = get_w2c(qvec, tvec)
# print(np.linalg.inv(w2c))
# 配置渲染器
render.setup_camera(intrinsics, w2c)
depth = render.render_to_depth_image(z_in_view_space=True)
return np.asarray(depth)
def sort_vertices(vertices_original):
return sorted(
(v for v in vertices_original),
key=lambda v: (v.co.x, v.co.y, v.co.z)
)
def _flag_model(self, camera_data, face_points):
"""标记可见顶点"""
vertex_visible = []
vertex_occlusion = []
depth_images = []
render = o3d.visualization.rendering.OffscreenRenderer(camera_data['width'], camera_data['height'])
material = o3d.visualization.rendering.MaterialRecord()
render.scene.add_geometry("mesh", self.mesh, material)
# 生成深度图
depth_image = self._gen_depth_image(camera_data, render)
# 计算可见性
R = self.qvec2rotmat(camera_data['qvec']).T
eye = -R @ camera_data['tvec']
# eye = camera_data['tvec']
# final_visible_list, final_occlusion_list, final_vertex_difference_list = self._compute_vertex_in_frustum(
final_visible_list, final_occlusion_list = self._compute_vertex_in_frustum(
camera_data['fx'], camera_data['fy'],
camera_data['cx'], camera_data['cy'],
R, eye,
camera_data['height'], camera_data['width'],
depth_image,camera_data['qvec'], camera_data['tvec']
)
print("_flag_model", len(final_occlusion_list), len(self.mesh.vertices), len(self.mesh.vertex_colors))
# 获取三角形面片数组
triangles = np.asarray(self.mesh.triangles)
face_visible_bitmap = np.zeros(len(triangles), dtype=bool)
# 遍历所有面片
for face_idx, face in enumerate(triangles):
v0, v1, v2 = face
face_visible_bitmap[face_idx] = any([ # any all
final_visible_list[v0],
final_visible_list[v1],
final_visible_list[v2]
])
shrunk_visibility = self._shrink_face_visibility(face_visible_bitmap, 6) # 6 10
expanded_visibility = self._expand_face_visibility(face_visible_bitmap, 30)
shrunk_visibility2 = self._shrink_face_visibility(face_visible_bitmap, 50)
expanded_edge = expanded_visibility & ~shrunk_visibility2
delete_edge = face_visible_bitmap & ~shrunk_visibility
return shrunk_visibility, expanded_edge, delete_edge
def _flag_contour(self, camera_data, face_points):
"""标记可见顶点"""
vertex_visible = []
vertex_occlusion = []
depth_images = []
render = o3d.visualization.rendering.OffscreenRenderer(camera_data['width'], camera_data['height'])
material = o3d.visualization.rendering.MaterialRecord()
render.scene.add_geometry("mesh", self.mesh, material)
# 生成深度图
depth_image = self._gen_depth_image(camera_data, render)
# 获取相机参数
fx = camera_data['fx']
fy = camera_data['fy']
cx = camera_data['cx']
cy = camera_data['cy']
height = camera_data['height']
width = camera_data['width']
# 计算顶点在相机空间中的坐标
w2c = get_w2c(camera_data['qvec'], camera_data['tvec'])
vertices = np.asarray(self.mesh.vertices)
vertices_homo = np.hstack([vertices, np.ones((len(vertices), 1))])
vertices_cam = (w2c @ vertices_homo.T).T[:, :3]
# 过滤掉相机后面的顶点
valid_mask = vertices_cam[:, 2] > 0
vertices_valid = vertices_cam[valid_mask]
# 投影顶点到图像平面
u = (vertices_valid[:, 0] * fx / vertices_valid[:, 2] + cx)
v = (vertices_valid[:, 1] * fy / vertices_valid[:, 2] + cy)
u_idx = np.clip(np.floor(u).astype(int), 0, width-1)
v_idx = np.clip(np.floor(v).astype(int), 0, height-1)
# 初始化 min_depth_map 和 max_depth_map
min_depth_map = np.full((height, width), np.inf)
max_depth_map = np.zeros((height, width))
# 更新 min_depth_map 和 max_depth_map
for i in range(len(vertices_valid)):
x = u_idx[i]
y = v_idx[i]
d = vertices_valid[i, 2]
if d < min_depth_map[y, x]:
min_depth_map[y, x] = d
if d > max_depth_map[y, x]:
max_depth_map[y, x] = d
# 对于每个顶点,检查深度范围
edge_vertices = np.zeros(len(vertices), dtype=bool)
threshold = 3 # 阈值,可根据需要调整
for i in range(len(vertices_valid)):
x = u_idx[i]
y = v_idx[i]
if min_depth_map[y, x] < np.inf: # 确保有数据
depth_range = max_depth_map[y, x] - min_depth_map[y, x]
if depth_range > threshold:
# 找到原始顶点索引
orig_idx = np.where(valid_mask)[0][i]
edge_vertices[orig_idx] = False
# 标记边缘顶点
vertex_colors = np.asarray(self.mesh.vertex_colors)
for i in range(len(vertices)):
if edge_vertices[i]:
vertex_colors[i] = [1.0, 0.0, 0.0] # 红色表示边缘
# 保存模型
output_path = f"{self.asset_dir}/mesh_{self.id}_edge.ply"
o3d.io.write_triangle_mesh(output_path, self.mesh)
print(f"Edge detection completed. Results saved to {output_path}")
# 计算面片的边缘性
triangles = np.asarray(self.mesh.triangles)
face_edge = np.zeros(len(triangles), dtype=bool)
for face_idx, face in enumerate(triangles):
if any(edge_vertices[face]):
face_edge[face_idx] = True
# 为了兼容原有代码,返回面片可见性和边缘性
# 注意:这里face_visible_bitmap未定义,但原有代码可能期望返回两个值
# 如果需要面片可见性,可以保留原有逻辑,但这里简化处理
face_visible_bitmap = np.ones(len(triangles), dtype=bool) # 临时填充
return face_visible_bitmap, face_edge
"""
def _mask_face_occlusion(self):
# 读取相机数据
cameras = read_cameras_text(os.path.join(self.pose_path, "cameras.txt"))
images = read_images_text(os.path.join(self.pose_path, "images.txt"))
# cameras = read_cameras_text(os.path.join(self.pose_path, "backup_cameras.txt"))
# images = read_images_text(os.path.join(self.pose_path, "backup_images.txt"))
face_points_sorted_path = os.path.join(self.pose_path, "face_points_sorted.txt")
print("face_points_sorted_path=", face_points_sorted_path)
#face_points = read_int_text(face_points_sorted_path)
face_points = read_indices_from_file(face_points_sorted_path)
# face_points = {}
camera_data = {}
for img in images.values():
if self.mask_image == img.name[:-4]:
camera = cameras[img.camera_id]
camera_data = {
"qvec": img.qvec,
"tvec": img.tvec,
"fx": camera.params[0],
"fy": camera.params[1],
"cx": camera.params[2],
"cy": camera.params[3],
"width": camera.width,
"height": camera.height,
"name": img.name[:-4]
}
# print(face_points)
self._flag_model(camera_data, face_points)
"""
def _mask_occlusion(self):
# 读取相机数据
cameras = read_cameras_text(os.path.join(self.pose_path, "cameras.txt"))
images = read_images_text(os.path.join(self.pose_path, "images.txt"))
camera_data = {}
countour_faces_dict = {}
visible_faces_dict = {}
edge_faces_dict = {}
delete_edge_faces_dict = {}
total_start = time.time()
n = 0
for img in images.values():
camera = cameras[img.camera_id]
camera_data = {
"qvec": img.qvec,
"tvec": img.tvec,
"fx": camera.params[0],
"fy": camera.params[1],
"cx": camera.params[2],
"cy": camera.params[3],
"width": camera.width,
"height": camera.height,
"name": img.name[:-4]
}
img_name = img.name[:-4]
# if (img_name!="73_8" and img_name!="52_8" and img_name!="62_8"):
# if (img_name!="52_8" and img_name!="62_8"):
# if (img_name!="52_8"):
# continue
start_time = time.time()
face_visibility, face_edge, face_delete_edge = self._flag_model(camera_data, None)
processing_time = time.time() - start_time
visible_faces = np.where(face_visibility)[0].tolist()
visible_faces_dict[img.name[:-4]] = visible_faces
edge_faces_dict[img.name[:-4]] = np.where(face_edge)[0].tolist()
delete_edge_faces_dict[img.name[:-4]] = np.where(face_delete_edge)[0].tolist()
n += 1
print(f"图像={img_name},耗时={processing_time:.2f}秒,可见面数={len(visible_faces)}")
total_time = time.time() - total_start
print(f"所有图像处理完成,总耗时: {total_time:.2f}")
print(f"平均每张图像耗时: {total_time/len(images):.2f}")
self.save_occlusion_data(visible_faces_dict, edge_faces_dict, delete_edge_faces_dict, self.asset_dir)
return {"result1": visible_faces_dict, "result2": edge_faces_dict, "result3": delete_edge_faces_dict}
def save_occlusion_data(self, result1: Dict[str, List[int]],
result2: Dict[str, List[int]],
result3: Dict[str, List[int]],
base_path: str) -> None:
"""
保存遮挡数据到文件
Args:
result1: 可见面字典,包含图像名称和对应的可见面列表
result2: 边面字典,包含图像名称和对应的边面列表
result3: 删除边面字典,包含图像名称和对应的删除边面列表
base_path: 基础文件路径
"""
print(f"save_occlusion_data {base_path}, {len(result1)}, {len(result2)}, {len(result3)}")
# 处理返回的可见面字典 - 转换为图像名到面编号集合的映射
visible_faces_map: Dict[str, Set[int]] = {}
for image_name, face_list in result1.items():
visible_faces_map[image_name] = set(face_list)
# 计算所有可见面的并集
face_visible_relative: Set[int] = set()
for face_set in visible_faces_map.values():
face_visible_relative.update(face_set)
# 处理边面字典
edge_faces_map: Dict[str, Set[int]] = {}
for image_name, face_list in result2.items():
edge_faces_map[image_name] = set(face_list)
# 处理删除边面字典
delete_edge_faces_map: Dict[str, Set[int]] = {}
for image_name, face_list in result3.items():
delete_edge_faces_map[image_name] = set(face_list)
# 保存 visible_faces_map
try:
with open(base_path + "_visible_faces_map.txt", "w", encoding='utf-8') as map_file:
for image_name, face_set in visible_faces_map.items():
# 写入图像名称和所有面ID,用空格分隔
line = image_name + " " + " ".join(str(face) for face in face_set) + "\n"
map_file.write(line)
except IOError as e:
print(f"Error writing visible_faces_map file: {e}")
# 保存 face_visible_relative
try:
with open(base_path + "_face_visible_relative.txt", "w", encoding='utf-8') as relative_file:
for face in face_visible_relative:
relative_file.write(str(face) + "\n")
except IOError as e:
print(f"Error writing face_visible_relative file: {e}")
# 保存 edge_faces_map
try:
with open(base_path + "_edge_faces_map.txt", "w", encoding='utf-8') as map_file2:
for image_name, face_set in edge_faces_map.items():
line = image_name + " " + " ".join(str(face) for face in face_set) + "\n"
map_file2.write(line)
except IOError as e:
print(f"Error writing edge_faces_map file: {e}")
# 保存 delete_edge_faces_map
try:
with open(base_path + "_delete_edge_faces_map.txt", "w", encoding='utf-8') as map_file3:
for image_name, face_set in delete_edge_faces_map.items():
line = image_name + " " + " ".join(str(face) for face in face_set) + "\n"
map_file3.write(line)
except IOError as e:
print(f"Error writing delete_edge_faces_map file: {e}")
def process(self):
print("process")
self.load_model()
try:
# 处理物理相机生成遮挡判断
# return self._mask_occlusion()
return self._mask_occlusion_gpu()
except Exception as e:
print(f"Error during processing: {str(e)}")
raise
if __name__ == "__main__":
ModelProcessor().process()