You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
232 lines
12 KiB
232 lines
12 KiB
import os, sys, time, math, csv, requests |
|
import oss2 |
|
from PIL import Image, ImageDraw |
|
from alibabacloud_facebody20191230.client import Client |
|
from alibabacloud_facebody20191230.models import DetectFaceRequest |
|
from alibabacloud_tea_openapi.models import Config |
|
from alibabacloud_tea_util.models import RuntimeOptions |
|
|
|
def detect_face_feature_points(url): |
|
try: |
|
detect_face_request = DetectFaceRequest( |
|
image_url=url, |
|
landmark=True, |
|
quality=True, |
|
max_face_number=5, |
|
pose=True |
|
) |
|
runtime = RuntimeOptions() |
|
result = facebody_client.detect_face_with_options(detect_face_request, runtime) |
|
return result.body |
|
except Exception as e: |
|
print(e) |
|
return None |
|
|
|
def format_points(face_points, faces_count, pid): |
|
points = face_points.landmarks |
|
qualities = face_points.qualities |
|
result = {} |
|
print(f'faces_count: {faces_count}') |
|
for i in range(faces_count): |
|
face = {} |
|
index = 0 |
|
j = 0 |
|
while j < 105 * 2: |
|
if index == 105: |
|
index = 0 |
|
j = 0 |
|
if index in feature_points: face[index] = {"x": points[0], "y": points[1]} |
|
points = points[2:] |
|
index += 1 |
|
|
|
j += 2 |
|
face['综合分数'] = qualities.score_list[i] |
|
face['模糊分数'] = qualities.blur_list[i] |
|
face['人脸概率'] = qualities.fnf_list[i] |
|
face['光照分数'] = qualities.illu_list[i] |
|
face['噪音分数'] = qualities.noise_list[i] |
|
result[f'{pid}_face{i}'] = face |
|
return result |
|
|
|
def get_distance(p1, p2): |
|
# 计算两个像素点之间的距离 |
|
d = math.sqrt((p1['x'] - p2['x']) ** 2 + (p1['y'] - p2['y']) ** 2) |
|
d = round(d, 2) |
|
return d |
|
|
|
def sort_faces(photo1_feature_points, photo2_feature_points): |
|
# 对两张图片中每张脸的特征点距离进行排序,找到最近的距离为同一张脸, 超过阈值的不算 |
|
threshold = 50 # 阈值定为50个像素点 |
|
new_photo1_feature_points = {} |
|
new_photo2_feature_points = {} |
|
for face2 in photo2_feature_points: |
|
min_distance = 9999999 |
|
find = False |
|
for face1 in photo1_feature_points: |
|
distance = get_distance(photo1_feature_points[face1][29], photo2_feature_points[face2][29]) |
|
if distance < threshold and distance < min_distance: |
|
find = True |
|
min_distance = distance |
|
min_face = face1 |
|
if find: |
|
new_photo2_feature_points[face2] = photo2_feature_points[face2] |
|
new_photo1_feature_points[face2] = photo1_feature_points[min_face] |
|
|
|
return new_photo1_feature_points, new_photo2_feature_points |
|
|
|
|
|
def get_distance_points(photo1_feature_points, photo2_feature_points): |
|
# 添加特征点之间的距离到原有的csv文件中 |
|
f = open('output/distance.csv', 'a', encoding='utf-8', newline='') |
|
csv_writer = csv.writer(f) |
|
# csv_writer.writerow(['pid_face', 'left_eye_top_29', 'left_eye_bottom_36', 'right_eye_top_45', 'right_eye_bottom_52', 'nose_59', 'mouth_top_66', 'mouth_bottom_75', 'chin_98', 'min', 'avg', 'max', '综合分数', '模糊分数', '人脸概率', '光照分数', '噪音分数']) |
|
for face in photo2_feature_points: |
|
left_eye_top_29 = get_distance(photo1_feature_points[face][29], photo2_feature_points[face][29]) |
|
left_eye_bottom_36 = get_distance(photo1_feature_points[face][36], photo2_feature_points[face][36]) |
|
right_eye_top_45 = get_distance(photo1_feature_points[face][45], photo2_feature_points[face][45]) |
|
right_eye_bottom_52 = get_distance(photo1_feature_points[face][52], photo2_feature_points[face][52]) |
|
nose_59 = get_distance(photo1_feature_points[face][59], photo2_feature_points[face][59]) |
|
mouth_top_66 = get_distance(photo1_feature_points[face][66], photo2_feature_points[face][66]) |
|
mouth_bottom_75 = get_distance(photo1_feature_points[face][75], photo2_feature_points[face][75]) |
|
chin_98 = get_distance(photo1_feature_points[face][98], photo2_feature_points[face][98]) |
|
max_distance = max(left_eye_top_29, left_eye_bottom_36, right_eye_top_45, right_eye_bottom_52, nose_59, mouth_top_66, mouth_bottom_75, chin_98) |
|
min_distance = min(left_eye_top_29, left_eye_bottom_36, right_eye_top_45, right_eye_bottom_52, nose_59, mouth_top_66, mouth_bottom_75, chin_98) |
|
avg_distance = (left_eye_top_29 + left_eye_bottom_36 + right_eye_top_45 + right_eye_bottom_52 + nose_59 + mouth_top_66 + mouth_bottom_75 + chin_98) / 8 |
|
avg_distance = round(avg_distance, 2) |
|
csv_writer.writerow([face, left_eye_top_29, left_eye_bottom_36, right_eye_top_45, right_eye_bottom_52, nose_59, mouth_top_66, mouth_bottom_75, chin_98, min_distance, avg_distance, max_distance, photo2_feature_points[face]['综合分数'], photo2_feature_points[face]['模糊分数'], photo2_feature_points[face]['人脸概率'], photo2_feature_points[face]['光照分数'], photo2_feature_points[face]['噪音分数']]) |
|
f.close() |
|
|
|
def get_front_camera(pid): |
|
url = 'https://mp.api.suwa3d.com/api/takephotoOrder/photoStudioInfo' |
|
res = requests.get(url, params={'pid': pid}) |
|
if res.json()['data']['type'] == 1: |
|
return '103' |
|
elif res.json()['data']['type'] == 2: |
|
return '74' |
|
else: |
|
return '' |
|
|
|
def draw_points(photo1_feature_points, photo2_feature_points, pid): |
|
front_camera = get_front_camera(pid) |
|
localphoto1 = f'/data/datasets/photos/{pid}/photo1/{front_camera}_1.jpg' |
|
localphoto2 = f'/data/datasets/photos/{pid}/photo2/{front_camera}_8.jpg' |
|
localphoto3 = f'/data/datasets/photos/{pid}/photo1/{front_camera}_1_8_points.jpg' |
|
if os.path.exists(localphoto3): |
|
print(f'{localphoto3} exists,跳过...') |
|
return |
|
if not os.path.exists(localphoto1): |
|
if not os.path.exists(f'/data/datasets/photos/{pid}/photo1'): |
|
os.makedirs(f'/data/datasets/photos/{pid}/photo1') |
|
bucket_client.get_object_to_file(f'photos/{pid}/photo1/{front_camera}_1.jpg', localphoto1) |
|
if not os.path.exists(localphoto2): |
|
if not os.path.exists(f'/data/datasets/photos/{pid}/photo2'): |
|
os.makedirs(f'/data/datasets/photos/{pid}/photo2') |
|
bucket_client.get_object_to_file(f'photos/{pid}/photo2/{front_camera}_8.jpg', localphoto2) |
|
time.sleep(1) |
|
image1 = Image.open(localphoto1) |
|
image2 = Image.open(localphoto2) |
|
image1 = image1.resize((int(image1.width / 2), int(image1.height / 2))).rotate(180) |
|
image2 = image2.resize((int(image2.width / 2), int(image2.height / 2))).rotate(180) |
|
|
|
# 重叠融合image1,透明度60%和image2到image3 |
|
image3 = Image.blend(image1, image2, 0.6) |
|
|
|
draw1 = ImageDraw.Draw(image1) |
|
draw2 = ImageDraw.Draw(image2) |
|
draw3 = ImageDraw.Draw(image3) |
|
color1 = (255, 0, 0) |
|
color2 = (0, 255, 0) |
|
radius = 2 |
|
for face in photo1_feature_points: |
|
for j in feature_points: |
|
draw1.ellipse((photo1_feature_points[face][j]['x'] - radius, photo1_feature_points[face][j]['y'] - radius, photo1_feature_points[face][j]['x'] + radius, photo1_feature_points[face][j]['y'] + radius), fill=color1) |
|
draw2.ellipse((photo2_feature_points[face][j]['x'] - radius, photo2_feature_points[face][j]['y'] - radius, photo2_feature_points[face][j]['x'] + radius, photo2_feature_points[face][j]['y'] + radius), fill=color2) |
|
draw3.ellipse((photo1_feature_points[face][j]['x'] - radius, photo1_feature_points[face][j]['y'] - radius, photo1_feature_points[face][j]['x'] + radius, photo1_feature_points[face][j]['y'] + radius), fill=color1) |
|
draw3.ellipse((photo2_feature_points[face][j]['x'] - radius, photo2_feature_points[face][j]['y'] - radius, photo2_feature_points[face][j]['x'] + radius, photo2_feature_points[face][j]['y'] + radius), fill=color2) |
|
# i = 0 |
|
# while i < len(photo2_feature_points): |
|
# for j in range(face_feature_points_count): |
|
# if j in feature_points: |
|
# draw1.ellipse((photo1_feature_points[f'{pid}_face{i}'][j]['x'] - radius, photo1_feature_points[f'{pid}_face{i}'][j]['y'] - radius, photo1_feature_points[f'{pid}_face{i}'][j]['x'] + radius, photo1_feature_points[f'{pid}_face{i}'][j]['y'] + radius), fill=color1) |
|
# draw2.ellipse((photo2_feature_points[f'{pid}_face{i}'][j]['x'] - radius, photo2_feature_points[f'{pid}_face{i}'][j]['y'] - radius, photo2_feature_points[f'{pid}_face{i}'][j]['x'] + radius, photo2_feature_points[f'{pid}_face{i}'][j]['y'] + radius), fill=color2) |
|
# draw3.ellipse((photo1_feature_points[f'{pid}_face{i}'][j]['x'] - radius, photo1_feature_points[f'{pid}_face{i}'][j]['y'] - radius, photo1_feature_points[f'{pid}_face{i}'][j]['x'] + radius, photo1_feature_points[f'{pid}_face{i}'][j]['y'] + radius), fill=color1) |
|
# draw3.ellipse((photo2_feature_points[f'{pid}_face{i}'][j]['x'] - radius, photo2_feature_points[f'{pid}_face{i}'][j]['y'] - radius, photo2_feature_points[f'{pid}_face{i}'][j]['x'] + radius, photo2_feature_points[f'{pid}_face{i}'][j]['y'] + radius), fill=color2) |
|
# i += 1 |
|
|
|
image1.save(f'/data/datasets/photos/{pid}/photo1/{front_camera}_1_points.jpg') |
|
image2.save(f'/data/datasets/photos/{pid}/photo2/{front_camera}_8_points.jpg') |
|
image3.save(f'/data/datasets/photos/{pid}/photo1/{front_camera}_1_8_points.jpg') |
|
# image1.show() |
|
# image2.show() |
|
# image3.show() |
|
|
|
def main(pids): |
|
for pid in pids: |
|
print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} 正在处理 {pid} 的照片...') |
|
front_camera = get_front_camera(pid) |
|
url1 = f'photos/{pid}/photo1/{front_camera}_1.jpg' |
|
url2 = f'photos/{pid}/photo2/{front_camera}_8.jpg' |
|
|
|
sign_url1 = bucket_client.sign_url('GET', url1, 3600, params={'x-oss-process': style}) |
|
sign_url2 = bucket_client.sign_url('GET', url2, 3600, params={'x-oss-process': style}) |
|
# print(f'正在处理 {pid} 的照片{front_camera} ...{sign_url1}') |
|
result1 = detect_face_feature_points(sign_url1) |
|
if result1 is None: |
|
print(f'照片{front_camera} 未检测到人脸') |
|
f = open('output/error.log', 'a') |
|
f.write(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} {front_camera} 未检测到人脸,跳过\n') |
|
continue |
|
photo1_face_count = result1.data.face_count |
|
photo1_feature_points = result1.data |
|
result2 = detect_face_feature_points(sign_url2) |
|
if result2 is None: |
|
print(f'照片{front_camera} 未检测到人脸') |
|
f = open('output/error.log', 'a') |
|
f.write(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())} {pid} {front_camera} 未检测到人脸,跳过\n') |
|
continue |
|
|
|
photo2_face_count = result2.data.face_count |
|
photo2_feature_points = result2.data |
|
|
|
photo1_feature_points = format_points(photo1_feature_points, photo1_face_count, pid) |
|
photo2_feature_points = format_points(photo2_feature_points, photo2_face_count, pid) |
|
|
|
photo1_feature_points, photo2_feature_points = sort_faces(photo1_feature_points, photo2_feature_points) |
|
|
|
print(photo1_feature_points) |
|
print(photo2_feature_points) |
|
|
|
get_distance_points(photo1_feature_points, photo2_feature_points) |
|
draw_points(photo1_feature_points, photo2_feature_points, pid) |
|
|
|
print(f'处理完成 {pid} ...') |
|
|
|
if __name__ == "__main__": |
|
# 定义阿里云oss的配置参数 |
|
access_key_id = 'LTAI5tSReWm8hz7dSYxxth8f' |
|
access_key_secret = '8ywTDF9upPAtvgXtLKALY2iMYHIxdS' |
|
facebody_endpoint = 'facebody.cn-shanghai.aliyuncs.com' |
|
endpoint = 'oss-cn-shanghai.aliyuncs.com' |
|
bucket_name = 'suwa3d-securedata' |
|
face_feature_points_count = 105 |
|
feature_points = [29, 36, 45, 52, 59, 66, 75, 98] |
|
style = 'image/resize,p_50' |
|
|
|
bucket_client = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name) |
|
facebody_client = Client(Config( |
|
access_key_id=access_key_id, |
|
access_key_secret=access_key_secret, |
|
endpoint=facebody_endpoint, |
|
region_id='cn-shanghai' |
|
)) |
|
|
|
if len(sys.argv) == 2: |
|
if sys.argv[1] == 'all': |
|
with open('datasets/pids_all.txt', 'r') as f: |
|
pids = f.read().split(',') |
|
else: |
|
pids = sys.argv[1].split(',') |
|
main(pids) |
|
else: |
|
print('用法:python draw_points_on_faces.py <pids>') |
|
sys.exit(0) |