建模程序 多个定时程序
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

213 lines
8.6 KiB

# mysql数据库常用任务函数封装
import pymysql, socket, time
import config
#公共连接库
def pymysqlAlias():
return pymysql.connect(
host=config.mysql_dong['host'],
port=config.mysql_dong['port'],
user=config.mysql_dong['user'],
password=config.mysql_dong['password'],
db=config.mysql_dong['db'],
charset=config.mysql_dong['charset'],)
# 获取新的任务
def get_task_by_level():
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'select pid from pid_base_fix where status = 0 order by level desc, id asc limit 1'
# print(f'sql: {sql}')
cursor.execute(sql)
data = cursor.fetchone()
if data:
return data[0]
else:
return ''
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} get_task_by_level()异常: {str(e)}")
return ''
# 开始任务
def update_fix_status(pid,status):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
hostname = socket.gethostname()
sql = f'update pid_base_fix set status = {status},createTime = now(), updateTime = now() where status = 0 and pid = {pid}'
print(f'开始任务sql: {sql}')
cursor.execute(sql)
conn.commit()
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_fix_status()异常: {str(e)}")
# # 获取新的任务
# def get_task(task_type):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# sql = f'select task_key from tasks where status = 0 order by priority desc, id asc limit 1 for update'
# # print(f'sql: {sql}')
# cursor.execute(sql)
# data = cursor.fetchone()
# if data:
# return data[0]
# else:
# return ''
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task({task_type})异常: {str(e)}")
# return ''
# # 新增新的任务
# def add_task(data):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# #判断是否是昆山教学的,是的话优先级设置为默认
# if data["psid"] == '85':
# sql = f'insert into tasks (task_type, task_key,studio_id) values ("{data["task_type"]}", "{data["task_key"]}","{data["psid"]}")'
# else:
# sql = f'insert into tasks (task_type, task_key, priority,studio_id) values ("{data["task_type"]}", "{data["task_key"]}", {data["priority"]},"{data["psid"]}")'
# # print(f'sql: {sql}')
# cursor.execute(sql)
# conn.commit()
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task({data})异常: {str(e)}")
# #新增任务到分布式处理任务表中 {"task_type":key,"task_key":pid,"psid":psid}
# def add_task_distributed(data):
# flag = isInTaskDistributed(data["task_key"])
# if flag:
# return "already"
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# sql = f'insert into task_distributed (task_type, task_key, priority,created_at,studio_id) values ("{data["task_type"]}", "{data["task_key"]}", "{data["priority"]}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}","{data["psid"]}")'
# cursor.execute(sql)
# conn.commit()
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed({data})异常: {str(e)}")
# # 开始任务
# def start_task(data):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# hostname = socket.gethostname()
# sql = f'update tasks set status = 1, hostname = "{hostname}", started_at = now(), updated_at = now() where status = 0 and task_key = {data["task_key"]}'
# print(f'开始任务sql: {sql}')
# cursor.execute(sql)
# conn.commit()
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行start_task({data})异常: {str(e)}")
# # 完成任务
# def finish_task(data):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# hostname = socket.gethostname()
# sql = f'update tasks set status = 2, hostname = "{hostname}", finished_at = now(), updated_at = now() where status = 1 and task_key = "{data["task_key"]}" '
# # print(f'sql: {sql}')
# cursor.execute(sql)
# conn.commit()
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行finish_task({data})异常: {str(e)}")
# # 根据影棚ID 获取影棚的配置,如果没有指定的配置,则返回默认配置
# def get_floor_sticker_distances(psid):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# sql = f'select a.studio_id,value from floor_sticker_link_studio as a left join floor_sticker_distances as b on a.floor_name = b.name where a.studio_id in (0,{psid})'
# cursor.execute(sql)
# result = cursor.fetchall()
# if len(result) == 1:
# return result[0][1]
# if len(result) == 2:
# for item in result:
# if str(item[0]) == str(psid):
# return item[1]
# return "error-配置异常,该影棚有多个地贴配置"
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_floor_sticker_distances()异常: {str(e)}")
# return "error"
# #判断是否要走新的建模系统
# def is_new_make_psid(psid):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# sql = f'select count(*) from new_make_psid where studio_id = {psid}'
# print(f'sql: {sql}')
# cursor.execute(sql)
# result = cursor.fetchone()
# if result[0] > 0:
# return True
# else:
# return False
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_new_make_psid()异常: {str(e)}")
# return "error"
# #判断影棚是否走的新的建模系统,不是的话修改成走新的建模系统
# def change_to_new_make_psid(psid):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# sql = f'select count(*) from new_make_psid where studio_id = {psid}'
# cursor.execute(sql)
# result = cursor.fetchone()
# if result[0] == 0:
# sql = f'insert into new_make_psid (studio_id,createTime) values ({psid},now())'
# cursor.execute(sql)
# conn.commit()
# print(f'修改影棚{psid}为走新的建模系统')
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行change_to_new_make_psid()异常: {str(e)}")
# return "error"
# #判断影棚走分布式的建模系统 还是走原来的建模系统
# def isStudioConfigDistribute(psid):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# sql = f'select count(*) from studio_config_distribute where studio_id = {psid} and status = 1'
# cursor.execute(sql)
# result = cursor.fetchone()
# if result[0] == 0:
# return False
# else:
# return True
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行change_to_new_make_psid()异常: {str(e)}")
# return "error"
# #判断是否已经存在了
# def isInTaskDistributed(task_key):
# try:
# with pymysqlAlias() as conn:
# cursor = conn.cursor()
# sql = f'select count(*) from task_distributed where status = 0 and task_key = "{task_key}"'
# cursor.execute(sql)
# result = cursor.fetchone()
# if result[0] == 0:
# return False
# else:
# return True
# except Exception as e:
# print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行isInTaskDistributed()异常: {str(e)}")
# return "error"