import os import subprocess import redis import logging from typing import cast import requests from download_print import download_datas_by_pre_layout from download_print import BatchModelInfo from print_factory_type_setting_obj_run import print_type_setting_obj from config import print_factory_type_dir from config import oss_config from config import local_data from general import is_run_local_data # 如果没有 DISPLAY,自动启动 Xvfb if "DISPLAY" not in os.environ: import atexit from time import sleep # 设置临时 XDG_RUNTIME_DIR runtime_dir = f"/tmp/runtime-{os.getuid()}" os.makedirs(runtime_dir, exist_ok=True) os.environ["XDG_RUNTIME_DIR"] = runtime_dir # 启动 Xvfb xvfb_cmd = ["Xvfb", ":99", "-screen", "0", "1024x768x24", "-nolisten", "tcp"] xvfb_proc = subprocess.Popen(xvfb_cmd) atexit.register(lambda: xvfb_proc.terminate()) # 退出时关闭 Xvfb # 设置 DISPLAY os.environ["DISPLAY"] = ":99" sleep(0.5) # 等待 Xvfb 启动 # redis_config = { # "host": "127.0.0.1", # "port": 6379, # "db": 6, # "password": "", # "kcV2000", # "socket_timeout": 10, # } redis_config = { "host": "mp.api.suwa3d.com", "port": 6379, "db": 6, "password": "kcV2000", # "kcV2000", "socket_timeout": 10, } import sys class RedisTaskQueue: """封装所有与 Redis 任务队列相关的操作。""" def __init__(self, queue_key: str, redis_config: dict = redis_config): self.queue_key = queue_key self.client = redis.Redis(**redis_config) def get_length(self) -> int: """ 获取队列的长度。如果连接丢失,会尝试重新连接一次。 """ try:# 检测queue_key是否存在 length = 0 self.client = redis.Redis(**redis_config) if not self.client.exists(self.queue_key): logging.warning(f"队列不存在: {self.queue_key}") return 0 key_type = self.client.type(self.queue_key).decode('utf-8') # type: ignore if key_type == 'list': length = cast(int, self.client.llen(self.queue_key)) elif key_type == 'set': length = cast(int, self.client.scard(self.queue_key)) return length except redis.ConnectionError as e: logging.warning(f"与 Redis 的连接中断: {e}。正在尝试重新连接...") return 0 except Exception as e: logging.warning(f"获取队列长度失败: {e}") return 0 def get_info(self) -> dict: """ 从指定的队列中取出一条信息 Returns: dict: 取出的信息,如果队列为空或出错则返回空字典 """ try: if not hasattr(self, 'client') or self.client is None: self.client = redis.Redis(**redis_config) if not self.client.exists(self.queue_key): logging.warning(f"队列不存在: {self.queue_key}") return {} key_type = self.client.type(self.queue_key).decode('utf-8') if key_type == 'list': # 从列表左侧弹出一个元素 data = self.client.lpop(self.queue_key) elif key_type == 'set': # 从集合中随机弹出一个元素 data = self.client.spop(self.queue_key) else: logging.warning(f"不支持的队列类型: {key_type}") return {} if data is None: return {} # print("data=", data) # 假设存储的是 JSON 格式的字符串 try: import json return json.loads(data.decode('utf-8')) except (json.JSONDecodeError, UnicodeDecodeError): # 如果不是 JSON,返回原始字符串作为值 return {"data": data.decode('utf-8')} except redis.ConnectionError as e: logging.warning(f"Redis 连接中断: {e}") return {} except Exception as e: logging.warning(f"获取队列信息失败: {e}") return {} def __len__(self) -> int: """让我们可以对实例使用 len() 函数,更符合 Python 风格。""" return self.get_length() import time import gc def main(): redis_queue_name = "pb:print_order_type_setting" # while True: try: redis_queue = RedisTaskQueue(redis_queue_name) task_num = redis_queue.get_length() # print("task_num=", task_num) if task_num <= 0: time.sleep(10) sys.exit(0) return info = redis_queue.get_info() print("info=", info) process_clound_print(info) gc.collect() except Exception as e: print(f"处理任务时出错: {e}") time.sleep(2.5) sys.exit(0) def test_main(): process_clound_print(local_data) gc.collect() time.sleep(5) sys.exit(0) import os def clear_directory_recursive(dir_path): """ 递归清空目录:保留原目录,递归删除其所有内容和子内容。 """ if not os.path.isdir(dir_path): print(f"路径 {dir_path} 不是一个有效目录。") return for item_name in os.listdir(dir_path): item_path = os.path.join(dir_path, item_name) if os.path.isfile(item_path) or os.path.islink(item_path): # 如果是文件或符号链接,直接删除 try: os.unlink(item_path) print(f"已删除: {item_path}") except Exception as e: print(f"删除失败 {item_path}: {e}") elif os.path.isdir(item_path): # 如果是子目录,递归调用函数清空它,然后删除这个空目录 try: clear_directory_recursive(item_path) # 递归清空子目录 os.rmdir(item_path) # 删除现已为空的子目录 print(f"已删除子目录: {item_path}") except Exception as e: print(f"删除子目录失败 {item_path}: {e}") print(f"目录 {dir_path} 下的内容已清空。") def process_clound_print(data): print_ids = [] list_print_model_info = [] selected_machine = "大机型" try: import json # parsed = json.loads(data.decode('utf-8')) parsed = data pre_batch_id = parsed["pre_batch_id"] print(f"pre_batch_id={pre_batch_id}") machine_print_counts = parsed["machine_print_counts"] machine_type = parsed["machine_type"] if machine_type == "small_machine" : selected_machine = "小机型" else : selected_machine = "大机型" print_orders = parsed.get("print_orders", []) if not print_orders: print("没有找到订单数据") results = [] for index, order in enumerate(print_orders, 1): order_info = { 'index': index, 'counts': order.get('counts'), 'pid': order.get('pid'), 'print_id': order.get('print_id'), 'order_id': order.get('order_id'), 'layout_z': order.get('layout_z') } results.append(order_info) """ # 打印订单信息 print(f"订单 #{index}:") print(f" counts: {order_info['counts']}") print(f" PID: {order_info['pid']}") print(f" print_id: {order_info['print_id']}") print(f" 布局Z: {order_info['layout_z']}") """ print_ids.append(order_info['print_id']) # list_print_model_info.append(batch_model_info) # print("-" * 40) except (json.JSONDecodeError, UnicodeDecodeError): # 如果不是 JSON,返回原始字符串作为值 print("error!") url = f"https://mp.api.suwa3d.com/api/printOrder/getInfoByPrintIds?print_ids={print_ids}" res = requests.get(url) data = res.json()["data"] for index, order in enumerate(data, 1): order_info = { 'order_id': order.get('order_id'), 'pid': order.get('pid'), 'print_order_id': order.get('print_order_id'), 'real_size': order.get('real_size'), 'path': order.get('path'), 'quantity': order.get('quantity') } results.append(order_info) model_size = f"{order_info['real_size']}_x{order_info['quantity']}" # 打印订单信息 # print(f"订单 #{order_info['order_id']}:") # print(f" PID: {order_info['pid']}") # print(f" print_order_id: {order_info['print_order_id']}") # print(f" model_size: {model_size}") # print(f" quantity: {order_info['quantity']}") # print(f" path: {order_info['path']}") batch_model_info = BatchModelInfo( order_id=order_info["order_id"], pid=order_info["pid"], print_order_id=order_info["print_order_id"], model_size=model_size, path=order_info["path"], count= order_info["quantity"] if order_info["quantity"] > 1 else 1 ) list_print_model_info.append(batch_model_info) print("-" * 40) workdir = f"{print_factory_type_dir}/data/{pre_batch_id}" clear_directory_recursive(f"{print_factory_type_dir}/data/") clear_directory_recursive(f"{print_factory_type_dir}/full/") start_time = time.time() if not download_datas_by_pre_layout(list_print_model_info, workdir, oss_config): print(f"下载失败,排版终止 批次={pre_batch_id}") return print(f"下载耗时:{time.time()-start_time}") # 下载结束 # 排版开始 src_dir = pre_batch_id selected_mode="紧凑" # 标准 紧凑 output_format="JSON" # 模型 JSON base_original_obj_dir = f"{print_factory_type_dir}/data/{src_dir}" print_type_setting_obj(base_original_obj_dir=base_original_obj_dir,batch_id=pre_batch_id, selected_mode=selected_mode,output_format=output_format,selected_machine=selected_machine) #排版结束 if __name__ == '__main__': print(f"is_run_local_data={is_run_local_data()}") if is_run_local_data(): test_main() else: print(f"run main") main()