import os, sys, time, math, csv, requests import oss2 from PIL import Image, ImageDraw from alibabacloud_facebody20191230.client import Client from alibabacloud_facebody20191230.models import DetectFaceRequest from alibabacloud_tea_openapi.models import Config from alibabacloud_tea_util.models import RuntimeOptions def detect_face_feature_points(url): try: detect_face_request = DetectFaceRequest( image_url=url, landmark=True, quality=True, max_face_number=5, pose=True ) runtime = RuntimeOptions() result = facebody_client.detect_face_with_options(detect_face_request, runtime) return result.body except Exception as e: print(e) return None def format_points(face_points, faces_count, pid): points = face_points.landmarks qualities = face_points.qualities result = {} print(f'faces_count: {faces_count}') for i in range(faces_count): face = {} index = 0 j = 0 while j < 105 * 2: if index == 105: index = 0 j = 0 if index in feature_points: face[index] = {"x": points[0], "y": points[1]} points = points[2:] index += 1 j += 2 face['综合分数'] = qualities.score_list[i] face['模糊分数'] = qualities.blur_list[i] face['人脸概率'] = qualities.fnf_list[i] face['光照分数'] = qualities.illu_list[i] face['噪音分数'] = qualities.noise_list[i] result[f'{pid}_face{i}'] = face return result def get_distance(p1, p2): # 计算两个像素点之间的距离 d = math.sqrt((p1['x'] - p2['x']) ** 2 + (p1['y'] - p2['y']) ** 2) d = round(d, 2) return d def sort_faces(photo1_feature_points, photo2_feature_points): # 对两张图片中每张脸的特征点距离进行排序,找到最近的距离为同一张脸, 超过阈值的不算 threshold = 50 # 阈值定为50个像素点 new_photo1_feature_points = {} new_photo2_feature_points = {} for face2 in photo2_feature_points: min_distance = 9999999 find = False for face1 in photo1_feature_points: distance = get_distance(photo1_feature_points[face1][29], photo2_feature_points[face2][29]) if distance < threshold and distance < min_distance: find = True min_distance = distance min_face = face1 if find: new_photo2_feature_points[face2] = photo2_feature_points[face2] new_photo1_feature_points[face2] = photo1_feature_points[min_face] return new_photo1_feature_points, new_photo2_feature_points def get_distance_points(photo1_feature_points, photo2_feature_points): # 添加特征点之间的距离到原有的csv文件中 f = open('output/distance.csv', 'a', encoding='utf-8', newline='') csv_writer = csv.writer(f) # csv_writer.writerow(['pid_face', 'left_eye_top_29', 'left_eye_bottom_36', 'right_eye_top_45', 'right_eye_bottom_52', 'nose_59', 'mouth_top_66', 'mouth_bottom_75', 'chin_98', 'min', 'avg', 'max', '综合分数', '模糊分数', '人脸概率', '光照分数', '噪音分数']) for face in photo2_feature_points: left_eye_top_29 = get_distance(photo1_feature_points[face][29], photo2_feature_points[face][29]) left_eye_bottom_36 = get_distance(photo1_feature_points[face][36], photo2_feature_points[face][36]) right_eye_top_45 = get_distance(photo1_feature_points[face][45], photo2_feature_points[face][45]) right_eye_bottom_52 = get_distance(photo1_feature_points[face][52], photo2_feature_points[face][52]) nose_59 = get_distance(photo1_feature_points[face][59], photo2_feature_points[face][59]) mouth_top_66 = get_distance(photo1_feature_points[face][66], photo2_feature_points[face][66]) mouth_bottom_75 = get_distance(photo1_feature_points[face][75], photo2_feature_points[face][75]) chin_98 = get_distance(photo1_feature_points[face][98], photo2_feature_points[face][98]) max_distance = max(left_eye_top_29, left_eye_bottom_36, right_eye_top_45, right_eye_bottom_52, nose_59, mouth_top_66, mouth_bottom_75, chin_98) min_distance = min(left_eye_top_29, left_eye_bottom_36, right_eye_top_45, right_eye_bottom_52, nose_59, mouth_top_66, mouth_bottom_75, chin_98) avg_distance = (left_eye_top_29 + left_eye_bottom_36 + right_eye_top_45 + right_eye_bottom_52 + nose_59 + mouth_top_66 + mouth_bottom_75 + chin_98) / 8 avg_distance = round(avg_distance, 2) csv_writer.writerow([face, left_eye_top_29, left_eye_bottom_36, right_eye_top_45, right_eye_bottom_52, nose_59, mouth_top_66, mouth_bottom_75, chin_98, min_distance, avg_distance, max_distance, photo2_feature_points[face]['综合分数'], photo2_feature_points[face]['模糊分数'], photo2_feature_points[face]['人脸概率'], photo2_feature_points[face]['光照分数'], photo2_feature_points[face]['噪音分数']]) f.close() def get_front_camera(pid): url = 'https://mp.api.suwa3d.com/api/takephotoOrder/photoStudioInfo' res = requests.get(url, params={'pid': pid}) if res.json()['data']['type'] == 1: return '103' elif res.json()['data']['type'] == 2: return '74' else: return '' def draw_points(photo1_feature_points, photo2_feature_points, pid): front_camera = get_front_camera(pid) localphoto1 = f'/data/datasets/photos/{pid}/photo1/{front_camera}_1.jpg' localphoto2 = f'/data/datasets/photos/{pid}/photo2/{front_camera}_8.jpg' localphoto3 = f'/data/datasets/photos/{pid}/photo1/{front_camera}_1_8_points.jpg' if os.path.exists(localphoto3): print(f'{localphoto3} exists,跳过...') return if not os.path.exists(localphoto1): if not os.path.exists(f'/data/datasets/photos/{pid}/photo1'): os.makedirs(f'/data/datasets/photos/{pid}/photo1') bucket_client.get_object_to_file(f'photos/{pid}/photo1/{front_camera}_1.jpg', localphoto1) if not os.path.exists(localphoto2): if not os.path.exists(f'/data/datasets/photos/{pid}/photo2'): os.makedirs(f'/data/datasets/photos/{pid}/photo2') bucket_client.get_object_to_file(f'photos/{pid}/photo2/{front_camera}_8.jpg', localphoto2) time.sleep(1) image1 = Image.open(localphoto1) image2 = Image.open(localphoto2) image1 = image1.resize((int(image1.width / 2), int(image1.height / 2))).rotate(180) image2 = image2.resize((int(image2.width / 2), int(image2.height / 2))).rotate(180) # 重叠融合image1,透明度60%和image2到image3 image3 = Image.blend(image1, image2, 0.6) draw1 = ImageDraw.Draw(image1) draw2 = ImageDraw.Draw(image2) draw3 = ImageDraw.Draw(image3) color1 = (255, 0, 0) color2 = (0, 255, 0) radius = 2 for face in photo1_feature_points: for j in feature_points: draw1.ellipse((photo1_feature_points[face][j]['x'] - radius, photo1_feature_points[face][j]['y'] - radius, photo1_feature_points[face][j]['x'] + radius, photo1_feature_points[face][j]['y'] + radius), fill=color1) draw2.ellipse((photo2_feature_points[face][j]['x'] - radius, photo2_feature_points[face][j]['y'] - radius, photo2_feature_points[face][j]['x'] + radius, photo2_feature_points[face][j]['y'] + radius), fill=color2) draw3.ellipse((photo1_feature_points[face][j]['x'] - radius, photo1_feature_points[face][j]['y'] - radius, photo1_feature_points[face][j]['x'] + radius, photo1_feature_points[face][j]['y'] + radius), fill=color1) draw3.ellipse((photo2_feature_points[face][j]['x'] - radius, photo2_feature_points[face][j]['y'] - radius, photo2_feature_points[face][j]['x'] + radius, photo2_feature_points[face][j]['y'] + radius), fill=color2) # i = 0 # while i < len(photo2_feature_points): # for j in range(face_feature_points_count): # if j in feature_points: # draw1.ellipse((photo1_feature_points[f'{pid}_face{i}'][j]['x'] - radius, photo1_feature_points[f'{pid}_face{i}'][j]['y'] - radius, photo1_feature_points[f'{pid}_face{i}'][j]['x'] + radius, photo1_feature_points[f'{pid}_face{i}'][j]['y'] + radius), fill=color1) # draw2.ellipse((photo2_feature_points[f'{pid}_face{i}'][j]['x'] - radius, photo2_feature_points[f'{pid}_face{i}'][j]['y'] - radius, photo2_feature_points[f'{pid}_face{i}'][j]['x'] + radius, photo2_feature_points[f'{pid}_face{i}'][j]['y'] + radius), fill=color2) # draw3.ellipse((photo1_feature_points[f'{pid}_face{i}'][j]['x'] - radius, photo1_feature_points[f'{pid}_face{i}'][j]['y'] - radius, photo1_feature_points[f'{pid}_face{i}'][j]['x'] + radius, photo1_feature_points[f'{pid}_face{i}'][j]['y'] + radius), fill=color1) # draw3.ellipse((photo2_feature_points[f'{pid}_face{i}'][j]['x'] - radius, photo2_feature_points[f'{pid}_face{i}'][j]['y'] - radius, photo2_feature_points[f'{pid}_face{i}'][j]['x'] + radius, photo2_feature_points[f'{pid}_face{i}'][j]['y'] + radius), fill=color2) # i += 1 image1.save(f'/data/datasets/photos/{pid}/photo1/{front_camera}_1_points.jpg') image2.save(f'/data/datasets/photos/{pid}/photo2/{front_camera}_8_points.jpg') image3.save(f'/data/datasets/photos/{pid}/photo1/{front_camera}_1_8_points.jpg') # image1.show() # image2.show() # image3.show() def main(pids): for pid in pids: print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理 {pid} 的照片...') front_camera = get_front_camera(pid) url1 = f'photos/{pid}/photo1/{front_camera}_1.jpg' url2 = f'photos/{pid}/photo2/{front_camera}_8.jpg' sign_url1 = bucket_client.sign_url('GET', url1, 3600, params={'x-oss-process': style}) sign_url2 = bucket_client.sign_url('GET', url2, 3600, params={'x-oss-process': style}) # print(f'正在处理 {pid} 的照片{front_camera} ...{sign_url1}') result1 = detect_face_feature_points(sign_url1) if result1 is None: print(f'照片{front_camera} 未检测到人脸') f = open('output/error.log', 'a') f.write(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} {front_camera} 未检测到人脸,跳过\n') continue photo1_face_count = result1.data.face_count photo1_feature_points = result1.data result2 = detect_face_feature_points(sign_url2) if result2 is None: print(f'照片{front_camera} 未检测到人脸') f = open('output/error.log', 'a') f.write(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} {front_camera} 未检测到人脸,跳过\n') continue photo2_face_count = result2.data.face_count photo2_feature_points = result2.data photo1_feature_points = format_points(photo1_feature_points, photo1_face_count, pid) photo2_feature_points = format_points(photo2_feature_points, photo2_face_count, pid) photo1_feature_points, photo2_feature_points = sort_faces(photo1_feature_points, photo2_feature_points) print(photo1_feature_points) print(photo2_feature_points) get_distance_points(photo1_feature_points, photo2_feature_points) draw_points(photo1_feature_points, photo2_feature_points, pid) print(f'处理完成 {pid} ...') if __name__ == "__main__": # 定义阿里云oss的配置参数 access_key_id = 'LTAI5tSReWm8hz7dSYxxth8f' access_key_secret = '8ywTDF9upPAtvgXtLKALY2iMYHIxdS' facebody_endpoint = 'facebody.cn-shanghai.aliyuncs.com' endpoint = 'oss-cn-shanghai.aliyuncs.com' bucket_name = 'suwa3d-securedata' face_feature_points_count = 105 feature_points = [29, 36, 45, 52, 59, 66, 75, 98] style = 'image/resize,p_50' bucket_client = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name) facebody_client = Client(Config( access_key_id=access_key_id, access_key_secret=access_key_secret, endpoint=facebody_endpoint, region_id='cn-shanghai' )) if len(sys.argv) == 2: if sys.argv[1] == 'all': with open('datasets/pids_all.txt', 'r') as f: pids = f.read().split(',') else: pids = sys.argv[1].split(',') main(pids) else: print('用法:python draw_points_on_faces.py ') sys.exit(0)