You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
372 lines
17 KiB
372 lines
17 KiB
# mysql数据库常用任务函数封装 |
|
import pymysql, socket, time |
|
import platform,sys |
|
if platform.system() == 'Windows': |
|
#线上正式运行 |
|
sys.path.append('e:\\libs\\') |
|
#本地测试 |
|
#sys.path.append('libs') |
|
else: |
|
sys.path.append('/data/deploy/make3d/make2/libs/') |
|
import config |
|
# import multiprocessing |
|
import logging |
|
# 创建互斥锁 |
|
# lock = multiprocessing.Lock() |
|
hostname = socket.gethostname() |
|
|
|
logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) |
|
#公共连接库 |
|
def pymysqlAlias(): |
|
return pymysql.connect( |
|
host=config.mysql_local['host'], |
|
port=config.mysql_local['port'], |
|
user=config.mysql_local['user'], |
|
password=config.mysql_local['password'], |
|
db=config.mysql_local['db'], |
|
charset=config.mysql_local['charset'],) |
|
|
|
|
|
#获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"} |
|
def get_task_distributed(): |
|
|
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = '' |
|
# 获取数据前先获取锁 |
|
# lock.acquire() |
|
#非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 |
|
if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": |
|
sql = f'select * from task_distributed where status = 1 order by priority desc,created_at asc limit 1 for update' |
|
cursor.execute(sql) |
|
result = cursor.fetchone() |
|
#print("查询主任务表",result) |
|
if result: |
|
#获取需要执行的步骤 |
|
next_step = need_run_stepx(result["id"]) |
|
if next_step == "no" or next_step == "error": |
|
print("获取需要执行的步骤 next_step",next_step) |
|
return next_step |
|
taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} |
|
flagRes = update_main_and_add_detail(taskData) |
|
if flagRes == "error": |
|
print(f'出现错误,有可能是多个进程获取同一个任务了') |
|
return "error" |
|
print(f'任务ID-{taskData["task_key"]}- "执行{taskData["run_step"]}" ') |
|
return taskData |
|
else: |
|
return 'no_data' |
|
else: |
|
|
|
#R11 R12的主机 可以执行step1 2 3 的任务 |
|
#如果R11 R12的主机目前没有正在执行step2,则优先处理step2, |
|
# print("次数",is_run_stepx_nums("step2")) |
|
if is_run_stepx_nums("step2") < 2: |
|
resultData = need_run_step2() |
|
if resultData != "no": |
|
resultData["hostname"] = hostname |
|
flagRes = update_main_and_add_detail(resultData) |
|
if flagRes == "error": |
|
print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了') |
|
return "error" |
|
print(f'任务ID-{resultData["task_key"]}- "执行step2" ') |
|
return resultData |
|
|
|
#R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 |
|
resultData = need_run_step_no_step2() |
|
#print("resultData",resultData) |
|
if resultData == "no": |
|
return "no" |
|
|
|
resultData["hostname"] = hostname |
|
flagRes = update_main_and_add_detail(resultData) |
|
if flagRes == "error": |
|
print(f'出现错误,有可能是多个进程获取同一个任务了') |
|
return "error" |
|
print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') |
|
return resultData |
|
|
|
#sql = f'select * from task_distributed where status != 2 order by priority desc,created_at asc limit 1' |
|
# print(f'sql: {sql}') |
|
|
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") |
|
return 'error' |
|
|
|
# finally: |
|
# # 释放锁 |
|
# lock.release() |
|
|
|
#查询当前主机有没有正在执行某个任务 |
|
def is_run_stepx(step): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = f'select * from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
result = cursor.fetchone() |
|
if result: |
|
return "yes" |
|
else: |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") |
|
return "error" |
|
|
|
|
|
#查询当前主机某个步骤正在执行的数量 |
|
def is_run_stepx_nums(step): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = f'select count(*) as nums from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
result = cursor.fetchone() |
|
return result["nums"] |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") |
|
return "error" |
|
|
|
|
|
#查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3 |
|
def need_run_stepx(task_distributed_id): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
#查询task_distributed_id 对应的子任务是否正在执行, |
|
sql = f'select * from task_distributed_detail where hostname = "{hostname}" and task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
result = cursor.fetchone() |
|
#如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 |
|
if result is None: |
|
return "step1" |
|
if result and result["finished_at"] is None: |
|
#该任务正在运行中不需要执行下一步 |
|
return "no" |
|
#查询改任务的最后一个状态 |
|
if result and result["finished_at"]: |
|
if result["step"] == "step1": |
|
return "step2" |
|
elif result["step"] == "step2": |
|
return "step3" |
|
elif result["step"] == "step3": |
|
#这里要将 主任务表的状态改为2,finished_at改为当前时间 |
|
update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"}) |
|
return "no" |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") |
|
return 'error' |
|
|
|
#查询出哪些任务需要执行step2 |
|
def need_run_step2(): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = f'select * from task_distributed where status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
result = cursor.fetchall() |
|
|
|
#判断result的长度 |
|
#判断是否有值 |
|
if len(result) == 0: |
|
return "no" |
|
#遍历循环哪笔需要执行step2 |
|
|
|
for row in result: |
|
#判断是否有正在执行的step2 |
|
if need_run_stepx(row["id"]) == "step2": |
|
#没有正在执行的step2,则返回该任务 |
|
return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]} |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") |
|
return "error" |
|
|
|
#查询出哪些任务需要执行指定的step |
|
def need_run_appoint_step(step): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
result = cursor.fetchall() |
|
#判断是否有值 |
|
if len(result) == 0: |
|
return "no" |
|
#遍历循环哪笔需要执行step2 |
|
for row in result: |
|
#判断是否有正在执行的step2 |
|
if need_run_stepx(row["id"]) == step: |
|
#没有正在执行的step2,则返回该任务 |
|
return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]} |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") |
|
return 'error' |
|
|
|
#查询出哪些任务需要执行非step2 |
|
def need_run_step_no_step2(): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
result = cursor.fetchall() |
|
#判断是否有值 |
|
if len(result) == 0: |
|
return "no" |
|
#遍历循环哪笔需要执行step2 |
|
|
|
for row in result: |
|
#判断是否有正在执行的step2 |
|
xstep = need_run_stepx(row["id"]) |
|
# print("查询非step2的任务列表",xstep) |
|
if xstep != "step2" and (xstep == "step1" or xstep == "step3"): |
|
#没有正在执行的step2,则返回该任务 |
|
return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} |
|
return "no" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") |
|
return 'error' |
|
|
|
|
|
# 在任务分布执行主表插入任务,所有的任务执行都从这里获取 |
|
def add_task_distributed(data): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor() |
|
|
|
sql = f'insert into task_distributed (task_type, task_key,created_at) values ("{data["task_type"]}", "{data["task_key"]}","{now()}")' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
|
|
# 获取插入数据的自增ID |
|
last_insert_id = cursor.lastrowid |
|
conn.commit() |
|
return last_insert_id |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") |
|
return "error" |
|
|
|
# 在task_distributed_detail插入明细步骤 |
|
def add_task_distributed_detail(data): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor() |
|
|
|
sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{hostname}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
conn.commit() |
|
return "ok" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") |
|
return "error" |
|
|
|
# 更新 task_distributed 主表 |
|
def update_task_distributed(data): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor() |
|
|
|
sql = f'update task_distributed set ' |
|
#判断要更新哪些字段 |
|
if "status" in data: |
|
sql += f'status = "{data["status"]}",' |
|
|
|
if "hostname" in data: |
|
sql += f'hostname = "{data["hostname"]}",' |
|
|
|
if "step_last" in data: |
|
sql += f'step_last = "{data["step_last"]}",' |
|
|
|
if "priority" in data: |
|
sql += f'priority = "{data["priority"]}",' |
|
|
|
if "started_at" in data: |
|
sql += f'started_at = "{data["started_at"]}",' |
|
|
|
if "finished_at" in data: |
|
sql += f'finished_at = "{data["finished_at"]}",' |
|
|
|
|
|
#去掉 sql 最右边的逗号 |
|
sql = sql.rstrip(',') |
|
|
|
|
|
sql += f' where 1=1 ' |
|
#条件要放在最后面 |
|
if "id" in data: |
|
sql += f' and id = "{data["id"]}"' |
|
|
|
if "task_key" in data: |
|
sql += f' and task_type = "{data["task_key"]}" and status != 2' |
|
|
|
#sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
conn.commit() |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") |
|
return "error" |
|
|
|
# 更新 task_distributed_detail 主表 |
|
def update_task_distributed_detail(data): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor() |
|
|
|
sql = f'update task_distributed_detail set ' |
|
#判断要更新哪些字段 |
|
if "finished_at" in data: |
|
sql += f'finished_at = "{data["finished_at"]}"' |
|
|
|
if "step" in data: |
|
sql += f',step = "{data["step"]}"' |
|
|
|
if "hostname" in data: |
|
sql += f',hostname = "{data["hostname"]}"' |
|
|
|
#where 条件 |
|
sql += f' where 1=1 ' |
|
if "task_distributed_id" in data: |
|
sql += f' and task_distributed_id = "{data["task_distributed_id"]}"' |
|
|
|
if "step" in data: |
|
sql += f' and step = "{data["step"]}"' |
|
|
|
cursor.execute(sql) |
|
conn.commit() |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") |
|
return "error" |
|
|
|
#更新主表和插入明细表的步骤 |
|
def update_main_and_add_detail(data): |
|
if data["run_step"] == "step1": |
|
updateData = {"id":data['task_distributed_id'],"status":1,"step_last":data["run_step"],"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} |
|
else: |
|
updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]} |
|
#更新主表 |
|
update_task_distributed(updateData) |
|
#插入明细表数据 |
|
return add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) |