You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
208 lines
8.5 KiB
208 lines
8.5 KiB
# mysql数据库常用任务函数封装 |
|
import pymysql, socket, time |
|
import config |
|
import logging |
|
logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) |
|
#公共连接库 |
|
def pymysqlAlias(): |
|
return pymysql.connect( |
|
host=config.mysql_local['host'], |
|
port=config.mysql_local['port'], |
|
user=config.mysql_local['user'], |
|
password=config.mysql_local['password'], |
|
db=config.mysql_local['db'], |
|
charset=config.mysql_local['charset'],) |
|
|
|
#查询 task_distributed |
|
def db_task_distributed(where): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = 'select * from task_distributed where 1=1' |
|
if where: |
|
sql += f' and {where}' |
|
|
|
cursor.execute(sql) |
|
result = cursor.fetchone() |
|
# 关闭游标和连接 |
|
##cursor.close() |
|
#conn.close() |
|
return result |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}") |
|
return 'error' |
|
|
|
def db_task_distributed_list(where): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = 'select * from task_distributed where 1=1' |
|
if where: |
|
sql += f' and {where}' |
|
|
|
cursor.execute(sql) |
|
result = cursor.fetchall() |
|
# 关闭游标和连接 |
|
##cursor.close() |
|
#conn.close() |
|
return result |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}") |
|
return 'error' |
|
|
|
#查询 task_distributed_detail |
|
def db_task_distributed_detail(where): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = 'select * from task_distributed_detail where 1=1' |
|
if where: |
|
sql += f' and {where}' |
|
|
|
cursor.execute(sql) |
|
result = cursor.fetchone() |
|
# 关闭游标和连接 |
|
#cursor.close() |
|
#conn.close() |
|
return result |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}") |
|
return 'error' |
|
|
|
#查询指定条件下的数量 task_distributed_detail |
|
def db_task_distributed_detail_count(where): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = 'select count(*) as nums from task_distributed_detail where 1=1' |
|
if where: |
|
sql += f' and {where}' |
|
|
|
cursor.execute(sql) |
|
result = cursor.fetchone() |
|
# 关闭游标和连接 |
|
#cursor.close() |
|
#conn.close() |
|
return result["nums"] |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}") |
|
return 'error' |
|
|
|
|
|
# 在task_distributed_detail插入明细步骤 |
|
def add_task_distributed_detail(data): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor() |
|
sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{data["hostname"]}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' |
|
cursor.execute(sql) |
|
conn.commit() |
|
return "ok" |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") |
|
return "error" |
|
|
|
# 更新 task_distributed 主表 |
|
def update_task_distributed(data): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor() |
|
|
|
sql = f'update task_distributed set ' |
|
#判断要更新哪些字段 |
|
if "status" in data: |
|
sql += f'status = "{data["status"]}",' |
|
|
|
if "hostname" in data: |
|
sql += f'hostname = "{data["hostname"]}",' |
|
|
|
if "step_last" in data: |
|
sql += f'step_last = "{data["step_last"]}",' |
|
|
|
if "priority" in data: |
|
sql += f'priority = "{data["priority"]}",' |
|
|
|
if "started_at" in data: |
|
sql += f'started_at = "{data["started_at"]}",' |
|
|
|
if "finished_at" in data: |
|
sql += f'finished_at = "{data["finished_at"]}",' |
|
|
|
|
|
#去掉 sql 最右边的逗号 |
|
sql = sql.rstrip(',') |
|
|
|
|
|
sql += f' where 1=1 ' |
|
#条件要放在最后面 |
|
if "id" in data: |
|
sql += f' and id = "{data["id"]}"' |
|
|
|
if "task_key" in data: |
|
sql += f' and task_type = "{data["task_key"]}" and status != 2' |
|
|
|
#sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"' |
|
# print(f'sql: {sql}') |
|
cursor.execute(sql) |
|
conn.commit() |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") |
|
return "error" |
|
|
|
# 更新 task_distributed_detail 主表 |
|
def update_task_distributed_detail(data): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor() |
|
|
|
sql = f'update task_distributed_detail set ' |
|
#判断要更新哪些字段 |
|
if "finished_at" in data: |
|
sql += f'finished_at = "{data["finished_at"]}"' |
|
|
|
if "step" in data: |
|
sql += f',step = "{data["step"]}"' |
|
|
|
if "hostname" in data: |
|
sql += f',hostname = "{data["hostname"]}"' |
|
|
|
#where 条件 |
|
sql += f' where 1=1 ' |
|
if "task_distributed_id" in data: |
|
sql += f' and task_distributed_id = "{data["task_distributed_id"]}"' |
|
|
|
if "step" in data: |
|
sql += f' and step = "{data["step"]}"' |
|
|
|
cursor.execute(sql) |
|
conn.commit() |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") |
|
return "error" |
|
|
|
#获取需要执行step1的任务 |
|
def get_task_distributed_step1(): |
|
try: |
|
with pymysqlAlias() as conn: |
|
cursor = conn.cursor(pymysql.cursors.DictCursor) |
|
sql = 'select * from task_distributed where status =0 order by priority desc limit 1' |
|
if where: |
|
sql += f' and {where}' |
|
|
|
cursor.execute(sql) |
|
result = cursor.fetchone() |
|
# 关闭游标和连接 |
|
##cursor.close() |
|
#conn.close() |
|
return result |
|
except Exception as e: |
|
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed_step1()异常: {str(e)}") |
|
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed_step1()异常: {str(e)}") |
|
return 'error' |