# mysql数据库常用任务函数封装 import pymysql, socket, time import platform,sys if platform.system() == 'Windows': #线上正式运行 sys.path.append('e:\\libs\\') #本地测试 #sys.path.append('libs') else: sys.path.append('/data/deploy/make3d/make2/libs/') import config # import multiprocessing import logging # 创建互斥锁 # lock = multiprocessing.Lock() hostname = socket.gethostname() logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) #公共连接库 def pymysqlAlias(): return pymysql.connect( host=config.mysql_local['host'], port=config.mysql_local['port'], user=config.mysql_local['user'], password=config.mysql_local['password'], db=config.mysql_local['db'], charset=config.mysql_local['charset'],) #获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"} def get_task_distributed(): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = '' # 获取数据前先获取锁 # lock.acquire() #非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": sql = f'select * from task_distributed where status = 1 order by priority desc,created_at asc limit 1 for update' cursor.execute(sql) result = cursor.fetchone() #print("查询主任务表",result) if result: #获取需要执行的步骤 next_step = need_run_stepx(result["id"]) if next_step == "no" or next_step == "error": print("获取需要执行的步骤 next_step",next_step) return next_step taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} flagRes = update_main_and_add_detail(taskData) if flagRes == "error": print(f'出现错误,有可能是多个进程获取同一个任务了') return "error" print(f'任务ID-{taskData["task_key"]}- "执行{taskData["run_step"]}" ') return taskData else: return 'no_data' else: #R11 R12的主机 可以执行step1 2 3 的任务 #如果R11 R12的主机目前没有正在执行step2,则优先处理step2, # print("次数",is_run_stepx_nums("step2")) if is_run_stepx_nums("step2") < 2: resultData = need_run_step2() if resultData != "no": resultData["hostname"] = hostname flagRes = update_main_and_add_detail(resultData) if flagRes == "error": print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了') return "error" print(f'任务ID-{resultData["task_key"]}- "执行step2" ') return resultData #R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 resultData = need_run_step_no_step2() #print("resultData",resultData) if resultData == "no": return "no" resultData["hostname"] = hostname flagRes = update_main_and_add_detail(resultData) if flagRes == "error": print(f'出现错误,有可能是多个进程获取同一个任务了') return "error" print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') return resultData #sql = f'select * from task_distributed where status != 2 order by priority desc,created_at asc limit 1' # print(f'sql: {sql}') except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") return 'error' # finally: # # 释放锁 # lock.release() #查询当前主机有没有正在执行某个任务 def is_run_stepx(step): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = f'select * from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchone() if result: return "yes" else: return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") return "error" #查询当前主机某个步骤正在执行的数量 def is_run_stepx_nums(step): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = f'select count(*) as nums from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchone() return result["nums"] except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") return "error" #查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3 def need_run_stepx(task_distributed_id): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) #查询task_distributed_id 对应的子任务是否正在执行, sql = f'select * from task_distributed_detail where hostname = "{hostname}" and task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchone() #如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 if result is None: return "step1" if result and result["finished_at"] is None: #该任务正在运行中不需要执行下一步 return "no" #查询改任务的最后一个状态 if result and result["finished_at"]: if result["step"] == "step1": return "step2" elif result["step"] == "step2": return "step3" elif result["step"] == "step3": #这里要将 主任务表的状态改为2,finished_at改为当前时间 update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"}) return "no" return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") return 'error' #查询出哪些任务需要执行step2 def need_run_step2(): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = f'select * from task_distributed where status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchall() #判断result的长度 #判断是否有值 if len(result) == 0: return "no" #遍历循环哪笔需要执行step2 for row in result: #判断是否有正在执行的step2 if need_run_stepx(row["id"]) == "step2": #没有正在执行的step2,则返回该任务 return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]} return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") return "error" #查询出哪些任务需要执行指定的step def need_run_appoint_step(step): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchall() #判断是否有值 if len(result) == 0: return "no" #遍历循环哪笔需要执行step2 for row in result: #判断是否有正在执行的step2 if need_run_stepx(row["id"]) == step: #没有正在执行的step2,则返回该任务 return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]} return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") return 'error' #查询出哪些任务需要执行非step2 def need_run_step_no_step2(): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' # print(f'sql: {sql}') cursor.execute(sql) result = cursor.fetchall() #判断是否有值 if len(result) == 0: return "no" #遍历循环哪笔需要执行step2 for row in result: #判断是否有正在执行的step2 xstep = need_run_stepx(row["id"]) # print("查询非step2的任务列表",xstep) if xstep != "step2" and (xstep == "step1" or xstep == "step3"): #没有正在执行的step2,则返回该任务 return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") return 'error' # 在任务分布执行主表插入任务,所有的任务执行都从这里获取 def add_task_distributed(data): try: with pymysqlAlias() as conn: cursor = conn.cursor() sql = f'insert into task_distributed (task_type, task_key,created_at) values ("{data["task_type"]}", "{data["task_key"]}","{now()}")' # print(f'sql: {sql}') cursor.execute(sql) # 获取插入数据的自增ID last_insert_id = cursor.lastrowid conn.commit() return last_insert_id except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") return "error" # 在task_distributed_detail插入明细步骤 def add_task_distributed_detail(data): try: with pymysqlAlias() as conn: cursor = conn.cursor() sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{hostname}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' # print(f'sql: {sql}') cursor.execute(sql) conn.commit() return "ok" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") return "error" # 更新 task_distributed 主表 def update_task_distributed(data): try: with pymysqlAlias() as conn: cursor = conn.cursor() sql = f'update task_distributed set ' #判断要更新哪些字段 if "status" in data: sql += f'status = "{data["status"]}",' if "hostname" in data: sql += f'hostname = "{data["hostname"]}",' if "step_last" in data: sql += f'step_last = "{data["step_last"]}",' if "priority" in data: sql += f'priority = "{data["priority"]}",' if "started_at" in data: sql += f'started_at = "{data["started_at"]}",' if "finished_at" in data: sql += f'finished_at = "{data["finished_at"]}",' #去掉 sql 最右边的逗号 sql = sql.rstrip(',') sql += f' where 1=1 ' #条件要放在最后面 if "id" in data: sql += f' and id = "{data["id"]}"' if "task_key" in data: sql += f' and task_type = "{data["task_key"]}" and status != 2' #sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"' # print(f'sql: {sql}') cursor.execute(sql) conn.commit() except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") return "error" # 更新 task_distributed_detail 主表 def update_task_distributed_detail(data): try: with pymysqlAlias() as conn: cursor = conn.cursor() sql = f'update task_distributed_detail set ' #判断要更新哪些字段 if "finished_at" in data: sql += f'finished_at = "{data["finished_at"]}"' if "step" in data: sql += f',step = "{data["step"]}"' if "hostname" in data: sql += f',hostname = "{data["hostname"]}"' #where 条件 sql += f' where 1=1 ' if "task_distributed_id" in data: sql += f' and task_distributed_id = "{data["task_distributed_id"]}"' if "step" in data: sql += f' and step = "{data["step"]}"' cursor.execute(sql) conn.commit() except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") return "error" #更新主表和插入明细表的步骤 def update_main_and_add_detail(data): if data["run_step"] == "step1": updateData = {"id":data['task_distributed_id'],"status":1,"step_last":data["run_step"],"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} else: updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]} #更新主表 update_task_distributed(updateData) #插入明细表数据 return add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})