You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
273 lines
11 KiB
273 lines
11 KiB
import os, time, json, requests, shutil, oss2, psutil |
|
from tqdm import tqdm |
|
from PIL import Image, ImageEnhance |
|
import config,libs_db |
|
|
|
def find_blender_bin_path(): |
|
base_path = 'C:\\Program Files\\Blender Foundation\\' |
|
if os.path.exists(base_path): |
|
for dir in os.listdir(base_path): |
|
if dir.startswith('Blender'): |
|
blender_bin_path = base_path + dir + '\\blender.exe' |
|
return f'"{blender_bin_path}"' |
|
else: |
|
print('未找到blender安装目录') |
|
exit(1) |
|
|
|
def resize_photos(photo_path, ratio=0.5): |
|
for filename in os.listdir(photo_path): |
|
if filename.endswith('.jpg'): |
|
img = Image.open(os.path.join(photo_path, filename)) |
|
img = rotate_image(img) |
|
w, h = img.size |
|
img = img.resize((int(w * ratio), int(h * ratio))) |
|
img.save(os.path.join(photo_path, filename)) |
|
|
|
def rotate_image(image): |
|
# 检查图像的EXIF数据是否包含方向信息 |
|
try: |
|
exif = image._getexif() |
|
orientation = exif.get(0x0112) |
|
except: |
|
orientation = None |
|
|
|
# 根据方向信息旋转图像 |
|
if orientation == 3: |
|
image = image.rotate(180, expand=True) |
|
elif orientation == 6: |
|
image = image.rotate(270, expand=True) |
|
elif orientation == 8: |
|
image = image.rotate(90, expand=True) |
|
return image |
|
|
|
def get_ps_adjust_photo_para(psid): |
|
res = requests.get(config.urls['get_ps_adjust_photo_para_url'], params={'id': psid}) |
|
print(res.json()) |
|
paras = res.json()['data'] |
|
brightness_factor, saturation_factor, temperature_factor = float(paras['brightness']), float(paras['saturation']), float(paras['colorTemperature']) |
|
return brightness_factor, saturation_factor, temperature_factor |
|
|
|
def adjust_photos(workdir, pid): |
|
def adjust_brightness(image, brightness_factor): |
|
if brightness_factor == 1 or brightness_factor == 0 : |
|
return image |
|
enhancer = ImageEnhance.Brightness(image) |
|
adjusted_image = enhancer.enhance(brightness_factor) |
|
return adjusted_image |
|
|
|
def adjust_saturation(image, saturation_factor): |
|
if saturation_factor == 1: |
|
return image |
|
enhancer = ImageEnhance.Color(image) |
|
adjusted_image = enhancer.enhance(saturation_factor) |
|
return adjusted_image |
|
|
|
def adjust_temperature(image, temperature_factor): |
|
if temperature_factor == 1: |
|
return image |
|
r, g, b = image.split() |
|
r = r.point(lambda i: i * temperature_factor) |
|
adjusted_image = Image.merge("RGB", (r, g, b)) |
|
return adjusted_image |
|
if not os.path.exists(os.path.join(workdir, pid, 'photo2')): |
|
print(f"Directory {os.path.join(workdir, pid, 'photo2')} does not exist") |
|
return False |
|
|
|
psid = getPSid(pid) |
|
brightness_factor, saturation_factor, temperature_factor = get_ps_adjust_photo_para(psid) |
|
|
|
if (brightness_factor == 1 and saturation_factor == 1 and temperature_factor == 1): |
|
print("No need to adjust") |
|
return False |
|
if os.path.exists(os.path.join(workdir, pid, 'photo3')): |
|
print(f'{os.path.join(workdir, pid, "photo3")}目录已存在,跳过') |
|
return |
|
os.makedirs(os.path.join(workdir, pid, 'photo3'), exist_ok=True) |
|
|
|
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 开始调整图片曝光...') |
|
start_time = time.time() |
|
for filename in os.listdir(os.path.join(workdir, pid, 'photo2')): |
|
if filename.endswith(".jpg"): |
|
# print(f"Adjusting {filename}:brightness={brightness_factor}, saturation={saturation_factor}, temperature={temperature_factor}") |
|
image = Image.open(os.path.join(workdir, pid, 'photo2', filename)) |
|
image = rotate_image(image) |
|
|
|
brightened_image = adjust_brightness(image, brightness_factor) |
|
saturated_image = adjust_saturation(brightened_image, saturation_factor) |
|
adjusted_image = adjust_temperature(saturated_image, temperature_factor) |
|
|
|
adjusted_image.save(os.path.join(workdir, pid, 'photo3', filename)) |
|
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 图片曝光调整完成,共费时{diff_time(start_time)}') |
|
return True |
|
|
|
def getPSid(pid): |
|
res = requests.get(config.urls['get_psid_url'], params={'pid': pid}) |
|
print('get_psid_url:', res.url) |
|
print('res:', res.text) |
|
res = json.loads(res.text) |
|
return str(res['data']) |
|
|
|
def getHeadCount(pid): |
|
res = requests.get(config.urls['get_printinfo_url'], params={'id': pid}) |
|
print('get_printinfo_url:', res.url) |
|
print('res:', res.text) |
|
if res.status_code != 200: |
|
print('获取人数失败,程序退出') |
|
exit(1) |
|
res = json.loads(res.text) |
|
return res['data']['headcount'] |
|
|
|
def get_ps_type(pid): |
|
# return 1:圆形影棚 2:方形影棚 |
|
res = requests.get(config.urls['get_ps_type_url'], params={'pid': pid}) |
|
return res.json()['data']['type'] |
|
|
|
def find_valid_camera_on_oss(pid): |
|
if get_ps_type(pid) == 1: |
|
print('当前拍照影棚为:圆形影棚') |
|
cameras = (103, 93, 113) |
|
else: |
|
print('当前拍照影棚为:方形影棚') |
|
cameras = (74, 64, 84) |
|
find_camera = 0 |
|
for camera in cameras: |
|
objectkey = f'photos/{pid}/photo2/{camera}_8.jpg' |
|
find = config.oss_bucket.object_exists(objectkey) |
|
if find: |
|
find_camera = camera |
|
break |
|
|
|
print('找到有效正脸相机:', find_camera) |
|
if find_camera == 0: |
|
print('{cameras}没有找到照片,程序退出') |
|
exit(1) |
|
return find_camera |
|
|
|
def aliyun_face(pid): |
|
high = False |
|
style = 'imm/detectface' |
|
camera = find_valid_camera_on_oss(pid) |
|
objectkey = f'photos/{pid}/photo2/{camera}_8.jpg' |
|
try: |
|
res = config.oss_bucket.get_object(objectkey, process=style) |
|
except oss2.exceptions.NoSuchKey: |
|
print('没有找到文件:', objectkey) |
|
return high |
|
res = json.loads(res.read()) |
|
if res['success']: |
|
if res['Faces'] is None: |
|
print('no face') |
|
return None |
|
else: |
|
print('faces num:', len(res['Faces'])) |
|
for face in res['Faces']: |
|
print('-' * 20) |
|
print('face_id:', face['FaceId']) |
|
print('gender:', face['Gender']) |
|
print('age:', face['Age']) |
|
|
|
if face['Gender'] == 'FEMALE' and face['Age'] < 22: high = True |
|
if face['Gender'] == 'MALE' and face['Age'] < 15: high = True |
|
else: |
|
print('face detect failed...') |
|
return high |
|
|
|
def down_obj_from_oss(workdir, pid, action): |
|
if os.path.exists(os.path.join(workdir, action, pid)): |
|
print(f'目录{os.path.join(workdir, action, pid)}已存在,跳过') |
|
return |
|
else: |
|
os.makedirs(os.path.join(workdir, action, pid)) |
|
|
|
# 根据前缀获取文件列表 |
|
prefix = f'objs/{action}/{pid}/' |
|
filelist = oss2.ObjectIteratorV2(config.oss_bucket, prefix=prefix) |
|
print('正在下载:', prefix) |
|
for file in filelist: |
|
filename = file.key.split('/')[-1] |
|
if filename.endswith('.obj'): obj_filename = filename |
|
# print('正在下载:', file.key) |
|
localfile = os.path.join(workdir, action, pid, filename) |
|
config.oss_bucket.get_object_to_file(file.key, localfile) |
|
|
|
return obj_filename |
|
|
|
def set_photos_join_type(workdir, pid, photoN, mesh = '0', texture='0'): |
|
photoN_path = os.path.join(workdir, pid, photoN) |
|
for xmp in os.listdir(photoN_path): |
|
if xmp.endswith('.xmp'): |
|
xmp_path = os.path.join(photoN_path, xmp) |
|
with open(xmp_path, 'r') as f: |
|
lines = f.readlines() |
|
lines = [line.replace('xcr:InMeshing="0"', f'xcr:InMeshing="{mesh}"') for line in lines] |
|
lines = [line.replace('xcr:InMeshing="1"', f'xcr:InMeshing="{mesh}"') for line in lines] |
|
lines = [line.replace('xcr:InTexturing="0"', f'xcr:InTexturing="{texture}"') for line in lines] |
|
lines = [line.replace('xcr:InTexturing="1"', f'xcr:InTexturing="{texture}"') for line in lines] |
|
with open(xmp_path, 'w') as f: |
|
f.writelines(lines) |
|
|
|
def set_photo_join_type(workdir, pid, photoN, camera_id, mesh = '0', texture='0'): |
|
if photoN == 'photo1': |
|
filename = os.path.join(workdir, pid, photoN, f'{camera_id}_1.xmp') |
|
else: |
|
filename = os.path.join(workdir, pid, photoN, f'{camera_id}_8.xmp') |
|
with open(filename, 'r') as f: |
|
lines = f.readlines() |
|
lines = [line.replace('xcr:InMeshing="0"', f'xcr:InMeshing="{mesh}"') for line in lines] |
|
lines = [line.replace('xcr:InMeshing="1"', f'xcr:InMeshing="{mesh}"') for line in lines] |
|
lines = [line.replace('xcr:InTexturing="0"', f'xcr:InTexturing="{texture}"') for line in lines] |
|
lines = [line.replace('xcr:InTexturing="1"', f'xcr:InTexturing="{texture}"') for line in lines] |
|
with open(filename, 'w') as f: |
|
f.writelines(lines) |
|
|
|
def down_from_oss(oss_client, workdir, pid, per=100): |
|
start_time = time.time() |
|
path = os.path.join(workdir, pid) |
|
if os.path.exists(path): |
|
print(f"Directory {path} already exists, skip") |
|
return |
|
os.makedirs(os.path.join(path, 'photo1')) |
|
os.makedirs(os.path.join(path, 'photo2')) |
|
|
|
psid = getPSid(pid) |
|
# 根据前缀获取文件列表 |
|
prefix = f'photos/{pid}/' |
|
filelist = oss2.ObjectIteratorV2(oss_client, prefix=prefix) |
|
for file in tqdm(filelist): |
|
filename = file.key.split('/')[-1] |
|
# print('正在下载:', file.key) |
|
if filename.endswith('_1.jpg'): |
|
localfile = os.path.join(path, 'photo1', filename) |
|
else: |
|
localfile = os.path.join(path, 'photo2', filename) |
|
|
|
style = f'image/resize,p_{per}' |
|
if per == 100: |
|
oss_client.get_object_to_file(file.key, localfile) |
|
else: |
|
oss_client.get_object_to_file(file.key, localfile, process=style) |
|
|
|
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} pid: {pid} 图片下载完成, 共费时{diff_time(start_time)}') |
|
|
|
def get_defineDistances(psid): |
|
res = '' |
|
distances = libs_db.get_floor_sticker_distances(psid).split(';') |
|
print("distances",distances) |
|
for d in distances: |
|
p1, p2, distance = d.split(' ') |
|
res = res + f' -defineDistance {p1} {p2} {distance}' |
|
return res.strip() |
|
|
|
def is_running(psname): |
|
for p in psutil.process_iter(['name']): |
|
if psname.strip() in p.info['name']: |
|
return True |
|
return False |
|
|
|
def diff_time(start_time): |
|
# 按照分:秒的方式返回时间差 |
|
end_time = time.time() |
|
diff = end_time - start_time |
|
m, s = divmod(diff, 60) |
|
return f'{int(m)}分{int(s)}秒' |
|
|
|
|