You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

551 lines
21 KiB

import torch
import argparse
import os
import logging
import subprocess
import numpy as np
import cv2
import time
from collections import defaultdict
import tqdm
from multiprocessing import Pool, cpu_count
import multiprocessing
from typing import List, Tuple, Dict # 如果还未导入
def read_vertices(obj_path):
vertices = []
with open(obj_path, 'r') as file:
lines = file.readlines()
for line in lines:
if line.startswith('v '): # 顶点坐标
vertices.append(list(map(float, line.split()[1:4])))
vertices = torch.tensor(vertices)
return vertices
def read_uvs(obj_path):
uv_coordinates = []
with open(obj_path, 'r') as file:
lines = file.readlines()
for line in lines:
if line.startswith('vt '): # UV 坐标
uv_coordinates.append(list(map(float, line.split()[1:3])))
uv_coordinates = torch.tensor(uv_coordinates)
return uv_coordinates
def read_faces(obj_path):
vertex_indices = []
uv_indices = []
with open(obj_path, 'r') as file:
lines = file.readlines()
for line in lines:
if line.startswith('f '): # 面
parts = line.split()[1:]
v_indices = []
uv_indices_temp = []
for face in parts:
v, vt = map(int, face.split('/')[:2])
v_indices.append(v - 1)
uv_indices_temp.append(vt - 1)
vertex_indices.append(v_indices)
uv_indices.append(uv_indices_temp)
vertex_indices = torch.tensor(vertex_indices)
uv_indices = torch.tensor(uv_indices)
return vertex_indices, uv_indices
def read_missing_faces(missing_faces_path):
with open(missing_faces_path, 'r') as file:
lines = file.readlines()
missing_color_faces = torch.tensor(
[int(line.strip()) for line in lines]
)
return missing_color_faces
def read_uv_map(input_texture_path):
uv_map = cv2.imread(input_texture_path)
uv_map = cv2.cvtColor(uv_map, cv2.COLOR_BGR2RGB)
uv_map = torch.from_numpy(uv_map)
return uv_map
def parse_obj_file_and_uv_map(obj_path, missing_faces_path, input_texture_path, device):
print(f"Reading OBJ file: {obj_path}")
# vertices = []
# uv_coordinates = []
# vertex_indices = []
# uv_indices = []
# multiprocessing.set_start_method('spawn', force=True)
# multiprocessing.freeze_support()
start_time = time.time()
p = Pool(5)
uv_map_result = p.apply_async(read_uv_map, (input_texture_path,))
vertices_result = p.apply_async(read_vertices, (obj_path,))
uv_coordinates_result = p.apply_async(read_uvs, (obj_path,))
faces_result = p.apply_async(read_faces, (obj_path,))
missing_faces_result = p.apply_async(read_missing_faces, (missing_faces_path,))
p.close()
p.join()
vertices = vertices_result.get()
uv_coordinates = uv_coordinates_result.get()
vertex_indices, uv_indices = faces_result.get()
missing_color_faces = missing_faces_result.get()
uv_map = uv_map_result.get()
vertices = vertices.to(device)
uv_coordinates = uv_coordinates.to(device)
vertex_indices = vertex_indices.to(device)
uv_indices = uv_indices.to(device)
missing_color_faces = missing_color_faces.to(device)
uv_map = uv_map.to(device)
end_time = time.time()
print(f"using: {end_time - start_time} seconds")
# exit()
print("Converting to tensors...")
return vertices, uv_coordinates, vertex_indices, uv_indices, missing_color_faces, uv_map
def write_obj_with_uv_coordinates(filename, vertices, uvs, vertex_indices, uv_indices):
"""
高性能OBJ文件写入函数
Parameters:
filename (str): 输出OBJ文件路径
vertices (np.ndarray): 顶点数组
uvs (np.ndarray): UV坐标数组
vertex_indices (np.ndarray): 面的顶点索引
uv_indices (np.ndarray): 面的UV索引
"""
# 估算数据大小(以字节为单位)
estimated_size = (
len(vertices) * 40 + # 每个顶点约40字节 (v x.xxxxxx y.xxxxxx z.xxxxxx\n)
len(uvs) * 30 + # 每个UV坐标约30字节 (vt x.xxxxxx y.xxxxxx\n)
len(vertex_indices) * 40 # 每个面约40字节 (f v1/vt1 v2/vt2 v3/vt3\n)
)
# 设置缓冲区大小为估算大小的1.2倍,最小256MB,最大1GB
buffer_size = min(max(int(estimated_size * 1.2), 256 * 1024 * 1024), 1024 * 1024 * 1024)
# 使用格式化字符串和列表推导式优化字符串生成
vertex_lines = ['v %.6f %.6f %.6f' % (v[0], v[1], v[2]) for v in vertices]
uv_lines = ['vt %.6f %.6f' % (uv[0], uv[1]) for uv in uvs]
# 优化face数据处理
face_lines = []
face_format = 'f %d/%d %d/%d %d/%d'
for v_idx, uv_idx in zip(vertex_indices, uv_indices):
face_lines.append(face_format % (
v_idx[0] + 1, uv_idx[0] + 1,
v_idx[1] + 1, uv_idx[1] + 1,
v_idx[2] + 1, uv_idx[2] + 1
))
# 使用join一次性构建完整内容
content = ['mtllib mesh.mtl'] + vertex_lines + [''] + uv_lines + [''] + ['usemtl material_0'] + face_lines
# 一次性写入所有数据
with open(filename, 'w', buffering=buffer_size) as f:
f.write('\n'.join(content))
def load_regions(filename):
regions = []
with open(filename, 'r') as file:
for line in file:
parts = line.split(";")
if len(parts) != 2:
continue # Skip any lines that don't have exactly two parts
first_set = set(int(x) for x in parts[0].strip().split())
second_set = set(int(x) for x in parts[1].strip().split())
regions.append((first_set, second_set))
return regions
def build_face_adjacency(vertices, faces):
"""
构建面的邻接关系,基于共享边
Args:
vertices: 顶点数组
faces: 面片索引数组 (N x 3)
Returns:
dict: 面片邻接关系字典,key为面片索引,value为邻接面片索引列表
"""
# 将faces转换为numpy数组以加快处理速度
faces = np.asarray(faces)
num_faces = len(faces)
# 为每个面创建所有边 (Nx3x2)
edges = np.stack([
np.column_stack((faces[:, i], faces[:, (i + 1) % 3]))
for i in range(3)
], axis=1)
# 确保边的方向一致 (较小的顶点索引在前)
edges.sort(axis=2)
# 将边展平为 (Nx3, 2) 的形状
edges = edges.reshape(-1, 2)
# 创建边到面的映射
edge_faces = np.repeat(np.arange(num_faces), 3)
# 使用复合键对边进行排序
edge_keys = edges[:, 0] * vertices.shape[0] + edges[:, 1]
sort_idx = np.argsort(edge_keys)
edges = edges[sort_idx]
edge_faces = edge_faces[sort_idx]
# 找到重复的边(共享边)
same_edges = (edge_keys[sort_idx][1:] == edge_keys[sort_idx][:-1])
edge_start_idx = np.where(same_edges)[0]
# 构建邻接字典
face_adjacency = defaultdict(list)
for idx in edge_start_idx:
face1, face2 = edge_faces[idx], edge_faces[idx + 1]
face_adjacency[face1].append(face2)
face_adjacency[face2].append(face1)
return dict(face_adjacency)
def find_groups_and_subgroups(face_adjacency, missing_faces):
"""
找到相连的面组和它们的邻接面
返回:
regions: 列表,每个元素是一个元组 (missing_faces_set, adjacent_faces_set),
与 load_regions() 函数返回格式保持一致
"""
missing_faces_set = set(missing_faces.cpu().numpy())
unused_faces = set(missing_faces.cpu().numpy())
regions = []
total_faces = len(unused_faces)
with tqdm.tqdm(total=total_faces, desc="Processing faces") as pbar:
while unused_faces:
start_face = unused_faces.pop()
current_group = {start_face}
current_subgroup = set()
stack = [start_face]
while stack:
face_idx = stack.pop()
for neighbor in face_adjacency.get(face_idx, []):
if neighbor in unused_faces:
current_group.add(neighbor)
unused_faces.remove(neighbor)
stack.append(neighbor)
elif neighbor not in missing_faces_set:
current_subgroup.add(neighbor)
regions.append((current_group, current_subgroup))
pbar.update(total_faces - len(unused_faces) - pbar.n)
# 输出统计信息
print(f"\nTotal regions: {len(regions)}")
print(f"Average missing faces group size: {sum(len(g[0]) for g in regions)/len(regions):.2f}")
print(f"Largest missing faces group size: {max(len(g[0]) for g in regions)}")
print(f"Smallest missing faces group size: {min(len(g[0]) for g in regions)}")
# 检查每个组是否都有邻接面
for i, (group, subgroup) in enumerate(regions):
if not subgroup:
print(f"Warning: Region {i} with {len(group)} missing faces has no adjacent faces!")
return regions
def compute_regions_face_colors(
regions: List[Tuple[set, set]],
uv_map: torch.Tensor,
uvs: torch.Tensor,
face_uv_indices: torch.Tensor,
device: str
) -> Dict[int, torch.Tensor]:
"""
根据每个区域的边缘面UV坐标计算加权平均的颜色,
当无有效采样时更新对应face_uv_indices。
参数:
regions (List[Tuple[set, set]]): 每个区域为 (缺失面集合, 邻接面集合)
uv_map (torch.Tensor): 原始纹理贴图,RGB格式
uvs (torch.Tensor): 原始UV坐标
face_uv_indices (torch.Tensor): 每个面对应的UV索引
device (str): 使用的设备("cuda""cpu")
返回:
Dict[int, torch.Tensor]: 键为区域索引,值为该区域加权平均计算得到的颜色(uint8)
"""
regions_face_color: Dict[int, torch.Tensor] = {}
for r_index, region in enumerate(tqdm.tqdm(regions, desc="Processing regions")):
region_faces_indexes = torch.tensor(list(region[0]), device=device)
region_edge_faces_indexes = torch.tensor(list(region[1]), device=device)
if len(region_edge_faces_indexes) == 0:
continue
# 获取边缘面的UV索引
edge_face_uv_indices = face_uv_indices[region_edge_faces_indexes]
# 使用三角形的质心UV坐标来采样颜色
triangle_uvs = uvs[edge_face_uv_indices] # shape: [num_faces, 3, 2]
centroid_uvs = triangle_uvs.mean(dim=1) # shape: [num_faces, 2]
# 将UV坐标转换为像素坐标
scale_tensor = torch.tensor([uv_map.shape[1] - 1, uv_map.shape[0] - 1], device=device)
pixel_coords = torch.round(centroid_uvs * scale_tensor)
pixel_coords[:, 1] = uv_map.shape[0] - 1 - pixel_coords[:, 1]
pixel_coords = pixel_coords.long().clamp(0, uv_map.shape[0] - 1)
# 直接采样质心位置的颜色
colors = uv_map[pixel_coords[:, 1], pixel_coords[:, 0]] # shape: [num_faces, 3]
# 使用面积加权平均来计算最终颜色
areas = torch.abs(
(triangle_uvs[:, 1, 0] - triangle_uvs[:, 0, 0]) * (triangle_uvs[:, 2, 1] - triangle_uvs[:, 0, 1]) -
(triangle_uvs[:, 2, 0] - triangle_uvs[:, 0, 0]) * (triangle_uvs[:, 1, 1] - triangle_uvs[:, 0, 1])
) * 0.5
if len(colors) > 0:
weighted_color = (colors.float() * areas.unsqueeze(1)).sum(dim=0) / areas.sum()
regions_face_color[r_index] = weighted_color.round().clamp(0, 255).to(torch.uint8)
else:
# 如果没有有效的采样点,使用第一个相邻面的UV坐标更新face_uv_indices
face_uv_indices[region_faces_indexes] = face_uv_indices[region_edge_faces_indexes[0]].unsqueeze(dim=0).clone()
return regions_face_color
def update_uv_map_and_indices(
uv_map: torch.Tensor,
uvs: torch.Tensor,
face_uv_indices: torch.Tensor,
regions: List[Tuple[set, set]],
regions_face_color: Dict[int, torch.Tensor],
device: str
) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
根据计算得到的区域颜色,更新UV贴图及对应的UV坐标,并批量更新face_uv_indices。
参数:
uv_map (torch.Tensor): 原始纹理贴图,RGB格式
uvs (torch.Tensor): 原始UV坐标
face_uv_indices (torch.Tensor): 原始面的UV索引
regions (List[Tuple[set, set]]): 每个区域为 (缺失面集合, 邻接面集合)
regions_face_color (Dict[int, torch.Tensor]): 每个区域计算得到的颜色
device (str): 使用的设备("cuda""cpu")
返回:
Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
new_uv_map: 更新后的UV贴图
uvs_updated: 更新后的UV坐标(拼接上新计算的UV)
face_uv_indices: 更新后的face UV索引
"""
total_regions = len(regions_face_color)
grid_size = uv_map.shape[1] // 3
all_c = torch.div(torch.arange(total_regions, device=device), grid_size, rounding_mode='floor')
all_r = torch.remainder(torch.arange(total_regions, device=device), grid_size)
# 创建新的颜色UV贴图
color_uv_map = torch.full((int(uv_map.shape[0] / 2), uv_map.shape[1], 3),
255, dtype=torch.uint8, device=device)
# 调整原始uvs的纵坐标
uvs[:, 1] = uvs[:, 1] * (2 / 3) + 1 / 3
# 批量创建所有颜色块的坐标
c_indices = all_c.unsqueeze(1).repeat(1, 9) * 3 + torch.tensor([0, 1, 2, 0, 1, 2, 0, 1, 2],
device=device).unsqueeze(0)
r_indices = all_r.unsqueeze(1).repeat(1, 9) * 3 + torch.tensor([0, 0, 0, 1, 1, 1, 2, 2, 2],
device=device).unsqueeze(0)
# 批量设置颜色
colors = torch.stack([color for _, color in sorted(regions_face_color.items(), key=lambda x: x[0])])
colors_repeated = colors.unsqueeze(1).repeat(1, 9, 1)
color_uv_map[c_indices.flatten(), r_indices.flatten()] = colors_repeated.reshape(-1, 3)
# 批量计算新的UV坐标
pixels = torch.stack([
all_r * 3 + 1,
uv_map.shape[0] + all_c * 3 + 1
], dim=1).to(device)
u_new = pixels[:, 0].float() / (uv_map.shape[1] - 1)
new_height = int(uv_map.shape[0] + uv_map.shape[0] / 2)
v_new = (new_height - 1 - pixels[:, 1].float()) / (new_height - 1)
new_uvs = torch.stack([u_new, v_new], dim=1)
# 更新UV坐标:拼接新计算的UV
uvs_updated = torch.cat([uvs, new_uvs], dim=0)
uv_coordinates_start = uvs_updated.shape[0] - total_regions
# 批量更新face_uv_indices
for i, (region_index, _) in enumerate(sorted(regions_face_color.items(), key=lambda x: x[0])):
region_faces_indexes = torch.tensor(list(regions[region_index][0]), device=device)
face_uv_indices[region_faces_indexes] = torch.full((1, 3), uv_coordinates_start + i, device=device)
# 合并原始UV贴图和新的颜色UV贴图
new_uv_map = torch.cat((uv_map, color_uv_map), dim=0)
return new_uv_map, uvs_updated, face_uv_indices
def group_regions_by_y_axis(
regions: List[Tuple[set, set]],
vertices: torch.Tensor,
triangle_vertex_indices: torch.Tensor,
device: str,
interval_size: float = 0.1
) -> Dict[int, List[int]]:
"""
将区域按照y轴高度分组
Args:
regions: 区域列表,每个区域为(缺失面集合, 邻接面集合)的元组
vertices: 顶点坐标张量
triangle_vertex_indices: 三角形顶点索引张量
device: 计算设备 ('cuda''cpu')
interval_size: y轴分组的间隔大小,默认为0.1
Returns:
Dict[int, List[int]]: 以y轴区间为键,区域索引列表为值的字典
"""
y_intervals = defaultdict(list)
for r_index, region in enumerate(regions):
region_faces_indexes = torch.tensor(list(region[0]), device=device)
# 计算面组的平均y轴位置
face_vertices = vertices[triangle_vertex_indices[region_faces_indexes]]
avg_y = face_vertices[:, :, 1].mean(dim=(0, 1))
# 根据y轴位置分配到对应区间
interval_key = int(avg_y // interval_size)
y_intervals[interval_key].append(r_index)
return dict(y_intervals)
def align_regions_colors(
regions_face_color: Dict[int, torch.Tensor],
y_intervals: Dict[int, List[int]],
regions: List[Tuple[set, set]]
) -> Dict[int, torch.Tensor]:
"""
对齐区间内的颜色
Args:
regions_face_color: 每个区域的颜色
y_intervals: 每个y轴区间的区域索引列表
Returns:
Dict[int, torch.Tensor]: 以y轴区间为键,颜色为值的字典
"""
# aligned_regions_face_color = {}
large_group_threshold_min = 5000
large_group_threshold_max = 100000
for interval_key, region_indices in y_intervals.items():
large_groups = []
# normal_groups = []
for r_index in region_indices:
region = regions[r_index]
if len(region[0]) >= large_group_threshold_min and len(region[0]) <= large_group_threshold_max:
large_groups.append((r_index, len(region[0]), regions_face_color[r_index]))
# 查找 large_groups 中 len(region[0]) 最大的组,并获取其颜色
if large_groups:
largest_group = max(large_groups, key=lambda x: x[1])
color: torch.Tensor = largest_group[2]
for large_group in large_groups:
regions_face_color[large_group[0]] = color
return regions_face_color
def process(input_obj_path, input_texture_path, missing_faces_path, output_obj_path, output_texture_path):
start_time = time.time()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
vertices, uvs, triangle_vertex_indices, face_uv_indices, missing_color_faces, uv_map = parse_obj_file_and_uv_map(
input_obj_path, missing_faces_path, input_texture_path, device=device)
# 构建面的邻接关系和找到区域
start_face_adjacency_time = time.time()
face_adjacency = build_face_adjacency(vertices.cpu().numpy(), triangle_vertex_indices.cpu().numpy())
end_face_adjacency_time = time.time()
print(f"face_adjacency using: {end_face_adjacency_time - start_face_adjacency_time} seconds")
start_find_groups_time = time.time()
regions = find_groups_and_subgroups(face_adjacency, missing_color_faces)
end_find_groups_time = time.time()
print(f"find_groups_and_subgroups using: {end_find_groups_time - start_find_groups_time} seconds")
start_texture_map_time = time.time()
# 使用新封装的函数计算每个区域的加权平均颜色
regions_face_color = compute_regions_face_colors(regions, uv_map, uvs, face_uv_indices, device)
end_texture_map_time = time.time()
print(f"texture_mapping_to_triangle using: {end_texture_map_time - start_texture_map_time} seconds")
# 按y轴区间分组
y_intervals = group_regions_by_y_axis(
regions,
vertices,
triangle_vertex_indices,
device
)
# 对齐区间内的颜色
regions_face_color = align_regions_colors(regions_face_color, y_intervals, regions)
# 更新UV贴图和面索引
start_color_map_time = time.time()
new_uv_map, uvs, face_uv_indices = update_uv_map_and_indices(uv_map, uvs, face_uv_indices, regions,
regions_face_color, device)
end_color_map_time = time.time()
print(f"color_mapping_to_triangle using: {end_color_map_time - start_color_map_time} seconds")
end_time = time.time()
print(f"using: {end_time - start_time} seconds")
# 写入OBJ和纹理贴图
start_write_time = time.time()
vertices_cpu = vertices.cpu().numpy()
uvs_cpu = uvs.cpu().numpy()
triangle_vertex_indices_cpu = triangle_vertex_indices.cpu().numpy()
face_uv_indices_cpu = face_uv_indices.cpu().numpy()
new_uv_map_cpu = new_uv_map.cpu().numpy()
new_uv_map_bgr = cv2.cvtColor(new_uv_map_cpu, cv2.COLOR_RGB2BGR)
with Pool(2) as p:
# 异步执行OBJ和纹理图写入操作
obj_future = p.apply_async(write_obj_with_uv_coordinates,
(output_obj_path, vertices_cpu, uvs_cpu,
triangle_vertex_indices_cpu, face_uv_indices_cpu))
img_future = p.apply_async(cv2.imwrite,
(output_texture_path, new_uv_map_bgr,
[cv2.IMWRITE_PNG_COMPRESSION, 3]))
obj_future.get()
img_future.get()
end_write_time = time.time()
end_time = time.time()
print(f"Total file writing time: {end_write_time - start_write_time:.2f} seconds")
print(f"using: {end_time - start_time} seconds")
def main():
parser = argparse.ArgumentParser(description='Process OBJ files to fix missing color faces.')
parser.add_argument('--input_obj', type=str, required = True, help='Path to the input OBJ file')
parser.add_argument('--input_texture', type=str, required = True, help='Path to the texture file')
parser.add_argument('--missing_faces', type=str, required = True, help='Path to the file with indices of missing color faces')
parser.add_argument('--output_obj', type=str, required = True, help='Path to the output OBJ file')
parser.add_argument('--output_texture', type=str, required = True, help='Path to the texture file')
args = parser.parse_args()
process(args.input_obj, args.input_texture, args.missing_faces, args.output_obj, args.output_texture)
if __name__ == '__main__':
main()