Browse Source

数据库代码抽离,增加影棚ID

master
dongchangxi 2 years ago
parent
commit
abca8a3cfc
  1. 96
      libs/common.py
  2. 3
      libs/config.py
  3. 188
      libs/main_service_db.py
  4. 216
      logic/logic_main_service.py
  5. 35
      main_service.py
  6. 6
      main_step1.py
  7. 20
      main_step2.py
  8. 7
      main_step3.py
  9. 18
      test.py
  10. 77
      tools/auto_distance.py

96
libs/common.py

@ -0,0 +1,96 @@
import redis,sys,os
import platform
if platform.system() == 'Windows':
#sys.path.append('e:\\libs\\')
sys.path.append('libs')
else:
sys.path.append('/data/deploy/make3d/make2/libs/')
import config,libs
#判断模型是需要高精模 或者是 需要photo3 参与
def task_need_high_model_or_photo3(pid):
resHigh = task_need_high_model(pid)
resPhoto3 = task_need_photo3(pid)
if resHigh or resPhoto3:
return True
else:
return False
#判断是否需要高精模
def task_need_high_model(pid):
redis_conn = config.redis_local_common
#判断在redis中是否有高精模和 需要photo3 参与的 task
if redis_conn.sismember("calculateHighModel",pid):
return True
#判断是否需要高精模
if redis_conn.sismember("calculateHighModel_no",pid):
return False
#判断是否需要高精模
if libs.get_ps_type(pid) == 1:
if libs.aliyun_face(pid):
#calulate_type = 'calculateHighModel'
redis_conn.sadd("calculateHighModel",pid)
return True
else:
redis_conn.sadd("calculateHighModel_no",pid)
return False
#判断是否需要photo3参与建模
def task_need_photo3(pid):
redis_conn = config.redis_local_common
#判断在redis中是否有高精模和 需要photo3 参与的 task
if redis_conn.sismember("photo3",pid):
return True
#判断是否需要photo3参与建模
if redis_conn.sismember("photo3_no",pid):
return False
if os.path.exists(os.path.join(config.workdir, pid, 'photo3')):
redis_conn.sadd("photo3",pid)
return True
else:
redis_conn.sadd("photo3_no",pid)
return False
#拷贝远程主机上的指定目录到本地指定目录
def copy_remote_directory(remote_host, remote_path, local_path):
# 建立 SSH 连接
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(remote_host, username='your_username', password='your_password')
# 创建 SFTP 客户端
sftp = ssh.open_sftp()
# 获取远程目录下的所有文件/子目录
file_list = sftp.listdir(remote_path)
# 遍历远程目录中的每个文件/子目录
for file_name in file_list:
remote_file_path = os.path.join(remote_path, file_name)
local_file_path = os.path.join(local_path, file_name)
# 判断当前项是文件还是目录
if sftp.stat(remote_file_path).st_isdir():
# 如果是目录,递归调用函数进行拷贝
os.makedirs(local_file_path, exist_ok=True)
copy_remote_directory(remote_host, remote_file_path, local_file_path)
else:
# 如果是文件,直接拷贝到指定目录
sftp.get(remote_file_path, local_file_path)
# 关闭 SFTP 客户端和 SSH 连接
sftp.close()
ssh.close()
#移除redis中的高精模和 需要photo3 参与的 task
# def remove_redis_high_model_or_photo3(pid):
# redis_conn = config.redis_local_common
# redis_conn.srem("calculateHighModel",pid)
# redis_conn.srem("photo3",pid)
# if __name__ == '__main__':
# redis_conn = config.redis_local_common
# print(redis_conn.sismember("photo3_no","1"))

3
libs/config.py

@ -128,3 +128,6 @@ task_run_timeout = {
"step3":60*12, "step3":60*12,
}, },
} }
high_host = ["R11","R12"]
low_host = ["R13","R14","R15","R16","R17"]

188
libs/main_service_db.py

