# mysql数据库常用任务函数封装 import pymysql, socket, time import config #公共连接库 def pymysqlAlias(): return pymysql.connect( host=config.mysql_dong['host'], port=config.mysql_dong['port'], user=config.mysql_dong['user'], password=config.mysql_dong['password'], db=config.mysql_dong['db'], charset=config.mysql_dong['charset'],) # 获取新的任务 def get_task_by_level(): try: with pymysqlAlias() as conn: cursor = conn.cursor() sql = f'select pid from pid_base_fix where status = 0 order by level desc, id asc limit 1' # print(f'sql: {sql}') cursor.execute(sql) data = cursor.fetchone() if data: return data[0] else: return '' except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} get_task_by_level()异常: {str(e)}") return '' # 开始任务 def update_fix_status(pid,status): try: with pymysqlAlias() as conn: cursor = conn.cursor() hostname = socket.gethostname() sql = f'update pid_base_fix set status = {status},createTime = now(), updateTime = now() where status = 0 and pid = {pid}' print(f'开始任务sql: {sql}') cursor.execute(sql) conn.commit() except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_fix_status()异常: {str(e)}") # # 获取新的任务 # def get_task(task_type): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # sql = f'select task_key from tasks where status = 0 order by priority desc, id asc limit 1 for update' # # print(f'sql: {sql}') # cursor.execute(sql) # data = cursor.fetchone() # if data: # return data[0] # else: # return '' # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task({task_type})异常: {str(e)}") # return '' # # 新增新的任务 # def add_task(data): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # #判断是否是昆山教学的,是的话优先级设置为默认 # if data["psid"] == '85': # sql = f'insert into tasks (task_type, task_key,studio_id) values ("{data["task_type"]}", "{data["task_key"]}","{data["psid"]}")' # else: # sql = f'insert into tasks (task_type, task_key, priority,studio_id) values ("{data["task_type"]}", "{data["task_key"]}", {data["priority"]},"{data["psid"]}")' # # print(f'sql: {sql}') # cursor.execute(sql) # conn.commit() # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task({data})异常: {str(e)}") # #新增任务到分布式处理任务表中 {"task_type":key,"task_key":pid,"psid":psid} # def add_task_distributed(data): # flag = isInTaskDistributed(data["task_key"]) # if flag: # return "already" # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # sql = f'insert into task_distributed (task_type, task_key, priority,created_at,studio_id) values ("{data["task_type"]}", "{data["task_key"]}", "{data["priority"]}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}","{data["psid"]}")' # cursor.execute(sql) # conn.commit() # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed({data})异常: {str(e)}") # # 开始任务 # def start_task(data): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # hostname = socket.gethostname() # sql = f'update tasks set status = 1, hostname = "{hostname}", started_at = now(), updated_at = now() where status = 0 and task_key = {data["task_key"]}' # print(f'开始任务sql: {sql}') # cursor.execute(sql) # conn.commit() # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行start_task({data})异常: {str(e)}") # # 完成任务 # def finish_task(data): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # hostname = socket.gethostname() # sql = f'update tasks set status = 2, hostname = "{hostname}", finished_at = now(), updated_at = now() where status = 1 and task_key = "{data["task_key"]}" ' # # print(f'sql: {sql}') # cursor.execute(sql) # conn.commit() # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行finish_task({data})异常: {str(e)}") # # 根据影棚ID 获取影棚的配置,如果没有指定的配置,则返回默认配置 # def get_floor_sticker_distances(psid): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # sql = f'select a.studio_id,value from floor_sticker_link_studio as a left join floor_sticker_distances as b on a.floor_name = b.name where a.studio_id in (0,{psid})' # cursor.execute(sql) # result = cursor.fetchall() # if len(result) == 1: # return result[0][1] # if len(result) == 2: # for item in result: # if str(item[0]) == str(psid): # return item[1] # return "error-配置异常,该影棚有多个地贴配置" # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_floor_sticker_distances()异常: {str(e)}") # return "error" # #判断是否要走新的建模系统 # def is_new_make_psid(psid): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # sql = f'select count(*) from new_make_psid where studio_id = {psid}' # print(f'sql: {sql}') # cursor.execute(sql) # result = cursor.fetchone() # if result[0] > 0: # return True # else: # return False # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_new_make_psid()异常: {str(e)}") # return "error" # #判断影棚是否走的新的建模系统,不是的话修改成走新的建模系统 # def change_to_new_make_psid(psid): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # sql = f'select count(*) from new_make_psid where studio_id = {psid}' # cursor.execute(sql) # result = cursor.fetchone() # if result[0] == 0: # sql = f'insert into new_make_psid (studio_id,createTime) values ({psid},now())' # cursor.execute(sql) # conn.commit() # print(f'修改影棚{psid}为走新的建模系统') # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行change_to_new_make_psid()异常: {str(e)}") # return "error" # #判断影棚走分布式的建模系统 还是走原来的建模系统 # def isStudioConfigDistribute(psid): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # sql = f'select count(*) from studio_config_distribute where studio_id = {psid} and status = 1' # cursor.execute(sql) # result = cursor.fetchone() # if result[0] == 0: # return False # else: # return True # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行change_to_new_make_psid()异常: {str(e)}") # return "error" # #判断是否已经存在了 # def isInTaskDistributed(task_key): # try: # with pymysqlAlias() as conn: # cursor = conn.cursor() # sql = f'select count(*) from task_distributed where status = 0 and task_key = "{task_key}"' # cursor.execute(sql) # result = cursor.fetchone() # if result[0] == 0: # return False # else: # return True # except Exception as e: # print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行isInTaskDistributed()异常: {str(e)}") # return "error"