You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
325 lines
10 KiB
325 lines
10 KiB
import os |
|
import subprocess |
|
import redis |
|
import logging |
|
from typing import cast |
|
import requests |
|
import json |
|
from download_print import download_datas_by_pre_layout |
|
from download_print import BatchModelInfo |
|
from print_factory_type_setting_obj_run import print_type_setting_obj |
|
|
|
from config import print_factory_type_dir |
|
from config import oss_config |
|
from config import url_get_info_by_printIds |
|
from config import local_data |
|
from config import redis_config |
|
from config import print_data_dir |
|
|
|
from general import is_run_local_data |
|
|
|
# 如果没有 DISPLAY,自动启动 Xvfb |
|
if "DISPLAY" not in os.environ: |
|
import atexit |
|
from time import sleep |
|
|
|
# 设置临时 XDG_RUNTIME_DIR |
|
runtime_dir = f"/tmp/runtime-{os.getuid()}" |
|
os.makedirs(runtime_dir, exist_ok=True) |
|
os.environ["XDG_RUNTIME_DIR"] = runtime_dir |
|
|
|
# 使用函数查找空闲的显示端口 |
|
free_display_num = 99 |
|
display_env = f":{free_display_num}" |
|
|
|
# 启动 Xvfb,使用找到的空闲端口 |
|
xvfb_cmd = ["Xvfb", display_env, "-screen", "0", "1024x768x24", "-nolisten", "tcp"] |
|
xvfb_proc = subprocess.Popen(xvfb_cmd) |
|
|
|
# 确保程序退出时终止 Xvfb 进程 |
|
def cleanup_xvfb(): |
|
xvfb_proc.terminate() |
|
xvfb_proc.wait() # 等待进程完全结束 |
|
atexit.register(cleanup_xvfb) |
|
|
|
# 设置环境变量 |
|
os.environ["DISPLAY"] = display_env |
|
sleep(1) # 给予 Xvfb 足够的启动时间 |
|
print(f"已启动 Xvfb 在显示器 {display_env}") |
|
|
|
import sys |
|
class RedisTaskQueue: |
|
"""封装所有与 Redis 任务队列相关的操作。""" |
|
def __init__(self, queue_key: str, redis_config: dict = redis_config): |
|
self.queue_key = queue_key |
|
self.client = redis.Redis(**redis_config) |
|
|
|
def get_length(self) -> int: |
|
""" |
|
获取队列的长度。如果连接丢失,会尝试重新连接一次。 |
|
""" |
|
|
|
try:# 检测queue_key是否存在 |
|
length = 0 |
|
self.client = redis.Redis(**redis_config) |
|
if not self.client.exists(self.queue_key): |
|
logging.warning(f"队列不存在: {self.queue_key}") |
|
return 0 |
|
|
|
key_type = self.client.type(self.queue_key).decode('utf-8') # type: ignore |
|
if key_type == 'list': |
|
length = cast(int, self.client.llen(self.queue_key)) |
|
elif key_type == 'set': |
|
length = cast(int, self.client.scard(self.queue_key)) |
|
return length |
|
except redis.ConnectionError as e: |
|
logging.warning(f"与 Redis 的连接中断: {e}。正在尝试重新连接...") |
|
return 0 |
|
except Exception as e: |
|
logging.warning(f"获取队列长度失败: {e}") |
|
return 0 |
|
|
|
def get_info(self) -> dict: |
|
""" |
|
从指定的队列中取出一条信息 |
|
|
|
Returns: |
|
dict: 取出的信息,如果队列为空或出错则返回空字典 |
|
""" |
|
try: |
|
if not hasattr(self, 'client') or self.client is None: |
|
self.client = redis.Redis(**redis_config) |
|
|
|
if not self.client.exists(self.queue_key): |
|
logging.warning(f"队列不存在: {self.queue_key}") |
|
return {} |
|
|
|
key_type = self.client.type(self.queue_key).decode('utf-8') |
|
|
|
if key_type == 'list': |
|
# 从列表左侧弹出一个元素 |
|
data = self.client.lpop(self.queue_key) |
|
elif key_type == 'set': |
|
# 从集合中随机弹出一个元素 |
|
data = self.client.spop(self.queue_key) |
|
else: |
|
logging.warning(f"不支持的队列类型: {key_type}") |
|
return {} |
|
|
|
if data is None: |
|
return {} |
|
|
|
# print("data=", data) |
|
|
|
# 假设存储的是 JSON 格式的字符串 |
|
try: |
|
return json.loads(data.decode('utf-8')) |
|
except (json.JSONDecodeError, UnicodeDecodeError): |
|
# 如果不是 JSON,返回原始字符串作为值 |
|
return {"data": data.decode('utf-8')} |
|
|
|
except redis.ConnectionError as e: |
|
logging.warning(f"Redis 连接中断: {e}") |
|
return {} |
|
except Exception as e: |
|
logging.warning(f"获取队列信息失败: {e}") |
|
return {} |
|
|
|
def __len__(self) -> int: |
|
"""让我们可以对实例使用 len() 函数,更符合 Python 风格。""" |
|
return self.get_length() |
|
|
|
import time |
|
import gc |
|
def main(): |
|
redis_queue_name = "pb:print_order_type_setting" |
|
try: |
|
redis_queue = RedisTaskQueue(redis_queue_name) |
|
task_num = redis_queue.get_length() |
|
|
|
# print("task_num=", task_num) |
|
|
|
if task_num <= 0: |
|
time.sleep(10) |
|
sys.exit(0) |
|
return |
|
|
|
info = redis_queue.get_info() |
|
|
|
print("info=", info) |
|
|
|
process_clound_print(info) |
|
|
|
gc.collect() |
|
|
|
except Exception as e: |
|
print(f"处理任务时出错: {e}") |
|
|
|
time.sleep(2.5) |
|
|
|
sys.exit(0) |
|
|
|
def test_main(): |
|
process_clound_print(local_data) |
|
|
|
gc.collect() |
|
|
|
time.sleep(5) |
|
|
|
sys.exit(0) |
|
|
|
import os |
|
def clear_directory_recursive(dir_path): |
|
""" |
|
递归清空目录:保留原目录,递归删除其所有内容和子内容。 |
|
""" |
|
if not os.path.isdir(dir_path): |
|
print(f"路径 {dir_path} 不是一个有效目录。") |
|
return |
|
|
|
for item_name in os.listdir(dir_path): |
|
item_path = os.path.join(dir_path, item_name) |
|
|
|
if os.path.isfile(item_path) or os.path.islink(item_path): |
|
# 如果是文件或符号链接,直接删除 |
|
try: |
|
os.unlink(item_path) |
|
# print(f"已删除: {item_path}") |
|
except Exception as e: |
|
print(f"删除失败 {item_path}: {e}") |
|
elif os.path.isdir(item_path): |
|
# 如果是子目录,递归调用函数清空它,然后删除这个空目录 |
|
try: |
|
clear_directory_recursive(item_path) # 递归清空子目录 |
|
os.rmdir(item_path) # 删除现已为空的子目录 |
|
print(f"已删除子目录: {item_path}") |
|
except Exception as e: |
|
print(f"删除子目录失败 {item_path}: {e}") |
|
|
|
print(f"目录 {dir_path} 下的内容已清空。") |
|
|
|
def process_clound_print(data): |
|
print_ids = [] |
|
list_print_model_info = [] |
|
|
|
selected_machine = "大机型" |
|
|
|
try: |
|
parsed = data |
|
|
|
pre_batch_id = parsed["pre_batch_id"] |
|
print(f"pre_batch_id={pre_batch_id}") |
|
|
|
machine_print_counts = parsed["machine_print_counts"] |
|
|
|
machine_type = parsed["machine_type"] |
|
|
|
if machine_type == "small_machine" : |
|
selected_machine = "小机型" |
|
else : |
|
selected_machine = "大机型" |
|
|
|
print_orders = parsed.get("print_orders", []) |
|
|
|
if not print_orders: |
|
print("没有找到订单数据") |
|
|
|
results = [] |
|
|
|
for index, order in enumerate(print_orders, 1): |
|
order_info = { |
|
'index': index, |
|
'counts': order.get('counts'), |
|
'pid': order.get('pid'), |
|
'print_id': order.get('print_id'), |
|
'order_id': order.get('order_id'), |
|
'layout_z': order.get('layout_z') |
|
} |
|
|
|
results.append(order_info) |
|
|
|
""" |
|
# 打印订单信息 |
|
print(f"订单 #{index}:") |
|
print(f" counts: {order_info['counts']}") |
|
print(f" PID: {order_info['pid']}") |
|
print(f" print_id: {order_info['print_id']}") |
|
print(f" 布局Z: {order_info['layout_z']}") |
|
""" |
|
|
|
print_ids.append(order_info['print_id']) |
|
|
|
# list_print_model_info.append(batch_model_info) |
|
|
|
# print("-" * 40) |
|
|
|
except (json.JSONDecodeError, UnicodeDecodeError): |
|
# 如果不是 JSON,返回原始字符串作为值 |
|
print("error!") |
|
|
|
res = requests.get(f"{url_get_info_by_printIds}{print_ids}") |
|
|
|
data = res.json()["data"] |
|
|
|
for index, order in enumerate(data, 1): |
|
order_info = { |
|
'order_id': order.get('order_id'), |
|
'pid': order.get('pid'), |
|
'print_order_id': order.get('print_order_id'), |
|
'real_size': order.get('real_size'), |
|
'path': order.get('path'), |
|
'quantity': order.get('quantity') |
|
} |
|
|
|
results.append(order_info) |
|
|
|
model_size = f"{order_info['real_size']}_x{order_info['quantity']}" |
|
# 打印订单信息 |
|
# print(f"订单 #{order_info['order_id']}:") |
|
# print(f" PID: {order_info['pid']}") |
|
# print(f" print_order_id: {order_info['print_order_id']}") |
|
# print(f" model_size: {model_size}") |
|
# print(f" quantity: {order_info['quantity']}") |
|
# print(f" path: {order_info['path']}") |
|
|
|
batch_model_info = BatchModelInfo( |
|
order_id=order_info["order_id"], |
|
pid=order_info["pid"], |
|
print_order_id=order_info["print_order_id"], |
|
model_size=model_size, |
|
path=order_info["path"], |
|
count= order_info["quantity"] if order_info["quantity"] > 1 else 1 |
|
) |
|
list_print_model_info.append(batch_model_info) |
|
|
|
# print("-" * 40) |
|
|
|
workdir = f"{print_data_dir}{pre_batch_id}" |
|
|
|
clear_directory_recursive(f"{print_data_dir}") |
|
clear_directory_recursive(f"{print_factory_type_dir}/full/") |
|
|
|
start_time = time.time() |
|
if not download_datas_by_pre_layout(list_print_model_info, workdir, oss_config): |
|
print(f"下载失败,排版终止 批次={pre_batch_id}") |
|
return |
|
print(f"下载耗时:{time.time()-start_time}") |
|
# 下载结束 |
|
|
|
# 排版开始 |
|
src_dir = pre_batch_id |
|
selected_mode="紧凑" # 标准 紧凑 |
|
output_format="JSON" # 模型 JSON |
|
base_original_obj_dir = f"{print_data_dir}{src_dir}" |
|
print_type_setting_obj(base_original_obj_dir=base_original_obj_dir,batch_id=pre_batch_id, |
|
selected_mode=selected_mode,output_format=output_format,selected_machine=selected_machine) |
|
#排版结束 |
|
|
|
if __name__ == '__main__': |
|
print(f"is_run_local_data={is_run_local_data()}") |
|
if is_run_local_data(): |
|
test_main() |
|
else: |
|
print(f"run main") |
|
main() |
|
|
|
|