建模程序 多个定时程序
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

272 lines
10 KiB

import os, time, json, requests, shutil, oss2, psutil
from tqdm import tqdm
from PIL import Image, ImageEnhance
import config
def find_blender_bin_path():
base_path = 'C:\\Program Files\\Blender Foundation\\'
if os.path.exists(base_path):
for dir in os.listdir(base_path):
if dir.startswith('Blender'):
blender_bin_path = base_path + dir + '\\blender.exe'
return f'"{blender_bin_path}"'
else:
print('未找到blender安装目录')
exit(1)
def resize_photos(photo_path, ratio=0.5):
for filename in os.listdir(photo_path):
if filename.endswith('.jpg'):
img = Image.open(os.path.join(photo_path, filename))
img = rotate_image(img)
w, h = img.size
img = img.resize((int(w * ratio), int(h * ratio)))
img.save(os.path.join(photo_path, filename))
def rotate_image(image):
# 检查图像的EXIF数据是否包含方向信息
try:
exif = image._getexif()
orientation = exif.get(0x0112)
except:
orientation = None
# 根据方向信息旋转图像
if orientation == 3:
image = image.rotate(180, expand=True)
elif orientation == 6:
image = image.rotate(270, expand=True)
elif orientation == 8:
image = image.rotate(90, expand=True)
return image
def get_ps_adjust_photo_para(psid):
res = requests.get(config.urls['get_ps_adjust_photo_para_url'], params={'id': psid})
print(res.json())
paras = res.json()['data']
brightness_factor, saturation_factor, temperature_factor = float(paras['brightness']), float(paras['saturation']), float(paras['colorTemperature'])
return brightness_factor, saturation_factor, temperature_factor
def adjust_photos(workdir, pid):
def adjust_brightness(image, brightness_factor):
if brightness_factor == 1 or brightness_factor == 0 :
return image
enhancer = ImageEnhance.Brightness(image)
adjusted_image = enhancer.enhance(brightness_factor)
return adjusted_image
def adjust_saturation(image, saturation_factor):
if saturation_factor == 1:
return image
enhancer = ImageEnhance.Color(image)
adjusted_image = enhancer.enhance(saturation_factor)
return adjusted_image
def adjust_temperature(image, temperature_factor):
if temperature_factor == 1:
return image
r, g, b = image.split()
r = r.point(lambda i: i * temperature_factor)
adjusted_image = Image.merge("RGB", (r, g, b))
return adjusted_image
if not os.path.exists(os.path.join(workdir, pid, 'photo2')):
print(f"Directory {os.path.join(workdir, pid, 'photo2')} does not exist")
return False
psid = getPSid(pid)
brightness_factor, saturation_factor, temperature_factor = get_ps_adjust_photo_para(psid)
if (brightness_factor == 1 and saturation_factor == 1 and temperature_factor == 1):
print("No need to adjust")
return False
if os.path.exists(os.path.join(workdir, pid, 'photo3')):
print(f'{os.path.join(workdir, pid, "photo3")}目录已存在,跳过')
return
os.makedirs(os.path.join(workdir, pid, 'photo3'), exist_ok=True)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 开始调整图片曝光...')
start_time = time.time()
for filename in os.listdir(os.path.join(workdir, pid, 'photo2')):
if filename.endswith(".jpg"):
# print(f"Adjusting {filename}:brightness={brightness_factor}, saturation={saturation_factor}, temperature={temperature_factor}")
image = Image.open(os.path.join(workdir, pid, 'photo2', filename))
image = rotate_image(image)
brightened_image = adjust_brightness(image, brightness_factor)
saturated_image = adjust_saturation(brightened_image, saturation_factor)
adjusted_image = adjust_temperature(saturated_image, temperature_factor)
adjusted_image.save(os.path.join(workdir, pid, 'photo3', filename))
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 图片曝光调整完成,共费时{diff_time(start_time)}')
return True
def getPSid(pid):
res = requests.get(config.urls['get_psid_url'], params={'pid': pid})
print('get_psid_url:', res.url)
print('res:', res.text)
res = json.loads(res.text)
return str(res['data'])
def getHeadCount(pid):
res = requests.get(config.urls['get_printinfo_url'], params={'id': pid})
print('get_printinfo_url:', res.url)
print('res:', res.text)
if res.status_code != 200:
print('获取人数失败,程序退出')
exit(1)
res = json.loads(res.text)
return res['data']['headcount']
def get_ps_type(pid):
# return 1:圆形影棚 2:方形影棚
res = requests.get(config.urls['get_ps_type_url'], params={'pid': pid})
return res.json()['data']['type']
def find_valid_camera_on_oss(pid):
if get_ps_type(pid) == 1:
print('当前拍照影棚为:圆形影棚')
cameras = (103, 93, 113)
else:
print('当前拍照影棚为:方形影棚')
cameras = (74, 64, 84)
find_camera = 0
for camera in cameras:
objectkey = f'photos/{pid}/photo2/{camera}_8.jpg'
find = config.oss_bucket.object_exists(objectkey)
if find:
find_camera = camera
break
print('找到有效正脸相机:', find_camera)
if find_camera == 0:
print('{cameras}没有找到照片,程序退出')
exit(1)
return find_camera
def aliyun_face(pid):
high = False
style = 'imm/detectface'
camera = find_valid_camera_on_oss(pid)
objectkey = f'photos/{pid}/photo2/{camera}_8.jpg'
try:
res = config.oss_bucket.get_object(objectkey, process=style)
except oss2.exceptions.NoSuchKey:
print('没有找到文件:', objectkey)
return high
res = json.loads(res.read())
if res['success']:
if res['Faces'] is None:
print('no face')
return None
else:
print('faces num:', len(res['Faces']))
for face in res['Faces']:
print('-' * 20)
print('face_id:', face['FaceId'])
print('gender:', face['Gender'])
print('age:', face['Age'])
if face['Gender'] == 'FEMALE' and face['Age'] < 22: high = True
if face['Gender'] == 'MALE' and face['Age'] < 15: high = True
else:
print('face detect failed...')
return high
def down_obj_from_oss(workdir, pid, action):
if os.path.exists(os.path.join(workdir, action, pid)):
print(f'目录{os.path.join(workdir, action, pid)}已存在,跳过')
return
else:
os.makedirs(os.path.join(workdir, action, pid))
# 根据前缀获取文件列表
prefix = f'objs/{action}/{pid}/'
filelist = oss2.ObjectIteratorV2(config.oss_bucket, prefix=prefix)
print('正在下载:', prefix)
for file in filelist:
filename = file.key.split('/')[-1]
if filename.endswith('.obj'): obj_filename = filename
# print('正在下载:', file.key)
localfile = os.path.join(workdir, action, pid, filename)
config.oss_bucket.get_object_to_file(file.key, localfile)
return obj_filename
def set_photos_join_type(workdir, pid, photoN, mesh = '0', texture='0'):
photoN_path = os.path.join(workdir, pid, photoN)
for xmp in os.listdir(photoN_path):
if xmp.endswith('.xmp'):
xmp_path = os.path.join(photoN_path, xmp)
with open(xmp_path, 'r') as f:
lines = f.readlines()
lines = [line.replace('xcr:InMeshing="0"', f'xcr:InMeshing="{mesh}"') for line in lines]
lines = [line.replace('xcr:InMeshing="1"', f'xcr:InMeshing="{mesh}"') for line in lines]
lines = [line.replace('xcr:InTexturing="0"', f'xcr:InTexturing="{texture}"') for line in lines]
lines = [line.replace('xcr:InTexturing="1"', f'xcr:InTexturing="{texture}"') for line in lines]
with open(xmp_path, 'w') as f:
f.writelines(lines)
def set_photo_join_type(workdir, pid, photoN, camera_id, mesh = '0', texture='0'):
if photoN == 'photo1':
filename = os.path.join(workdir, pid, photoN, f'{camera_id}_1.xmp')
else:
filename = os.path.join(workdir, pid, photoN, f'{camera_id}_8.xmp')
with open(filename, 'r') as f:
lines = f.readlines()
lines = [line.replace('xcr:InMeshing="0"', f'xcr:InMeshing="{mesh}"') for line in lines]
lines = [line.replace('xcr:InMeshing="1"', f'xcr:InMeshing="{mesh}"') for line in lines]
lines = [line.replace('xcr:InTexturing="0"', f'xcr:InTexturing="{texture}"') for line in lines]
lines = [line.replace('xcr:InTexturing="1"', f'xcr:InTexturing="{texture}"') for line in lines]
with open(filename, 'w') as f:
f.writelines(lines)
def down_from_oss(oss_client, workdir, pid, per=100):
start_time = time.time()
path = os.path.join(workdir, pid)
if os.path.exists(path):
print(f"Directory {path} already exists, skip")
return
os.makedirs(os.path.join(path, 'photo1'))
os.makedirs(os.path.join(path, 'photo2'))
psid = getPSid(pid)
# 根据前缀获取文件列表
prefix = f'photos/{pid}/'
filelist = oss2.ObjectIteratorV2(oss_client, prefix=prefix)
for file in tqdm(filelist):
filename = file.key.split('/')[-1]
# print('正在下载:', file.key)
if filename.endswith('_1.jpg'):
localfile = os.path.join(path, 'photo1', filename)
else:
localfile = os.path.join(path, 'photo2', filename)
style = f'image/resize,p_{per}'
if per == 100:
oss_client.get_object_to_file(file.key, localfile)
else:
oss_client.get_object_to_file(file.key, localfile, process=style)
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 图片下载完成, 共费时{diff_time(start_time)}')
def get_defineDistances(psid):
res = ''
distances = libs_db.get_floor_sticker_distances(psid).split(';')
for d in distances:
p1, p2, distance = d.split(' ')
res = res + f' -defineDistance {p1} {p2} {distance}'
return res.strip()
def is_running(psname):
for p in psutil.process_iter(['name']):
if psname.strip() in p.info['name']:
return True
return False
def diff_time(start_time):
# 按照分:秒的方式返回时间差
end_time = time.time()
diff = end_time - start_time
m, s = divmod(diff, 60)
return f'{int(m)}{int(s)}'