@ -0,0 +1,188 @@
# mysql数据库常用任务函数封装
import pymysql, socket, time
import config
import logging
logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR)
#公共连接库
def pymysqlAlias():
return pymysql.connect(
host=config.mysql_local['host'],
port=config.mysql_local['port'],
user=config.mysql_local['user'],
password=config.mysql_local['password'],
db=config.mysql_local['db'],
charset=config.mysql_local['charset'],)
#查询 task_distributed
def db_task_distributed(where):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from task_distributed where 1=1'
if where:
sql += f' and {where}'
cursor.execute(sql)
result = cursor.fetchone()
# 关闭游标和连接
##cursor.close()
#conn.close()
return result
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed()异常: {str(e)}")
return 'error'
def db_task_distributed_list(where):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from task_distributed where 1=1'
if where:
sql += f' and {where}'
cursor.execute(sql)
result = cursor.fetchall()
# 关闭游标和连接
##cursor.close()
#conn.close()
return result
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_list()异常: {str(e)}")
return 'error'
#查询 task_distributed_detail
def db_task_distributed_detail(where):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select * from task_distributed_detail where 1=1'
if where:
sql += f' and {where}'
cursor.execute(sql)
result = cursor.fetchone()
# 关闭游标和连接
#cursor.close()
#conn.close()
return result
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail()异常: {str(e)}")
return 'error'
#查询指定条件下的数量 task_distributed_detail
def db_task_distributed_detail_count(where):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = 'select count(*) as nums from task_distributed_detail where 1=1'
if where:
sql += f' and {where}'
cursor.execute(sql)
result = cursor.fetchone()
# 关闭游标和连接
#cursor.close()
#conn.close()
return result["nums"]
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行db_task_distributed_detail_count()异常: {str(e)}")
return 'error'
# 在task_distributed_detail插入明细步骤
def add_task_distributed_detail(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{data["hostname"]}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")'
cursor.execute(sql)
conn.commit()
return "ok"
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}")
return "error"
# 更新 task_distributed 主表
def update_task_distributed(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'update task_distributed set '
#判断要更新哪些字段
if "status" in data:
sql += f'status = "{data["status"]}",'
if "hostname" in data:
sql += f'hostname = "{data["hostname"]}",'
if "step_last" in data:
sql += f'step_last = "{data["step_last"]}",'
if "priority" in data:
sql += f'priority = "{data["priority"]}",'
if "started_at" in data:
sql += f'started_at = "{data["started_at"]}",'
if "finished_at" in data:
sql += f'finished_at = "{data["finished_at"]}",'
#去掉 sql 最右边的逗号
sql = sql.rstrip(',')
sql += f' where 1=1 '
#条件要放在最后面
if "id" in data:
sql += f' and id = "{data["id"]}"'
if "task_key" in data:
sql += f' and task_type = "{data["task_key"]}" and status != 2'
#sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"'
# print(f'sql: {sql}')
cursor.execute(sql)
conn.commit()
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}")
return "error"
# 更新 task_distributed_detail 主表
def update_task_distributed_detail(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'update task_distributed_detail set '
#判断要更新哪些字段
if "finished_at" in data:
sql += f'finished_at = "{data["finished_at"]}"'
if "step" in data:
sql += f',step = "{data["step"]}"'
if "hostname" in data:
sql += f',hostname = "{data["hostname"]}"'
#where 条件
sql += f' where 1=1 '
if "task_distributed_id" in data:
sql += f' and task_distributed_id = "{data["task_distributed_id"]}"'
if "step" in data:
sql += f' and step = "{data["step"]}"'
cursor.execute(sql)
conn.commit()
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}")
return "error"

216
logic/logic_main_service.py

