import torch import argparse import os import logging import subprocess import numpy as np import cv2 import time from collections import defaultdict import tqdm from multiprocessing import Pool, cpu_count import multiprocessing from typing import List, Tuple, Dict # 如果还未导入 def read_vertices(obj_path): vertices = [] with open(obj_path, 'r') as file: lines = file.readlines() for line in lines: if line.startswith('v '): # 顶点坐标 vertices.append(list(map(float, line.split()[1:4]))) vertices = torch.tensor(vertices) return vertices def read_uvs(obj_path): uv_coordinates = [] with open(obj_path, 'r') as file: lines = file.readlines() for line in lines: if line.startswith('vt '): # UV 坐标 uv_coordinates.append(list(map(float, line.split()[1:3]))) uv_coordinates = torch.tensor(uv_coordinates) return uv_coordinates def read_faces(obj_path): vertex_indices = [] uv_indices = [] with open(obj_path, 'r') as file: lines = file.readlines() for line in lines: if line.startswith('f '): # 面 parts = line.split()[1:] v_indices = [] uv_indices_temp = [] for face in parts: v, vt = map(int, face.split('/')[:2]) v_indices.append(v - 1) uv_indices_temp.append(vt - 1) vertex_indices.append(v_indices) uv_indices.append(uv_indices_temp) vertex_indices = torch.tensor(vertex_indices) uv_indices = torch.tensor(uv_indices) return vertex_indices, uv_indices def read_missing_faces(missing_faces_path): with open(missing_faces_path, 'r') as file: lines = file.readlines() missing_color_faces = torch.tensor( [int(line.strip()) for line in lines] ) return missing_color_faces def read_uv_map(input_texture_path): uv_map = cv2.imread(input_texture_path) uv_map = cv2.cvtColor(uv_map, cv2.COLOR_BGR2RGB) uv_map = torch.from_numpy(uv_map) return uv_map def parse_obj_file_and_uv_map(obj_path, missing_faces_path, input_texture_path, device): print(f"Reading OBJ file: {obj_path}") # vertices = [] # uv_coordinates = [] # vertex_indices = [] # uv_indices = [] # multiprocessing.set_start_method('spawn', force=True) # multiprocessing.freeze_support() start_time = time.time() p = Pool(5) uv_map_result = p.apply_async(read_uv_map, (input_texture_path,)) vertices_result = p.apply_async(read_vertices, (obj_path,)) uv_coordinates_result = p.apply_async(read_uvs, (obj_path,)) faces_result = p.apply_async(read_faces, (obj_path,)) missing_faces_result = p.apply_async(read_missing_faces, (missing_faces_path,)) p.close() p.join() vertices = vertices_result.get() uv_coordinates = uv_coordinates_result.get() vertex_indices, uv_indices = faces_result.get() missing_color_faces = missing_faces_result.get() uv_map = uv_map_result.get() vertices = vertices.to(device) uv_coordinates = uv_coordinates.to(device) vertex_indices = vertex_indices.to(device) uv_indices = uv_indices.to(device) missing_color_faces = missing_color_faces.to(device) uv_map = uv_map.to(device) end_time = time.time() print(f"using: {end_time - start_time} seconds") # exit() print("Converting to tensors...") return vertices, uv_coordinates, vertex_indices, uv_indices, missing_color_faces, uv_map def write_obj_with_uv_coordinates(filename, vertices, uvs, vertex_indices, uv_indices): """ 高性能OBJ文件写入函数 Parameters: filename (str): 输出OBJ文件路径 vertices (np.ndarray): 顶点数组 uvs (np.ndarray): UV坐标数组 vertex_indices (np.ndarray): 面的顶点索引 uv_indices (np.ndarray): 面的UV索引 """ # 估算数据大小(以字节为单位) estimated_size = ( len(vertices) * 40 + # 每个顶点约40字节 (v x.xxxxxx y.xxxxxx z.xxxxxx\n) len(uvs) * 30 + # 每个UV坐标约30字节 (vt x.xxxxxx y.xxxxxx\n) len(vertex_indices) * 40 # 每个面约40字节 (f v1/vt1 v2/vt2 v3/vt3\n) ) # 设置缓冲区大小为估算大小的1.2倍,最小256MB,最大1GB buffer_size = min(max(int(estimated_size * 1.2), 256 * 1024 * 1024), 1024 * 1024 * 1024) # 使用格式化字符串和列表推导式优化字符串生成 vertex_lines = ['v %.6f %.6f %.6f' % (v[0], v[1], v[2]) for v in vertices] uv_lines = ['vt %.6f %.6f' % (uv[0], uv[1]) for uv in uvs] # 优化face数据处理 face_lines = [] face_format = 'f %d/%d %d/%d %d/%d' for v_idx, uv_idx in zip(vertex_indices, uv_indices): face_lines.append(face_format % ( v_idx[0] + 1, uv_idx[0] + 1, v_idx[1] + 1, uv_idx[1] + 1, v_idx[2] + 1, uv_idx[2] + 1 )) # 使用join一次性构建完整内容 content = ['mtllib mesh.mtl'] + vertex_lines + [''] + uv_lines + [''] + ['usemtl material_0'] + face_lines # 一次性写入所有数据 with open(filename, 'w', buffering=buffer_size) as f: f.write('\n'.join(content)) def load_regions(filename): regions = [] with open(filename, 'r') as file: for line in file: parts = line.split(";") if len(parts) != 2: continue # Skip any lines that don't have exactly two parts first_set = set(int(x) for x in parts[0].strip().split()) second_set = set(int(x) for x in parts[1].strip().split()) regions.append((first_set, second_set)) return regions def build_face_adjacency(vertices, faces): """ 构建面的邻接关系,基于共享边 Args: vertices: 顶点数组 faces: 面片索引数组 (N x 3) Returns: dict: 面片邻接关系字典,key为面片索引,value为邻接面片索引列表 """ # 将faces转换为numpy数组以加快处理速度 faces = np.asarray(faces) num_faces = len(faces) # 为每个面创建所有边 (Nx3x2) edges = np.stack([ np.column_stack((faces[:, i], faces[:, (i + 1) % 3])) for i in range(3) ], axis=1) # 确保边的方向一致 (较小的顶点索引在前) edges.sort(axis=2) # 将边展平为 (Nx3, 2) 的形状 edges = edges.reshape(-1, 2) # 创建边到面的映射 edge_faces = np.repeat(np.arange(num_faces), 3) # 使用复合键对边进行排序 edge_keys = edges[:, 0] * vertices.shape[0] + edges[:, 1] sort_idx = np.argsort(edge_keys) edges = edges[sort_idx] edge_faces = edge_faces[sort_idx] # 找到重复的边(共享边) same_edges = (edge_keys[sort_idx][1:] == edge_keys[sort_idx][:-1]) edge_start_idx = np.where(same_edges)[0] # 构建邻接字典 face_adjacency = defaultdict(list) for idx in edge_start_idx: face1, face2 = edge_faces[idx], edge_faces[idx + 1] face_adjacency[face1].append(face2) face_adjacency[face2].append(face1) return dict(face_adjacency) def find_groups_and_subgroups(face_adjacency, missing_faces): """ 找到相连的面组和它们的邻接面 返回: regions: 列表,每个元素是一个元组 (missing_faces_set, adjacent_faces_set), 与 load_regions() 函数返回格式保持一致 """ missing_faces_set = set(missing_faces.cpu().numpy()) unused_faces = set(missing_faces.cpu().numpy()) regions = [] total_faces = len(unused_faces) with tqdm.tqdm(total=total_faces, desc="Processing faces") as pbar: while unused_faces: start_face = unused_faces.pop() current_group = {start_face} current_subgroup = set() stack = [start_face] while stack: face_idx = stack.pop() for neighbor in face_adjacency.get(face_idx, []): if neighbor in unused_faces: current_group.add(neighbor) unused_faces.remove(neighbor) stack.append(neighbor) elif neighbor not in missing_faces_set: current_subgroup.add(neighbor) regions.append((current_group, current_subgroup)) pbar.update(total_faces - len(unused_faces) - pbar.n) # 输出统计信息 print(f"\nTotal regions: {len(regions)}") print(f"Average missing faces group size: {sum(len(g[0]) for g in regions)/len(regions):.2f}") print(f"Largest missing faces group size: {max(len(g[0]) for g in regions)}") print(f"Smallest missing faces group size: {min(len(g[0]) for g in regions)}") # 检查每个组是否都有邻接面 for i, (group, subgroup) in enumerate(regions): if not subgroup: print(f"Warning: Region {i} with {len(group)} missing faces has no adjacent faces!") return regions def compute_regions_face_colors( regions: List[Tuple[set, set]], uv_map: torch.Tensor, uvs: torch.Tensor, face_uv_indices: torch.Tensor, device: str ) -> Dict[int, torch.Tensor]: """ 根据每个区域的边缘面UV坐标计算加权平均的颜色, 当无有效采样时更新对应face_uv_indices。 参数: regions (List[Tuple[set, set]]): 每个区域为 (缺失面集合, 邻接面集合) uv_map (torch.Tensor): 原始纹理贴图,RGB格式 uvs (torch.Tensor): 原始UV坐标 face_uv_indices (torch.Tensor): 每个面对应的UV索引 device (str): 使用的设备("cuda"或"cpu") 返回: Dict[int, torch.Tensor]: 键为区域索引,值为该区域加权平均计算得到的颜色(uint8) """ regions_face_color: Dict[int, torch.Tensor] = {} for r_index, region in enumerate(tqdm.tqdm(regions, desc="Processing regions")): region_faces_indexes = torch.tensor(list(region[0]), device=device) region_edge_faces_indexes = torch.tensor(list(region[1]), device=device) if len(region_edge_faces_indexes) == 0: continue # 获取边缘面的UV索引 edge_face_uv_indices = face_uv_indices[region_edge_faces_indexes] # 使用三角形的质心UV坐标来采样颜色 triangle_uvs = uvs[edge_face_uv_indices] # shape: [num_faces, 3, 2] centroid_uvs = triangle_uvs.mean(dim=1) # shape: [num_faces, 2] # 将UV坐标转换为像素坐标 scale_tensor = torch.tensor([uv_map.shape[1] - 1, uv_map.shape[0] - 1], device=device) pixel_coords = torch.round(centroid_uvs * scale_tensor) pixel_coords[:, 1] = uv_map.shape[0] - 1 - pixel_coords[:, 1] pixel_coords = pixel_coords.long().clamp(0, uv_map.shape[0] - 1) # 直接采样质心位置的颜色 colors = uv_map[pixel_coords[:, 1], pixel_coords[:, 0]] # shape: [num_faces, 3] # 使用面积加权平均来计算最终颜色 areas = torch.abs( (triangle_uvs[:, 1, 0] - triangle_uvs[:, 0, 0]) * (triangle_uvs[:, 2, 1] - triangle_uvs[:, 0, 1]) - (triangle_uvs[:, 2, 0] - triangle_uvs[:, 0, 0]) * (triangle_uvs[:, 1, 1] - triangle_uvs[:, 0, 1]) ) * 0.5 if len(colors) > 0: weighted_color = (colors.float() * areas.unsqueeze(1)).sum(dim=0) / areas.sum() regions_face_color[r_index] = weighted_color.round().clamp(0, 255).to(torch.uint8) else: # 如果没有有效的采样点,使用第一个相邻面的UV坐标更新face_uv_indices face_uv_indices[region_faces_indexes] = face_uv_indices[region_edge_faces_indexes[0]].unsqueeze(dim=0).clone() return regions_face_color def update_uv_map_and_indices( uv_map: torch.Tensor, uvs: torch.Tensor, face_uv_indices: torch.Tensor, regions: List[Tuple[set, set]], regions_face_color: Dict[int, torch.Tensor], device: str ) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: """ 根据计算得到的区域颜色,更新UV贴图及对应的UV坐标,并批量更新face_uv_indices。 参数: uv_map (torch.Tensor): 原始纹理贴图,RGB格式 uvs (torch.Tensor): 原始UV坐标 face_uv_indices (torch.Tensor): 原始面的UV索引 regions (List[Tuple[set, set]]): 每个区域为 (缺失面集合, 邻接面集合) regions_face_color (Dict[int, torch.Tensor]): 每个区域计算得到的颜色 device (str): 使用的设备("cuda"或"cpu") 返回: Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: new_uv_map: 更新后的UV贴图 uvs_updated: 更新后的UV坐标(拼接上新计算的UV) face_uv_indices: 更新后的face UV索引 """ total_regions = len(regions_face_color) grid_size = uv_map.shape[1] // 3 all_c = torch.div(torch.arange(total_regions, device=device), grid_size, rounding_mode='floor') all_r = torch.remainder(torch.arange(total_regions, device=device), grid_size) # 创建新的颜色UV贴图 color_uv_map = torch.full((int(uv_map.shape[0] / 2), uv_map.shape[1], 3), 255, dtype=torch.uint8, device=device) # 调整原始uvs的纵坐标 uvs[:, 1] = uvs[:, 1] * (2 / 3) + 1 / 3 # 批量创建所有颜色块的坐标 c_indices = all_c.unsqueeze(1).repeat(1, 9) * 3 + torch.tensor([0, 1, 2, 0, 1, 2, 0, 1, 2], device=device).unsqueeze(0) r_indices = all_r.unsqueeze(1).repeat(1, 9) * 3 + torch.tensor([0, 0, 0, 1, 1, 1, 2, 2, 2], device=device).unsqueeze(0) # 批量设置颜色 colors = torch.stack([color for _, color in sorted(regions_face_color.items(), key=lambda x: x[0])]) colors_repeated = colors.unsqueeze(1).repeat(1, 9, 1) color_uv_map[c_indices.flatten(), r_indices.flatten()] = colors_repeated.reshape(-1, 3) # 批量计算新的UV坐标 pixels = torch.stack([ all_r * 3 + 1, uv_map.shape[0] + all_c * 3 + 1 ], dim=1).to(device) u_new = pixels[:, 0].float() / (uv_map.shape[1] - 1) new_height = int(uv_map.shape[0] + uv_map.shape[0] / 2) v_new = (new_height - 1 - pixels[:, 1].float()) / (new_height - 1) new_uvs = torch.stack([u_new, v_new], dim=1) # 更新UV坐标:拼接新计算的UV uvs_updated = torch.cat([uvs, new_uvs], dim=0) uv_coordinates_start = uvs_updated.shape[0] - total_regions # 批量更新face_uv_indices for i, (region_index, _) in enumerate(sorted(regions_face_color.items(), key=lambda x: x[0])): region_faces_indexes = torch.tensor(list(regions[region_index][0]), device=device) face_uv_indices[region_faces_indexes] = torch.full((1, 3), uv_coordinates_start + i, device=device) # 合并原始UV贴图和新的颜色UV贴图 new_uv_map = torch.cat((uv_map, color_uv_map), dim=0) return new_uv_map, uvs_updated, face_uv_indices def group_regions_by_y_axis( regions: List[Tuple[set, set]], vertices: torch.Tensor, triangle_vertex_indices: torch.Tensor, device: str, interval_size: float = 0.1 ) -> Dict[int, List[int]]: """ 将区域按照y轴高度分组 Args: regions: 区域列表,每个区域为(缺失面集合, 邻接面集合)的元组 vertices: 顶点坐标张量 triangle_vertex_indices: 三角形顶点索引张量 device: 计算设备 ('cuda' 或 'cpu') interval_size: y轴分组的间隔大小,默认为0.1 Returns: Dict[int, List[int]]: 以y轴区间为键,区域索引列表为值的字典 """ y_intervals = defaultdict(list) for r_index, region in enumerate(regions): region_faces_indexes = torch.tensor(list(region[0]), device=device) # 计算面组的平均y轴位置 face_vertices = vertices[triangle_vertex_indices[region_faces_indexes]] avg_y = face_vertices[:, :, 1].mean(dim=(0, 1)) # 根据y轴位置分配到对应区间 interval_key = int(avg_y // interval_size) y_intervals[interval_key].append(r_index) return dict(y_intervals) def align_regions_colors( regions_face_color: Dict[int, torch.Tensor], y_intervals: Dict[int, List[int]], regions: List[Tuple[set, set]] ) -> Dict[int, torch.Tensor]: """ 对齐区间内的颜色 Args: regions_face_color: 每个区域的颜色 y_intervals: 每个y轴区间的区域索引列表 Returns: Dict[int, torch.Tensor]: 以y轴区间为键,颜色为值的字典 """ # aligned_regions_face_color = {} large_group_threshold_min = 5000 large_group_threshold_max = 100000 for interval_key, region_indices in y_intervals.items(): large_groups = [] # normal_groups = [] for r_index in region_indices: region = regions[r_index] if len(region[0]) >= large_group_threshold_min and len(region[0]) <= large_group_threshold_max: large_groups.append((r_index, len(region[0]), regions_face_color[r_index])) # 查找 large_groups 中 len(region[0]) 最大的组,并获取其颜色 if large_groups: largest_group = max(large_groups, key=lambda x: x[1]) color: torch.Tensor = largest_group[2] for large_group in large_groups: regions_face_color[large_group[0]] = color return regions_face_color def process(input_obj_path, input_texture_path, missing_faces_path, output_obj_path, output_texture_path): start_time = time.time() device = 'cuda' if torch.cuda.is_available() else 'cpu' vertices, uvs, triangle_vertex_indices, face_uv_indices, missing_color_faces, uv_map = parse_obj_file_and_uv_map( input_obj_path, missing_faces_path, input_texture_path, device=device) # 构建面的邻接关系和找到区域 start_face_adjacency_time = time.time() face_adjacency = build_face_adjacency(vertices.cpu().numpy(), triangle_vertex_indices.cpu().numpy()) end_face_adjacency_time = time.time() print(f"face_adjacency using: {end_face_adjacency_time - start_face_adjacency_time} seconds") start_find_groups_time = time.time() regions = find_groups_and_subgroups(face_adjacency, missing_color_faces) end_find_groups_time = time.time() print(f"find_groups_and_subgroups using: {end_find_groups_time - start_find_groups_time} seconds") start_texture_map_time = time.time() # 使用新封装的函数计算每个区域的加权平均颜色 regions_face_color = compute_regions_face_colors(regions, uv_map, uvs, face_uv_indices, device) end_texture_map_time = time.time() print(f"texture_mapping_to_triangle using: {end_texture_map_time - start_texture_map_time} seconds") # 按y轴区间分组 y_intervals = group_regions_by_y_axis( regions, vertices, triangle_vertex_indices, device ) # 对齐区间内的颜色 regions_face_color = align_regions_colors(regions_face_color, y_intervals, regions) # 更新UV贴图和面索引 start_color_map_time = time.time() new_uv_map, uvs, face_uv_indices = update_uv_map_and_indices(uv_map, uvs, face_uv_indices, regions, regions_face_color, device) end_color_map_time = time.time() print(f"color_mapping_to_triangle using: {end_color_map_time - start_color_map_time} seconds") end_time = time.time() print(f"using: {end_time - start_time} seconds") # 写入OBJ和纹理贴图 start_write_time = time.time() vertices_cpu = vertices.cpu().numpy() uvs_cpu = uvs.cpu().numpy() triangle_vertex_indices_cpu = triangle_vertex_indices.cpu().numpy() face_uv_indices_cpu = face_uv_indices.cpu().numpy() new_uv_map_cpu = new_uv_map.cpu().numpy() new_uv_map_bgr = cv2.cvtColor(new_uv_map_cpu, cv2.COLOR_RGB2BGR) with Pool(2) as p: # 异步执行OBJ和纹理图写入操作 obj_future = p.apply_async(write_obj_with_uv_coordinates, (output_obj_path, vertices_cpu, uvs_cpu, triangle_vertex_indices_cpu, face_uv_indices_cpu)) img_future = p.apply_async(cv2.imwrite, (output_texture_path, new_uv_map_bgr, [cv2.IMWRITE_PNG_COMPRESSION, 3])) obj_future.get() img_future.get() end_write_time = time.time() end_time = time.time() print(f"Total file writing time: {end_write_time - start_write_time:.2f} seconds") print(f"using: {end_time - start_time} seconds") def main(): parser = argparse.ArgumentParser(description='Process OBJ files to fix missing color faces.') parser.add_argument('--input_obj', type=str, required = True, help='Path to the input OBJ file') parser.add_argument('--input_texture', type=str, required = True, help='Path to the texture file') parser.add_argument('--missing_faces', type=str, required = True, help='Path to the file with indices of missing color faces') parser.add_argument('--output_obj', type=str, required = True, help='Path to the output OBJ file') parser.add_argument('--output_texture', type=str, required = True, help='Path to the texture file') args = parser.parse_args() process(args.input_obj, args.input_texture, args.missing_faces, args.output_obj, args.output_texture) if __name__ == '__main__': main()