diff --git a/libs/common.py b/libs/common.py new file mode 100644 index 0000000..e0602b4 --- /dev/null +++ b/libs/common.py @@ -0,0 +1,96 @@ +import redis,sys,os +import platform +if platform.system() == 'Windows': + #sys.path.append('e:\\libs\\') + sys.path.append('libs') +else: + sys.path.append('/data/deploy/make3d/make2/libs/') +import config,libs + +#判断模型是需要高精模 或者是 需要photo3 参与 +def task_need_high_model_or_photo3(pid): + resHigh = task_need_high_model(pid) + resPhoto3 = task_need_photo3(pid) + if resHigh or resPhoto3: + return True + else: + return False + +#判断是否需要高精模 +def task_need_high_model(pid): + redis_conn = config.redis_local_common + #判断在redis中是否有高精模和 需要photo3 参与的 task + if redis_conn.sismember("calculateHighModel",pid): + return True + #判断是否需要高精模 + if redis_conn.sismember("calculateHighModel_no",pid): + return False + + + #判断是否需要高精模 + if libs.get_ps_type(pid) == 1: + if libs.aliyun_face(pid): + #calulate_type = 'calculateHighModel' + redis_conn.sadd("calculateHighModel",pid) + return True + else: + redis_conn.sadd("calculateHighModel_no",pid) + return False + +#判断是否需要photo3参与建模 +def task_need_photo3(pid): + redis_conn = config.redis_local_common + #判断在redis中是否有高精模和 需要photo3 参与的 task + if redis_conn.sismember("photo3",pid): + return True + #判断是否需要photo3参与建模 + if redis_conn.sismember("photo3_no",pid): + return False + if os.path.exists(os.path.join(config.workdir, pid, 'photo3')): + redis_conn.sadd("photo3",pid) + return True + else: + redis_conn.sadd("photo3_no",pid) + return False + +#拷贝远程主机上的指定目录到本地指定目录 +def copy_remote_directory(remote_host, remote_path, local_path): + # 建立 SSH 连接 + ssh = paramiko.SSHClient() + ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + ssh.connect(remote_host, username='your_username', password='your_password') + + # 创建 SFTP 客户端 + sftp = ssh.open_sftp() + + # 获取远程目录下的所有文件/子目录 + file_list = sftp.listdir(remote_path) + + # 遍历远程目录中的每个文件/子目录 + for file_name in file_list: + remote_file_path = os.path.join(remote_path, file_name) + local_file_path = os.path.join(local_path, file_name) + + # 判断当前项是文件还是目录 + if sftp.stat(remote_file_path).st_isdir(): + # 如果是目录,递归调用函数进行拷贝 + os.makedirs(local_file_path, exist_ok=True) + copy_remote_directory(remote_host, remote_file_path, local_file_path) + else: + # 如果是文件,直接拷贝到指定目录 + sftp.get(remote_file_path, local_file_path) + + # 关闭 SFTP 客户端和 SSH 连接 + sftp.close() + ssh.close() + +#移除redis中的高精模和 需要photo3 参与的 task +# def remove_redis_high_model_or_photo3(pid): +# redis_conn = config.redis_local_common +# redis_conn.srem("calculateHighModel",pid) +# redis_conn.srem("photo3",pid) + + +# if __name__ == '__main__': +# redis_conn = config.redis_local_common +# print(redis_conn.sismember("photo3_no","1")) \ No newline at end of file diff --git a/libs/config.py b/libs/config.py index 999e172..ccb9dc2 100644 --- a/libs/config.py +++ b/libs/config.py @@ -127,4 +127,7 @@ task_run_timeout = { "step2":60*12, "step3":60*12, }, -} \ No newline at end of file +} + +high_host = ["R11","R12"] +low_host = ["R13","R14","R15","R16","R17"] diff --git a/libs/main_service_db.py b/libs/main_service_db.py new file mode 100644 index 0000000..107c2a2 --- /dev/null +++ b/libs/main_service_db.py @@ -0,0 +1,188 @@ +# mysql数据库常用任务函数封装 +import pymysql, socket, time +import config +import logging +logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) +#公共连接库 +def pymysqlAlias(): + return pymysql.connect( + host=config.mysql_local['host'], + port=config.mysql_local['port'], + user=config.mysql_local['user'], + password=config.mysql_local['password'], + db=config.mysql_local['db'], + charset=config.mysql_local['charset'],) + +#查询 task_distributed +def db_task_distributed(where): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = 'select * from task_distributed where 1=1' + if where: + sql += f' and {where}' + + cursor.execute(sql) + result = cursor.fetchone() + # 关闭游标和连接 + ##cursor.close() + #conn.close() + return result + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}") + return 'error' + +def db_task_distributed_list(where): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = 'select * from task_distributed where 1=1' + if where: + sql += f' and {where}' + + cursor.execute(sql) + result = cursor.fetchall() + # 关闭游标和连接 + ##cursor.close() + #conn.close() + return result + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}") + return 'error' + +#查询 task_distributed_detail +def db_task_distributed_detail(where): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = 'select * from task_distributed_detail where 1=1' + if where: + sql += f' and {where}' + + cursor.execute(sql) + result = cursor.fetchone() + # 关闭游标和连接 + #cursor.close() + #conn.close() + return result + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}") + return 'error' + +#查询指定条件下的数量 task_distributed_detail +def db_task_distributed_detail_count(where): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor(pymysql.cursors.DictCursor) + sql = 'select count(*) as nums from task_distributed_detail where 1=1' + if where: + sql += f' and {where}' + + cursor.execute(sql) + result = cursor.fetchone() + # 关闭游标和连接 + #cursor.close() + #conn.close() + return result["nums"] + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}") + return 'error' + + +# 在task_distributed_detail插入明细步骤 +def add_task_distributed_detail(data): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor() + sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{data["hostname"]}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' + cursor.execute(sql) + conn.commit() + return "ok" + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") + return "error" + +# 更新 task_distributed 主表 +def update_task_distributed(data): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor() + + sql = f'update task_distributed set ' + #判断要更新哪些字段 + if "status" in data: + sql += f'status = "{data["status"]}",' + + if "hostname" in data: + sql += f'hostname = "{data["hostname"]}",' + + if "step_last" in data: + sql += f'step_last = "{data["step_last"]}",' + + if "priority" in data: + sql += f'priority = "{data["priority"]}",' + + if "started_at" in data: + sql += f'started_at = "{data["started_at"]}",' + + if "finished_at" in data: + sql += f'finished_at = "{data["finished_at"]}",' + + + #去掉 sql 最右边的逗号 + sql = sql.rstrip(',') + + + sql += f' where 1=1 ' + #条件要放在最后面 + if "id" in data: + sql += f' and id = "{data["id"]}"' + + if "task_key" in data: + sql += f' and task_type = "{data["task_key"]}" and status != 2' + + #sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"' + # print(f'sql: {sql}') + cursor.execute(sql) + conn.commit() + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") + return "error" + +# 更新 task_distributed_detail 主表 +def update_task_distributed_detail(data): + try: + with pymysqlAlias() as conn: + cursor = conn.cursor() + + sql = f'update task_distributed_detail set ' + #判断要更新哪些字段 + if "finished_at" in data: + sql += f'finished_at = "{data["finished_at"]}"' + + if "step" in data: + sql += f',step = "{data["step"]}"' + + if "hostname" in data: + sql += f',hostname = "{data["hostname"]}"' + + #where 条件 + sql += f' where 1=1 ' + if "task_distributed_id" in data: + sql += f' and task_distributed_id = "{data["task_distributed_id"]}"' + + if "step" in data: + sql += f' and step = "{data["step"]}"' + + cursor.execute(sql) + conn.commit() + except Exception as e: + print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") + logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") + return "error" \ No newline at end of file diff --git a/logic/logic_main_service.py b/logic/logic_main_service.py index dd853cd..e28c395 100644 --- a/logic/logic_main_service.py +++ b/logic/logic_main_service.py @@ -5,113 +5,81 @@ if platform.system() == 'Windows': #线上正式运行 sys.path.append('e:\\libs\\') #本地测试 - #sys.path.append('libs') else: sys.path.append('/data/deploy/make3d/make2/libs/') -import config -# import multiprocessing +import config,common,main_service_db import logging -# 创建互斥锁 -# lock = multiprocessing.Lock() hostname = socket.gethostname() - logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) -#公共连接库 -def pymysqlAlias(): - return pymysql.connect( - host=config.mysql_local['host'], - port=config.mysql_local['port'], - user=config.mysql_local['user'], - password=config.mysql_local['password'], - db=config.mysql_local['db'], - charset=config.mysql_local['charset'],) #获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"} def get_task_distributed(): - try: - with pymysqlAlias() as conn: - cursor = conn.cursor(pymysql.cursors.DictCursor) - sql = '' - # 获取数据前先获取锁 - # lock.acquire() - #非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 - if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": - sql = f'select * from task_distributed where status = 1 order by priority desc,created_at asc limit 1 for update' - cursor.execute(sql) - result = cursor.fetchone() - #print("查询主任务表",result) - if result: - #获取需要执行的步骤 - next_step = need_run_stepx(result["id"]) - if next_step == "no" or next_step == "error": - print("获取需要执行的步骤 next_step",next_step) - return next_step - taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} - flagRes = update_main_and_add_detail(taskData) - if flagRes == "error": - print(f'出现错误,有可能是多个进程获取同一个任务了') - return "error" - print(f'任务ID-{taskData["task_key"]}- "执行{taskData["run_step"]}" ') - return taskData - else: - return 'no_data' - else: - - #R11 R12的主机 可以执行step1 2 3 的任务 - #如果R11 R12的主机目前没有正在执行step2,则优先处理step2, - # print("次数",is_run_stepx_nums("step2")) - if is_run_stepx_nums("step2") < 2: - resultData = need_run_step2() - if resultData != "no": - resultData["hostname"] = hostname - flagRes = update_main_and_add_detail(resultData) - if flagRes == "error": - print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了') - return "error" - print(f'任务ID-{resultData["task_key"]}- "执行step2" ') - return resultData - - #R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 - resultData = need_run_step_no_step2() - #print("resultData",resultData) - if resultData == "no": - return "no" - - resultData["hostname"] = hostname - flagRes = update_main_and_add_detail(resultData) + #非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 + if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": + #查询数据库指定内容 + result = main_service_db.db_task_distributed("status = 1 order by priority desc,created_at asc limit 1 for update") + if result: + #获取需要执行的步骤 + next_step = need_run_stepx(result["id"]) + if next_step == "no" or next_step == "error": + print("获取需要执行的步骤 next_step",next_step) + return next_step + #非R11 R12 的主机在执行step2的时候,需要判断当前模型是否需要高精模或者photo3参与建模,如果是的话,该主机不执行这一步 + if next_step == "step2": + if common.task_need_high_model_or_photo3(result["task_key"]): + print(f'模型{result["task_key"]}需要高精模或者photo3参与建模,该主机{hostname}不执行step2') + return "no" + taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} + flagRes = update_main_and_add_detail(taskData) if flagRes == "error": print(f'出现错误,有可能是多个进程获取同一个任务了') return "error" - print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') - return resultData - - #sql = f'select * from task_distributed where status != 2 order by priority desc,created_at asc limit 1' - # print(f'sql: {sql}') + print(f'任务ID-{taskData["task_key"]}- "执行{taskData["run_step"]}"') + return taskData + else: + return 'no_data' + else: + #R11 R12的主机 可以执行step1 2 3 的任务 + #如果R11 R12的主机目前没有正在执行step2,则优先处理step2, + # print("次数",is_run_stepx_nums("step2")) + if is_run_stepx_nums("step2") < 2: + resultData = need_run_step2() + if resultData != "no": + resultData["hostname"] = hostname + flagRes = update_main_and_add_detail(resultData) + if flagRes == "error": + print(f'出现错误,有可能是多个进程获取同一个任务了,重新获取任务去执行了') + return "error" + print(f'任务ID-{resultData["task_key"]}- "执行step2" ') + return resultData + #R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 + resultData = need_run_step_no_step2() + if resultData == "no": + return "no" + resultData["hostname"] = hostname + flagRes = update_main_and_add_detail(resultData) + if flagRes == "error": + print(f'出现错误,有可能是多个进程获取同一个任务了') + return "error" + print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') + return resultData except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") return 'error' - - # finally: - # # 释放锁 - # lock.release() #查询当前主机有没有正在执行某个任务 def is_run_stepx(step): try: - with pymysqlAlias() as conn: - cursor = conn.cursor(pymysql.cursors.DictCursor) - sql = f'select * from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' - # print(f'sql: {sql}') - cursor.execute(sql) - result = cursor.fetchone() - if result: - return "yes" - else: - return "no" + where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update' + result = main_service_db.db_task_distributed(where) + if result: + return "yes" + else: + return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx({hostname},{step})异常: {str(e)}") @@ -121,13 +89,8 @@ def is_run_stepx(step): #查询当前主机某个步骤正在执行的数量 def is_run_stepx_nums(step): try: - with pymysqlAlias() as conn: - cursor = conn.cursor(pymysql.cursors.DictCursor) - sql = f'select count(*) as nums from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update' - # print(f'sql: {sql}') - cursor.execute(sql) - result = cursor.fetchone() - return result["nums"] + where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update' + return main_service_db.db_task_distributed_detail_count(where) except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") @@ -137,30 +100,27 @@ def is_run_stepx_nums(step): #查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3 def need_run_stepx(task_distributed_id): try: - with pymysqlAlias() as conn: - cursor = conn.cursor(pymysql.cursors.DictCursor) - #查询task_distributed_id 对应的子任务是否正在执行, - sql = f'select * from task_distributed_detail where hostname = "{hostname}" and task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' - # print(f'sql: {sql}') - cursor.execute(sql) - result = cursor.fetchone() - #如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 - if result is None: - return "step1" - if result and result["finished_at"] is None: - #该任务正在运行中不需要执行下一步 - return "no" - #查询改任务的最后一个状态 - if result and result["finished_at"]: - if result["step"] == "step1": - return "step2" - elif result["step"] == "step2": - return "step3" - elif result["step"] == "step3": - #这里要将 主任务表的状态改为2,finished_at改为当前时间 - update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"}) - return "no" + + #查询task_distributed_id 对应的子任务是否正在执行, + where = f'hostname = "{hostname}" and task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' + result = main_service_db.db_task_distributed_detail(where) + #如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 + if result is None: + return "step1" + if result and result["finished_at"] is None: + #该任务正在运行中不需要执行下一步 return "no" + #查询改任务的最后一个状态 + if result and result["finished_at"]: + if result["step"] == "step1": + return "step2" + elif result["step"] == "step2": + return "step3" + elif result["step"] == "step3": + #这里要将 主任务表的状态改为2,finished_at改为当前时间 + update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),"step_last":"step3"}) + return "no" + return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_stepx({hostname},{task_distributed_id})异常: {str(e)}") @@ -169,25 +129,19 @@ def need_run_stepx(task_distributed_id): #查询出哪些任务需要执行step2 def need_run_step2(): try: - with pymysqlAlias() as conn: - cursor = conn.cursor(pymysql.cursors.DictCursor) - sql = f'select * from task_distributed where status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update' - # print(f'sql: {sql}') - cursor.execute(sql) - result = cursor.fetchall() - - #判断result的长度 - #判断是否有值 - if len(result) == 0: - return "no" - #遍历循环哪笔需要执行step2 - - for row in result: - #判断是否有正在执行的step2 - if need_run_stepx(row["id"]) == "step2": - #没有正在执行的step2,则返回该任务 - return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]} + where = f'status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update' + result = main_service_db.db_task_distributed_list(where) + #判断result的长度 + #判断是否有值 + if len(result) == 0: return "no" + #遍历循环哪笔需要执行step2 + for row in result: + #判断是否有正在执行的step2 + if need_run_stepx(row["id"]) == "step2": + #没有正在执行的step2,则返回该任务 + return {"hostname":hostname,"run_step":"step2","task_distributed_id":row["id"],"task_key":row["task_key"]} + return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step2()异常: {str(e)}") @@ -195,23 +149,19 @@ def need_run_step2(): #查询出哪些任务需要执行指定的step def need_run_appoint_step(step): - try: - with pymysqlAlias() as conn: - cursor = conn.cursor(pymysql.cursors.DictCursor) - sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' - # print(f'sql: {sql}') - cursor.execute(sql) - result = cursor.fetchall() - #判断是否有值 - if len(result) == 0: - return "no" - #遍历循环哪笔需要执行step2 - for row in result: - #判断是否有正在执行的step2 - if need_run_stepx(row["id"]) == step: - #没有正在执行的step2,则返回该任务 - return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]} + try: + where = f'finished_at is null order by priority desc,created_at asc for update' + result = main_service_db.db_task_distributed_list(where) + #判断是否有值 + if len(result) == 0: return "no" + #遍历循环哪笔需要执行step2 + for row in result: + #判断是否有正在执行的step2 + if need_run_stepx(row["id"]) == step: + #没有正在执行的step2,则返回该任务 + return {"hostname":hostname,"run_step":step,"task_distributed_id":row["id"],"task_key":row["task_key"]} + return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_appoint_step()异常: {str(e)}") @@ -220,145 +170,23 @@ def need_run_appoint_step(step): #查询出哪些任务需要执行非step2 def need_run_step_no_step2(): try: - with pymysqlAlias() as conn: - cursor = conn.cursor(pymysql.cursors.DictCursor) - sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update' - # print(f'sql: {sql}') - cursor.execute(sql) - result = cursor.fetchall() - #判断是否有值 - if len(result) == 0: - return "no" - #遍历循环哪笔需要执行step2 - - for row in result: - #判断是否有正在执行的step2 - xstep = need_run_stepx(row["id"]) - # print("查询非step2的任务列表",xstep) - if xstep != "step2" and (xstep == "step1" or xstep == "step3"): - #没有正在执行的step2,则返回该任务 - return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} + result = main_service_db.db_task_distributed_list("finished_at is null order by priority desc,created_at asc for update") + #判断是否有值 + if len(result) == 0: return "no" + #遍历循环哪笔需要执行step2 + for row in result: + #判断是否有正在执行的step2 + xstep = need_run_stepx(row["id"]) + # print("查询非step2的任务列表",xstep) + if xstep != "step2" and (xstep == "step1" or xstep == "step3"): + #没有正在执行的step2,则返回该任务 + return {"hostname":hostname,"run_step":xstep,"task_distributed_id":row["id"],"task_key":row["task_key"]} + return "no" except Exception as e: print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") return 'error' - - -# 在任务分布执行主表插入任务,所有的任务执行都从这里获取 -def add_task_distributed(data): - try: - with pymysqlAlias() as conn: - cursor = conn.cursor() - - sql = f'insert into task_distributed (task_type, task_key,created_at) values ("{data["task_type"]}", "{data["task_key"]}","{now()}")' - # print(f'sql: {sql}') - cursor.execute(sql) - - # 获取插入数据的自增ID - last_insert_id = cursor.lastrowid - conn.commit() - return last_insert_id - except Exception as e: - print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") - logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}") - return "error" - -# 在task_distributed_detail插入明细步骤 -def add_task_distributed_detail(data): - try: - with pymysqlAlias() as conn: - cursor = conn.cursor() - - sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{hostname}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")' - # print(f'sql: {sql}') - cursor.execute(sql) - conn.commit() - return "ok" - except Exception as e: - print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") - logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}") - return "error" - -# 更新 task_distributed 主表 -def update_task_distributed(data): - try: - with pymysqlAlias() as conn: - cursor = conn.cursor() - - sql = f'update task_distributed set ' - #判断要更新哪些字段 - if "status" in data: - sql += f'status = "{data["status"]}",' - - if "hostname" in data: - sql += f'hostname = "{data["hostname"]}",' - - if "step_last" in data: - sql += f'step_last = "{data["step_last"]}",' - - if "priority" in data: - sql += f'priority = "{data["priority"]}",' - - if "started_at" in data: - sql += f'started_at = "{data["started_at"]}",' - - if "finished_at" in data: - sql += f'finished_at = "{data["finished_at"]}",' - - - #去掉 sql 最右边的逗号 - sql = sql.rstrip(',') - - - sql += f' where 1=1 ' - #条件要放在最后面 - if "id" in data: - sql += f' and id = "{data["id"]}"' - - if "task_key" in data: - sql += f' and task_type = "{data["task_key"]}" and status != 2' - - #sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"' - # print(f'sql: {sql}') - cursor.execute(sql) - conn.commit() - except Exception as e: - print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") - logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}") - return "error" - -# 更新 task_distributed_detail 主表 -def update_task_distributed_detail(data): - try: - with pymysqlAlias() as conn: - cursor = conn.cursor() - - sql = f'update task_distributed_detail set ' - #判断要更新哪些字段 - if "finished_at" in data: - sql += f'finished_at = "{data["finished_at"]}"' - - if "step" in data: - sql += f',step = "{data["step"]}"' - - if "hostname" in data: - sql += f',hostname = "{data["hostname"]}"' - - #where 条件 - sql += f' where 1=1 ' - if "task_distributed_id" in data: - sql += f' and task_distributed_id = "{data["task_distributed_id"]}"' - - if "step" in data: - sql += f' and step = "{data["step"]}"' - - cursor.execute(sql) - conn.commit() - except Exception as e: - print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") - logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}") - return "error" #更新主表和插入明细表的步骤 def update_main_and_add_detail(data): @@ -367,6 +195,6 @@ def update_main_and_add_detail(data): else: updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]} #更新主表 - update_task_distributed(updateData) + main_service_db.update_task_distributed(updateData) #插入明细表数据 - return add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) \ No newline at end of file + return main_service_db.add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) \ No newline at end of file diff --git a/main_service.py b/main_service.py index 84dfb09..f3e86b3 100644 --- a/main_service.py +++ b/main_service.py @@ -1,7 +1,14 @@ -import sys,socket,time +import sys,socket,time,platform from logic import logic_main_service import logging import main_step1,main_step2,main_step3 +if platform.system() == 'Windows': + #线上正式运行 + sys.path.append('e:\\libs\\') + #本地测试 +else: + sys.path.append('/data/deploy/make3d/make2/libs/') +import main_service_db if __name__ == '__main__': #循环值守 while True: @@ -14,31 +21,31 @@ if __name__ == '__main__': else: if data["run_step"] == "step1": # 本地测试分布运行的用 - # time.sleep(5) - # print("更新step1的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) - # logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step1","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + time.sleep(5) + print("更新step1的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + main_service_db.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step1","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) #生产线上用 - main_step1.step1(data["task_key"], experience=False, makeloop=False,task_distributed_id=data['task_distributed_id']) + #main_step1.step1(data["task_key"], experience=False, makeloop=False,task_distributed_id=data['task_distributed_id']) elif data["run_step"] == "step2": # 本地测试分布运行的用 - # time.sleep(15) - # print("更新step2的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) - # logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step2","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + time.sleep(15) + print("更新step2的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + main_service_db.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step2","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) #生产线上用 - main_step2.step2(data["task_key"], data['task_distributed_id']) + #main_step2.step2(data["task_key"], data['task_distributed_id']) elif data["run_step"] == "step3": # 本地测试分布运行的用 - # time.sleep(8) + time.sleep(8) # #更新子表的finished_at - # print("更新step3的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) - # logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step3","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + print("更新step3的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) + main_service_db.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step3","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) #更新主表的status 和 finished_at - #logic_main_service.update_task_distributed({"id":data["task_distributed_id"],"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + main_service_db.update_task_distributed({"id":data["task_distributed_id"],"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) #生产线上用 - main_step3.step3(data["task_key"], data['task_distributed_id']) + #main_step3.step3(data["task_key"], data['task_distributed_id']) diff --git a/main_step1.py b/main_step1.py index 0fb8682..8618bff 100644 --- a/main_step1.py +++ b/main_step1.py @@ -3,10 +3,10 @@ from PIL import Image import platform if platform.system() == 'Windows': sys.path.append('e:\\libs\\') + #sys.path.append('libs') else: sys.path.append('/data/deploy/make3d/make2/libs/') -from logic import logic_main_service -import config, libs, libs_db +import config, libs, libs_db,main_service_db def filter_dark_texture_image(pid): start_time = time.time() @@ -180,7 +180,7 @@ def step1(pid, experience=False, makeloop=True,task_distributed_id=""): # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} step1任务完成,移动到共享目录') else: #分布式服务执行完后,需要更新任务状态,更新字表的finished_at字段 - logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + main_service_db.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) return diff --git a/main_step2.py b/main_step2.py index fb7f54c..897fe04 100644 --- a/main_step2.py +++ b/main_step2.py @@ -2,10 +2,10 @@ import os, sys, time, shutil, subprocess, shlex import platform if platform.system() == 'Windows': sys.path.append('e:\\libs\\') + #sys.path.append('libs') else: sys.path.append('/data/deploy/make3d/make2/libs/') -from logic import logic_main_service -import config, libs, libs_db +import config, libs, libs_db,common,main_service_db def load_model(pid): cmd = f'{config.rcbin} {config.r1["init"]} -load "{os.path.join(config.workdir, pid, f"{pid}.rcproj")}"' @@ -23,7 +23,7 @@ def get_rcver(): def make3d(pid): simplify_value = 1000000 * libs.getHeadCount(pid) add_photo3 = ' ' - if os.path.exists(os.path.join(config.workdir, pid, 'photo3')): + if common.task_need_photo3(pid): add_photo3 = ' -addFolder "' + os.path.join(config.workdir, pid, 'photo3') + '" -align -align ' if get_rcver() == 1: # old version @@ -44,13 +44,14 @@ def make3d(pid): cmd = f'{config.rcbin} {config.r1["init"]} -load "{os.path.join(config.workdir, pid, f"{pid}.rcproj")}" -update \ -setReconstructionRegion "{os.path.join(config.workdir, pid, f"{pid}.rcbox")}" \ - -mvs -modelSelectMaximalConnectedComponent -modelInvertSelection -modelRemoveSelectedTriangles -closeHoles -clean -simplify {simplify_value} -smooth -unwrap -calculateTexture -renameModel {pid} -exportModel "{pid} {os.path.join(config.workdir, pid, "output", f"{pid}.obj")}" "d:\\make2\\config\\ModelExportParams102.xml" -quit' + -mvs -modelSelectMaximalConnectedComponent -modelInvertSelection -modelRemoveSelectedTriangles -closeHoles -clean -simplify {simplify_value} -smooth -unwrap -calculateTexture -renameModel {pid} -exportModel "{pid}" "{os.path.join(config.workdir, pid, "output", f"{pid}.obj")}" "d:\\make2\\config\\ModelExportParams102.xml" -quit' print(cmd) cmd = shlex.split(cmd) res = subprocess.run(cmd) else: # new version - if libs.aliyun_face(pid) and libs.get_ps_type(pid) == 1: - calulate_type = 'calculateHighModel' + #判断是否要进行高精模 + if common.task_need_high_model(pid): + calulate_type = 'calculateHighModel' else: calulate_type = 'calculateNormalModel' cmd = f'{config.rcbin} {config.r2["init"]} -setInstanceName {pid} \ @@ -65,11 +66,18 @@ def make3d(pid): def step2(pid,task_distributed_id=""): print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 开始建模任务step2') + + #判断是否要从共享目录拷贝数据 if os.path.exists(os.path.join(config.sharedir, pid)) and not os.path.exists(os.path.join(config.workdir, pid)): shutil.move(os.path.join(config.sharedir, pid), config.workdir) else: print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 目录{os.path.join(config.sharedir, pid)}不存在,或{os.path.join(config.workdir, pid)}已存在') - # return + # return + #最后还是要判断有没有存在目录 + if not os.path.exists(os.path.join(config.workdir, pid)): + print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 目录{os.path.join(config.workdir, pid)}不存在') + return + start_time = time.time() make3d(pid) @@ -78,7 +86,7 @@ def step2(pid,task_distributed_id=""): if task_distributed_id == "": os.system(f'python d:\\make2\\main_step3.py {pid}') else: - logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + main_service_db.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) return def main(pid): diff --git a/main_step3.py b/main_step3.py index 75ab35d..033fe0a 100644 --- a/main_step3.py +++ b/main_step3.py @@ -6,8 +6,7 @@ if platform.system() == 'Windows': #sys.path.append('libs') else: sys.path.append('/data/deploy/make3d/make2/libs/') -from logic import logic_main_service -import config, libs, libs_db +import config, libs, libs_db,main_service_db def bmesh_copy_from_object(obj, transform=True, triangulate=True, apply_modifiers=False): """Returns a transformed, triangulated copy of the mesh""" @@ -211,9 +210,9 @@ def step3(pid,task_distributed_id=""): libs_db.finish_task({"task_type": "make", "task_key": pid}) if task_distributed_id: #更新子表的finished_at - logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + main_service_db.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) #更新主表的status 和 finished_at - logic_main_service.update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) + main_service_db.update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) return def main(pid): diff --git a/test.py b/test.py new file mode 100644 index 0000000..8f612d1 --- /dev/null +++ b/test.py @@ -0,0 +1,18 @@ +import os, sys, time, shutil, subprocess, shlex, json +import platform +if platform.system() == 'Windows': + #sys.path.append('e:\\libs\\') + sys.path.append('libs') +else: + sys.path.append('/data/deploy/make3d/make2/libs/') + +import config, libs, libs_db,common + + +if __name__ == '__main__': + + common.copy_remote_directory("172.31.1.11","D:\\7831","E:\\") + # config.oss_bucket.delete_object(f'test/test_delete') + # #删除oss 上的文件夹里的内容 + # object_list = oss2.ObjectIterator(config.oss_bucket, prefix='test/test_delete/') + # result = config.oss_bucket.batch_delete_objects([obj.key for obj in object_list]) diff --git a/tools/auto_distance.py b/tools/auto_distance.py index 2a0f25c..219a9a5 100644 --- a/tools/auto_distance.py +++ b/tools/auto_distance.py @@ -22,29 +22,82 @@ def find_and_maximize_window(window_title): return pid, left, top, right, bottom return '0', 0, 0, 0, 0 +# def get_defineDistances(pid, left, top, right, bottom): +# psid = libs.getPSid(pid) +# distances = config.ps_floor_sticker.get(psid, config.ps_floor_sticker['default']) +# for index, d in enumerate(distances.split(';')): +# p1, p2, distance = d.split(' ') +# if index == 0: +# ag.moveTo(left + 80, top + 290) +# else: +# ag.moveTo(left + 80, top + 290 + 15) # Create distance line height 15 +# ag.click() + +# ag.moveTo(left + 302, (bottom - 100)) # A point +# ag.click();repeat_backspace(20) +# ag.typewrite(p1) + +# ag.moveTo(left + 302, (bottom - 80)) # B point +# ag.click();repeat_backspace(20) +# ag.typewrite(p2) + +# ag.moveTo(left + 302, (bottom - 35)) # Definded distance +# ag.click();repeat_backspace(8) +# ag.typewrite(distance) +# ag.press('enter') + def get_defineDistances(pid, left, top, right, bottom): psid = libs.getPSid(pid) distances = config.ps_floor_sticker.get(psid, config.ps_floor_sticker['default']) + time.sleep(5) + y = 748 for index, d in enumerate(distances.split(';')): p1, p2, distance = d.split(' ') if index == 0: - ag.moveTo(left + 80, top + 290) + #移动到 control points + ag.moveTo(14, 191) else: - ag.moveTo(left + 80, top + 290 + 15) # Create distance line height 15 + #不是第一次就移动到 create distance + ag.moveTo(65, 300+30) # Create distance line height 15 + time.sleep(0.5) ag.click() + time.sleep(1.2) + if index == 0: + ag.moveTo(65, 285+30) + time.sleep(1) + ag.click() - ag.moveTo(left + 302, (bottom - 100)) # A point - ag.click();repeat_backspace(20) + #点击左下的A点 + time.sleep(0.5) + ag.moveTo(233, y) # A point + time.sleep(0.5) + ag.click() + time.sleep(0.5) + repeat_backspace(20) + time.sleep(1) ag.typewrite(p1) - - ag.moveTo(left + 302, (bottom - 80)) # B point - ag.click();repeat_backspace(20) + time.sleep(1) + #点击左下的B点 + ag.moveTo(233, y+15) # B point + time.sleep(0.5) + ag.click(); + time.sleep(0.5) + repeat_backspace(20) + time.sleep(1) ag.typewrite(p2) - - ag.moveTo(left + 302, (bottom - 35)) # Definded distance - ag.click();repeat_backspace(8) + time.sleep(1) + #点击左下的Definded distance + ag.moveTo(233,y+60) # Definded distance + time.sleep(0.5) + ag.click(); + time.sleep(0.5) + repeat_backspace(15) + time.sleep(1.5) ag.typewrite(distance) + time.sleep(1.5) ag.press('enter') + print(f'执行距离划定{index}') + def repeat_backspace(times): @@ -58,8 +111,8 @@ def defind_distance(pid, left, top, right, bottom): print(f'left: {left}, top: {top}, right: {right}, bottom: {bottom}') # ag.PAUSE = 1 - ag.moveTo(left + 20, top + 200) # open Control points - ag.click() + # ag.moveTo(left + 20, top + 200) # open Control points + # ag.click() get_defineDistances(pid, left, top, right, bottom)