@ -5,49 +5,32 @@ if platform.system() == 'Windows':
#线上正式运行 #线上正式运行
sys.path.append('e:\\libs\\') sys.path.append('e:\\libs\\')
#本地测试 #本地测试
#sys.path.append('libs')
else: else:
sys.path.append('/data/deploy/make3d/make2/libs/') sys.path.append('/data/deploy/make3d/make2/libs/')
import config import config,common,main_service_db
# import multiprocessing
import logging import logging
# 创建互斥锁
# lock = multiprocessing.Lock()
hostname = socket.gethostname() hostname = socket.gethostname()
logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR) logging.basicConfig(filename='task_distributed_error.log', level=logging.ERROR)
#公共连接库
def pymysqlAlias():
return pymysql.connect(
host=config.mysql_local['host'],
port=config.mysql_local['port'],
user=config.mysql_local['user'],
password=config.mysql_local['password'],
db=config.mysql_local['db'],
charset=config.mysql_local['charset'],)
#获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"} #获取任务 {"hostname":"XXX","run_step":"xxxx","task_distributed_id":"xxxx"}
def get_task_distributed(): def get_task_distributed():
try: try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = ''
# 获取数据前先获取锁
# lock.acquire()
#非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务 #非R11 R12的主机 没有办法处理step1,所以不能做初始化的任务,只能做后续的任务
if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC": if hostname != "R11" and hostname != "R12" and hostname != "XJB-20220906FLC":
sql = f'select * from task_distributed where status = 1 order by priority desc,created_at asc limit 1 for update' #查询数据库指定内容
cursor.execute(sql) result = main_service_db.db_task_distributed("status = 1 order by priority desc,created_at asc limit 1 for update")
result = cursor.fetchone()
#print("查询主任务表",result)
if result: if result:
#获取需要执行的步骤 #获取需要执行的步骤
next_step = need_run_stepx(result["id"]) next_step = need_run_stepx(result["id"])
if next_step == "no" or next_step == "error": if next_step == "no" or next_step == "error":
print("获取需要执行的步骤 next_step",next_step) print("获取需要执行的步骤 next_step",next_step)
return next_step return next_step
#非R11 R12 的主机在执行step2的时候,需要判断当前模型是否需要高精模或者photo3参与建模,如果是的话,该主机不执行这一步
if next_step == "step2":
if common.task_need_high_model_or_photo3(result["task_key"]):
print(f'模型{result["task_key"]}需要高精模或者photo3参与建模,该主机{hostname}不执行step2')
return "no"
taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]} taskData = {"hostname":hostname,"run_step":next_step,"task_distributed_id":result["id"],"task_key":result["task_key"]}
flagRes = update_main_and_add_detail(taskData) flagRes = update_main_and_add_detail(taskData)
if flagRes == "error": if flagRes == "error":
@ -58,7 +41,6 @@ def get_task_distributed():
else: else:
return 'no_data' return 'no_data'
else: else:
#R11 R12的主机 可以执行step1 2 3 的任务 #R11 R12的主机 可以执行step1 2 3 的任务
#如果R11 R12的主机目前没有正在执行step2,则优先处理step2, #如果R11 R12的主机目前没有正在执行step2,则优先处理step2,
# print("次数",is_run_stepx_nums("step2")) # print("次数",is_run_stepx_nums("step2"))
@ -75,10 +57,8 @@ def get_task_distributed():
#R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3 #R11 R12的主机如果已经有在处理step2了,则不能再处理step2,只能处理step1 step3
resultData = need_run_step_no_step2() resultData = need_run_step_no_step2()
#print("resultData",resultData)
if resultData == "no": if resultData == "no":
return "no" return "no"
resultData["hostname"] = hostname resultData["hostname"] = hostname
flagRes = update_main_and_add_detail(resultData) flagRes = update_main_and_add_detail(resultData)
if flagRes == "error": if flagRes == "error":
@ -86,28 +66,16 @@ def get_task_distributed():
return "error" return "error"
print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ') print(f'任务ID-{resultData["task_key"]}- "执行{resultData["run_step"]}" ')
return resultData return resultData
#sql = f'select * from task_distributed where status != 2 order by priority desc,created_at asc limit 1'
# print(f'sql: {sql}')
except Exception as e: except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行get_task_distributed({hostname})异常: {str(e)}")
return 'error' return 'error'
# finally:
# # 释放锁
# lock.release()
#查询当前主机有没有正在执行某个任务 #查询当前主机有没有正在执行某个任务
def is_run_stepx(step): def is_run_stepx(step):
try: try:
with pymysqlAlias() as conn: where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update'
cursor = conn.cursor(pymysql.cursors.DictCursor) result = main_service_db.db_task_distributed(where)
sql = f'select * from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update'
# print(f'sql: {sql}')
cursor.execute(sql)
result = cursor.fetchone()
if result: if result:
return "yes" return "yes"
else: else:
@ -121,13 +89,8 @@ def is_run_stepx(step):
#查询当前主机某个步骤正在执行的数量 #查询当前主机某个步骤正在执行的数量
def is_run_stepx_nums(step): def is_run_stepx_nums(step):
try: try:
with pymysqlAlias() as conn: where = f'hostname = "{hostname}" and step = "{step}" and finished_at is null for update'
cursor = conn.cursor(pymysql.cursors.DictCursor) return main_service_db.db_task_distributed_detail_count(where)
sql = f'select count(*) as nums from task_distributed_detail where hostname = "{hostname}" and step = "{step}" and finished_at is null for update'
# print(f'sql: {sql}')
cursor.execute(sql)
result = cursor.fetchone()
return result["nums"]
except Exception as e: except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行is_run_stepx_nums({hostname},{step})异常: {str(e)}")
@ -137,13 +100,10 @@ def is_run_stepx_nums(step):
#查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3 #查询指定任务需要执行哪个步骤 "error" 表示异常 "no"表示没有任务需要执行 "step1"表示需要执行step1 "step2"表示需要执行step2 "step3"表示需要执行step3
def need_run_stepx(task_distributed_id): def need_run_stepx(task_distributed_id):
try: try:
with pymysqlAlias() as conn:
cursor = conn.cursor(pymysql.cursors.DictCursor)
#查询task_distributed_id 对应的子任务是否正在执行, #查询task_distributed_id 对应的子任务是否正在执行,
sql = f'select * from task_distributed_detail where hostname = "{hostname}" and task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update' where = f'hostname = "{hostname}" and task_distributed_id = "{task_distributed_id}" order by id desc limit 1 for update'
# print(f'sql: {sql}') result = main_service_db.db_task_distributed_detail(where)
cursor.execute(sql)
result = cursor.fetchone()
#如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1 #如果一个子任务都没有,说明该任务还没有开始执行,需要执行step1
if result is None: if result is None:
return "step1" return "step1"
@ -169,19 +129,13 @@ def need_run_stepx(task_distributed_id):
#查询出哪些任务需要执行step2 #查询出哪些任务需要执行step2
def need_run_step2(): def need_run_step2():
try: try:
with pymysqlAlias() as conn: where = f'status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update'
cursor = conn.cursor(pymysql.cursors.DictCursor) result = main_service_db.db_task_distributed_list(where)
sql = f'select * from task_distributed where status = 1 and step_last = "step1" and finished_at is null order by priority desc,created_at asc for update'
# print(f'sql: {sql}')
cursor.execute(sql)
result = cursor.fetchall()
#判断result的长度 #判断result的长度
#判断是否有值 #判断是否有值
if len(result) == 0: if len(result) == 0:
return "no" return "no"
#遍历循环哪笔需要执行step2 #遍历循环哪笔需要执行step2
for row in result: for row in result:
#判断是否有正在执行的step2 #判断是否有正在执行的step2
if need_run_stepx(row["id"]) == "step2": if need_run_stepx(row["id"]) == "step2":
@ -196,12 +150,8 @@ def need_run_step2():
#查询出哪些任务需要执行指定的step #查询出哪些任务需要执行指定的step
def need_run_appoint_step(step): def need_run_appoint_step(step):
try: try:
with pymysqlAlias() as conn: where = f'finished_at is null order by priority desc,created_at asc for update'
cursor = conn.cursor(pymysql.cursors.DictCursor) result = main_service_db.db_task_distributed_list(where)
sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update'
# print(f'sql: {sql}')
cursor.execute(sql)
result = cursor.fetchall()
#判断是否有值 #判断是否有值
if len(result) == 0: if len(result) == 0:
return "no" return "no"
@ -220,17 +170,11 @@ def need_run_appoint_step(step):
#查询出哪些任务需要执行非step2 #查询出哪些任务需要执行非step2
def need_run_step_no_step2(): def need_run_step_no_step2():
try: try:
with pymysqlAlias() as conn: result = main_service_db.db_task_distributed_list("finished_at is null order by priority desc,created_at asc for update")
cursor = conn.cursor(pymysql.cursors.DictCursor)
sql = f'select * from task_distributed where finished_at is null order by priority desc,created_at asc for update'
# print(f'sql: {sql}')
cursor.execute(sql)
result = cursor.fetchall()
#判断是否有值 #判断是否有值
if len(result) == 0: if len(result) == 0:
return "no" return "no"
#遍历循环哪笔需要执行step2 #遍历循环哪笔需要执行step2
for row in result: for row in result:
#判断是否有正在执行的step2 #判断是否有正在执行的step2
xstep = need_run_stepx(row["id"]) xstep = need_run_stepx(row["id"])
@ -244,122 +188,6 @@ def need_run_step_no_step2():
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}") logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行need_run_step_no_step2()异常: {str(e)}")
return 'error' return 'error'
# 在任务分布执行主表插入任务,所有的任务执行都从这里获取
def add_task_distributed(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'insert into task_distributed (task_type, task_key,created_at) values ("{data["task_type"]}", "{data["task_key"]}","{now()}")'
# print(f'sql: {sql}')
cursor.execute(sql)
# 获取插入数据的自增ID
last_insert_id = cursor.lastrowid
conn.commit()
return last_insert_id
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_main_task({data})异常: {str(e)}")
return "error"
# 在task_distributed_detail插入明细步骤
def add_task_distributed_detail(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'insert into task_distributed_detail (task_distributed_id,step,hostname,started_at) values ("{data["task_distributed_id"]}", "{data["step"]}","{hostname}","{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}")'
# print(f'sql: {sql}')
cursor.execute(sql)
conn.commit()
return "ok"
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行add_task_distributed_detail({data})异常: {str(e)}")
return "error"
# 更新 task_distributed 主表
def update_task_distributed(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'update task_distributed set '
#判断要更新哪些字段
if "status" in data:
sql += f'status = "{data["status"]}",'
if "hostname" in data:
sql += f'hostname = "{data["hostname"]}",'
if "step_last" in data:
sql += f'step_last = "{data["step_last"]}",'
if "priority" in data:
sql += f'priority = "{data["priority"]}",'
if "started_at" in data:
sql += f'started_at = "{data["started_at"]}",'
if "finished_at" in data:
sql += f'finished_at = "{data["finished_at"]}",'
#去掉 sql 最右边的逗号
sql = sql.rstrip(',')
sql += f' where 1=1 '
#条件要放在最后面
if "id" in data:
sql += f' and id = "{data["id"]}"'
if "task_key" in data:
sql += f' and task_type = "{data["task_key"]}" and status != 2'
#sql = f'update task_distributed set status = "{data["status"]}",updated_at = "{now()}" where id = "{data["id"]}"'
# print(f'sql: {sql}')
cursor.execute(sql)
conn.commit()
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed({data})异常: {str(e)}")
return "error"
# 更新 task_distributed_detail 主表
def update_task_distributed_detail(data):
try:
with pymysqlAlias() as conn:
cursor = conn.cursor()
sql = f'update task_distributed_detail set '
#判断要更新哪些字段
if "finished_at" in data:
sql += f'finished_at = "{data["finished_at"]}"'
if "step" in data:
sql += f',step = "{data["step"]}"'
if "hostname" in data:
sql += f',hostname = "{data["hostname"]}"'
#where 条件
sql += f' where 1=1 '
if "task_distributed_id" in data:
sql += f' and task_distributed_id = "{data["task_distributed_id"]}"'
if "step" in data:
sql += f' and step = "{data["step"]}"'
cursor.execute(sql)
conn.commit()
except Exception as e:
print(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}")
logging.error(f"{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())} 执行update_task_distributed_detail({data})异常: {str(e)}")
return "error"
#更新主表和插入明细表的步骤 #更新主表和插入明细表的步骤
def update_main_and_add_detail(data): def update_main_and_add_detail(data):
if data["run_step"] == "step1": if data["run_step"] == "step1":
@ -367,6 +195,6 @@ def update_main_and_add_detail(data):
else: else:
updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]} updateData = {"id":data['task_distributed_id'],"step_last":data["run_step"]}
#更新主表 #更新主表
update_task_distributed(updateData) main_service_db.update_task_distributed(updateData)
#插入明细表数据 #插入明细表数据
return add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) return main_service_db.add_task_distributed_detail({"task_distributed_id":data['task_distributed_id'],"step":data["run_step"],"hostname":hostname,"started_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})

35
main_service.py

@ -1,7 +1,14 @@
import sys,socket,time import sys,socket,time,platform
from logic import logic_main_service from logic import logic_main_service
import logging import logging
import main_step1,main_step2,main_step3 import main_step1,main_step2,main_step3
if platform.system() == 'Windows':
#线上正式运行
sys.path.append('e:\\libs\\')
#本地测试
else:
sys.path.append('/data/deploy/make3d/make2/libs/')
import main_service_db
if __name__ == '__main__': if __name__ == '__main__':
#循环值守 #循环值守
while True: while True:
@ -14,31 +21,31 @@ if __name__ == '__main__':
else: else:
if data["run_step"] == "step1": if data["run_step"] == "step1":
# 本地测试分布运行的用 # 本地测试分布运行的用
# time.sleep(5) time.sleep(5)
# print("更新step1的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) print("更新step1的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
# logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step1","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) main_service_db.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step1","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
#生产线上用 #生产线上用
main_step1.step1(data["task_key"], experience=False, makeloop=False,task_distributed_id=data['task_distributed_id']) #main_step1.step1(data["task_key"], experience=False, makeloop=False,task_distributed_id=data['task_distributed_id'])
elif data["run_step"] == "step2": elif data["run_step"] == "step2":
# 本地测试分布运行的用 # 本地测试分布运行的用
# time.sleep(15) time.sleep(15)
# print("更新step2的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) print("更新step2的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
# logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step2","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) main_service_db.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step2","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
#生产线上用 #生产线上用
main_step2.step2(data["task_key"], data['task_distributed_id']) #main_step2.step2(data["task_key"], data['task_distributed_id'])
elif data["run_step"] == "step3": elif data["run_step"] == "step3":
# 本地测试分布运行的用 # 本地测试分布运行的用
# time.sleep(8) time.sleep(8)
# #更新子表的finished_at # #更新子表的finished_at
# print("更新step3的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())) print("更新step3的结束时间",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
# logic_main_service.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step3","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) main_service_db.update_task_distributed_detail({"task_distributed_id":data["task_distributed_id"],"step":"step3","finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
#更新主表的status 和 finished_at #更新主表的status 和 finished_at
#logic_main_service.update_task_distributed({"id":data["task_distributed_id"],"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) main_service_db.update_task_distributed({"id":data["task_distributed_id"],"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
#生产线上用 #生产线上用
main_step3.step3(data["task_key"], data['task_distributed_id']) #main_step3.step3(data["task_key"], data['task_distributed_id'])

6
main_step1.py

@ -3,10 +3,10 @@ from PIL import Image
import platform import platform
if platform.system() == 'Windows': if platform.system() == 'Windows':
sys.path.append('e:\\libs\\') sys.path.append('e:\\libs\\')
#sys.path.append('libs')
else: else:
sys.path.append('/data/deploy/make3d/make2/libs/') sys.path.append('/data/deploy/make3d/make2/libs/')
from logic import logic_main_service import config, libs, libs_db,main_service_db
import config, libs, libs_db
def filter_dark_texture_image(pid): def filter_dark_texture_image(pid):
start_time = time.time() start_time = time.time()
@ -180,7 +180,7 @@ def step1(pid, experience=False, makeloop=True,task_distributed_id=""):
# print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} step1任务完成,移动到共享目录') # print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} step1任务完成,移动到共享目录')
else: else:
#分布式服务执行完后,需要更新任务状态,更新字表的finished_at字段 #分布式服务执行完后,需要更新任务状态,更新字表的finished_at字段
logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) main_service_db.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
return return

20
main_step2.py

@ -2,10 +2,10 @@ import os, sys, time, shutil, subprocess, shlex
import platform import platform
if platform.system() == 'Windows': if platform.system() == 'Windows':
sys.path.append('e:\\libs\\') sys.path.append('e:\\libs\\')
#sys.path.append('libs')
else: else:
sys.path.append('/data/deploy/make3d/make2/libs/') sys.path.append('/data/deploy/make3d/make2/libs/')
from logic import logic_main_service import config, libs, libs_db,common,main_service_db
import config, libs, libs_db
def load_model(pid): def load_model(pid):
cmd = f'{config.rcbin} {config.r1["init"]} -load "{os.path.join(config.workdir, pid, f"{pid}.rcproj")}"' cmd = f'{config.rcbin} {config.r1["init"]} -load "{os.path.join(config.workdir, pid, f"{pid}.rcproj")}"'
@ -23,7 +23,7 @@ def get_rcver():
def make3d(pid): def make3d(pid):
simplify_value = 1000000 * libs.getHeadCount(pid) simplify_value = 1000000 * libs.getHeadCount(pid)
add_photo3 = ' ' add_photo3 = ' '
if os.path.exists(os.path.join(config.workdir, pid, 'photo3')): if common.task_need_photo3(pid):
add_photo3 = ' -addFolder "' + os.path.join(config.workdir, pid, 'photo3') + '" -align -align ' add_photo3 = ' -addFolder "' + os.path.join(config.workdir, pid, 'photo3') + '" -align -align '
if get_rcver() == 1: # old version if get_rcver() == 1: # old version
@ -44,12 +44,13 @@ def make3d(pid):
cmd = f'{config.rcbin} {config.r1["init"]} -load "{os.path.join(config.workdir, pid, f"{pid}.rcproj")}" -update \ cmd = f'{config.rcbin} {config.r1["init"]} -load "{os.path.join(config.workdir, pid, f"{pid}.rcproj")}" -update \
-setReconstructionRegion "{os.path.join(config.workdir, pid, f"{pid}.rcbox")}" \ -setReconstructionRegion "{os.path.join(config.workdir, pid, f"{pid}.rcbox")}" \
-mvs -modelSelectMaximalConnectedComponent -modelInvertSelection -modelRemoveSelectedTriangles -closeHoles -clean -simplify {simplify_value} -smooth -unwrap -calculateTexture -renameModel {pid} -exportModel "{pid} {os.path.join(config.workdir, pid, "output", f"{pid}.obj")}" "d:\\make2\\config\\ModelExportParams102.xml" -quit' -mvs -modelSelectMaximalConnectedComponent -modelInvertSelection -modelRemoveSelectedTriangles -closeHoles -clean -simplify {simplify_value} -smooth -unwrap -calculateTexture -renameModel {pid} -exportModel "{pid}" "{os.path.join(config.workdir, pid, "output", f"{pid}.obj")}" "d:\\make2\\config\\ModelExportParams102.xml" -quit'
print(cmd) print(cmd)
cmd = shlex.split(cmd) cmd = shlex.split(cmd)
res = subprocess.run(cmd) res = subprocess.run(cmd)
else: # new version else: # new version
if libs.aliyun_face(pid) and libs.get_ps_type(pid) == 1: #判断是否要进行高精模
if common.task_need_high_model(pid):
calulate_type = 'calculateHighModel' calulate_type = 'calculateHighModel'
else: else:
calulate_type = 'calculateNormalModel' calulate_type = 'calculateNormalModel'
@ -65,11 +66,18 @@ def make3d(pid):
def step2(pid,task_distributed_id=""): def step2(pid,task_distributed_id=""):
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 开始建模任务step2') print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 开始建模任务step2')
#判断是否要从共享目录拷贝数据
if os.path.exists(os.path.join(config.sharedir, pid)) and not os.path.exists(os.path.join(config.workdir, pid)): if os.path.exists(os.path.join(config.sharedir, pid)) and not os.path.exists(os.path.join(config.workdir, pid)):
shutil.move(os.path.join(config.sharedir, pid), config.workdir) shutil.move(os.path.join(config.sharedir, pid), config.workdir)
else: else:
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 目录{os.path.join(config.sharedir, pid)}不存在,或{os.path.join(config.workdir, pid)}已存在') print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 目录{os.path.join(config.sharedir, pid)}不存在,或{os.path.join(config.workdir, pid)}已存在')
# return # return
#最后还是要判断有没有存在目录
if not os.path.exists(os.path.join(config.workdir, pid)):
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 目录{os.path.join(config.workdir, pid)}不存在')
return
start_time = time.time() start_time = time.time()
make3d(pid) make3d(pid)
@ -78,7 +86,7 @@ def step2(pid,task_distributed_id=""):
if task_distributed_id == "": if task_distributed_id == "":
os.system(f'python d:\\make2\\main_step3.py {pid}') os.system(f'python d:\\make2\\main_step3.py {pid}')
else: else:
logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) main_service_db.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
return return
def main(pid): def main(pid):

