import os,shutil import redis import oss2,time,sys import requests import argparse,json # 将当前脚本所在目录添加到 Python 路径,以便导入 utils 模块 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # from download_print_out import download_transform_save_by_json from utils.oss_redis import redisClient from utils.funcs import downloadJsonAndJpgFileAndMoveToCorrectDir, downloadDataByOssAndTransformSave # 默认使用脚本所在目录 currentDir = os.path.dirname(os.path.abspath(__file__)) ENV = 'prod' url = 'https://mp.api.suwa3d.com' if ENV == 'dev': url = 'http://mp.api.dev.com' elif ENV == 'prod': url = 'https://mp.api.suwa3d.com' #判断是否上传了 JSON 文件 def step1(versionId): # 下载json 文件 和 图片 dirName,machineInfo = downloadJsonAndJpgFileAndMoveToCorrectDir(versionId,currentDir) if not dirName: return False #判断是否是小机台 isSmallMachine = False if str(machineInfo["machine_type"]) == '1': isSmallMachine = True #下载数据,转换数据 res = downloadDataByOssAndTransformSave(dirName,isSmallMachine) if not res: return False #判断下载的obj文件数量和json里的是否一致,排除arrange文件夹 objFilePath = os.path.join(dirName, 'data') objCounts = 0 for file in os.listdir(objFilePath): if file == 'arrange': continue if file.endswith('.obj'): objCounts += 1 print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 下载处理完成的obj文件数量: {objCounts}') # requestApiToUpdateSliceStatus(versionId,objCounts) # 读取 队列中一个数据出来 def main(work_dir=None): # 如果指定了工作目录,使用指定的目录 if work_dir: work_dir = os.path.abspath(work_dir) if not os.path.exists(work_dir): print(f'指定的工作目录不存在: {work_dir},将创建该目录') os.makedirs(work_dir, exist_ok=True) currentDir = work_dir print(f'使用指定的工作目录: {currentDir}') else: print(f'没有指定工作目录,退出') exit(0) # 循环处理,直到队列为空 while True: r = redisClient() #检测队列是否有值 if r.scard('pb:sliceing') == 0: print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒') time.sleep(10) continue #获取队列中的值 data = r.spop('pb:sliceing') if data is None: print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒') time.sleep(10) continue data = data.decode('utf-8') #判断是否是数字 if not data.isdigit(): print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒') time.sleep(10) continue versionId = str(data) print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}') res = step1(versionId) if res == False: print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒') time.sleep(10) continue # 在长时间操作后,确保 Redis 连接仍然有效 # 通过重新获取客户端来触发连接检查 try: r = redisClient() r.ping() # 测试连接 except Exception as e: print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连') #time.sleep(10) def testMain(): global currentDir currentDir = "/Users/dcx/code/make2/script/factory_sliceing_v2/tempData" versionId = '10158' #'10153 10158' print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}') res = step1(versionId) print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 处理完成,res={res}') if __name__ == '__main__': testMain() # parser = argparse.ArgumentParser(description='排版打印订单处理程序') # parser.add_argument( # '--work-dir', # type=str, # default=None, # help='指定工作目录(磁盘路径),例如: D:/work 或 /Users/username/work。如果不指定,则使用脚本所在目录' # ) # args = parser.parse_args() # main(work_dir=args.work_dir)