建模程序 多个定时程序
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

200 lines
10 KiB

# mysql数据库常用任务函数封装
import pymysql, socket, time
import platform,sys
if platform.system() == 'Windows':
#线上正式运行
sys.path.append('e:\\libs\\')
#本地测试
else:
sys.path.append('/data/deploy/make3d/make2/libs/')
import config,common,main_service_db
import logging
hostname = socket.gethostname()
logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR)
#获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"}
def get_task_distributed():
try:
#非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务
if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC":
#查询数据库指定内容
result = main_service_db.db_task_distributed("status = 1 order by priority desc,created_at asc limit 1 for update")
if result:
#获取需要执行的步骤
next_step = need_run_stepx(result["id"])
if next_step == "no" or next_step == "error":
print("获取需要执行的步骤 next_step",next_step)
return next_step
#非R11 R12 的主机在执行step2的时候,需要判断当前模型是否需要高精模或者photo3参与建模,如果是的话,该主机不执行这一步
if next_step == "step2":
if common.task_need_high_model_or_photo3(result["task_key"]):
print(f'模型{result["task_key"]}需要高精模或者photo3参与建模,该主机{hostname}不执行step2')
return "no"
taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]}
flagRes = update_main_and_add_detail(taskData)
if flagRes == "error":
print(f'出现错误,有可能是多个进程获取同一个任务了')
return "error"
print(f'任务ID-{taskData["task_key"]}- "执行{taskData["run_step"]}"')
return taskData
else:
return 'no_data'
else:
#R11 R12的主机 可以执行step1 2 3 的任务
#如果R11 R12的主机目前没有正在执行step2,则优先处理step2,
# print("次数",is_run_stepx_nums("step2"))
if is_run_stepx_nums("step2") < 2:
resultData = need_run_step2()
if resultData != "no":
resultData["hostname"] = hostname
flagRes = update_main_and_add_detail(resultData)
if flagRes == "error":
print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了')
return "error"
print(f'任务ID-{resultData["task_key"]}- "执行step2" ')
return resultData
#R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3
resultData = need_run_step_no_step2()
if resultData == "no":
return "no"
resultData["hostname"] = hostname
flagRes = update_main_and_add_detail(resultData)
if flagRes == "error":
print(f'出现错误,有可能是多个进程获取同一个任务了')
return "error"
print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ')
return resultData
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}")
return 'error'
#查询当前主机有没有正在执行某个任务
def is_run_stepx(step):
try:
where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update'
result = main_service_db.db_task_distributed(where)
if result:
return "yes"
else:
return "no"
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}")
return "error"
#查询当前主机某个步骤正在执行的数量
def is_run_stepx_nums(step):
try:
where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update'
return main_service_db.db_task_distributed_detail_count(where)
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}")
return "error"
#查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3
def need_run_stepx(task_distributed_id):
try:
#查询task_distributed_id 对应的子任务是否正在执行,hostname = "{hostname}" and
where = f'task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update'
result = main_service_db.db_task_distributed_detail(where)
#如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1
if result is None:
return "step1"
if result and result["finished_at"] is None:
#该任务正在运行中不需要执行下一步
return "no"
#查询改任务的最后一个状态
if result and result["finished_at"]:
if result["step"] == "step1":
return "step2"
elif result["step"] == "step2":
return "step3"
elif result["step"] == "step3":
#这里要将 主任务表的状态改为2,finished_at改为当前时间
update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"})
return "no"
return "no"
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}")
return 'error'
#查询出哪些任务需要执行step2
def need_run_step2():
try:
where = f'status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update'
result = main_service_db.db_task_distributed_list(where)
#判断result的长度
#判断是否有值
if len(result) == 0:
return "no"
#遍历循环哪笔需要执行step2
for row in result:
#判断是否有正在执行的step2
if need_run_stepx(row["id"]) == "step2":
#没有正在执行的step2,则返回该任务
return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]}
return "no"
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}")
return "error"
#查询出哪些任务需要执行指定的step
def need_run_appoint_step(step):
try:
where = f'finished_at is null order by priority desc,created_at asc for update'
result = main_service_db.db_task_distributed_list(where)
#判断是否有值
if len(result) == 0:
return "no"
#遍历循环哪笔需要执行step2
for row in result:
#判断是否有正在执行的step2
if need_run_stepx(row["id"]) == step:
#没有正在执行的step2,则返回该任务
return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]}
return "no"
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}")
return 'error'
#查询出哪些任务需要执行非step2
def need_run_step_no_step2():
try:
result = main_service_db.db_task_distributed_list("finished_at is null order by priority desc,created_at asc for update")
#判断是否有值
if len(result) == 0:
return "no"
#遍历循环哪笔需要执行step2
for row in result:
#判断是否有正在执行的step2
xstep = need_run_stepx(row["id"])
# print("查询非step2的任务列表",xstep)
if xstep != "step2" and (xstep == "step1" or xstep == "step3"):
#没有正在执行的step2,则返回该任务
return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]}
return "no"
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}")
return 'error'
#更新主表和插入明细表的步骤
def update_main_and_add_detail(data):
if data["run_step"] == "step1":
updateData = {"id":data['task_distributed_id'],"status":1,"step_last":data["run_step"],"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}
else:
updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]}
#更新主表
main_service_db.update_task_distributed(updateData)
#插入明细表数据
return main_service_db.add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})