You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
164 lines
6.6 KiB
164 lines
6.6 KiB
from ast import Not |
|
import base64 |
|
from tkinter.ttk import Style |
|
from PIL import Image, ImageDraw |
|
import config |
|
import requests, pickledb, json, datetime, oss2, os, sys, cv2 |
|
import numpy as np |
|
from imutils.object_detection import non_max_suppression |
|
|
|
def re_get_access_token(): |
|
baidu_token_host = 'https://aip.baidubce.com/oauth/2.0/token' |
|
res = requests.get(baidu_token_host, params={'grant_type': 'client_credentials', 'client_id': config.baidu_api['face']['api_key'], 'client_secret': config.baidu_api['face']['secret_key']}) |
|
access_token = json.loads(res.text)['access_token'] |
|
expires_time = int(json.loads(res.text)['expires_in']) + int(datetime.datetime.now().timestamp()) |
|
print(access_token, expires_time) |
|
return access_token, expires_time |
|
|
|
def get_access_token(): |
|
config = pickledb.load('config.json', auto_dump=True) |
|
baidu = config.get('baidu_api') |
|
if not baidu: |
|
access_token, expires_time = re_get_access_token() |
|
config.set('baidu_api', {'access_token': access_token, 'expires_time': expires_time}) |
|
else: |
|
access_token = baidu['access_token'] |
|
expires_time = baidu['expires_time'] |
|
if int(datetime.datetime.now().timestamp()) > expires_time: |
|
access_token, expires_time = re_get_access_token() |
|
config.set('baidu_api', {'access_token': access_token, 'expires_time': expires_time}) |
|
|
|
config.dump() |
|
print('access_token:', access_token) |
|
return access_token |
|
|
|
def down_image(pid): |
|
if not os.path.exists(workdir): |
|
os.mkdir(workdir) |
|
input_path = os.path.join(workdir, pid) |
|
if not os.path.exists(input_path): |
|
os.mkdir(input_path) |
|
os.mkdir(os.path.join(input_path, 'photo2')) |
|
output = os.path.join(input_path, 'photo2', '103_8.jpg') |
|
|
|
oss_client.get_object_to_file(f'photos/{pid}/photo2/103_8.jpg', output) |
|
return output |
|
|
|
def baidu_face(pid): |
|
access_token = get_access_token() |
|
request_url = 'https://aip.baidubce.com/rest/2.0/face/v3/detect' + '?access_token=' + access_token |
|
img_url = f'https://3dview.suwa3d.com/face/{pid}/103_8.jpg' |
|
params = { |
|
'access_token': access_token, |
|
'image': img_url, |
|
'image_type': 'URL', |
|
'face_field': 'age,beauty,expression,gender,glasses,landmark,eye_status', |
|
'max_face_num': '6', |
|
} |
|
res = requests.post(request_url, data=params) |
|
print(res.text) |
|
|
|
def aliyun_face(pid): |
|
high = False |
|
style = 'imm/detectface' |
|
keys = [f'photos/{pid}/photo1/103_1.jpg', f'photos/{pid}/photo2/103_8.jpg'] |
|
for objectkey in keys: |
|
res = oss_client.get_object(objectkey, process=style) |
|
res = json.loads(res.read()) |
|
if res['success']: |
|
if res['Faces'] is None: |
|
print('no face') |
|
return None |
|
else: |
|
print('faces num:', len(res['Faces'], ' in ', objectkey)) |
|
for face in res['Faces']: |
|
print('-' * 20) |
|
print('face_id:', face['FaceId']) |
|
print('gender:', face['Gender']) |
|
print('age:', face['Age']) |
|
print('glasses:', face['FaceAttributes']['Glasses']) |
|
print('Boundary:', face['FaceAttributes']['FaceBoundary']) |
|
start_point = (face['FaceAttributes']['FaceBoundary']['Left'], face['FaceAttributes']['FaceBoundary']['Top']) |
|
end_point = (face['FaceAttributes']['FaceBoundary']['Left'] + face['FaceAttributes']['FaceBoundary']['Width'], face['FaceAttributes']['FaceBoundary']['Top'] + face['FaceAttributes']['FaceBoundary']['Height']) |
|
# img = cv2.imread(down_image(pid)) |
|
# cv2.rectangle(img, start_point, end_point, (0, 0, 255), 2) |
|
# cv2.imshow('img', img) |
|
# cv2.waitKey(0) |
|
|
|
if face['Gender'] == 'FEMALE' and face['Age'] < 22: high = True |
|
if face['Gender'] == 'MALE' and face['Age'] < 15: high = True |
|
else: |
|
print('face detect failed...') |
|
return high |
|
|
|
def get_head_rect(photo): |
|
style = 'imm/detectface' |
|
objectkey = f'photos/{pid}/photo2/{photo}' |
|
res = oss_client.get_object(objectkey, process=style) |
|
res = json.loads(res.read()) |
|
if res['success']: |
|
if res['Faces'] is None: |
|
print('no face') |
|
return None |
|
print('faces num:', len(res['Faces'])) |
|
rects = [] |
|
for face in res['Faces']: |
|
print('-' * 20) |
|
print('face_id:', face['FaceId']) |
|
start_point = (face['FaceAttributes']['FaceBoundary']['Left'], face['FaceAttributes']['FaceBoundary']['Top']) |
|
end_point = (face['FaceAttributes']['FaceBoundary']['Left'] + face['FaceAttributes']['FaceBoundary']['Width'], face['FaceAttributes']['FaceBoundary']['Top'] + face['FaceAttributes']['FaceBoundary']['Height']) |
|
rect = (start_point, end_point) |
|
rects.append(rect) |
|
return rects |
|
return None |
|
|
|
def gen_mask(pid): |
|
workdir = f'z:\\{pid}\\' |
|
if not os.path.exists(workdir): |
|
print('not found pid:', pid) |
|
return |
|
photo1 = os.path.join(workdir, 'photo1') |
|
photo2 = os.path.join(workdir, 'photo2') |
|
for photo in os.listdir(photo2): |
|
if photo.endswith('.jpg'): |
|
print('photo:', photo) |
|
rects = get_head_rect(photo) |
|
print(rects) |
|
if rects is None: continue |
|
for rect in rects: |
|
mask = np.zeros((3040, 4056, 3), np.uint8) |
|
cv2.rectangle(mask, rect[0], rect[1], (255, 255, 255), -1) |
|
# cv2.imwrite(os.path.join(photo2, photo + '.mask.png'), mask) |
|
mask = cv2.cvtColor(mask, cv2.COLOR_BGR2GRAY) |
|
cv2.imwrite(os.path.join(photo1, photo[:-5] + '1.jpg.mask.png'), mask) |
|
|
|
def aliyun_tagimage(pid): |
|
high = False |
|
style = 'imm/tagimage' |
|
objectkey = f'photos/{pid}/photo2/103_8.jpg' |
|
res = oss_client.get_object(objectkey, process=style) |
|
res = json.loads(res.read()) |
|
if res['success']: |
|
print('tags num:', len(res['Tags'])) |
|
for tag in res['Tags']: |
|
print(tag) |
|
else: |
|
print('tag detect failed...') |
|
return high |
|
|
|
if __name__ == '__main__': |
|
workdir = 'Z:\\face\\' |
|
AccessKeyId = 'LTAI5tSReWm8hz7dSYxxth8f' |
|
AccessKeySecret = '8ywTDF9upPAtvgXtLKALY2iMYHIxdS' |
|
Endpoint = 'oss-cn-shanghai.aliyuncs.com' |
|
Bucket = 'suwa3d-securedata' |
|
oss_client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket) |
|
|
|
if len(sys.argv) == 2: |
|
pid = sys.argv[1] |
|
else: |
|
print('Usage: python test.py pid') |
|
exit(1) |
|
# aliyun_tagimage(pid) |
|
aliyun_face(pid) |
|
# gen_mask(pid) |