import toml,time,hmac,hashlib,requests import oss2,os #读取配置文件 def cfg(keyName): try: with open('config.toml', 'r') as f: config = toml.load(f) if "." in keyName: keyName = keyName.split(".") return config[keyName[0]][keyName[1]] else: return config[keyName] except Exception as e: return None #生成签名 def sign(params): params = [ f"appId={params['appId']}", f"appSecret={cfg('account.appSecret')}", f"shopId={params['shop_id']}", f"timestamp={params['timestamp']}" ] # 拼接参数字符串 joined_str = "&".join(params) #进行签名 签名方式采用 HMAC-SHA256 算法 hmac_sha256 = hmac.new(cfg("account.appSecret").encode(), joined_str.encode(), hashlib.sha256) sign = hmac_sha256.hexdigest() return sign #获取token def getToken(): #判断是否有token.txt文件 if os.path.exists("token.txt"): #读取token.txt文件 with open("token.txt", "r") as f: token = f.read() #判断token是否过期 if int(token.split("|")[1]) > int(time.time()): return token.split("|")[0] #获取token url = cfg("api.domain")+cfg("api.getToken") params = { "appId":cfg("account.appId"), "shop_id":cfg("account.shopId"), "timestamp":str(int(time.time())), #时间戳,秒级 Unix 时间戳(自 1970-01-01 00:00:00 UTC 以来的秒数), 10位数 } params["sign"] = sign(params) #发送请求 response = requests.post(url, json=params) data = response.json() if data["code"] != 1000: print("获取token失败",data) return "" else: #记录token with open("token.txt", "w") as f: f.write(data["data"]["token"]+"|"+str(int(time.time())+3600)) return data["data"]["token"] def requestUrl(url,method,params): #获取token token = getToken() #设置headers headers = { "Authorization":f"Bearer {token}", "Content-Type":"application/json" } #发送请求 response = requests.request(method,url,headers=headers,json=params) data = response.json() if data["code"] != 1000: print("请求失败",data) return "","error" else: return data["data"],"success" def oss(): AccessKeyId = cfg("oss.AccessKeyId") AccessKeySecret = cfg("oss.AccessKeySecret") Endpoint = cfg("oss.EndPoint") Bucket = cfg("oss.BucketSuwa3dDataSecret") oss_client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket) return oss_client