import toml,time,hmac,hashlib,requests import oss2,os import redis #读取配置文件 def cfg(keyName): try: # 获取当前文件所在目录 current_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(current_dir, 'config.toml') with open(config_path, 'r') as f: config = toml.load(f) print(config) if "." in keyName: keyName = keyName.split(".") if keyName[0] in config and keyName[1] in config[keyName[0]]: return config[keyName[0]][keyName[1]] else: print(f"配置项 {keyName} 不存在") return None else: if keyName in config: return config[keyName] else: print(f"配置项 {keyName} 不存在") return None except FileNotFoundError: print(f"config.toml 文件不存在,尝试路径: {config_path}") return None except Exception as e: print(f"读取配置文件时发生错误: {e}") return None def oss(): AccessKeyId = cfg("oss.AccessKeyId") AccessKeySecret = cfg("oss.AccessKeySecret") Endpoint = cfg("oss.EndPoint") Bucket = cfg("oss.BucketSuwa3dDataSecret") # 检查配置是否完整 if not all([AccessKeyId, AccessKeySecret, Endpoint, Bucket]): raise ValueError("OSS配置不完整,请检查config.toml文件中的oss配置项") oss_client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket) return oss_client def create_redis_connection(): """创建 Redis 连接,若连接失败则重试""" while True: try: r = redis.Redis(host="106.14.158.208",password="kcV2000",port=6379,db=6) # 尝试进行一次操作,检查连接是否有效 r.ping() # ping 操作是一个简单的连接测试 print("Redis连接成功!") return r except ConnectionError: print("Redis连接失败,正在重试...") time.sleep(5) def notify(content): if content == "": return "content 不能为空" notify_user_Ids = ["18950403426"] for user_agent_id in notify_user_Ids: data = { 'userId': user_agent_id, 'message': content, } headers = {'Content-Type': 'application/json'} message_send_url = "https://mp.api.suwa3d.com/api/qyNotify/sendMessage?userId="+user_agent_id+"&message="+content response = requests.post(message_send_url, data=json.dumps(data), headers=headers)