Browse Source

下单接口处理

master
dongchangxi 7 months ago
parent
commit
1f6f16aff6
  1. BIN
      openapi/__pycache__/utils.cpython-311.pyc
  2. 40
      openapi/config.toml
  3. 208
      openapi/main.py
  4. 1
      openapi/token.txt
  5. 88
      openapi/utils.py

BIN
openapi/__pycache__/utils.cpython-311.pyc

Binary file not shown.

40
openapi/config.toml

@ -0,0 +1,40 @@
[account]
appId="1nt7j0w17ecd9duruc48m7050012r5a8"
appSecret="1nt7j0w17ecd9duruc48m70600m2a32r"
shopId="1"
[oss]
AccessKeyId = "LTAI5tSReWm8hz7dSYxxth8f"
AccessKeySecret = "8ywTDF9upPAtvgXtLKALY2iMYHIxdS"
EndPoint = "oss-cn-shanghai.aliyuncs.com"
BucketSuwa3dDataSecret = "suwa3d-securedata"
[api]
#请求的域名
#test_domain="http://127.0.0.1:8188/"
domain = "https://open.api.suwa3d.com/"
#Token地址
getToken="token/getToken"
#获取pid
getPid="link/pid"
#记录拍照订单信息
recordPose="link/recordBaseInfo"
#脸部图片的上传地址
faceUploadUrl = "upload/getUploadUrlForBadge"
#下单接口
order = "order/confirmCartAndOrder"
#提交建模
submitModeling = "modeling/submitOrderModeling"
#模拟图片上传回调
callbackImageUpload = "callback/uploadBadgeImage"
[customer]
id=1
peoples=1
props=0
pets=0
people_pose=["站姿"]
platform=99
customer_name="董长希",
customer_mobile="18950403426"

208
openapi/main.py

@ -0,0 +1,208 @@
import utils,time,os,sys
from utils import cfg
#获取PID,记录基础信息
def getPidRecordPose():
params = {
"photo_group_id":int(time.time()),
"customer_id":cfg("customer.id"),
"platform":cfg("customer.platform"),
}
url = cfg("api.domain")+cfg("api.getPid")
data,status = utils.requestUrl(url,"GET",params)
if status != "success":
return 0,"获取PID失败"
pid = data["pid"]
#记录基础信息
params = {
"pid":pid,
"shop_id":cfg("account.shopId"),
"peoples":cfg("customer.peoples"),
"props":cfg("customer.props"),
"pets":cfg("customer.pets"),
"people_pose":cfg("customer.people_pose"),
}
url = cfg("api.domain")+cfg("api.recordPose")
data,status = utils.requestUrl(url,"POST",params)
if status != "success":
return 0,"记录基础信息失败"
print(f"记录基础信息成功,PID:{pid}")
return pid,"success"
#获取照片上传的地址, 上传到脸部位置,上传到卡通图片的位置,
def uploadImages(pid,imagePath):
#检测文件图片是否存在
if not os.path.exists(imagePath):
return "文件不存在"
ossFacePath = f"photos/{pid}/cartoon/origin/face.jpg"
ossPath = f"photos/{pid}/cartoon/cartoon_image/1.png"
utils.oss().put_object_from_file(ossFacePath,imagePath)
utils.oss().put_object_from_file(ossPath,imagePath)
callbackImageUpload(pid)
return "success"
#下单
def makeOrder(pid):
if pid == 0:
return "PID不能为0"
params = {
"pid":pid,
"shop_id":cfg("account.shopId"),
"address":{
"country_id":350206,
"province_id":350000,
"county_id":"0",
"city_id":350200,
"address":"汇金湖里大厦B栋312",
"receive_address":"汇金湖里大厦B栋312",
"send_model_type": 2,
"memo": "测试",
"customer_name": cfg("customer.name"),
"customer_mobile": cfg("customer.mobile")
},
"order_params": [
{
"id": 0,
"discount_name": "",
"discount_amount": 0,
}
],
"carts_info": [
{
"prod_id": 7,
"skus": [
{
"quantity": 1,
"attrs": [
{
"attr_id": 5,
"attr_val_id": 13,
},
{
"attr_id": 1,
"attr_val_id": 1,
}
],
}
]
}
],
"cartoon_pic_no": 1,
"product_name": "badge",
"platform": 99
}
url = cfg("api.domain")+cfg("api.order")
data,status = utils.requestUrl(url,"POST",params)
print(f"下订单结果:{data}")
if status != "success":
return 0,"下单失败"
status = submitModeling(data["order_ids"][0])
if status != "success":
return 0,"提交建模失败"
return data["order_ids"][0],"success"
#图片上传回调
def callbackImageUpload(pid):
params = {
"pid":pid,
"token":utils.getToken(),
}
url = cfg("api.domain")+cfg("api.callbackImageUpload")
data,status = utils.requestUrl(url,"GET",params)
print(f"图片上传回调结果:{data}-{status}")
return "success"
#提交建模
def submitModeling(orderId):
if orderId == 0:
return "订单ID不能为0"
params = {
"order_id":orderId,
"is_cartoon":1,
}
url = cfg("api.domain")+cfg("api.submitModeling")
data,status = utils.requestUrl(url,"POST",params)
print(f"提交建模结果:{status}")
if status != "success":
return "提交建模失败"
return "success"
def imageFolder():
#判断目录下是否有images文件夹 和 文件
if not os.path.exists("images"):
print("images目录不存在")
return
arrFiles = os.listdir("images")
if len(arrFiles) == 0:
print("images目录下没有文件")
return
#遍历 images 目录下的所有图片
for image in arrFiles:
#获取完整的图片地址
imagePath = os.path.join("images",image)
main(imagePath)
def main(imagePath = ""):
#判断文件是否存在
if not os.path.exists(imagePath):
print(f"图片不存在:{imagePath}")
return None,False
print(f"开始处理图片:{imagePath}")
pid,status = getPidRecordPose()
if status != "success":
print(f"获取PID失败:{imagePath}")
return None,False
status = uploadImages(pid,imagePath)
if status != "success":
print(f"上传图片失败:{imagePath}")
return None,False
order_id,status = makeOrder(pid)
if status != "success":
print(f"下单失败:{imagePath}")
return None,False
data = {"pid":pid,"imagePath":imagePath,"orderId":order_id}
print(f"下单成功:{data},移除图片:{imagePath}")
os.remove(imagePath)
return data,True
if __name__ == "__main__":
#获取请求参数
args = sys.argv[1:]
if len(args) == 0:
#需要二次确认是否要处理images目录下的所有图片
confirmV = input("是否要处理images目录下的所有图片? (y/n)")
if confirmV == "y":
imageFolder()
else:
print("请输入图片路径")
sys.exit(1)
else:
for imagePath in args:
main(imagePath)
print(f"{args}")

