commit 195a3406e038df57c4326b71c4dfe3791e17821a Author: Ke Han Date: Wed May 13 14:39:16 2026 +0800 Initial commit: add single_tools scripts diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6bfc347 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +__pycache__/ +*.pyc +*.pyo +*.pyd +*.swp +.DS_Store diff --git a/create_depth_fast.py b/create_depth_fast.py new file mode 100644 index 0000000..657ee1a --- /dev/null +++ b/create_depth_fast.py @@ -0,0 +1,1208 @@ +import os +os.environ["EGL_PLATFORM"] = "surfaceless" +import open3d as o3d +import numpy as np +import json +import argparse +from concurrent.futures import ThreadPoolExecutor +from typing import List, Tuple, Optional +# from scripts.colmap_loader import read_images_text, read_cameras_text +# +# Copyright (C) 2023, Inria +# GRAPHDECO research group, https://team.inria.fr/graphdeco +# All rights reserved. +# +# This software is free for non-commercial, research and evaluation use +# under the terms of the LICENSE.md file. +# +# For inquiries contact george.drettakis@inria.fr +# + +import numpy as np +import collections +import struct +import math +import os + +CameraModel = collections.namedtuple( + "CameraModel", ["model_id", "model_name", "num_params"] +) +Camera = collections.namedtuple("Camera", ["id", "model", "width", "height", "params"]) +BaseImage = collections.namedtuple( + "Image", ["id", "qvec", "tvec", "camera_id", "name", "xys", "point3D_ids"] +) +Point3D = collections.namedtuple( + "Point3D", ["id", "xyz", "rgb", "error", "image_ids", "point2D_idxs"] +) +CAMERA_MODELS = { + CameraModel(model_id=0, model_name="SIMPLE_PINHOLE", num_params=3), + CameraModel(model_id=1, model_name="PINHOLE", num_params=4), + CameraModel(model_id=2, model_name="SIMPLE_RADIAL", num_params=4), + CameraModel(model_id=3, model_name="RADIAL", num_params=5), + CameraModel(model_id=4, model_name="OPENCV", num_params=8), + CameraModel(model_id=5, model_name="OPENCV_FISHEYE", num_params=8), + CameraModel(model_id=6, model_name="FULL_OPENCV", num_params=12), + CameraModel(model_id=7, model_name="FOV", num_params=5), + CameraModel(model_id=8, model_name="SIMPLE_RADIAL_FISHEYE", num_params=4), + CameraModel(model_id=9, model_name="RADIAL_FISHEYE", num_params=5), + CameraModel(model_id=10, model_name="THIN_PRISM_FISHEYE", num_params=12), +} +CAMERA_MODEL_IDS = dict( + [(camera_model.model_id, camera_model) for camera_model in CAMERA_MODELS] +) +CAMERA_MODEL_NAMES = dict( + [(camera_model.model_name, camera_model) for camera_model in CAMERA_MODELS] +) + + +def qvec2rotmat(qvec): + return np.array( + [ + [ + 1 - 2 * qvec[2] ** 2 - 2 * qvec[3] ** 2, + 2 * qvec[1] * qvec[2] - 2 * qvec[0] * qvec[3], + 2 * qvec[3] * qvec[1] + 2 * qvec[0] * qvec[2], + ], + [ + 2 * qvec[1] * qvec[2] + 2 * qvec[0] * qvec[3], + 1 - 2 * qvec[1] ** 2 - 2 * qvec[3] ** 2, + 2 * qvec[2] * qvec[3] - 2 * qvec[0] * qvec[1], + ], + [ + 2 * qvec[3] * qvec[1] - 2 * qvec[0] * qvec[2], + 2 * qvec[2] * qvec[3] + 2 * qvec[0] * qvec[1], + 1 - 2 * qvec[1] ** 2 - 2 * qvec[2] ** 2, + ], + ] + ) + + +def rotmat2qvec(R): + Rxx, Ryx, Rzx, Rxy, Ryy, Rzy, Rxz, Ryz, Rzz = R.flat + K = ( + np.array( + [ + [Rxx - Ryy - Rzz, 0, 0, 0], + [Ryx + Rxy, Ryy - Rxx - Rzz, 0, 0], + [Rzx + Rxz, Rzy + Ryz, Rzz - Rxx - Ryy, 0], + [Ryz - Rzy, Rzx - Rxz, Rxy - Ryx, Rxx + Ryy + Rzz], + ] + ) + / 3.0 + ) + eigvals, eigvecs = np.linalg.eigh(K) + qvec = eigvecs[[3, 0, 1, 2], np.argmax(eigvals)] + if qvec[0] < 0: + qvec *= -1 + return qvec + + +class Image(BaseImage): + def qvec2rotmat(self): + return qvec2rotmat(self.qvec) + + +def read_next_bytes(fid, num_bytes, format_char_sequence, endian_character="<"): + """Read and unpack the next bytes from a binary file. + :param fid: + :param num_bytes: Sum of combination of {2, 4, 8}, e.g. 2, 6, 16, 30, etc. + :param format_char_sequence: List of {c, e, f, d, h, H, i, I, l, L, q, Q}. + :param endian_character: Any of {@, =, <, >, !} + :return: Tuple of read and unpacked values. + """ + data = fid.read(num_bytes) + return struct.unpack(endian_character + format_char_sequence, data) + + +def read_points3D_text(path): + """ + see: src/base/reconstruction.cc + void Reconstruction::ReadPoints3DText(const std::string& path) + void Reconstruction::WritePoints3DText(const std::string& path) + """ + xyzs = None + rgbs = None + errors = None + num_points = 0 + with open(path, "r") as fid: + while True: + line = fid.readline() + if not line: + break + line = line.strip() + if len(line) > 0 and line[0] != "#": + num_points += 1 + + xyzs = np.empty((num_points, 3)) + rgbs = np.empty((num_points, 3)) + errors = np.empty((num_points, 1)) + count = 0 + with open(path, "r") as fid: + while True: + line = fid.readline() + if not line: + break + line = line.strip() + if len(line) > 0 and line[0] != "#": + elems = line.split() + xyz = np.array(tuple(map(float, elems[1:4]))) + rgb = np.array(tuple(map(int, elems[4:7]))) + error = np.array(float(elems[7])) + xyzs[count] = xyz + rgbs[count] = rgb + errors[count] = error + count += 1 + + return xyzs, rgbs, errors + + +def read_points3D_binary(path_to_model_file): + """ + see: src/base/reconstruction.cc + void Reconstruction::ReadPoints3DBinary(const std::string& path) + void Reconstruction::WritePoints3DBinary(const std::string& path) + """ + + with open(path_to_model_file, "rb") as fid: + num_points = read_next_bytes(fid, 8, "Q")[0] + + xyzs = np.empty((num_points, 3)) + rgbs = np.empty((num_points, 3)) + errors = np.empty((num_points, 1)) + + for p_id in range(num_points): + binary_point_line_properties = read_next_bytes( + fid, num_bytes=43, format_char_sequence="QdddBBBd" + ) + xyz = np.array(binary_point_line_properties[1:4]) + rgb = np.array(binary_point_line_properties[4:7]) + error = np.array(binary_point_line_properties[7]) + track_length = read_next_bytes(fid, num_bytes=8, format_char_sequence="Q")[ + 0 + ] + track_elems = read_next_bytes( + fid, + num_bytes=8 * track_length, + format_char_sequence="ii" * track_length, + ) + xyzs[p_id] = xyz + rgbs[p_id] = rgb + errors[p_id] = error + return xyzs, rgbs, errors + + +def read_intrinsics_text(path): + """ + Taken from https://github.com/colmap/colmap/blob/dev/scripts/python/read_write_model.py + """ + cameras = {} + with open(path, "r") as fid: + while True: + line = fid.readline() + if not line: + break + line = line.strip() + if len(line) > 0 and line[0] != "#": + elems = line.split() + camera_id = int(elems[0]) + model = elems[1] + assert ( + model == "PINHOLE" + ), "While the loader support other types, the rest of the code assumes PINHOLE" + width = int(elems[2]) + height = int(elems[3]) + params = np.array(tuple(map(float, elems[4:]))) + cameras[camera_id] = Camera( + id=camera_id, model=model, width=width, height=height, params=params + ) + return cameras + + +def read_extrinsics_binary(path_to_model_file): + """ + see: src/base/reconstruction.cc + void Reconstruction::ReadImagesBinary(const std::string& path) + void Reconstruction::WriteImagesBinary(const std::string& path) + """ + images = {} + with open(path_to_model_file, "rb") as fid: + num_reg_images = read_next_bytes(fid, 8, "Q")[0] + for _ in range(num_reg_images): + binary_image_properties = read_next_bytes( + fid, num_bytes=64, format_char_sequence="idddddddi" + ) + image_id = binary_image_properties[0] + qvec = np.array(binary_image_properties[1:5]) + tvec = np.array(binary_image_properties[5:8]) + camera_id = binary_image_properties[8] + image_name = "" + current_char = read_next_bytes(fid, 1, "c")[0] + while current_char != b"\x00": # look for the ASCII 0 entry + image_name += current_char.decode("utf-8") + current_char = read_next_bytes(fid, 1, "c")[0] + num_points2D = read_next_bytes(fid, num_bytes=8, format_char_sequence="Q")[ + 0 + ] + x_y_id_s = read_next_bytes( + fid, + num_bytes=24 * num_points2D, + format_char_sequence="ddq" * num_points2D, + ) + xys = np.column_stack( + [tuple(map(float, x_y_id_s[0::3])), tuple(map(float, x_y_id_s[1::3]))] + ) + point3D_ids = np.array(tuple(map(int, x_y_id_s[2::3]))) + images[image_id] = Image( + id=image_id, + qvec=qvec, + tvec=tvec, + camera_id=camera_id, + name=image_name, + xys=xys, + point3D_ids=point3D_ids, + ) + return images + + +def read_intrinsics_binary(path_to_model_file): + """ + see: src/base/reconstruction.cc + void Reconstruction::WriteCamerasBinary(const std::string& path) + void Reconstruction::ReadCamerasBinary(const std::string& path) + """ + cameras = {} + with open(path_to_model_file, "rb") as fid: + num_cameras = read_next_bytes(fid, 8, "Q")[0] + for _ in range(num_cameras): + camera_properties = read_next_bytes( + fid, num_bytes=24, format_char_sequence="iiQQ" + ) + camera_id = camera_properties[0] + model_id = camera_properties[1] + model_name = CAMERA_MODEL_IDS[camera_properties[1]].model_name + width = camera_properties[2] + height = camera_properties[3] + num_params = CAMERA_MODEL_IDS[model_id].num_params + params = read_next_bytes( + fid, num_bytes=8 * num_params, format_char_sequence="d" * num_params + ) + cameras[camera_id] = Camera( + id=camera_id, + model=model_name, + width=width, + height=height, + params=np.array(params), + ) + assert len(cameras) == num_cameras + return cameras + + +def focal2fov(focal, pixels): + return 2 * math.atan(pixels / (2 * focal)) + + +def read_extrinsics_text(path): + """ + Taken from https://github.com/colmap/colmap/blob/dev/scripts/python/read_write_model.py + """ + images = {} + with open(path, "r") as fid: + while True: + line = fid.readline() + if not line: + break + line = line.strip() + if len(line) > 0 and line[0] != "#": + elems = line.split() + image_id = int(elems[0]) + qvec = np.array(tuple(map(float, elems[1:5]))) + tvec = np.array(tuple(map(float, elems[5:8]))) + camera_id = int(elems[8]) + image_name = elems[9] + elems = fid.readline().split() + xys = np.column_stack( + [tuple(map(float, elems[0::3])), tuple(map(float, elems[1::3]))] + ) + point3D_ids = np.array(tuple(map(int, elems[2::3]))) + images[image_id] = Image( + id=image_id, + qvec=qvec, + tvec=tvec, + camera_id=camera_id, + name=image_name, + xys=xys, + point3D_ids=point3D_ids, + ) + return images + + +def read_colmap_bin_array(path): + """ + Taken from https://github.com/colmap/colmap/blob/dev/scripts/python/read_dense.py + + :param path: path to the colmap binary file. + :return: nd array with the floating point values in the value + """ + with open(path, "rb") as fid: + width, height, channels = np.genfromtxt( + fid, delimiter="&", max_rows=1, usecols=(0, 1, 2), dtype=int + ) + fid.seek(0) + num_delimiter = 0 + byte = fid.read(1) + while True: + if byte == b"&": + num_delimiter += 1 + if num_delimiter >= 3: + break + byte = fid.read(1) + array = np.fromfile(fid, np.float32) + array = array.reshape((width, height, channels), order="F") + return np.transpose(array, (1, 0, 2)).squeeze() + + +def read_next_bytes(fid, num_bytes, format_char_sequence, endian_character="<"): + """Read and unpack the next bytes from a binary file. + :param fid: + :param num_bytes: Sum of combination of {2, 4, 8}, e.g. 2, 6, 16, 30, etc. + :param format_char_sequence: List of {c, e, f, d, h, H, i, I, l, L, q, Q}. + :param endian_character: Any of {@, =, <, >, !} + :return: Tuple of read and unpacked values. + """ + data = fid.read(num_bytes) + return struct.unpack(endian_character + format_char_sequence, data) + + +def write_next_bytes(fid, data, format_char_sequence, endian_character="<"): + """pack and write to a binary file. + :param fid: + :param data: data to send, if multiple elements are sent at the same time, + they should be encapsuled either in a list or a tuple + :param format_char_sequence: List of {c, e, f, d, h, H, i, I, l, L, q, Q}. + should be the same length as the data list or tuple + :param endian_character: Any of {@, =, <, >, !} + """ + if isinstance(data, (list, tuple)): + bytes = struct.pack(endian_character + format_char_sequence, *data) + else: + bytes = struct.pack(endian_character + format_char_sequence, data) + fid.write(bytes) + + +def read_cameras_text(path): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::WriteCamerasText(const std::string& path) + void Reconstruction::ReadCamerasText(const std::string& path) + """ + cameras = {} + with open(path, "r") as fid: + while True: + line = fid.readline() + if not line: + break + line = line.strip() + if len(line) > 0 and line[0] != "#": + elems = line.split() + camera_id = int(elems[0]) + model = elems[1] + width = int(elems[2]) + height = int(elems[3]) + params = np.array(tuple(map(float, elems[4:]))) + cameras[camera_id] = Camera( + id=camera_id, + model=model, + width=width, + height=height, + params=params, + ) + return cameras + + +def read_cameras_binary(path_to_model_file): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::WriteCamerasBinary(const std::string& path) + void Reconstruction::ReadCamerasBinary(const std::string& path) + """ + cameras = {} + with open(path_to_model_file, "rb") as fid: + num_cameras = read_next_bytes(fid, 8, "Q")[0] + for _ in range(num_cameras): + camera_properties = read_next_bytes( + fid, num_bytes=24, format_char_sequence="iiQQ" + ) + camera_id = camera_properties[0] + model_id = camera_properties[1] + model_name = CAMERA_MODEL_IDS[camera_properties[1]].model_name + width = camera_properties[2] + height = camera_properties[3] + num_params = CAMERA_MODEL_IDS[model_id].num_params + params = read_next_bytes( + fid, + num_bytes=8 * num_params, + format_char_sequence="d" * num_params, + ) + cameras[camera_id] = Camera( + id=camera_id, + model=model_name, + width=width, + height=height, + params=np.array(params), + ) + assert len(cameras) == num_cameras + return cameras + + +def write_cameras_text(cameras, path): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::WriteCamerasText(const std::string& path) + void Reconstruction::ReadCamerasText(const std::string& path) + """ + HEADER = ( + "# Camera list with one line of data per camera:\n" + + "# CAMERA_ID, MODEL, WIDTH, HEIGHT, PARAMS[]\n" + + "# Number of cameras: {}\n".format(len(cameras)) + ) + with open(path, "w") as fid: + fid.write(HEADER) + for _, cam in cameras.items(): + to_write = [cam.id, cam.model, cam.width, cam.height, *cam.params] + line = " ".join([str(elem) for elem in to_write]) + fid.write(line + "\n") + + +def write_cameras_binary(cameras, path_to_model_file): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::WriteCamerasBinary(const std::string& path) + void Reconstruction::ReadCamerasBinary(const std::string& path) + """ + with open(path_to_model_file, "wb") as fid: + write_next_bytes(fid, len(cameras), "Q") + for _, cam in cameras.items(): + model_id = CAMERA_MODEL_NAMES[cam.model].model_id + camera_properties = [cam.id, model_id, cam.width, cam.height] + write_next_bytes(fid, camera_properties, "iiQQ") + for p in cam.params: + write_next_bytes(fid, float(p), "d") + return cameras + + +def read_images_text(path): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::ReadImagesText(const std::string& path) + void Reconstruction::WriteImagesText(const std::string& path) + """ + images = {} + with open(path, "r") as fid: + while True: + line = fid.readline() + if not line: + break + line = line.strip() + if len(line) > 0 and line[0] != "#": + elems = line.split() + image_id = int(elems[0]) + qvec = np.array(tuple(map(float, elems[1:5]))) + tvec = np.array(tuple(map(float, elems[5:8]))) + camera_id = int(elems[8]) + image_name = elems[9] + elems = fid.readline().split() + xys = np.column_stack( + [ + tuple(map(float, elems[0::3])), + tuple(map(float, elems[1::3])), + ] + ) + point3D_ids = np.array(tuple(map(int, elems[2::3]))) + images[image_id] = Image( + id=image_id, + qvec=qvec, + tvec=tvec, + camera_id=camera_id, + name=image_name, + xys=xys, + point3D_ids=point3D_ids, + ) + return images + + +def read_images_binary(path_to_model_file): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::ReadImagesBinary(const std::string& path) + void Reconstruction::WriteImagesBinary(const std::string& path) + """ + images = {} + with open(path_to_model_file, "rb") as fid: + num_reg_images = read_next_bytes(fid, 8, "Q")[0] + for _ in range(num_reg_images): + binary_image_properties = read_next_bytes( + fid, num_bytes=64, format_char_sequence="idddddddi" + ) + image_id = binary_image_properties[0] + qvec = np.array(binary_image_properties[1:5]) + tvec = np.array(binary_image_properties[5:8]) + camera_id = binary_image_properties[8] + binary_image_name = b"" + current_char = read_next_bytes(fid, 1, "c")[0] + while current_char != b"\x00": # look for the ASCII 0 entry + binary_image_name += current_char + current_char = read_next_bytes(fid, 1, "c")[0] + image_name = binary_image_name.decode("utf-8") + num_points2D = read_next_bytes(fid, num_bytes=8, format_char_sequence="Q")[ + 0 + ] + x_y_id_s = read_next_bytes( + fid, + num_bytes=24 * num_points2D, + format_char_sequence="ddq" * num_points2D, + ) + xys = np.column_stack( + [ + tuple(map(float, x_y_id_s[0::3])), + tuple(map(float, x_y_id_s[1::3])), + ] + ) + point3D_ids = np.array(tuple(map(int, x_y_id_s[2::3]))) + images[image_id] = Image( + id=image_id, + qvec=qvec, + tvec=tvec, + camera_id=camera_id, + name=image_name, + xys=xys, + point3D_ids=point3D_ids, + ) + return images + + +def write_images_text(images, path): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::ReadImagesText(const std::string& path) + void Reconstruction::WriteImagesText(const std::string& path) + """ + if len(images) == 0: + mean_observations = 0 + else: + mean_observations = sum( + (len(img.point3D_ids) for _, img in images.items()) + ) / len(images) + HEADER = ( + "# Image list with two lines of data per image:\n" + + "# IMAGE_ID, QW, QX, QY, QZ, TX, TY, TZ, CAMERA_ID, NAME\n" + + "# POINTS2D[] as (X, Y, POINT3D_ID)\n" + + "# Number of images: {}, mean observations per image: {}\n".format( + len(images), mean_observations + ) + ) + + with open(path, "w") as fid: + fid.write(HEADER) + for _, img in images.items(): + image_header = [ + img.id, + *img.qvec, + *img.tvec, + img.camera_id, + img.name, + ] + first_line = " ".join(map(str, image_header)) + fid.write(first_line + "\n") + + points_strings = [] + for xy, point3D_id in zip(img.xys, img.point3D_ids): + points_strings.append(" ".join(map(str, [*xy, point3D_id]))) + fid.write(" ".join(points_strings) + "\n") + + +def write_images_binary(images, path_to_model_file): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::ReadImagesBinary(const std::string& path) + void Reconstruction::WriteImagesBinary(const std::string& path) + """ + with open(path_to_model_file, "wb") as fid: + write_next_bytes(fid, len(images), "Q") + for _, img in images.items(): + write_next_bytes(fid, img.id, "i") + write_next_bytes(fid, img.qvec.tolist(), "dddd") + write_next_bytes(fid, img.tvec.tolist(), "ddd") + write_next_bytes(fid, img.camera_id, "i") + for char in img.name: + write_next_bytes(fid, char.encode("utf-8"), "c") + write_next_bytes(fid, b"\x00", "c") + write_next_bytes(fid, len(img.point3D_ids), "Q") + for xy, p3d_id in zip(img.xys, img.point3D_ids): + write_next_bytes(fid, [*xy, p3d_id], "ddq") + + +def read_points3D_text(path): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::ReadPoints3DText(const std::string& path) + void Reconstruction::WritePoints3DText(const std::string& path) + """ + points3D = {} + with open(path, "r") as fid: + while True: + line = fid.readline() + if not line: + break + line = line.strip() + if len(line) > 0 and line[0] != "#": + elems = line.split() + point3D_id = int(elems[0]) + xyz = np.array(tuple(map(float, elems[1:4]))) + rgb = np.array(tuple(map(int, elems[4:7]))) + error = float(elems[7]) + image_ids = np.array(tuple(map(int, elems[8::2]))) + point2D_idxs = np.array(tuple(map(int, elems[9::2]))) + points3D[point3D_id] = Point3D( + id=point3D_id, + xyz=xyz, + rgb=rgb, + error=error, + image_ids=image_ids, + point2D_idxs=point2D_idxs, + ) + return points3D + + +def read_points3D_binary(path_to_model_file): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::ReadPoints3DBinary(const std::string& path) + void Reconstruction::WritePoints3DBinary(const std::string& path) + """ + points3D = {} + with open(path_to_model_file, "rb") as fid: + num_points = read_next_bytes(fid, 8, "Q")[0] + for _ in range(num_points): + binary_point_line_properties = read_next_bytes( + fid, num_bytes=43, format_char_sequence="QdddBBBd" + ) + point3D_id = binary_point_line_properties[0] + xyz = np.array(binary_point_line_properties[1:4]) + rgb = np.array(binary_point_line_properties[4:7]) + error = np.array(binary_point_line_properties[7]) + track_length = read_next_bytes(fid, num_bytes=8, format_char_sequence="Q")[ + 0 + ] + track_elems = read_next_bytes( + fid, + num_bytes=8 * track_length, + format_char_sequence="ii" * track_length, + ) + image_ids = np.array(tuple(map(int, track_elems[0::2]))) + point2D_idxs = np.array(tuple(map(int, track_elems[1::2]))) + points3D[point3D_id] = Point3D( + id=point3D_id, + xyz=xyz, + rgb=rgb, + error=error, + image_ids=image_ids, + point2D_idxs=point2D_idxs, + ) + return points3D + + +def write_points3D_text(points3D, path): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::ReadPoints3DText(const std::string& path) + void Reconstruction::WritePoints3DText(const std::string& path) + """ + if len(points3D) == 0: + mean_track_length = 0 + else: + mean_track_length = sum( + (len(pt.image_ids) for _, pt in points3D.items()) + ) / len(points3D) + HEADER = ( + "# 3D point list with one line of data per point:\n" + + "# POINT3D_ID, X, Y, Z, R, G, B, ERROR, TRACK[] as (IMAGE_ID, POINT2D_IDX)\n" + + "# Number of points: {}, mean track length: {}\n".format( + len(points3D), mean_track_length + ) + ) + + with open(path, "w") as fid: + fid.write(HEADER) + for _, pt in points3D.items(): + point_header = [pt.id, *pt.xyz, *pt.rgb, pt.error] + fid.write(" ".join(map(str, point_header)) + " ") + track_strings = [] + for image_id, point2D in zip(pt.image_ids, pt.point2D_idxs): + track_strings.append(" ".join(map(str, [image_id, point2D]))) + fid.write(" ".join(track_strings) + "\n") + + +def write_points3D_binary(points3D, path_to_model_file): + """ + see: src/colmap/scene/reconstruction.cc + void Reconstruction::ReadPoints3DBinary(const std::string& path) + void Reconstruction::WritePoints3DBinary(const std::string& path) + """ + with open(path_to_model_file, "wb") as fid: + write_next_bytes(fid, len(points3D), "Q") + for _, pt in points3D.items(): + write_next_bytes(fid, pt.id, "Q") + write_next_bytes(fid, pt.xyz.tolist(), "ddd") + write_next_bytes(fid, pt.rgb.tolist(), "BBB") + write_next_bytes(fid, pt.error, "d") + track_length = pt.image_ids.shape[0] + write_next_bytes(fid, track_length, "Q") + for image_id, point2D_id in zip(pt.image_ids, pt.point2D_idxs): + write_next_bytes(fid, [image_id, point2D_id], "ii") + + +def detect_model_format(path, ext): + if ( + os.path.isfile(os.path.join(path, "cameras" + ext)) + and os.path.isfile(os.path.join(path, "images" + ext)) + and os.path.isfile(os.path.join(path, "points3D" + ext)) + ): + print("Detected model format: '" + ext + "'") + return True + + return False + + +def read_model(path, ext=""): + # try to detect the extension automatically + if ext == "": + if detect_model_format(path, ".bin"): + ext = ".bin" + elif detect_model_format(path, ".txt"): + ext = ".txt" + else: + print("Provide model format: '.bin' or '.txt'") + return + + if ext == ".txt": + cameras = read_cameras_text(os.path.join(path, "cameras" + ext)) + images = read_images_text(os.path.join(path, "images" + ext)) + points3D = read_points3D_text(os.path.join(path, "points3D") + ext) + else: + cameras = read_cameras_binary(os.path.join(path, "cameras" + ext)) + images = read_images_binary(os.path.join(path, "images" + ext)) + points3D = read_points3D_binary(os.path.join(path, "points3D") + ext) + return cameras, images, points3D + + +def write_model(cameras, images, points3D, path, ext=".bin"): + if ext == ".txt": + write_cameras_text(cameras, os.path.join(path, "cameras" + ext)) + write_images_text(images, os.path.join(path, "images" + ext)) + write_points3D_text(points3D, os.path.join(path, "points3D") + ext) + else: + write_cameras_binary(cameras, os.path.join(path, "cameras" + ext)) + write_images_binary(images, os.path.join(path, "images" + ext)) + write_points3D_binary(points3D, os.path.join(path, "points3D") + ext) + return cameras, images, points3D + + +def qvec2rotmat(qvec): + return np.array( + [ + [ + 1 - 2 * qvec[2] ** 2 - 2 * qvec[3] ** 2, + 2 * qvec[1] * qvec[2] - 2 * qvec[0] * qvec[3], + 2 * qvec[3] * qvec[1] + 2 * qvec[0] * qvec[2], + ], + [ + 2 * qvec[1] * qvec[2] + 2 * qvec[0] * qvec[3], + 1 - 2 * qvec[1] ** 2 - 2 * qvec[3] ** 2, + 2 * qvec[2] * qvec[3] - 2 * qvec[0] * qvec[1], + ], + [ + 2 * qvec[3] * qvec[1] - 2 * qvec[0] * qvec[2], + 2 * qvec[2] * qvec[3] + 2 * qvec[0] * qvec[1], + 1 - 2 * qvec[1] ** 2 - 2 * qvec[2] ** 2, + ], + ] + ) + + +def rotmat2qvec(R): + Rxx, Ryx, Rzx, Rxy, Ryy, Rzy, Rxz, Ryz, Rzz = R.flat + K = ( + np.array( + [ + [Rxx - Ryy - Rzz, 0, 0, 0], + [Ryx + Rxy, Ryy - Rxx - Rzz, 0, 0], + [Rzx + Rxz, Rzy + Ryz, Rzz - Rxx - Ryy, 0], + [Ryz - Rzy, Rzx - Rxz, Rxy - Ryx, Rxx + Ryy + Rzz], + ] + ) + / 3.0 + ) + eigvals, eigvecs = np.linalg.eigh(K) + qvec = eigvecs[[3, 0, 1, 2], np.argmax(eigvals)] + if qvec[0] < 0: + qvec *= -1 + return qvec + +class DepthRenderer: + """ + 复用渲染器的深度图渲染类 + """ + + def __init__(self, mesh: o3d.geometry.TriangleMesh, width: int, height: int): + """ + 初始化渲染器,只加载一次模型 + + 参数: + mesh (o3d.geometry.TriangleMesh): 要渲染的3D网格 + width (int): 渲染图像的宽度 + height (int): 渲染图像的高度 + """ + self.width = width + self.height = height + + # 创建渲染器(只创建一次) + self.render = o3d.visualization.rendering.OffscreenRenderer(width, height) + + # 将 mesh 加载到渲染器(只加载一次) + material = o3d.visualization.rendering.MaterialRecord() + self.render.scene.add_geometry("mesh", mesh, material) + + # 确保渲染器正确初始化 + if not self.render: + raise RuntimeError("Renderer failed to initialize.") + + def render_depth_map(self, K: np.ndarray, R: np.ndarray, t: np.ndarray) -> Tuple[np.ndarray, float]: + """ + 渲染深度图,复用已初始化的渲染器 + + 参数: + K (np.array): 相机内参矩阵 + R (np.array): 相机旋转矩阵 + t (np.array): 相机平移向量 + + 返回: + tuple: 包含深度图(未归一化)和最大深度值 + """ + # 设置相机投影矩阵 + fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] + intrinsic = o3d.camera.PinholeCameraIntrinsic(self.width, self.height, fx, fy, cx, cy) + + # 将外参转换为 4x4 变换矩阵 (从 R 和 t) + extrinsic = np.eye(4) + extrinsic[:3, :3] = R + extrinsic[:3, 3] = t + + # 设置相机参数(只需要更新相机参数) + self.render.setup_camera(intrinsic, extrinsic) + + # 渲染深度图 + depth_image = self.render.render_to_depth_image(z_in_view_space=True) + + # 检查是否生成了深度图 + if depth_image is None: + raise RuntimeError("Depth image was not generated.") + + # 将深度图转换为 numpy 数组 + depth_map = np.asarray(depth_image) + + # 返回未归一化的深度图和最大深度值 + max_depth = np.nanmax(depth_map) # 获取最大深度值 + + return depth_map, max_depth + + +def save_depth_map_async(depth_data: Tuple[str, np.ndarray, float], output_path: str) -> None: + """ + 异步保存深度图的函数 + + 参数: + depth_data: 包含(文件名, 深度图, 最大深度值)的元组 + output_path: 输出路径 + """ + name, depth_map, max_depth = depth_data + depth_filename = os.path.join(output_path, f"{name.split('.')[0]}.npz") + np.savez_compressed(depth_filename, depth=depth_map, max_depth=max_depth) + + +def generate_depth_maps_and_save_optimized( + mesh: o3d.geometry.TriangleMesh, + camera_params: List[Tuple[str, np.ndarray, np.ndarray, np.ndarray]], + width: int, + height: int, + output_path: str, + max_workers: Optional[int] = None +) -> None: + """ + 优化版本:生成深度图并异步保存 + + 参数: + mesh (o3d.geometry.TriangleMesh): 3D网格 + camera_params (list): 相机参数列表 + width (int): 图像宽度 + height (int): 图像高度 + output_path (str): 输出路径 + max_workers (int, optional): 异步保存的最大工作线程数 + + 返回: + None + """ + # 确保输出目录存在 + os.makedirs(output_path, exist_ok=True) + + # 创建复用的深度渲染器 + depth_renderer = DepthRenderer(mesh, width, height) + + # 存储待保存的深度图数据 + depth_data_list = [] + + print(f"开始渲染 {len(camera_params)} 张深度图...") + + # 批量渲染深度图 + for i, (name, K, R, t) in enumerate(camera_params): + depth_map, max_depth = depth_renderer.render_depth_map(K, R, t) + depth_data_list.append((name, depth_map.copy(), max_depth)) + + if (i + 1) % 10 == 0: + print(f"已渲染 {i + 1}/{len(camera_params)} 张图片") + + print("渲染完成,开始异步保存...") + + # 使用线程池异步保存所有深度图 + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [] + for depth_data in depth_data_list: + future = executor.submit(save_depth_map_async, depth_data, output_path) + futures.append(future) + + # 等待所有保存任务完成,并显示进度 + for i, future in enumerate(futures): + future.result() # 等待完成并获取结果(如果有异常会抛出) + if (i + 1) % 10 == 0: + print(f"已保存 {i + 1}/{len(futures)} 个文件") + + print(f"所有深度图已保存完成到: {output_path}") + + +def render_depth_map(mesh, K, R, t, width, height): + """ + 渲染深度图,不进行归一化。 + + 注意:此函数保留用于向后兼容,建议使用 DepthRenderer 类以获得更好的性能 + + 参数: + mesh (o3d.geometry.TriangleMesh): 要渲染的3D网格 + K (np.array): 相机内参矩阵 + R (np.array): 相机旋转矩阵 + t (np.array): 相机平移向量 + width (int): 渲染图像的宽度 + height (int): 渲染图像的高度 + + 返回: + tuple: 包含深度图(未归一化)和最大深度值 + """ + # 创建渲染器 + render = o3d.visualization.rendering.OffscreenRenderer(width, height) + + # 设置相机投影矩阵 + fx, fy, cx, cy = K[0, 0], K[1, 1], K[0, 2], K[1, 2] + intrinsic = o3d.camera.PinholeCameraIntrinsic(width, height, fx, fy, cx, cy) + + # 将外参转换为 4x4 变换矩阵 (从 R 和 t) + extrinsic = np.eye(4) + extrinsic[:3, :3] = R + extrinsic[:3, 3] = t + + # 设置相机参数 + render.setup_camera(intrinsic, extrinsic) + + # 将 mesh 加载到渲染器 + material = o3d.visualization.rendering.MaterialRecord() + render.scene.add_geometry("mesh", mesh, material) + + # 渲染深度图 + depth_image = render.render_to_depth_image(z_in_view_space=True) + + # 确保渲染器正确初始化 + if not render: + raise RuntimeError("Renderer failed to initialize.") + + # 检查是否生成了深度图 + if depth_image is None: + raise RuntimeError("Depth image was not generated.") + + # 将深度图转换为 numpy 数组 + depth_map = np.asarray(depth_image) + + # 返回未归一化的深度图和最大深度值 + max_depth = np.nanmax(depth_map) # 获取最大深度值 + + return depth_map, max_depth + +def load_camera_parameters(colmap_params_path): + """ + 从COLMAP参数文件加载相机参数。 + + 参数: + colmap_params_path (str): COLMAP参数文件的路径 + + 返回: + tuple: 包含相机参数列表、图像宽度和高度 + """ + images = read_images_text(f"{colmap_params_path}/images.txt") + cameras = read_cameras_text(f"{colmap_params_path}/cameras.txt") + + camera_params = [] + + for image_id, image in images.items(): + camera_id = image.camera_id + camera = cameras[camera_id] + + # 读取内参 相机内参可以获取 fx, fy, cx, cy + K = np.array( + [ + [camera.params[0], 0, camera.params[2]], + [0, camera.params[1], camera.params[3]], + [0, 0, 1], + ] + ) + + # 读取外参 相机外参可以获取 qvec 和 tvec + R = image.qvec2rotmat() + t = np.array(image.tvec) + + camera_params.append((image.name, K, R, t)) + + return camera_params, camera.width, camera.height + +def generate_depth_maps_and_save(mesh, camera_params, width, height, output_path): + """ + 生成深度图并保存为 .npy 格式。 + + 注意:此函数保留用于向后兼容,建议使用 generate_depth_maps_and_save_optimized 以获得更好的性能 + + 参数: + mesh (o3d.geometry.TriangleMesh): 3D网格 + camera_params (list): 相机参数列表 + width (int): 图像宽度 + height (int): 图像高度 + output_path (str): 输出路径 + + 返回: + None + """ + # 调用优化版本 + generate_depth_maps_and_save_optimized(mesh, camera_params, width, height, output_path) + + +def create_depth_maps(mesh_path, colmap_params_path, output_path, use_optimized=True, max_workers=None): + """ + 创建深度图并保存为 .npz 格式。 + + 参数: + mesh_path (str): 3D网格文件路径 + colmap_params_path (str): COLMAP参数文件路径 + output_path (str): 输出路径 + use_optimized (bool): 是否使用优化版本(默认True) + max_workers (int, optional): 异步保存的最大工作线程数 + + 返回: + None + """ + print(f"加载3D网格: {mesh_path}") + # 加载 3D 网格 + mesh = o3d.io.read_triangle_mesh(mesh_path) + if len(mesh.vertices) == 0: + raise ValueError(f"无法加载网格文件或网格为空: {mesh_path}") + + print(f"加载相机参数: {colmap_params_path}") + # 加载相机参数 + camera_params, width, height = load_camera_parameters(colmap_params_path) + + print(f"找到 {len(camera_params)} 个相机参数") + print(f"图像尺寸: {width} x {height}") + + # 使用优化版本生成并保存深度图 + if use_optimized: + generate_depth_maps_and_save_optimized(mesh, camera_params, width, height, output_path, max_workers) + else: + # 使用原始版本 + for i, (name, K, R, t) in enumerate(camera_params): + depth_map, max_depth = render_depth_map(mesh, K, R, t, width, height) + + # 保存深度图为 .npz 文件 + depth_filename = f"{output_path}/{name.split('.')[0]}.npz" + np.savez_compressed(depth_filename, depth=depth_map, max_depth=max_depth) + + if (i + 1) % 10 == 0: + print(f"已处理 {i + 1}/{len(camera_params)} 张图片") + +# 示例用法 +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Generate depth maps from a mesh and colmap parameters" + ) + parser.add_argument("--mesh_path", type=str, required=True, help="Path to the mesh file") + parser.add_argument( + "--colmap_params_path", type=str, required=True, help="Path to the colmap parameters" + ) + parser.add_argument("--output_path", type=str, required=True, help="Path to the output directory") + parser.add_argument( + "--use_optimized", + action="store_true", + default=True, + help="Use optimized version with renderer reuse and async I/O (default: True)" + ) + parser.add_argument( + "--no_optimized", + action="store_true", + help="Use original version (slower)" + ) + parser.add_argument( + "--max_workers", + type=int, + default=None, + help="Maximum number of worker threads for async I/O (default: None, uses system default)" + ) + + args = parser.parse_args() + + # 如果指定了 --no_optimized,则使用原始版本 + use_optimized = not args.no_optimized + + import time + start_time = time.time() + print("=" * 50) + print("深度图生成工具") + print("=" * 50) + print(f"网格文件: {args.mesh_path}") + print(f"COLMAP参数路径: {args.colmap_params_path}") + print(f"输出路径: {args.output_path}") + print(f"使用优化版本: {use_optimized}") + if use_optimized and args.max_workers: + print(f"最大工作线程数: {args.max_workers}") + print("=" * 50) + + + + try: + create_depth_maps( + args.mesh_path, + args.colmap_params_path, + args.output_path, + use_optimized=use_optimized, + max_workers=args.max_workers + ) + print(" 深度图生成完成!") + except Exception as e: + print(f" 生成深度图时出错: {e}") + raise e + + elapsed_time = time.time() - start_time + minutes = int(elapsed_time // 60) + seconds = int(elapsed_time % 60) + print(f" 深度图生成完成,用时: {minutes}分{seconds}秒") diff --git a/fill_all_empty_faces_v1.2.py b/fill_all_empty_faces_v1.2.py new file mode 100644 index 0000000..1df0cd7 --- /dev/null +++ b/fill_all_empty_faces_v1.2.py @@ -0,0 +1,547 @@ +import torch +import argparse +import numpy as np +import cv2 +import time +from collections import defaultdict +import tqdm +from multiprocessing import Pool, cpu_count +from typing import List, Tuple, Dict # 如果还未导入 + + +def read_vertices(obj_path): + vertices = [] + with open(obj_path, 'r') as file: + lines = file.readlines() + for line in lines: + if line.startswith('v '): # 顶点坐标 + vertices.append(list(map(float, line.split()[1:4]))) + vertices = torch.tensor(vertices) + return vertices + +def read_uvs(obj_path): + uv_coordinates = [] + with open(obj_path, 'r') as file: + lines = file.readlines() + for line in lines: + if line.startswith('vt '): # UV 坐标 + uv_coordinates.append(list(map(float, line.split()[1:3]))) + uv_coordinates = torch.tensor(uv_coordinates) + return uv_coordinates + +def read_faces(obj_path): + vertex_indices = [] + uv_indices = [] + with open(obj_path, 'r') as file: + lines = file.readlines() + + for line in lines: + if line.startswith('f '): # 面 + parts = line.split()[1:] + v_indices = [] + uv_indices_temp = [] + for face in parts: + v, vt = map(int, face.split('/')[:2]) + v_indices.append(v - 1) + uv_indices_temp.append(vt - 1) + vertex_indices.append(v_indices) + uv_indices.append(uv_indices_temp) + vertex_indices = torch.tensor(vertex_indices) + uv_indices = torch.tensor(uv_indices) + return vertex_indices, uv_indices + +def read_missing_faces(missing_faces_path): + with open(missing_faces_path, 'r') as file: + lines = file.readlines() + missing_color_faces = torch.tensor( + [int(line.strip()) for line in lines] + ) + return missing_color_faces + +def read_uv_map(input_texture_path): + uv_map = cv2.imread(input_texture_path) + uv_map = cv2.cvtColor(uv_map, cv2.COLOR_BGR2RGB) + uv_map = torch.from_numpy(uv_map) + return uv_map + +def parse_obj_file_and_uv_map(obj_path, missing_faces_path, input_texture_path, device): + print(f"Reading OBJ file: {obj_path}") + + # vertices = [] + # uv_coordinates = [] + # vertex_indices = [] + # uv_indices = [] + # multiprocessing.set_start_method('spawn', force=True) + # multiprocessing.freeze_support() + start_time = time.time() + + + p = Pool(5) + uv_map_result = p.apply_async(read_uv_map, (input_texture_path,)) + vertices_result = p.apply_async(read_vertices, (obj_path,)) + uv_coordinates_result = p.apply_async(read_uvs, (obj_path,)) + faces_result = p.apply_async(read_faces, (obj_path,)) + missing_faces_result = p.apply_async(read_missing_faces, (missing_faces_path,)) + + p.close() + p.join() + + vertices = vertices_result.get() + uv_coordinates = uv_coordinates_result.get() + vertex_indices, uv_indices = faces_result.get() + missing_color_faces = missing_faces_result.get() + uv_map = uv_map_result.get() + + vertices = vertices.to(device) + uv_coordinates = uv_coordinates.to(device) + vertex_indices = vertex_indices.to(device) + uv_indices = uv_indices.to(device) + missing_color_faces = missing_color_faces.to(device) + uv_map = uv_map.to(device) + + end_time = time.time() + print(f"using: {end_time - start_time} seconds") + + # exit() + print("Converting to tensors...") + + return vertices, uv_coordinates, vertex_indices, uv_indices, missing_color_faces, uv_map + +def write_obj_with_uv_coordinates(filename, vertices, uvs, vertex_indices, uv_indices): + """ + 高性能OBJ文件写入函数 + + Parameters: + filename (str): 输出OBJ文件路径 + vertices (np.ndarray): 顶点数组 + uvs (np.ndarray): UV坐标数组 + vertex_indices (np.ndarray): 面的顶点索引 + uv_indices (np.ndarray): 面的UV索引 + """ + # 估算数据大小(以字节为单位) + estimated_size = ( + len(vertices) * 40 + # 每个顶点约40字节 (v x.xxxxxx y.xxxxxx z.xxxxxx\n) + len(uvs) * 30 + # 每个UV坐标约30字节 (vt x.xxxxxx y.xxxxxx\n) + len(vertex_indices) * 40 # 每个面约40字节 (f v1/vt1 v2/vt2 v3/vt3\n) + ) + + # 设置缓冲区大小为估算大小的1.2倍,最小256MB,最大1GB + buffer_size = min(max(int(estimated_size * 1.2), 256 * 1024 * 1024), 1024 * 1024 * 1024) + + # 使用格式化字符串和列表推导式优化字符串生成 + vertex_lines = ['v %.6f %.6f %.6f' % (v[0], v[1], v[2]) for v in vertices] + uv_lines = ['vt %.6f %.6f' % (uv[0], uv[1]) for uv in uvs] + + # 优化face数据处理 + face_lines = [] + face_format = 'f %d/%d %d/%d %d/%d' + for v_idx, uv_idx in zip(vertex_indices, uv_indices): + face_lines.append(face_format % ( + v_idx[0] + 1, uv_idx[0] + 1, + v_idx[1] + 1, uv_idx[1] + 1, + v_idx[2] + 1, uv_idx[2] + 1 + )) + + # 使用join一次性构建完整内容 + content = ['mtllib mesh.mtl'] + vertex_lines + [''] + uv_lines + [''] + ['usemtl material_0'] + face_lines + + # 一次性写入所有数据 + with open(filename, 'w', buffering=buffer_size) as f: + f.write('\n'.join(content)) + +def load_regions(filename): + regions = [] + with open(filename, 'r') as file: + for line in file: + parts = line.split(";") + if len(parts) != 2: + continue # Skip any lines that don't have exactly two parts + + first_set = set(int(x) for x in parts[0].strip().split()) + second_set = set(int(x) for x in parts[1].strip().split()) + regions.append((first_set, second_set)) + + return regions + + +def build_face_adjacency(vertices, faces): + """ + 构建面的邻接关系,基于共享边 + + Args: + vertices: 顶点数组 + faces: 面片索引数组 (N x 3) + + Returns: + dict: 面片邻接关系字典,key为面片索引,value为邻接面片索引列表 + """ + # 将faces转换为numpy数组以加快处理速度 + faces = np.asarray(faces) + num_faces = len(faces) + + # 为每个面创建所有边 (Nx3x2) + edges = np.stack([ + np.column_stack((faces[:, i], faces[:, (i + 1) % 3])) + for i in range(3) + ], axis=1) + + # 确保边的方向一致 (较小的顶点索引在前) + edges.sort(axis=2) + + # 将边展平为 (Nx3, 2) 的形状 + edges = edges.reshape(-1, 2) + + # 创建边到面的映射 + edge_faces = np.repeat(np.arange(num_faces), 3) + + # 使用复合键对边进行排序 + edge_keys = edges[:, 0] * vertices.shape[0] + edges[:, 1] + sort_idx = np.argsort(edge_keys) + edges = edges[sort_idx] + edge_faces = edge_faces[sort_idx] + + # 找到重复的边(共享边) + same_edges = (edge_keys[sort_idx][1:] == edge_keys[sort_idx][:-1]) + edge_start_idx = np.where(same_edges)[0] + + # 构建邻接字典 + face_adjacency = defaultdict(list) + for idx in edge_start_idx: + face1, face2 = edge_faces[idx], edge_faces[idx + 1] + face_adjacency[face1].append(face2) + face_adjacency[face2].append(face1) + + return dict(face_adjacency) + +def find_groups_and_subgroups(face_adjacency, missing_faces): + """ + 找到相连的面组和它们的邻接面 + 返回: + regions: 列表,每个元素是一个元组 (missing_faces_set, adjacent_faces_set), + 与 load_regions() 函数返回格式保持一致 + """ + missing_faces_set = set(missing_faces.cpu().numpy()) + unused_faces = set(missing_faces.cpu().numpy()) + regions = [] + + total_faces = len(unused_faces) + with tqdm.tqdm(total=total_faces, desc="Processing faces") as pbar: + while unused_faces: + start_face = unused_faces.pop() + current_group = {start_face} + current_subgroup = set() + + stack = [start_face] + while stack: + face_idx = stack.pop() + + for neighbor in face_adjacency.get(face_idx, []): + if neighbor in unused_faces: + current_group.add(neighbor) + unused_faces.remove(neighbor) + stack.append(neighbor) + elif neighbor not in missing_faces_set: + current_subgroup.add(neighbor) + + regions.append((current_group, current_subgroup)) + pbar.update(total_faces - len(unused_faces) - pbar.n) + + # 输出统计信息 + print(f"\nTotal regions: {len(regions)}") + print(f"Average missing faces group size: {sum(len(g[0]) for g in regions)/len(regions):.2f}") + print(f"Largest missing faces group size: {max(len(g[0]) for g in regions)}") + print(f"Smallest missing faces group size: {min(len(g[0]) for g in regions)}") + + # 检查每个组是否都有邻接面 + for i, (group, subgroup) in enumerate(regions): + if not subgroup: + print(f"Warning: Region {i} with {len(group)} missing faces has no adjacent faces!") + + return regions + +def compute_regions_face_colors( + regions: List[Tuple[set, set]], + uv_map: torch.Tensor, + uvs: torch.Tensor, + face_uv_indices: torch.Tensor, + device: str +) -> Dict[int, torch.Tensor]: + """ + 根据每个区域的边缘面UV坐标计算加权平均的颜色, + 当无有效采样时更新对应face_uv_indices。 + + 参数: + regions (List[Tuple[set, set]]): 每个区域为 (缺失面集合, 邻接面集合) + uv_map (torch.Tensor): 原始纹理贴图,RGB格式 + uvs (torch.Tensor): 原始UV坐标 + face_uv_indices (torch.Tensor): 每个面对应的UV索引 + device (str): 使用的设备("cuda"或"cpu") + + 返回: + Dict[int, torch.Tensor]: 键为区域索引,值为该区域加权平均计算得到的颜色(uint8) + """ + regions_face_color: Dict[int, torch.Tensor] = {} + for r_index, region in enumerate(tqdm.tqdm(regions, desc="Processing regions")): + region_faces_indexes = torch.tensor(list(region[0]), device=device) + region_edge_faces_indexes = torch.tensor(list(region[1]), device=device) + + if len(region_edge_faces_indexes) == 0: + continue + + # 获取边缘面的UV索引 + edge_face_uv_indices = face_uv_indices[region_edge_faces_indexes] + # 使用三角形的质心UV坐标来采样颜色 + triangle_uvs = uvs[edge_face_uv_indices] # shape: [num_faces, 3, 2] + centroid_uvs = triangle_uvs.mean(dim=1) # shape: [num_faces, 2] + + # 将UV坐标转换为像素坐标 + scale_tensor = torch.tensor([uv_map.shape[1] - 1, uv_map.shape[0] - 1], device=device) + pixel_coords = torch.round(centroid_uvs * scale_tensor) + pixel_coords[:, 1] = uv_map.shape[0] - 1 - pixel_coords[:, 1] + pixel_coords = pixel_coords.long().clamp(0, uv_map.shape[0] - 1) + + # 直接采样质心位置的颜色 + colors = uv_map[pixel_coords[:, 1], pixel_coords[:, 0]] # shape: [num_faces, 3] + + # 使用面积加权平均来计算最终颜色 + areas = torch.abs( + (triangle_uvs[:, 1, 0] - triangle_uvs[:, 0, 0]) * (triangle_uvs[:, 2, 1] - triangle_uvs[:, 0, 1]) - + (triangle_uvs[:, 2, 0] - triangle_uvs[:, 0, 0]) * (triangle_uvs[:, 1, 1] - triangle_uvs[:, 0, 1]) + ) * 0.5 + + if len(colors) > 0: + weighted_color = (colors.float() * areas.unsqueeze(1)).sum(dim=0) / areas.sum() + regions_face_color[r_index] = weighted_color.round().clamp(0, 255).to(torch.uint8) + else: + # 如果没有有效的采样点,使用第一个相邻面的UV坐标更新face_uv_indices + face_uv_indices[region_faces_indexes] = face_uv_indices[region_edge_faces_indexes[0]].unsqueeze(dim=0).clone() + + return regions_face_color + + +def update_uv_map_and_indices( + uv_map: torch.Tensor, + uvs: torch.Tensor, + face_uv_indices: torch.Tensor, + regions: List[Tuple[set, set]], + regions_face_color: Dict[int, torch.Tensor], + device: str +) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + """ + 根据计算得到的区域颜色,更新UV贴图及对应的UV坐标,并批量更新face_uv_indices。 + + 参数: + uv_map (torch.Tensor): 原始纹理贴图,RGB格式 + uvs (torch.Tensor): 原始UV坐标 + face_uv_indices (torch.Tensor): 原始面的UV索引 + regions (List[Tuple[set, set]]): 每个区域为 (缺失面集合, 邻接面集合) + regions_face_color (Dict[int, torch.Tensor]): 每个区域计算得到的颜色 + device (str): 使用的设备("cuda"或"cpu") + + 返回: + Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: + new_uv_map: 更新后的UV贴图 + uvs_updated: 更新后的UV坐标(拼接上新计算的UV) + face_uv_indices: 更新后的face UV索引 + """ + total_regions = len(regions_face_color) + grid_size = uv_map.shape[1] // 3 + all_c = torch.div(torch.arange(total_regions, device=device), grid_size, rounding_mode='floor') + all_r = torch.remainder(torch.arange(total_regions, device=device), grid_size) + + # 创建新的颜色UV贴图 + color_uv_map = torch.full((int(uv_map.shape[0] / 2), uv_map.shape[1], 3), + 255, dtype=torch.uint8, device=device) + # 调整原始uvs的纵坐标 + uvs[:, 1] = uvs[:, 1] * (2 / 3) + 1 / 3 + + # 批量创建所有颜色块的坐标 + c_indices = all_c.unsqueeze(1).repeat(1, 9) * 3 + torch.tensor([0, 1, 2, 0, 1, 2, 0, 1, 2], + device=device).unsqueeze(0) + r_indices = all_r.unsqueeze(1).repeat(1, 9) * 3 + torch.tensor([0, 0, 0, 1, 1, 1, 2, 2, 2], + device=device).unsqueeze(0) + + # 批量设置颜色 + colors = torch.stack([color for _, color in sorted(regions_face_color.items(), key=lambda x: x[0])]) + colors_repeated = colors.unsqueeze(1).repeat(1, 9, 1) + color_uv_map[c_indices.flatten(), r_indices.flatten()] = colors_repeated.reshape(-1, 3) + + # 批量计算新的UV坐标 + pixels = torch.stack([ + all_r * 3 + 1, + uv_map.shape[0] + all_c * 3 + 1 + ], dim=1).to(device) + u_new = pixels[:, 0].float() / (uv_map.shape[1] - 1) + new_height = int(uv_map.shape[0] + uv_map.shape[0] / 2) + v_new = (new_height - 1 - pixels[:, 1].float()) / (new_height - 1) + new_uvs = torch.stack([u_new, v_new], dim=1) + + # 更新UV坐标:拼接新计算的UV + uvs_updated = torch.cat([uvs, new_uvs], dim=0) + uv_coordinates_start = uvs_updated.shape[0] - total_regions + + # 批量更新face_uv_indices + for i, (region_index, _) in enumerate(sorted(regions_face_color.items(), key=lambda x: x[0])): + region_faces_indexes = torch.tensor(list(regions[region_index][0]), device=device) + face_uv_indices[region_faces_indexes] = torch.full((1, 3), uv_coordinates_start + i, device=device) + + # 合并原始UV贴图和新的颜色UV贴图 + new_uv_map = torch.cat((uv_map, color_uv_map), dim=0) + + return new_uv_map, uvs_updated, face_uv_indices + +def group_regions_by_y_axis( + regions: List[Tuple[set, set]], + vertices: torch.Tensor, + triangle_vertex_indices: torch.Tensor, + device: str, + interval_size: float = 0.1 +) -> Dict[int, List[int]]: + """ + 将区域按照y轴高度分组 + + Args: + regions: 区域列表,每个区域为(缺失面集合, 邻接面集合)的元组 + vertices: 顶点坐标张量 + triangle_vertex_indices: 三角形顶点索引张量 + device: 计算设备 ('cuda' 或 'cpu') + interval_size: y轴分组的间隔大小,默认为0.1 + + Returns: + Dict[int, List[int]]: 以y轴区间为键,区域索引列表为值的字典 + """ + y_intervals = defaultdict(list) + for r_index, region in enumerate(regions): + region_faces_indexes = torch.tensor(list(region[0]), device=device) + # 计算面组的平均y轴位置 + face_vertices = vertices[triangle_vertex_indices[region_faces_indexes]] + avg_y = face_vertices[:, :, 1].mean(dim=(0, 1)) + + # 根据y轴位置分配到对应区间 + interval_key = int(avg_y // interval_size) + y_intervals[interval_key].append(r_index) + + return dict(y_intervals) + +def align_regions_colors( + regions_face_color: Dict[int, torch.Tensor], + y_intervals: Dict[int, List[int]], + regions: List[Tuple[set, set]] +) -> Dict[int, torch.Tensor]: + """ + 对齐区间内的颜色 + + Args: + regions_face_color: 每个区域的颜色 + y_intervals: 每个y轴区间的区域索引列表 + + Returns: + Dict[int, torch.Tensor]: 以y轴区间为键,颜色为值的字典 + """ + # aligned_regions_face_color = {} + large_group_threshold_min = 5000 + large_group_threshold_max = 100000 + for interval_key, region_indices in y_intervals.items(): + large_groups = [] + # normal_groups = [] + for r_index in region_indices: + region = regions[r_index] + if len(region[0]) >= large_group_threshold_min and len(region[0]) <= large_group_threshold_max: + large_groups.append((r_index, len(region[0]), regions_face_color[r_index])) + + # 查找 large_groups 中 len(region[0]) 最大的组,并获取其颜色 + if large_groups: + largest_group = max(large_groups, key=lambda x: x[1]) + color: torch.Tensor = largest_group[2] + for large_group in large_groups: + regions_face_color[large_group[0]] = color + + return regions_face_color + + +def process(input_obj_path, input_texture_path, missing_faces_path, output_obj_path, output_texture_path): + start_time = time.time() + + device = 'cuda' if torch.cuda.is_available() else 'cpu' + vertices, uvs, triangle_vertex_indices, face_uv_indices, missing_color_faces, uv_map = parse_obj_file_and_uv_map( + input_obj_path, missing_faces_path, input_texture_path, device=device) + + # 构建面的邻接关系和找到区域 + start_face_adjacency_time = time.time() + face_adjacency = build_face_adjacency(vertices.cpu().numpy(), triangle_vertex_indices.cpu().numpy()) + end_face_adjacency_time = time.time() + print(f"face_adjacency using: {end_face_adjacency_time - start_face_adjacency_time} seconds") + + start_find_groups_time = time.time() + regions = find_groups_and_subgroups(face_adjacency, missing_color_faces) + end_find_groups_time = time.time() + print(f"find_groups_and_subgroups using: {end_find_groups_time - start_find_groups_time} seconds") + + start_texture_map_time = time.time() + # 使用新封装的函数计算每个区域的加权平均颜色 + regions_face_color = compute_regions_face_colors(regions, uv_map, uvs, face_uv_indices, device) + end_texture_map_time = time.time() + print(f"texture_mapping_to_triangle using: {end_texture_map_time - start_texture_map_time} seconds") + + # 按y轴区间分组 + y_intervals = group_regions_by_y_axis( + regions, + vertices, + triangle_vertex_indices, + device + ) + + # 对齐区间内的颜色 + regions_face_color = align_regions_colors(regions_face_color, y_intervals, regions) + + # 更新UV贴图和面索引 + start_color_map_time = time.time() + new_uv_map, uvs, face_uv_indices = update_uv_map_and_indices(uv_map, uvs, face_uv_indices, regions, + regions_face_color, device) + end_color_map_time = time.time() + print(f"color_mapping_to_triangle using: {end_color_map_time - start_color_map_time} seconds") + + end_time = time.time() + print(f"using: {end_time - start_time} seconds") + + # 写入OBJ和纹理贴图 + start_write_time = time.time() + + vertices_cpu = vertices.cpu().numpy() + uvs_cpu = uvs.cpu().numpy() + triangle_vertex_indices_cpu = triangle_vertex_indices.cpu().numpy() + face_uv_indices_cpu = face_uv_indices.cpu().numpy() + new_uv_map_cpu = new_uv_map.cpu().numpy() + new_uv_map_bgr = cv2.cvtColor(new_uv_map_cpu, cv2.COLOR_RGB2BGR) + + with Pool(2) as p: + # 异步执行OBJ和纹理图写入操作 + obj_future = p.apply_async(write_obj_with_uv_coordinates, + (output_obj_path, vertices_cpu, uvs_cpu, + triangle_vertex_indices_cpu, face_uv_indices_cpu)) + + img_future = p.apply_async(cv2.imwrite, + (output_texture_path, new_uv_map_bgr, + [cv2.IMWRITE_PNG_COMPRESSION, 3])) + + obj_future.get() + img_future.get() + + end_write_time = time.time() + end_time = time.time() + print(f"Total file writing time: {end_write_time - start_write_time:.2f} seconds") + print(f"using: {end_time - start_time} seconds") + +def main(): + parser = argparse.ArgumentParser(description='Process OBJ files to fix missing color faces.') + parser.add_argument('--input_obj', type=str, required = True, help='Path to the input OBJ file') + parser.add_argument('--input_texture', type=str, required = True, help='Path to the texture file') + parser.add_argument('--missing_faces', type=str, required = True, help='Path to the file with indices of missing color faces') + parser.add_argument('--output_obj', type=str, required = True, help='Path to the output OBJ file') + parser.add_argument('--output_texture', type=str, required = True, help='Path to the texture file') + + args = parser.parse_args() + process(args.input_obj, args.input_texture, args.missing_faces, args.output_obj, args.output_texture) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/generate_fbx.py b/generate_fbx.py new file mode 100644 index 0000000..b019724 --- /dev/null +++ b/generate_fbx.py @@ -0,0 +1,192 @@ +import bpy +import os +import sys +import argparse + +class ArgumentParserForBlender(argparse.ArgumentParser): + """ + This class is identical to its superclass, except for the parse_args + method (see docstring). It resolves the ambiguity generated when calling + Blender from the CLI with a python script, and both Blender and the script + have arguments. E.g., the following call will make Blender crash because + it will try to process the script's -a and -b flags: + >>> blender --python my_script.py -a 1 -b 2 + + To bypass this issue this class uses the fact that Blender will ignore all + arguments given after a double-dash ('--'). The approach is that all + arguments before '--' go to Blender, arguments after go to the script. + The following calls work fine: + >>> blender --python my_script.py -- -a 1 -b 2 + >>> blender --python my_script.py -- + """ + + def _get_argv_after_doubledash(self): + """ + Given the sys.argv as a list of strings, this method returns the + sublist right after the '--' element (if present, otherwise returns + an empty list). + """ + try: + idx = sys.argv.index("--") + return sys.argv[idx+1:] # the list after '--' + except ValueError as e: # '--' not in the list: + return [] + + # overrides superclass + def parse_args(self): + """ + This method is expected to behave identically as in the superclass, + except that the sys.argv list will be pre-processed using + _get_argv_after_doubledash before. See the docstring of the class for + usage examples and details. + """ + return super().parse_args(args=self._get_argv_after_doubledash()) + + + +def find_pid_objname(pid): + for obj in bpy.data.objects: + if obj.type == 'MESH': + return obj.name + + + +def obj2glb(input_path: str, output_path: str, human_num, face_num, is_reduce_face) -> None: + """ + 将OBJ文件转换为GLB格式并保存。 + + 参数: + input_path (str): 输入OBJ文件的路径 + output_path (str): 输出GLB文件的路径 + human_num (int): 人物数量 + face_num (int): 目标面数 + is_reduce_face (str): 是否减面 + img_quality (int): 图片质量 + + + + 返回: + None + """ + if is_reduce_face == "True": + is_reduce_face = True + else: + is_reduce_face = False + if not os.path.exists(input_path): + raise FileNotFoundError(f"输入文件 {input_path} 不存在") + + bpy.ops.object.delete(use_global=False, confirm=False) + bpy.ops.object.select_all(action="DESELECT") + bpy.ops.object.select_by_type(type="MESH") + bpy.ops.object.delete(use_global=False, confirm=False) + + bpy.ops.wm.obj_import(filepath=input_path) + + + pid = os.path.splitext(os.path.basename(input_path))[0] + + print(f'人物数量:{human_num},目标面数:{face_num}') + faces_dest = face_num * human_num + + total_faces: int = 0 + for obj in bpy.data.objects: + if obj.type == 'MESH': + # 获取对象的面数并累加 + total_faces += len(obj.data.polygons) + print(f"所有对象的面数之和: {total_faces}") + + if not is_reduce_face: + print("不减面") + faces_dest = total_faces + + print(f"目标面数:{faces_dest} 当前面数:{total_faces}") + + for obj in bpy.data.objects: + if obj.type == 'MESH': + # 取消选中所有对象 + bpy.ops.object.select_all(action='DESELECT') + # 选中当前对象 + obj.select_set(True) + bpy.context.view_layer.objects.active = obj + + # 添加和应用修改器 + modifier = obj.modifiers.new(name="Decimate", type='DECIMATE') + modifier.ratio = faces_dest / total_faces + # 应用修改器 + bpy.ops.object.modifier_apply(modifier="Decimate") + + + pid_objname = find_pid_objname(pid) + obj = bpy.data.objects[pid_objname] + + # scale = 90 / bpy.data.objects[pid_objname].dimensions.y + print(bpy.data.objects[pid_objname].dimensions) + # bpy.data.objects[pid_objname].scale = (scale, scale, scale) + # bpy.ops.object.transform_apply(scale=True) + + bpy.ops.outliner.orphans_purge(do_recursive=True) + + # 导出GLB文件 + bpy.ops.export_scene.fbx(filepath=output_path, + use_selection=False, # 导出所有对象,若只导出选中对象可设为 True + object_types={'MESH'}, # 只导出网格对象 + path_mode='RELATIVE', # 使用相对贴图路径 + # embed_textures=False, # 不嵌入贴图(路径生效) + # apply_unit_scale=True, # 应用单位缩放 + # apply_scale_options='FBX_SCALE_ALL', # 应用缩放 + bake_space_transform=False) # 保持原始空间变换 + + # 清理所有资源的优化版本 + def clean_data_blocks(): + """清理所有数据块""" + # 首先删除所有对象 + bpy.ops.object.select_all(action='SELECT') + bpy.ops.object.delete() + + # 按特定顺序清理数据块以避免依赖问题 + data_blocks = [ + (bpy.data.actions, "动作"), + (bpy.data.armatures, "骨骼"), + (bpy.data.cameras, "相机"), + (bpy.data.lights, "灯光"), + (bpy.data.meshes, "网格"), + (bpy.data.materials, "材质"), + (bpy.data.textures, "纹理"), + (bpy.data.images, "图片"), + (bpy.data.curves, "曲线"), + (bpy.data.lights, "灯光"), + (bpy.data.worlds, "世界"), + (bpy.data.collections, "集合") + ] + + # 循环清理每种类型的数据 + for data_block, block_name in data_blocks: + try: + for item in data_block: + if item.users == 0: + data_block.remove(item) + else: + item.user_clear() + data_block.remove(item) + except Exception as e: + print(f"清理{block_name}时出错: {str(e)}") + + # 强制执行垃圾回收 + bpy.ops.outliner.orphans_purge(do_recursive=True) + import gc + gc.collect() + + # 执行清理 + clean_data_blocks() + + +if __name__ == "__main__": + parser = ArgumentParserForBlender() + parser.add_argument("--input_path", type=str, required=True) + parser.add_argument("--output_path", type=str, required=True) + parser.add_argument("--human_num", type=int, required=True) + parser.add_argument("--face_num", type=int, required=True) + parser.add_argument("--is_reduce_face", type=str, required=True) + args = parser.parse_args() + + obj2glb(args.input_path, args.output_path, args.human_num, args.face_num, args.is_reduce_face) \ No newline at end of file