建模程序 多个定时程序
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

134 lines
4.6 KiB

import os,shutil
import redis
import oss2,time,sys
import requests
import argparse,json
# 将当前脚本所在目录添加到 Python 路径,以便导入 utils 模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# from download_print_out import download_transform_save_by_json
from utils.oss_redis import redisClient
from utils.funcs import downloadJsonAndJpgFileAndMoveToCorrectDir, downloadDataByOssAndTransformSave
# 默认使用脚本所在目录
currentDir = os.path.dirname(os.path.abspath(__file__))
ENV = 'prod'
url = 'https://mp.api.suwa3d.com'
if ENV == 'dev':
url = 'http://mp.api.dev.com'
elif ENV == 'prod':
url = 'https://mp.api.suwa3d.com'
#判断是否上传了 JSON 文件
def step1(versionId):
# 下载json 文件 和 图片
dirName,machineInfo = downloadJsonAndJpgFileAndMoveToCorrectDir(versionId,currentDir)
if not dirName:
return False
#判断是否是小机台
isSmallMachine = False
if str(machineInfo["machine_type"]) == '1':
isSmallMachine = True
#下载数据,转换数据
res = downloadDataByOssAndTransformSave(dirName,isSmallMachine)
if not res:
return False
#判断下载的obj文件数量和json里的是否一致,排除arrange文件夹
objFilePath = os.path.join(dirName, 'data')
objCounts = 0
for file in os.listdir(objFilePath):
if file == 'arrange':
continue
if file.endswith('.obj'):
objCounts += 1
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 下载处理完成的obj文件数量: {objCounts}')
# requestApiToUpdateSliceStatus(versionId,objCounts)
# 读取 队列中一个数据出来
def main(work_dir=None):
# 如果指定了工作目录,使用指定的目录
if work_dir:
work_dir = os.path.abspath(work_dir)
if not os.path.exists(work_dir):
print(f'指定的工作目录不存在: {work_dir},将创建该目录')
os.makedirs(work_dir, exist_ok=True)
currentDir = work_dir
print(f'使用指定的工作目录: {currentDir}')
else:
print(f'没有指定工作目录,退出')
exit(0)
# 循环处理,直到队列为空
while True:
r = redisClient()
#检测队列是否有值
if r.scard('pb:sliceing') == 0:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 队列为空,等待10秒')
time.sleep(10)
continue
#获取队列中的值
data = r.spop('pb:sliceing')
if data is None:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据为空,等待10秒')
time.sleep(10)
continue
data = data.decode('utf-8')
#判断是否是数字
if not data.isdigit():
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 取出的数据不是数字,等待10秒')
time.sleep(10)
continue
versionId = str(data)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
res = step1(versionId)
if res == False:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} JSON文件下载数据失败,等待10秒')
time.sleep(10)
continue
# 在长时间操作后,确保 Redis 连接仍然有效
# 通过重新获取客户端来触发连接检查
try:
r = redisClient()
r.ping() # 测试连接
except Exception as e:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} Redis连接检查失败: {str(e)},将在下次循环时自动重连')
#time.sleep(10)
def testMain():
global currentDir
currentDir = "/Users/dcx/code/make2/script/factory_sliceing_v2/tempData"
versionId = '10153' #'10153 10158'
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理版次ID={versionId}')
res = step1(versionId)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 处理完成,res={res}')
if __name__ == '__main__':
testMain()
# parser = argparse.ArgumentParser(description='排版打印订单处理程序')
# parser.add_argument(
# '--work-dir',
# type=str,
# default=None,
# help='指定工作目录(磁盘路径),例如: D:/work 或 /Users/username/work。如果不指定,则使用脚本所在目录'
# )
# args = parser.parse_args()
# main(work_dir=args.work_dir)