# mysql数据库常用任务函数封装 import pymysql, socket, time import config import logging logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) #公共连接库 def pymysqlAlias(): return pymysql.connect( host=config.mysql_local['host'], port=config.mysql_local['port'], user=config.mysql_local['user'], password=config.mysql_local['password'], db=config.mysql_local['db'], charset=config.mysql_local['charset'],) #查询 task_distributed def db_task_distributed(where): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'select * from task_distributed where 1=1' if where: sql += f' and {where}' cursor.execute(sql) result = cursor.fetchone() # 关闭游标和连接 ##cursor.close() #conn.close() return result except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}") return 'error' def db_task_distributed_list(where): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'select * from task_distributed where 1=1' if where: sql += f' and {where}' cursor.execute(sql) result = cursor.fetchall() # 关闭游标和连接 ##cursor.close() #conn.close() return result except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}") return 'error' #查询 task_distributed_detail def db_task_distributed_detail(where): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'select * from task_distributed_detail where 1=1' if where: sql += f' and {where}' cursor.execute(sql) result = cursor.fetchone() # 关闭游标和连接 #cursor.close() #conn.close() return result except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}") return 'error' #查询指定条件下的数量 task_distributed_detail def db_task_distributed_detail_count(where): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'select count(*) as nums from task_distributed_detail where 1=1' if where: sql += f' and {where}' cursor.execute(sql) result = cursor.fetchone() # 关闭游标和连接 #cursor.close() #conn.close() return result["nums"] except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}") return 'error' # 在task_distributed_detail插入明细步骤 def add_task_distributed_detail(data): try: with pymysqlAlias() as conn: cursor = conn.cursor() sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{data["hostname"]}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' cursor.execute(sql) conn.commit() return "ok" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") return "error" # 更新 task_distributed 主表 def update_task_distributed(data): try: with pymysqlAlias() as conn: cursor = conn.cursor() sql = f'update task_distributed set ' #判断要更新哪些字段 if "status" in data: sql += f'status = "{data["status"]}",' if "hostname" in data: sql += f'hostname = "{data["hostname"]}",' if "step_last" in data: sql += f'step_last = "{data["step_last"]}",' if "priority" in data: sql += f'priority = "{data["priority"]}",' if "started_at" in data: sql += f'started_at = "{data["started_at"]}",' if "finished_at" in data: sql += f'finished_at = "{data["finished_at"]}",' #去掉 sql 最右边的逗号 sql = sql.rstrip(',') sql += f' where 1=1 ' #条件要放在最后面 if "id" in data: sql += f' and id = "{data["id"]}"' if "task_key" in data: sql += f' and task_type = "{data["task_key"]}" and status != 2' #sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"' # print(f'sql: {sql}') cursor.execute(sql) conn.commit() except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") return "error" # 更新 task_distributed_detail 主表 def update_task_distributed_detail(data): try: with pymysqlAlias() as conn: cursor = conn.cursor() sql = f'update task_distributed_detail set ' #判断要更新哪些字段 if "finished_at" in data: sql += f'finished_at = "{data["finished_at"]}"' if "step" in data: sql += f',step = "{data["step"]}"' if "hostname" in data: sql += f',hostname = "{data["hostname"]}"' #where 条件 sql += f' where 1=1 ' if "task_distributed_id" in data: sql += f' and task_distributed_id = "{data["task_distributed_id"]}"' if "step" in data: sql += f' and step = "{data["step"]}"' cursor.execute(sql) conn.commit() except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") return "error" #获取需要执行step1的任务 def get_task_distributed_step1(): try: with pymysqlAlias() as conn: cursor = conn.cursor(pymysql.cursors.DictCursor) sql = 'select * from task_distributed where status =0 order by priority desc limit 1 for update' cursor.execute(sql) result = cursor.fetchone() # 关闭游标和连接 ##cursor.close() #conn.close() return result except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed_step1()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed_step1()异常: {str(e)}") return 'error'