# mysql数据库常用任务函数封装 import pymysql, socket, time import platform,sys if platform.system() == 'Windows': #线上正式运行 sys.path.append('e:\\libs\\') #本地测试 else: sys.path.append('/data/deploy/make3d/make2/libs/') import config,common,main_service_db import logging hostname = socket.gethostname() logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) #获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"} def get_task_distributed(): try: #非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": #查询数据库指定内容 result = main_service_db.db_task_distributed("status = 1 order by priority desc,created_at asc limit 1 for update") if result: #获取需要执行的步骤 #next_step = need_run_stepx(result["id"]) next_step_result = need_run_step_no_step1() if next_step_result == "no" or next_step_result == "error": print("获取需要执行的步骤 next_step",next_step_result) return next_step_result #非R11 R12 的主机在执行step2的时候,需要判断当前模型是否需要高精模或者photo3参与建模,如果是的话,该主机不执行这一步 # if next_step_result["run_step"] == "step2": # if common.task_need_high_model_or_photo3(next_step_result["task_key"]): # print(f'模型{next_step_result["task_key"]}需要高精模或者photo3参与建模,该主机{hostname}不执行step2') # return "no" #taskData = next_step_result #{"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} flagRes = update_main_and_add_detail(next_step_result) if flagRes == "error": print(f'出现错误,有可能是多个进程获取同一个任务了') return "error" print(f'任务ID-{next_step_result["task_key"]}- "执行{next_step_result["run_step"]}"') return next_step_result else: return 'no_data' else: #优先处理step1 和 step3 #R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 resultData = need_run_step_no_step2() if resultData == "no": #return "no" #R11 R12的主机 可以执行step1 2 3 的任务 #如果R11 R12的主机目前没有正在执行step2,则优先处理step2, # print("次数",is_run_stepx_nums("step2")) if is_run_stepx_nums("step2") < 200000: resultData = need_run_step2() if resultData != "no": resultData["hostname"] = hostname flagRes = update_main_and_add_detail(resultData) if flagRes == "error": print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了') return "error" print(f'任务ID-{resultData["task_key"]}- "执行step2" ') return resultData #没有任何可执行的 return "no" resultData["hostname"] = hostname flagRes = update_main_and_add_detail(resultData) if flagRes == "error": print(f'出现错误,有可能是多个进程获取同一个任务了') return "error" print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') return resultData except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") return 'error' #查询当前主机有没有正在执行某个任务 def is_run_stepx(step): try: where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update' result = main_service_db.db_task_distributed(where) if result: return "yes" else: return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") return "error" #查询当前主机某个步骤正在执行的数量 def is_run_stepx_nums(step): try: where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update' return main_service_db.db_task_distributed_detail_count(where) except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") return "error" #查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3 def need_run_stepx(task_distributed_id): try: #查询task_distributed_id 对应的子任务是否正在执行,hostname = "{hostname}" and where = f'task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' result = main_service_db.db_task_distributed_detail(where) #如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 if result is None: return "step1" if result and result["finished_at"] is None: #该任务正在运行中不需要执行下一步 return "no" #查询改任务的最后一个状态 if result and result["finished_at"]: if result["step"] == "step1": return "step2" elif result["step"] == "step2": return "step3" elif result["step"] == "step3": #这里要将 主任务表的状态改为2,finished_at改为当前时间 main_service_db.update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"}) return "no" return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") return 'error' #查询出哪些任务需要执行step2 def need_run_step2(): try: where = f'status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update' result = main_service_db.db_task_distributed_list(where) #判断result的长度 #判断是否有值 if len(result) == 0: return "no" #遍历循环哪笔需要执行step2 for row in result: #判断是否有正在执行的step2 if need_run_stepx(row["id"]) == "step2": #没有正在执行的step2,则返回该任务 return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]} return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") return "error" #查询出哪些任务需要执行指定的step def need_run_appoint_step(step): try: where = f'finished_at is null order by priority desc,created_at asc for update' result = main_service_db.db_task_distributed_list(where) #判断是否有值 if len(result) == 0: return "no" #遍历循环哪笔需要执行step2 for row in result: #判断是否有正在执行的step2 if need_run_stepx(row["id"]) == step: #没有正在执行的step2,则返回该任务 return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]} return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") return 'error' #查询出哪些任务需要执行非step2 def need_run_step_no_step2(): try: result = main_service_db.db_task_distributed_list("finished_at is null order by priority desc,created_at asc for update") #判断是否有值 if len(result) == 0: return "no" #遍历循环哪笔需要执行step2 for row in result: #判断是否有正在执行的step2 xstep = need_run_stepx(row["id"]) # print("查询非step2的任务列表",xstep) if xstep != "step2" and (xstep == "step1"): #没有正在执行的step2,则返回该任务 return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") return 'error' #查询出哪些任务需要执行非step1 def need_run_step_no_step1(): try: result = main_service_db.db_task_distributed_list("finished_at is null order by priority desc,created_at asc for update") #判断是否有值 if len(result) == 0: return "no" #遍历循环哪笔需要执行step2 for row in result: #判断是否有正在执行的step2 xstep = need_run_stepx(row["id"]) #print("查询非step1的任务列表",xstep,row["id"]) if xstep == "step2": if common.task_need_high_model_or_photo3(row["task_key"]) and hostname != "R11" and hostname != "R12": continue #没有正在执行的step1,则返回该任务 return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step1()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step1()异常: {str(e)}") return 'error' #更新主表和插入明细表的步骤 def update_main_and_add_detail(data): if data["run_step"] == "step1": updateData = {"id":data['task_distributed_id'],"status":1,"step_last":data["run_step"],"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} else: updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]} #更新主表 main_service_db.update_task_distributed(updateData) #插入明细表数据 return main_service_db.add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})