You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
237 lines
13 KiB
237 lines
13 KiB
# mysql数据库常用任务函数封装 |
|
import pymysql, socket, time |
|
import platform,sys |
|
if platform.system() == 'Windows': |
|
#线上正式运行 |
|
sys.path.append('e:\\libs\\') |
|
#本地测试 |
|
else: |
|
sys.path.append('/data/deploy/make3d/make2/libs/') |
|
import config,common,main_service_db |
|
import logging |
|
hostname = socket.gethostname() |
|
logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) |
|
|
|
|
|
#获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"} |
|
def get_task_distributed(): |
|
try: |
|
#非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 |
|
if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": |
|
#查询数据库指定内容 |
|
result = main_service_db.db_task_distributed("status = 1 order by priority desc,created_at asc limit 1 for update") |
|
if result: |
|
#获取需要执行的步骤 |
|
#next_step = need_run_stepx(result["id"]) |
|
next_step_result = need_run_step_no_step1() |
|
if next_step_result == "no" or next_step_result == "error": |
|
print("获取需要执行的步骤 next_step",next_step_result) |
|
return next_step_result |
|
#非R11 R12 的主机在执行step2的时候,需要判断当前模型是否需要高精模或者photo3参与建模,如果是的话,该主机不执行这一步 |
|
# if next_step_result["run_step"] == "step2": |
|
# if common.task_need_high_model_or_photo3(next_step_result["task_key"]): |
|
# print(f'模型{next_step_result["task_key"]}需要高精模或者photo3参与建模,该主机{hostname}不执行step2') |
|
# return "no" |
|
#taskData = next_step_result #{"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} |
|
flagRes = update_main_and_add_detail(next_step_result) |
|
if flagRes == "error": |
|
print(f'出现错误,有可能是多个进程获取同一个任务了') |
|
return "error" |
|
print(f'任务ID-{next_step_result["task_key"]}- "执行{next_step_result["run_step"]}"') |
|
return next_step_result |
|
else: |
|
return 'no_data' |
|
else: |
|
#优先处理step1 和 step3 |
|
resultData = main_service_db.get_task_distributed_step1() |
|
#R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 |
|
#resultData = need_run_step_no_step2() |
|
if resultData is None: |
|
#return "no" |
|
#R11 R12的主机 可以执行step1 2 3 的任务 |
|
#如果R11 R12的主机目前没有正在执行step2,则优先处理step2, |
|
# print("次数",is_run_stepx_nums("step2")) |
|
if is_run_stepx_nums("step2") < 200000: |
|
resultData = need_run_step2() |
|
if resultData != "no": |
|
resultData["hostname"] = hostname |
|
flagRes = update_main_and_add_detail(resultData) |
|
if flagRes == "error": |
|
print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了') |
|
return "error" |
|
print(f'任务ID-{resultData["task_key"]}- "执行step2" ') |
|
return resultData |
|
#没有任何可执行的 |
|
return "no" |
|
|
|
resultData = {"hostname":hostname,"run_step":"step1","task_distributed_id":resultData["id"],"task_key":resultData["task_key"]} |
|
#resultData["hostname"] = hostname |
|
flagRes = update_main_and_add_detail(resultData) |
|
if flagRes == "error": |
|
print(f'出现错误,有可能是多个进程获取同一个任务了') |
|
return "error" |
|
print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') |
|
return resultData |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") |
|
return 'error' |
|
|
|
#查询当前主机有没有正在执行某个任务 |
|
def is_run_stepx(step): |
|
try: |
|
where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update' |
|
result = main_service_db.db_task_distributed(where) |
|
if result: |
|
return "yes" |
|
else: |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") |
|
return "error" |
|
|
|
|
|
#查询当前主机某个步骤正在执行的数量 |
|
def is_run_stepx_nums(step): |
|
try: |
|
where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update' |
|
return main_service_db.db_task_distributed_detail_count(where) |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") |
|
return "error" |
|
|
|
|
|
#查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3 |
|
def need_run_stepx(task_distributed_id): |
|
try: |
|
|
|
#查询task_distributed_id 对应的子任务是否正在执行,hostname = "{hostname}" and |
|
where = f'task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' |
|
result = main_service_db.db_task_distributed_detail(where) |
|
#如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 |
|
if result is None: |
|
return "step1" |
|
if result and result["finished_at"] is None: |
|
#该任务正在运行中不需要执行下一步 |
|
return "no" |
|
#查询改任务的最后一个状态 |
|
if result and result["finished_at"]: |
|
if result["step"] == "step1": |
|
|
|
#如果是要执行 step2 ,并且当前主机是R11 R12 ,则要判断step1是否是当前的主机执行的 |
|
if hostname == "R11" or hostname == "R12": |
|
if hostname != result["hostname"]: |
|
return "no" |
|
else: |
|
return "step2" |
|
else: |
|
return "step2" |
|
elif result["step"] == "step2": |
|
return "step3" |
|
elif result["step"] == "step3": |
|
#这里要将 主任务表的状态改为2,finished_at改为当前时间 |
|
main_service_db.update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"}) |
|
return "no" |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") |
|
return 'error' |
|
|
|
#查询出哪些任务需要执行step2 |
|
def need_run_step2(): |
|
try: |
|
where = f'status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update' |
|
result = main_service_db.db_task_distributed_list(where) |
|
#判断result的长度 |
|
#判断是否有值 |
|
if len(result) == 0: |
|
return "no" |
|
#遍历循环哪笔需要执行step2 |
|
for row in result: |
|
#判断是否有正在执行的step2 |
|
if need_run_stepx(row["id"]) == "step2": |
|
#没有正在执行的step2,则返回该任务 |
|
return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]} |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") |
|
return "error" |
|
|
|
#查询出哪些任务需要执行指定的step |
|
def need_run_appoint_step(step): |
|
try: |
|
where = f'finished_at is null order by priority desc,created_at asc for update' |
|
result = main_service_db.db_task_distributed_list(where) |
|
#判断是否有值 |
|
if len(result) == 0: |
|
return "no" |
|
#遍历循环哪笔需要执行step2 |
|
for row in result: |
|
#判断是否有正在执行的step2 |
|
if need_run_stepx(row["id"]) == step: |
|
#没有正在执行的step2,则返回该任务 |
|
return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]} |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") |
|
return 'error' |
|
|
|
#查询出哪些任务需要执行非step2 |
|
def need_run_step_no_step2(): |
|
try: |
|
result = main_service_db.db_task_distributed_list("finished_at is null order by priority desc,created_at asc for update") |
|
#判断是否有值 |
|
if len(result) == 0: |
|
return "no" |
|
#遍历循环哪笔需要执行step2 |
|
for row in result: |
|
#判断是否有正在执行的step2 |
|
xstep = need_run_stepx(row["id"]) |
|
# print("查询非step2的任务列表",xstep) |
|
if xstep != "step2" and (xstep == "step1"): |
|
#没有正在执行的step2,则返回该任务 |
|
return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") |
|
return 'error' |
|
|
|
#查询出哪些任务需要执行非step1 |
|
def need_run_step_no_step1(): |
|
try: |
|
result = main_service_db.db_task_distributed_list("finished_at is null order by priority desc,created_at asc for update") |
|
#判断是否有值 |
|
if len(result) == 0: |
|
return "no" |
|
#遍历循环哪笔需要执行step2 |
|
for row in result: |
|
#判断是否有正在执行的step2 |
|
xstep = need_run_stepx(row["id"]) |
|
#print("查询非step1的任务列表",xstep,row["id"]) |
|
if xstep == "step2": |
|
# if common.task_need_high_model_or_photo3(row["task_key"]) and hostname != "R11" and hostname != "R12": |
|
# continue |
|
#没有正在执行的step1,则返回该任务 |
|
return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step1()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step1()异常: {str(e)}") |
|
return 'error' |
|
|
|
#更新主表和插入明细表的步骤 |
|
def update_main_and_add_detail(data): |
|
if data["run_step"] == "step1": |
|
updateData = {"id":data['task_distributed_id'],"status":1,"step_last":data["run_step"],"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} |
|
else: |
|
updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]} |
|
#更新主表 |
|
main_service_db.update_task_distributed(updateData) |
|
#插入明细表数据 |
|
return main_service_db.add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) |