建模程序 多个定时程序
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

188 lines
7.6 KiB

# mysql数据库常用任务函数封装
import pymysql, socket, time
import config
import logging
logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR)
#公共连接库
def pymysqlAlias():
return pymysql.connect(
host=config.mysql_local['host'],
port=config.mysql_local['port'],
user=config.mysql_local['user'],
password=config.mysql_local['password'],
db=config.mysql_local['db'],
charset=config.mysql_local['charset'],)
#查询 task_distributed
def db_task_distributed(where):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from task_distributed where 1=1'
if where:
sql += f' and {where}'
cursor.execute(sql)
result = cursor.fetchone()
# 关闭游标和连接
##cursor.close()
#conn.close()
return result
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}")
return 'error'
def db_task_distributed_list(where):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from task_distributed where 1=1'
if where:
sql += f' and {where}'
cursor.execute(sql)
result = cursor.fetchall()
# 关闭游标和连接
##cursor.close()
#conn.close()
return result
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}")
return 'error'
#查询 task_distributed_detail
def db_task_distributed_detail(where):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from task_distributed_detail where 1=1'
if where:
sql += f' and {where}'
cursor.execute(sql)
result = cursor.fetchone()
# 关闭游标和连接
#cursor.close()
#conn.close()
return result
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}")
return 'error'
#查询指定条件下的数量 task_distributed_detail
def db_task_distributed_detail_count(where):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select count(*) as nums from task_distributed_detail where 1=1'
if where:
sql += f' and {where}'
cursor.execute(sql)
result = cursor.fetchone()
# 关闭游标和连接
#cursor.close()
#conn.close()
return result["nums"]
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}")
return 'error'
# 在task_distributed_detail插入明细步骤
def add_task_distributed_detail(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{data["hostname"]}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")'
cursor.execute(sql)
conn.commit()
return "ok"
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}")
return "error"
# 更新 task_distributed 主表
def update_task_distributed(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'update task_distributed set '
#判断要更新哪些字段
if "status" in data:
sql += f'status = "{data["status"]}",'
if "hostname" in data:
sql += f'hostname = "{data["hostname"]}",'
if "step_last" in data:
sql += f'step_last = "{data["step_last"]}",'
if "priority" in data:
sql += f'priority = "{data["priority"]}",'
if "started_at" in data:
sql += f'started_at = "{data["started_at"]}",'
if "finished_at" in data:
sql += f'finished_at = "{data["finished_at"]}",'
#去掉 sql 最右边的逗号
sql = sql.rstrip(',')
sql += f' where 1=1 '
#条件要放在最后面
if "id" in data:
sql += f' and id = "{data["id"]}"'
if "task_key" in data:
sql += f' and task_type = "{data["task_key"]}" and status != 2'
#sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"'
# print(f'sql: {sql}')
cursor.execute(sql)
conn.commit()
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}")
return "error"
# 更新 task_distributed_detail 主表
def update_task_distributed_detail(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'update task_distributed_detail set '
#判断要更新哪些字段
if "finished_at" in data:
sql += f'finished_at = "{data["finished_at"]}"'
if "step" in data:
sql += f',step = "{data["step"]}"'
if "hostname" in data:
sql += f',hostname = "{data["hostname"]}"'
#where 条件
sql += f' where 1=1 '
if "task_distributed_id" in data:
sql += f' and task_distributed_id = "{data["task_distributed_id"]}"'
if "step" in data:
sql += f' and step = "{data["step"]}"'
cursor.execute(sql)
conn.commit()
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}")
return "error"