import requests import json import pymysql, socket, time import platform,sys import logging import os if platform.system() == 'Windows': #线上正式运行 #本地测试 #sys.path.append('libs') # 判断是否存在libs目录 if os.path.exists('e:\\libs\\'): sys.path.append('e:\\libs\\') else: sys.path.append('libs') else: sys.path.append('/data/deploy/make3d/make2/libs/') import config #公共连接库 def pymysqlAlias(): return pymysql.connect( host=config.mysql_local['host'], port=config.mysql_local['port'], user=config.mysql_local['user'], password=config.mysql_local['password'], db=config.mysql_local['db'], charset=config.mysql_local['charset'],) def notify(user_agent_id,content): if user_agent_id == "": return "user_agent_id 不能为空" if content == "": return "content 不能为空" #获取token data = { 'userId': user_agent_id, 'message': content, } headers = {'Content-Type': 'application/json'} message_send_url = "https://mp.api.suwa3d.com/api/qyNotify/sendMessage?userId="+user_agent_id+"&message="+content response = requests.post(message_send_url, data=json.dumps(data), headers=headers) #检测 task_distributed 有哪些任务是卡住时间很长没处理的 def check_task_distributed_detail(): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) #查询出当前还有多少个待处理的任务 sqlWait = f'select count(*) as nums from task_distributed where finished_at is null' cursor.execute(sqlWait) resultWait = cursor.fetchone() waitNums = resultWait["nums"] sql = f'select * from task_distributed_detail where finished_at is null order by started_at asc' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchall() #判断是否有值 if len(result) == 0: return "no" nowTaskNums = len(result) #遍历循环每个任务对应的步骤已经执行多长时间了 for row in result: taskData = get_task_distributed_by_id(row["task_distributed_id"]) if taskData == "error": notify("DongZhangXi",f'task_distributed_id{row["task_distributed_id"]}的数据异常') continue if row["step"] == "step1": #判断是否超过了30=分钟 if (time.time() - time.mktime(row["started_at"].timetuple())) > 180: #发送消息通知 notify("DongZhangXi",f'任务{taskData["task_key"]}的{row["step"]}步骤已经超运行太久了,当前还有{waitNums}个任务未完成') if row["step"] == "step2": #判断是否超过了10分钟 if (time.time() - time.mktime(row["started_at"].timetuple())) > 60*10: #发送消息通知 notify("DongZhangXi",f'任务{taskData["task_key"]}的{row["step"]}步骤已经超运行太久了,当前还有{waitNums}个任务未完成') if row["step"] == "step3": #判断是否超过了10分钟 if (time.time() - time.mktime(row["started_at"].timetuple())) > 60*8: #发送消息通知 notify("DongZhangXi",f'任务{taskData["task_key"]}的{row["step"]}步骤已经超运行太久了,当前还有{waitNums}个任务未完成') return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行check_task_distributed_detail异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行check_task_distributed_detail异常: {str(e)}") return 'error' #检测 task 有哪些任务是卡住时间很长没有处理的 def check_task(): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) #查询出当前还有多少个待处理的任务 sqlWait = f'select count(*) as nums from tasks where finished_at is null' cursor.execute(sqlWait) resultWait = cursor.fetchone() waitNums = resultWait["nums"] sql = f'select * from tasks where status = 1 and finished_at is null order by created_at asc' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchall() #判断是否有值 if len(result) == 0: return "no" nowTaskNums = len(result) #遍历循环每个任务对应的步骤已经执行多长时间了 for row in result: #判断是否超过了30=分钟 if (time.time() - time.mktime(row["created_at"].timetuple())) > 60*25: #发送消息通知 notify("DongZhangXi",f'任务{row["task_key"]}已经超运行超过25分钟了,当前还有{waitNums}个任务未完成') return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行check_task_distributed_detail异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行check_task_distributed_detail异常: {str(e)}") return 'error' #查询task_distributed 数据 def get_task_distributed_by_id(id): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = f'select * from task_distributed where id = {id}' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchone() #判断是否有值 if result == None: return "error" else: return result except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed_by_id异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed_by_id异常: {str(e)}") return 'error' #程序主入口 if __name__ == '__main__': #开启死循环 while True: check_task_distributed_detail() check_task() #两分钟检测一次 time.sleep(120)