diff --git a/__pycache__/config.cpython-310.pyc b/__pycache__/config.cpython-310.pyc deleted file mode 100644 index 7791e56..0000000 Binary files a/__pycache__/config.cpython-310.pyc and /dev/null differ diff --git a/__pycache__/libs.cpython-310.pyc b/__pycache__/libs.cpython-310.pyc deleted file mode 100644 index ab0da4b..0000000 Binary files a/__pycache__/libs.cpython-310.pyc and /dev/null differ diff --git a/__pycache__/libs_db.cpython-310.pyc b/__pycache__/libs_db.cpython-310.pyc deleted file mode 100644 index 5ba57b3..0000000 Binary files a/__pycache__/libs_db.cpython-310.pyc and /dev/null differ diff --git a/libs/__pycache__/config.cpython-310.pyc b/libs/__pycache__/config.cpython-310.pyc deleted file mode 100644 index 8ed4d7f..0000000 Binary files a/libs/__pycache__/config.cpython-310.pyc and /dev/null differ diff --git a/libs/__pycache__/libs.cpython-310.pyc b/libs/__pycache__/libs.cpython-310.pyc deleted file mode 100644 index 7fe98ed..0000000 Binary files a/libs/__pycache__/libs.cpython-310.pyc and /dev/null differ diff --git a/libs/__pycache__/libs_db.cpython-310.pyc b/libs/__pycache__/libs_db.cpython-310.pyc deleted file mode 100644 index cab6709..0000000 Binary files a/libs/__pycache__/libs_db.cpython-310.pyc and /dev/null differ diff --git a/libs/libs_db.py b/libs/libs_db.py index 7f2c289..1a4a2ec 100644 --- a/libs/libs_db.py +++ b/libs/libs_db.py @@ -14,7 +14,7 @@ def get_task(task_type): charset=config.mysql_local['charset'],) as conn: cursor = conn.cursor() - sql = f'select task_key from tasks where task_type = "{task_type}" and status = 0 order by id asc limit 1' + sql = f'select task_key from tasks where task_type = "{task_type}" and status = 0 order by priority desc, id asc limit 1' # print(f'sql: {sql}') cursor.execute(sql) data = cursor.fetchone() @@ -36,9 +36,13 @@ def add_task(data): password=config.mysql_local['password'], db=config.mysql_local['db'], charset=config.mysql_local['charset'],) as conn: - cursor = conn.cursor() + cursor = conn.cursor() - sql = f'insert into tasks (task_type, task_key) values ("{data["task_type"]}", "{data["task_key"]}")' + #判断是否是昆山教学的,是的话优先级设置为默认 + if data["psid"] == 85: + sql = f'insert into tasks (task_type, task_key) values ("{data["task_type"]}", "{data["task_key"]}")' + else: + sql = f'insert into tasks (task_type, task_key, priority) values ("{data["task_type"]}", "{data["task_key"]}", 1)' # print(f'sql: {sql}') cursor.execute(sql) conn.commit() diff --git a/logic/logic_main_service.py b/logic/logic_main_service.py new file mode 100644 index 0000000..a993009 --- /dev/null +++ b/logic/logic_main_service.py @@ -0,0 +1,370 @@ +# mysql数据库常用任务函数封装 +import pymysql, socket, time +import platform,sys +if platform.system() == 'Windows': + #sys.path.append('e:\\libs\\') + sys.path.append('libs') +else: + sys.path.append('/data/deploy/make3d/make2/libs/') +import config +# import multiprocessing +import logging +# 创建互斥锁 +# lock = multiprocessing.Lock() +hostname = socket.gethostname() + +logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) +#公共连接库 +def pymysqlAlias(): + return pymysql.connect( + host=config.mysql_local['host'], + port=config.mysql_local['port'], + user=config.mysql_local['user'], + password=config.mysql_local['password'], + db=config.mysql_local['db'], + charset=config.mysql_local['charset'],) + + +#获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"} +def get_task_distributed(): + + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = '' + # 获取数据前先获取锁 + # lock.acquire() + #非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 + if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": + sql = f'select * from task_distributed where status = 1 order by priority desc,created_at asc limit 1 for update' + cursor.execute(sql) + result = cursor.fetchone() + #print("查询主任务表",result) + if result: + #获取需要执行的步骤 + next_step = need_run_stepx(result["id"]) + if next_step == "no" or next_step == "error": + print("获取需要执行的步骤 next_step",next_step) + return next_step + taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} + flagRes = update_main_and_add_detail(taskData) + if flagRes == "error": + print(f'出现错误,有可能是多个进程获取同一个任务了') + return "error" + print(f'任务ID-{taskData["task_key"]}- "执行{taskData["run_step"]}" ') + return taskData + else: + return 'no_data' + else: + + #R11 R12的主机 可以执行step1 2 3 的任务 + #如果R11 R12的主机目前没有正在执行step2,则优先处理step2, + # print("次数",is_run_stepx_nums("step2")) + if is_run_stepx_nums("step2") < 2: + resultData = need_run_step2() + if resultData != "no": + resultData["hostname"] = hostname + flagRes = update_main_and_add_detail(resultData) + if flagRes == "error": + print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了') + return "error" + print(f'任务ID-{resultData["task_key"]}- "执行step2" ') + return resultData + + #R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 + resultData = need_run_step_no_step2() + #print("resultData",resultData) + if resultData == "no": + return "no" + + resultData["hostname"] = hostname + flagRes = update_main_and_add_detail(resultData) + if flagRes == "error": + print(f'出现错误,有可能是多个进程获取同一个任务了') + return "error" + print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') + return resultData + + #sql = f'select * from task_distributed where status != 2 order by priority desc,created_at asc limit 1' + # print(f'sql: {sql}') + + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") + return 'error' + + # finally: + # # 释放锁 + # lock.release() + +#查询当前主机有没有正在执行某个任务 +def is_run_stepx(step): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = f'select * from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchone() + if result: + return "yes" + else: + return "no" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") + return "error" + + +#查询当前主机某个步骤正在执行的数量 +def is_run_stepx_nums(step): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = f'select count(*) as nums from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchone() + return result["nums"] + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") + return "error" + + +#查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3 +def need_run_stepx(task_distributed_id): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + #查询task_distributed_id 对应的子任务是否正在执行, + sql = f'select * from task_distributed_detail where hostname = "{hostname}" and task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchone() + #如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 + if result is None: + return "step1" + if result and result["finished_at"] is None: + #该任务正在运行中不需要执行下一步 + return "no" + #查询改任务的最后一个状态 + if result and result["finished_at"]: + if result["step"] == "step1": + return "step2" + elif result["step"] == "step2": + return "step3" + elif result["step"] == "step3": + #这里要将 主任务表的状态改为2,finished_at改为当前时间 + update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"}) + return "no" + return "no" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") + return 'error' + +#查询出哪些任务需要执行step2 +def need_run_step2(): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = f'select * from task_distributed where status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchall() + + #判断result的长度 + #判断是否有值 + if len(result) == 0: + return "no" + #遍历循环哪笔需要执行step2 + + for row in result: + #判断是否有正在执行的step2 + if need_run_stepx(row["id"]) == "step2": + #没有正在执行的step2,则返回该任务 + return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]} + return "no" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") + return "error" + +#查询出哪些任务需要执行指定的step +def need_run_appoint_step(step): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchall() + #判断是否有值 + if len(result) == 0: + return "no" + #遍历循环哪笔需要执行step2 + for row in result: + #判断是否有正在执行的step2 + if need_run_stepx(row["id"]) == step: + #没有正在执行的step2,则返回该任务 + return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]} + return "no" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") + return 'error' + +#查询出哪些任务需要执行非step2 +def need_run_step_no_step2(): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchall() + #判断是否有值 + if len(result) == 0: + return "no" + #遍历循环哪笔需要执行step2 + + for row in result: + #判断是否有正在执行的step2 + xstep = need_run_stepx(row["id"]) + # print("查询非step2的任务列表",xstep) + if xstep != "step2" and (xstep == "step1" or xstep == "step3"): + #没有正在执行的step2,则返回该任务 + return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} + return "no" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") + return 'error' + + +# 在任务分布执行主表插入任务,所有的任务执行都从这里获取 +def add_task_distributed(data): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor() + + sql = f'insert into task_distributed (task_type, task_key,created_at) values ("{data["task_type"]}", "{data["task_key"]}","{now()}")' + # print(f'sql: {sql}') + cursor.execute(sql) + + # 获取插入数据的自增ID + last_insert_id = cursor.lastrowid + conn.commit() + return last_insert_id + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") + return "error" + +# 在task_distributed_detail插入明细步骤 +def add_task_distributed_detail(data): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor() + + sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{hostname}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' + # print(f'sql: {sql}') + cursor.execute(sql) + conn.commit() + return "ok" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") + return "error" + +# 更新 task_distributed 主表 +def update_task_distributed(data): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor() + + sql = f'update task_distributed set ' + #判断要更新哪些字段 + if "status" in data: + sql += f'status = "{data["status"]}",' + + if "hostname" in data: + sql += f'hostname = "{data["hostname"]}",' + + if "step_last" in data: + sql += f'step_last = "{data["step_last"]}",' + + if "priority" in data: + sql += f'priority = "{data["priority"]}",' + + if "started_at" in data: + sql += f'started_at = "{data["started_at"]}",' + + if "finished_at" in data: + sql += f'finished_at = "{data["finished_at"]}",' + + + #去掉 sql 最右边的逗号 + sql = sql.rstrip(',') + + + sql += f' where 1=1 ' + #条件要放在最后面 + if "id" in data: + sql += f' and id = "{data["id"]}"' + + if "task_key" in data: + sql += f' and task_type = "{data["task_key"]}" and status != 2' + + #sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"' + # print(f'sql: {sql}') + cursor.execute(sql) + conn.commit() + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") + return "error" + +# 更新 task_distributed_detail 主表 +def update_task_distributed_detail(data): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor() + + sql = f'update task_distributed_detail set ' + #判断要更新哪些字段 + if "finished_at" in data: + sql += f'finished_at = "{data["finished_at"]}"' + + if "step" in data: + sql += f',step = "{data["step"]}"' + + if "hostname" in data: + sql += f',hostname = "{data["hostname"]}"' + + #where 条件 + sql += f' where 1=1 ' + if "task_distributed_id" in data: + sql += f' and task_distributed_id = "{data["task_distributed_id"]}"' + + if "step" in data: + sql += f' and step = "{data["step"]}"' + + cursor.execute(sql) + conn.commit() + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") + return "error" + +#更新主表和插入明细表的步骤 +def update_main_and_add_detail(data): + if data["run_step"] == "step1": + updateData = {"id":data['task_distributed_id'],"status":1,"step_last":data["run_step"],"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} + else: + updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]} + #更新主表 + update_task_distributed(updateData) + #插入明细表数据 + return add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) \ No newline at end of file diff --git a/main_service.py b/main_service.py new file mode 100644 index 0000000..49dca60 --- /dev/null +++ b/main_service.py @@ -0,0 +1,46 @@ +import sys,socket,time +sys.path.append('logic') +import logic_main_service +import logging +import main_step1,main_step2,main_step3 +if __name__ == '__main__': + #循环值守 + while True: + data = logic_main_service.get_task_distributed() + #判断data数据类型 + if isinstance(data, str): + print("没有可执行的任务 sleep 3s") + time.sleep(3) + continue + else: + if data["run_step"] == "step1": + # 本地测试分布运行的用 + # time.sleep(5) + # print("更新step1的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + # logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step1","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + + #生产线上用 + main_step1.step1(data["task_key"], experience, makeloop=False,data['task_distributed_id']) + elif data["run_step"] == "step2": + # 本地测试分布运行的用 + # time.sleep(15) + # print("更新step2的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + # logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step2","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + + #生产线上用 + main_step2.step2(data["task_key"], data['task_distributed_id']) + elif data["run_step"] == "step3": + # 本地测试分布运行的用 + # time.sleep(8) + # #更新子表的finished_at + # print("更新step3的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + # logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step3","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + #更新主表的status 和 finished_at + #logic_main_service.update_task_distributed({"id":data["task_distributed_id"],"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + + #生产线上用 + main_step3.step3(data["task_key"], data['task_distributed_id']) + + + + \ No newline at end of file diff --git a/main_step1.py b/main_step1.py index c105048..5545c70 100644 --- a/main_step1.py +++ b/main_step1.py @@ -2,10 +2,12 @@ import os, sys, time, shlex, subprocess, shutil, requests, cv2, numpy as np from PIL import Image import platform if platform.system() == 'Windows': - sys.path.append('e:\\libs\\') + #sys.path.append('e:\\libs\\') + sys.path.append('libs') + sys.path.append('logic') else: sys.path.append('/data/deploy/make3d/make2/libs/') -import config, libs, libs_db +import config, libs, libs_db,logic_main_service def filter_dark_texture_image(pid): start_time = time.time() @@ -107,7 +109,7 @@ def cal_reconstruction_region(psid, pid): fix_region() print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 重建区域计算完成') -def step1(pid, experience=False, makeloop=True): +def step1(pid, experience=False, makeloop=True,task_distributed_id=""): libs_db.start_task({"task_type": "make", "task_key": pid}) print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 开始处理{pid}建模任务') @@ -158,15 +160,21 @@ def step1(pid, experience=False, makeloop=True): print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} step1图片预处理完成,共费时{libs.diff_time(start_time)}') # TODO: 更新本地step1任务状态,加入step2任务队列 - if makeloop: - os.system(f'python main_step2.py {pid}') + if task_distributed_id == "":#不是分布式任务的时候就自动往下个步骤走,是分布式任务的时候就就执行当前任务 + if makeloop: + os.system(f'python main_step2.py {pid}') + else: + os.system(f'python main_step2.py {pid}') + # if os.path.exists(os.path.join(config.sharedir, pid)): + # shutil.rmtree(os.path.join(config.sharedir, pid), ignore_errors=True) + # shutil.move(os.path.join(config.workdir, pid), config.sharedir) + + # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} step1任务完成,移动到共享目录') else: - os.system(f'python main_step2.py {pid}') - # if os.path.exists(os.path.join(config.sharedir, pid)): - # shutil.rmtree(os.path.join(config.sharedir, pid), ignore_errors=True) - # shutil.move(os.path.join(config.workdir, pid), config.sharedir) + #分布式服务执行完后,需要更新任务状态,更新字表的finished_at字段 + logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + return - # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} step1任务完成,移动到共享目录') def main(pid, experience=False, makeloop=True): if pid == '0': diff --git a/main_step2.py b/main_step2.py index 52a002d..02d03ef 100644 --- a/main_step2.py +++ b/main_step2.py @@ -1,10 +1,12 @@ import os, sys, time, shutil, subprocess, shlex import platform if platform.system() == 'Windows': - sys.path.append('e:\\libs\\') + #sys.path.append('e:\\libs\\') + sys.path.append('libs') + sys.path.append('logic') else: sys.path.append('/data/deploy/make3d/make2/libs/') -import config, libs, libs_db +import config, libs, libs_db,logic_main_service def load_model(pid): cmd = f'{config.rcbin} {config.r1["init"]} -load "{os.path.join(config.workdir, pid, f"{pid}.rcproj")}"' @@ -60,7 +62,7 @@ def make3d(pid): cmd = shlex.split(cmd) res = subprocess.run(cmd) -def step2(pid): +def step2(pid,task_distributed_id=""): print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 开始建模任务step2') if os.path.exists(os.path.join(config.sharedir, pid)) and not os.path.exists(os.path.join(config.workdir, pid)): shutil.move(os.path.join(config.sharedir, pid), config.workdir) @@ -72,8 +74,11 @@ def step2(pid): print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 建模任务step2完成,共费时{libs.diff_time(start_time)},任务已提交到step3') # 更新本地任务状态,加入step3任务队列 - - os.system(f'python d:\\make2\\main_step3.py {pid}') + if task_distributed_id == "": + os.system(f'python d:\\make2\\main_step3.py {pid}') + else: + logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + return def main(pid): if pid == '0': diff --git a/main_step3.py b/main_step3.py index 2f0f202..29044ed 100644 --- a/main_step3.py +++ b/main_step3.py @@ -2,10 +2,12 @@ import os, sys, time, bpy, math, requests, bmesh, json, shutil from PIL import Image import platform if platform.system() == 'Windows': - sys.path.append('e:\\libs\\') + #sys.path.append('e:\\libs\\') + sys.path.append('libs') + sys.path.append('logic') else: sys.path.append('/data/deploy/make3d/make2/libs/') -import config, libs, libs_db +import config, libs, libs_db,logic_main_service def bmesh_copy_from_object(obj, transform=True, triangulate=True, apply_modifiers=False): """Returns a transformed, triangulated copy of the mesh""" @@ -183,7 +185,7 @@ def export_and_update_glbs(pid): print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} glb文件导出并上传完成,共费时{libs.diff_time(start_time)}') -def step3(pid): +def step3(pid,task_distributed_id=""): print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 开始模型后道处理') start_time = time.time() # 方向、大小、位置等基础校正 @@ -207,6 +209,12 @@ def step3(pid): print('上传完成更新建模成功状态:', res.text) shutil.rmtree(os.path.join(config.workdir, pid), ignore_errors=True) libs_db.finish_task({"task_type": "make", "task_key": pid}) + if task_distributed_id: + #更新子表的finished_at + logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + #更新主表的status 和 finished_at + logic_main_service.update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + return def main(pid): if pid == '0': diff --git a/test/test.py b/test/test.py deleted file mode 100644 index 301a255..0000000 --- a/test/test.py +++ /dev/null @@ -1,30 +0,0 @@ -import sys -from PyQt5.QtCore import QUrl -from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget -from PyQt5.QtWebEngineWidgets import QWebEngineView - -class WebBrowserWindow(QMainWindow): - def __init__(self): - super().__init__() - - self.browser = QWebEngineView() - self.browser.setUrl(QUrl("https://www.qq.com")) # 设置要打开的网页 - - layout = QVBoxLayout() - layout.addWidget(self.browser) - - central_widget = QWidget() - central_widget.setLayout(layout) - - self.setCentralWidget(central_widget) - self.setWindowTitle("Web Browser") - self.setGeometry(100, 100, 800, 600) - -def main(): - app = QApplication(sys.argv) - window = WebBrowserWindow() - window.show() - sys.exit(app.exec_()) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test/use.sql b/test/use.sql deleted file mode 100644 index f81dba3..0000000 --- a/test/use.sql +++ /dev/null @@ -1,2 +0,0 @@ -SELECT *, ABS(TIMESTAMPDIFF(MINUTE , started_at, finished_at)) AS durning from tasks -