You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 

316 lines
10 KiB

import os
import subprocess
import redis
import logging
from typing import cast
import requests
from download_print import download_datas_by_pre_layout
from download_print import BatchModelInfo
from print_factory_type_setting_obj_run import print_type_setting_obj
from config import print_factory_type_dir
from config import oss_config
from config import local_data
from config import redis_config
from general import is_run_local_data
# 如果没有 DISPLAY,自动启动 Xvfb
if "DISPLAY" not in os.environ:
import atexit
from time import sleep
# 设置临时 XDG_RUNTIME_DIR
runtime_dir = f"/tmp/runtime-{os.getuid()}"
os.makedirs(runtime_dir, exist_ok=True)
os.environ["XDG_RUNTIME_DIR"] = runtime_dir
# 启动 Xvfb
xvfb_cmd = ["Xvfb", ":99", "-screen", "0", "1024x768x24", "-nolisten", "tcp"]
xvfb_proc = subprocess.Popen(xvfb_cmd)
atexit.register(lambda: xvfb_proc.terminate()) # 退出时关闭 Xvfb
# 设置 DISPLAY
os.environ["DISPLAY"] = ":99"
sleep(0.5) # 等待 Xvfb 启动
import sys
class RedisTaskQueue:
"""封装所有与 Redis 任务队列相关的操作。"""
def __init__(self, queue_key: str, redis_config: dict = redis_config):
self.queue_key = queue_key
self.client = redis.Redis(**redis_config)
def get_length(self) -> int:
"""
获取队列的长度。如果连接丢失,会尝试重新连接一次。
"""
try:# 检测queue_key是否存在
length = 0
self.client = redis.Redis(**redis_config)
if not self.client.exists(self.queue_key):
logging.warning(f"队列不存在: {self.queue_key}")
return 0
key_type = self.client.type(self.queue_key).decode('utf-8') # type: ignore
if key_type == 'list':
length = cast(int, self.client.llen(self.queue_key))
elif key_type == 'set':
length = cast(int, self.client.scard(self.queue_key))
return length
except redis.ConnectionError as e:
logging.warning(f"与 Redis 的连接中断: {e}。正在尝试重新连接...")
return 0
except Exception as e:
logging.warning(f"获取队列长度失败: {e}")
return 0
def get_info(self) -> dict:
"""
从指定的队列中取出一条信息
Returns:
dict: 取出的信息,如果队列为空或出错则返回空字典
"""
try:
if not hasattr(self, 'client') or self.client is None:
self.client = redis.Redis(**redis_config)
if not self.client.exists(self.queue_key):
logging.warning(f"队列不存在: {self.queue_key}")
return {}
key_type = self.client.type(self.queue_key).decode('utf-8')
if key_type == 'list':
# 从列表左侧弹出一个元素
data = self.client.lpop(self.queue_key)
elif key_type == 'set':
# 从集合中随机弹出一个元素
data = self.client.spop(self.queue_key)
else:
logging.warning(f"不支持的队列类型: {key_type}")
return {}
if data is None:
return {}
# print("data=", data)
# 假设存储的是 JSON 格式的字符串
try:
import json
return json.loads(data.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
# 如果不是 JSON,返回原始字符串作为值
return {"data": data.decode('utf-8')}
except redis.ConnectionError as e:
logging.warning(f"Redis 连接中断: {e}")
return {}
except Exception as e:
logging.warning(f"获取队列信息失败: {e}")
return {}
def __len__(self) -> int:
"""让我们可以对实例使用 len() 函数,更符合 Python 风格。"""
return self.get_length()
import time
import gc
def main():
redis_queue_name = "pb:print_order_type_setting"
# while True:
try:
redis_queue = RedisTaskQueue(redis_queue_name)
task_num = redis_queue.get_length()
# print("task_num=", task_num)
if task_num <= 0:
time.sleep(10)
sys.exit(0)
return
info = redis_queue.get_info()
print("info=", info)
process_clound_print(info)
gc.collect()
except Exception as e:
print(f"处理任务时出错: {e}")
time.sleep(2.5)
sys.exit(0)
def test_main():
process_clound_print(local_data)
gc.collect()
time.sleep(5)
sys.exit(0)
import os
def clear_directory_recursive(dir_path):
"""
递归清空目录:保留原目录,递归删除其所有内容和子内容。
"""
if not os.path.isdir(dir_path):
print(f"路径 {dir_path} 不是一个有效目录。")
return
for item_name in os.listdir(dir_path):
item_path = os.path.join(dir_path, item_name)
if os.path.isfile(item_path) or os.path.islink(item_path):
# 如果是文件或符号链接,直接删除
try:
os.unlink(item_path)
print(f"已删除: {item_path}")
except Exception as e:
print(f"删除失败 {item_path}: {e}")
elif os.path.isdir(item_path):
# 如果是子目录,递归调用函数清空它,然后删除这个空目录
try:
clear_directory_recursive(item_path) # 递归清空子目录
os.rmdir(item_path) # 删除现已为空的子目录
print(f"已删除子目录: {item_path}")
except Exception as e:
print(f"删除子目录失败 {item_path}: {e}")
print(f"目录 {dir_path} 下的内容已清空。")
def process_clound_print(data):
print_ids = []
list_print_model_info = []
selected_machine = "大机型"
try:
import json
# parsed = json.loads(data.decode('utf-8'))
parsed = data
pre_batch_id = parsed["pre_batch_id"]
print(f"pre_batch_id={pre_batch_id}")
machine_print_counts = parsed["machine_print_counts"]
machine_type = parsed["machine_type"]
if machine_type == "small_machine" :
selected_machine = "小机型"
else :
selected_machine = "大机型"
print_orders = parsed.get("print_orders", [])
if not print_orders:
print("没有找到订单数据")
results = []
for index, order in enumerate(print_orders, 1):
order_info = {
'index': index,
'counts': order.get('counts'),
'pid': order.get('pid'),
'print_id': order.get('print_id'),
'order_id': order.get('order_id'),
'layout_z': order.get('layout_z')
}
results.append(order_info)
"""
# 打印订单信息
print(f"订单 #{index}:")
print(f" counts: {order_info['counts']}")
print(f" PID: {order_info['pid']}")
print(f" print_id: {order_info['print_id']}")
print(f" 布局Z: {order_info['layout_z']}")
"""
print_ids.append(order_info['print_id'])
# list_print_model_info.append(batch_model_info)
# print("-" * 40)
except (json.JSONDecodeError, UnicodeDecodeError):
# 如果不是 JSON,返回原始字符串作为值
print("error!")
url = f"https://mp.api.suwa3d.com/api/printOrder/getInfoByPrintIds?print_ids={print_ids}"
res = requests.get(url)
data = res.json()["data"]
for index, order in enumerate(data, 1):
order_info = {
'order_id': order.get('order_id'),
'pid': order.get('pid'),
'print_order_id': order.get('print_order_id'),
'real_size': order.get('real_size'),
'path': order.get('path'),
'quantity': order.get('quantity')
}
results.append(order_info)
model_size = f"{order_info['real_size']}_x{order_info['quantity']}"
# 打印订单信息
# print(f"订单 #{order_info['order_id']}:")
# print(f" PID: {order_info['pid']}")
# print(f" print_order_id: {order_info['print_order_id']}")
# print(f" model_size: {model_size}")
# print(f" quantity: {order_info['quantity']}")
# print(f" path: {order_info['path']}")
batch_model_info = BatchModelInfo(
order_id=order_info["order_id"],
pid=order_info["pid"],
print_order_id=order_info["print_order_id"],
model_size=model_size,
path=order_info["path"],
count= order_info["quantity"] if order_info["quantity"] > 1 else 1
)
list_print_model_info.append(batch_model_info)
print("-" * 40)
workdir = f"{print_factory_type_dir}/data/{pre_batch_id}"
clear_directory_recursive(f"{print_factory_type_dir}/data/")
clear_directory_recursive(f"{print_factory_type_dir}/full/")
start_time = time.time()
if not download_datas_by_pre_layout(list_print_model_info, workdir, oss_config):
print(f"下载失败,排版终止 批次={pre_batch_id}")
return
print(f"下载耗时:{time.time()-start_time}")
# 下载结束
# 排版开始
src_dir = pre_batch_id
selected_mode="紧凑" # 标准 紧凑
output_format="JSON" # 模型 JSON
base_original_obj_dir = f"{print_factory_type_dir}/data/{src_dir}"
print_type_setting_obj(base_original_obj_dir=base_original_obj_dir,batch_id=pre_batch_id,
selected_mode=selected_mode,output_format=output_format,selected_machine=selected_machine)
#排版结束
if __name__ == '__main__':
print(f"is_run_local_data={is_run_local_data()}")
if is_run_local_data():
test_main()
else:
print(f"run main")
main()