From 10c2146bd0ee52ba75c18cf646f5505981f7f268 Mon Sep 17 00:00:00 2001 From: dongchangxi <458593490@qq.com> Date: Tue, 10 Oct 2023 14:43:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=B6=85=E6=97=B6=E6=A3=80?= =?UTF-8?q?=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- libs/libs_db.py | 3 + main_service.py | 3 +- main_step1.py | 7 +- main_step2.py | 7 +- main_step3.py | 8 +-- timer/check_task.py | 164 ++++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 178 insertions(+), 14 deletions(-) create mode 100644 timer/check_task.py diff --git a/libs/libs_db.py b/libs/libs_db.py index 1a4a2ec..3aef145 100644 --- a/libs/libs_db.py +++ b/libs/libs_db.py @@ -41,6 +41,9 @@ def add_task(data): #判断是否是昆山教学的,是的话优先级设置为默认 if data["psid"] == 85: sql = f'insert into tasks (task_type, task_key) values ("{data["task_type"]}", "{data["task_key"]}")' + elif data["psid"] == 1: + #实验室的订单走分布式处理 + sql = f'insert into task_distributed (task_type, task_key, priority,created_at) values ("{data["task_type"]}", "{data["task_key"]}", 1,"{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' else: sql = f'insert into tasks (task_type, task_key, priority) values ("{data["task_type"]}", "{data["task_key"]}", 1)' # print(f'sql: {sql}') diff --git a/main_service.py b/main_service.py index 49dca60..c2274a1 100644 --- a/main_service.py +++ b/main_service.py @@ -1,6 +1,5 @@ import sys,socket,time -sys.path.append('logic') -import logic_main_service +from logic import logic_main_service import logging import main_step1,main_step2,main_step3 if __name__ == '__main__': diff --git a/main_step1.py b/main_step1.py index 7917953..ec171ae 100644 --- a/main_step1.py +++ b/main_step1.py @@ -2,12 +2,11 @@ import os, sys, time, shlex, subprocess, shutil, requests, cv2, numpy as np from PIL import Image import platform if platform.system() == 'Windows': - #sys.path.append('e:\\libs\\') - sys.path.append('libs') - sys.path.append('logic') + sys.path.append('e:\\libs\\') else: sys.path.append('/data/deploy/make3d/make2/libs/') -import config, libs, libs_db,logic_main_service +from logic import logic_main_service +import config, libs, libs_db def filter_dark_texture_image(pid): start_time = time.time() diff --git a/main_step2.py b/main_step2.py index a29c860..fb7f54c 100644 --- a/main_step2.py +++ b/main_step2.py @@ -1,12 +1,11 @@ import os, sys, time, shutil, subprocess, shlex import platform if platform.system() == 'Windows': - #sys.path.append('e:\\libs\\') - sys.path.append('libs') - sys.path.append('logic') + sys.path.append('e:\\libs\\') else: sys.path.append('/data/deploy/make3d/make2/libs/') -import config, libs, libs_db,logic_main_service +from logic import logic_main_service +import config, libs, libs_db def load_model(pid): cmd = f'{config.rcbin} {config.r1["init"]} -load "{os.path.join(config.workdir, pid, f"{pid}.rcproj")}"' diff --git a/main_step3.py b/main_step3.py index 29044ed..75ab35d 100644 --- a/main_step3.py +++ b/main_step3.py @@ -2,12 +2,12 @@ import os, sys, time, bpy, math, requests, bmesh, json, shutil from PIL import Image import platform if platform.system() == 'Windows': - #sys.path.append('e:\\libs\\') - sys.path.append('libs') - sys.path.append('logic') + sys.path.append('e:\\libs\\') + #sys.path.append('libs') else: sys.path.append('/data/deploy/make3d/make2/libs/') -import config, libs, libs_db,logic_main_service +from logic import logic_main_service +import config, libs, libs_db def bmesh_copy_from_object(obj, transform=True, triangulate=True, apply_modifiers=False): """Returns a transformed, triangulated copy of the mesh""" diff --git a/timer/check_task.py b/timer/check_task.py new file mode 100644 index 0000000..8a14b50 --- /dev/null +++ b/timer/check_task.py @@ -0,0 +1,164 @@ +import requests +import json +import pymysql, socket, time +import platform,sys +import logging +import os +if platform.system() == 'Windows': + #线上正式运行 + + #本地测试 + #sys.path.append('libs') + # 判断是否存在libs目录 + if os.path.exists('e:\\libs\\'): + sys.path.append('e:\\libs\\') + else: + sys.path.append('libs') + +else: + sys.path.append('/data/deploy/make3d/make2/libs/') + +import config + +#公共连接库 +def pymysqlAlias(): + return pymysql.connect( + host=config.mysql_local['host'], + port=config.mysql_local['port'], + user=config.mysql_local['user'], + password=config.mysql_local['password'], + db=config.mysql_local['db'], + charset=config.mysql_local['charset'],) + +def notify(user_agent_id,content): + if user_agent_id == "": + return "user_agent_id 不能为空" + + if content == "": + return "content 不能为空" + + #获取token + + data = { + 'userId': user_agent_id, + 'message': content, + } + headers = {'Content-Type': 'application/json'} + message_send_url = "https://mp.api.suwa3d.com/api/qyNotify/sendMessage?userId="+user_agent_id+"&message="+content + response = requests.post(message_send_url, data=json.dumps(data), headers=headers) + + + + +#检测 task_distributed 有哪些任务是卡住时间很长没处理的 +def check_task_distributed_detail(): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + #查询出当前还有多少个待处理的任务 + sqlWait = f'select count(*) as nums from task_distributed where finished_at is null' + cursor.execute(sqlWait) + resultWait = cursor.fetchone() + waitNums = resultWait["nums"] + + sql = f'select * from task_distributed_detail where finished_at is null order by started_at asc' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchall() + #判断是否有值 + if len(result) == 0: + return "no" + + nowTaskNums = len(result) + #遍历循环每个任务对应的步骤已经执行多长时间了 + for row in result: + + taskData = get_task_distributed_by_id(row["task_distributed_id"]) + if taskData == "error": + notify("DongZhangXi",f'task_distributed_id{row["task_distributed_id"]}的数据异常') + continue + + if row["step"] == "step1": + #判断是否超过了30=分钟 + if (time.time() - time.mktime(row["started_at"].timetuple())) > 180: + #发送消息通知 + notify("DongZhangXi",f'任务{taskData["task_key"]}的{row["step"]}步骤已经超运行太久了,当前还有{waitNums}个任务未完成') + + if row["step"] == "step2": + #判断是否超过了10分钟 + if (time.time() - time.mktime(row["started_at"].timetuple())) > 60*10: + #发送消息通知 + notify("DongZhangXi",f'任务{taskData["task_key"]}的{row["step"]}步骤已经超运行太久了,当前还有{waitNums}个任务未完成') + + if row["step"] == "step3": + #判断是否超过了10分钟 + if (time.time() - time.mktime(row["started_at"].timetuple())) > 60*8: + #发送消息通知 + notify("DongZhangXi",f'任务{taskData["task_key"]}的{row["step"]}步骤已经超运行太久了,当前还有{waitNums}个任务未完成') + return "no" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行check_task_distributed_detail异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行check_task_distributed_detail异常: {str(e)}") + return 'error' + +#检测 task 有哪些任务是卡住时间很长没有处理的 +def check_task(): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + #查询出当前还有多少个待处理的任务 + sqlWait = f'select count(*) as nums from tasks where finished_at is null' + cursor.execute(sqlWait) + resultWait = cursor.fetchone() + waitNums = resultWait["nums"] + + sql = f'select * from tasks where status = 1 and finished_at is null order by created_at asc' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchall() + #判断是否有值 + if len(result) == 0: + return "no" + + nowTaskNums = len(result) + #遍历循环每个任务对应的步骤已经执行多长时间了 + for row in result: + #判断是否超过了30=分钟 + if (time.time() - time.mktime(row["created_at"].timetuple())) > 60*25: + #发送消息通知 + notify("DongZhangXi",f'任务{row["task_key"]}已经超运行超过25分钟了,当前还有{waitNums}个任务未完成') + return "no" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行check_task_distributed_detail异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行check_task_distributed_detail异常: {str(e)}") + return 'error' + + +#查询task_distributed 数据 +def get_task_distributed_by_id(id): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = f'select * from task_distributed where id = {id}' + # print(f'sql: {sql}') + cursor.execute(sql) + result = cursor.fetchone() + #判断是否有值 + if result == None: + return "error" + else: + return result + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed_by_id异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed_by_id异常: {str(e)}") + return 'error' + +#程序主入口 +if __name__ == '__main__': + #开启死循环 + while True: + check_task_distributed_detail() + check_task() + #两分钟检测一次 + time.sleep(120) + \ No newline at end of file