1
openapi/token.txt

@ -0,0 +1 @@
xdezmm0291gd9x7z89yj3um2709r55mu|1747365162

88
openapi/utils.py

@ -0,0 +1,88 @@
import toml,time,hmac,hashlib,requests
import oss2,os
#读取配置文件
def cfg(keyName):
try:
with open('config.toml', 'r') as f:
config = toml.load(f)
if "." in keyName:
keyName = keyName.split(".")
return config[keyName[0]][keyName[1]]
else:
return config[keyName]
except Exception as e:
return None
#生成签名
def sign(params):
params = [
f"appId={params['appId']}",
f"appSecret={cfg('account.appSecret')}",
f"shopId={params['shop_id']}",
f"timestamp={params['timestamp']}"
]
# 拼接参数字符串
joined_str = "&".join(params)
#进行签名 签名方式采用 HMAC-SHA256 算法
hmac_sha256 = hmac.new(cfg("account.appSecret").encode(), joined_str.encode(), hashlib.sha256)
sign = hmac_sha256.hexdigest()
return sign
#获取token
def getToken():
#判断是否有token.txt文件
if os.path.exists("token.txt"):
#读取token.txt文件
with open("token.txt", "r") as f:
token = f.read()
#判断token是否过期
if int(token.split("|")[1]) > int(time.time()):
return token.split("|")[0]
#获取token
url = cfg("api.domain")+cfg("api.getToken")
params = {
"appId":cfg("account.appId"),
"shop_id":cfg("account.shopId"),
"timestamp":str(int(time.time())), #时间戳,秒级 Unix 时间戳(自 1970-01-01 00:00:00 UTC 以来的秒数), 10位数
}
params["sign"] = sign(params)
#发送请求
response = requests.post(url, json=params)
data = response.json()
if data["code"] != 1000:
print("获取token失败",data)
return ""
else:
#记录token
with open("token.txt", "w") as f:
f.write(data["data"]["token"]+"|"+str(int(time.time())+3600))
return data["data"]["token"]
def requestUrl(url,method,params):
#获取token
token = getToken()
#设置headers
headers = {
"Authorization":f"Bearer {token}",
"Content-Type":"application/json"
}
#发送请求
response = requests.request(method,url,headers=headers,json=params)
data = response.json()
if data["code"] != 1000:
print("请求失败",data)
return "","error"
else:
return data["data"],"success"
def oss():
AccessKeyId = cfg("oss.AccessKeyId")
AccessKeySecret = cfg("oss.AccessKeySecret")
Endpoint = cfg("oss.EndPoint")
Bucket = cfg("oss.BucketSuwa3dDataSecret")
oss_client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket)
return oss_client
Loading…
Cancel
Save