7
main_step3.py

@ -6,8 +6,7 @@ if platform.system() == 'Windows':
#sys.path.append('libs') #sys.path.append('libs')
else: else:
sys.path.append('/data/deploy/make3d/make2/libs/') sys.path.append('/data/deploy/make3d/make2/libs/')
from logic import logic_main_service import config, libs, libs_db,main_service_db
import config, libs, libs_db
def bmesh_copy_from_object(obj, transform=True, triangulate=True, apply_modifiers=False): def bmesh_copy_from_object(obj, transform=True, triangulate=True, apply_modifiers=False):
"""Returns a transformed, triangulated copy of the mesh""" """Returns a transformed, triangulated copy of the mesh"""
@ -211,9 +210,9 @@ def step3(pid,task_distributed_id=""):
libs_db.finish_task({"task_type": "make", "task_key": pid}) libs_db.finish_task({"task_type": "make", "task_key": pid})
if task_distributed_id: if task_distributed_id:
#更新子表的finished_at #更新子表的finished_at
logic_main_service.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) main_service_db.update_task_distributed_detail({"task_distributed_id":task_distributed_id,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
#更新主表的status 和 finished_at #更新主表的status 和 finished_at
logic_main_service.update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) main_service_db.update_task_distributed({"id":task_distributed_id,"status":2,"finished_at":time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
return return
def main(pid): def main(pid):

18
test.py

@ -0,0 +1,18 @@
import os, sys, time, shutil, subprocess, shlex, json
import platform
if platform.system() == 'Windows':
#sys.path.append('e:\\libs\\')
sys.path.append('libs')
else:
sys.path.append('/data/deploy/make3d/make2/libs/')
import config, libs, libs_db,common
if __name__ == '__main__':
common.copy_remote_directory("172.31.1.11","D:\\7831","E:\\")
# config.oss_bucket.delete_object(f'test/test_delete')
# #删除oss 上的文件夹里的内容
# object_list = oss2.ObjectIterator(config.oss_bucket, prefix='test/test_delete/')
# result = config.oss_bucket.batch_delete_objects([obj.key for obj in object_list])

77
tools/auto_distance.py

@ -22,29 +22,82 @@ def find_and_maximize_window(window_title):
return pid, left, top, right, bottom return pid, left, top, right, bottom
return '0', 0, 0, 0, 0 return '0', 0, 0, 0, 0
# def get_defineDistances(pid, left, top, right, bottom):
# psid = libs.getPSid(pid)
# distances = config.ps_floor_sticker.get(psid, config.ps_floor_sticker['default'])
# for index, d in enumerate(distances.split(';')):
# p1, p2, distance = d.split(' ')
# if index == 0:
# ag.moveTo(left + 80, top + 290)
# else:
# ag.moveTo(left + 80, top + 290 + 15) # Create distance line height 15
# ag.click()
# ag.moveTo(left + 302, (bottom - 100)) # A point
# ag.click();repeat_backspace(20)
# ag.typewrite(p1)
# ag.moveTo(left + 302, (bottom - 80)) # B point
# ag.click();repeat_backspace(20)
# ag.typewrite(p2)
# ag.moveTo(left + 302, (bottom - 35)) # Definded distance
# ag.click();repeat_backspace(8)
# ag.typewrite(distance)
# ag.press('enter')
def get_defineDistances(pid, left, top, right, bottom): def get_defineDistances(pid, left, top, right, bottom):
psid = libs.getPSid(pid) psid = libs.getPSid(pid)
distances = config.ps_floor_sticker.get(psid, config.ps_floor_sticker['default']) distances = config.ps_floor_sticker.get(psid, config.ps_floor_sticker['default'])
time.sleep(5)
y = 748
for index, d in enumerate(distances.split(';')): for index, d in enumerate(distances.split(';')):
p1, p2, distance = d.split(' ') p1, p2, distance = d.split(' ')
if index == 0: if index == 0:
ag.moveTo(left + 80, top + 290) #移动到 control points
ag.moveTo(14, 191)
else: else:
ag.moveTo(left + 80, top + 290 + 15) # Create distance line height 15 #不是第一次就移动到 create distance
ag.moveTo(65, 300+30) # Create distance line height 15
time.sleep(0.5)
ag.click()
time.sleep(1.2)
if index == 0:
ag.moveTo(65, 285+30)
time.sleep(1)
ag.click() ag.click()
ag.moveTo(left + 302, (bottom - 100)) # A point #点击左下的A点
ag.click();repeat_backspace(20) time.sleep(0.5)
ag.moveTo(233, y) # A point
time.sleep(0.5)
ag.click()
time.sleep(0.5)
repeat_backspace(20)
time.sleep(1)
ag.typewrite(p1) ag.typewrite(p1)
time.sleep(1)
ag.moveTo(left + 302, (bottom - 80)) # B point #点击左下的B点
ag.click();repeat_backspace(20) ag.moveTo(233, y+15) # B point
time.sleep(0.5)
ag.click();
time.sleep(0.5)
repeat_backspace(20)
time.sleep(1)
ag.typewrite(p2) ag.typewrite(p2)
time.sleep(1)
ag.moveTo(left + 302, (bottom - 35)) # Definded distance #点击左下的Definded distance
ag.click();repeat_backspace(8) ag.moveTo(233,y+60) # Definded distance
time.sleep(0.5)
ag.click();
time.sleep(0.5)
repeat_backspace(15)
time.sleep(1.5)
ag.typewrite(distance) ag.typewrite(distance)
time.sleep(1.5)
ag.press('enter') ag.press('enter')
print(f'执行距离划定{index}')
def repeat_backspace(times): def repeat_backspace(times):
@ -58,8 +111,8 @@ def defind_distance(pid, left, top, right, bottom):
print(f'left: {left}, top: {top}, right: {right}, bottom: {bottom}') print(f'left: {left}, top: {top}, right: {right}, bottom: {bottom}')
# ag.PAUSE = 1 # ag.PAUSE = 1
ag.moveTo(left + 20, top + 200) # open Control points # ag.moveTo(left + 20, top + 200) # open Control points
ag.click() # ag.click()
get_defineDistances(pid, left, top, right, bottom) get_defineDistances(pid, left, top, right, bottom)

Loading…
Cancel
Save