You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
76 lines
2.6 KiB
76 lines
2.6 KiB
import toml,time,hmac,hashlib,requests,json |
|
import oss2,os |
|
import redis |
|
#读取配置文件 |
|
def cfg(keyName): |
|
try: |
|
# 获取当前文件所在目录 |
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
config_path = os.path.join(current_dir, 'config.toml') |
|
|
|
with open(config_path, 'r') as f: |
|
config = toml.load(f) |
|
|
|
print(config) |
|
if "." in keyName: |
|
keyName = keyName.split(".") |
|
if keyName[0] in config and keyName[1] in config[keyName[0]]: |
|
return config[keyName[0]][keyName[1]] |
|
else: |
|
print(f"配置项 {keyName} 不存在") |
|
return None |
|
else: |
|
if keyName in config: |
|
return config[keyName] |
|
else: |
|
print(f"配置项 {keyName} 不存在") |
|
return None |
|
except FileNotFoundError: |
|
print(f"config.toml 文件不存在,尝试路径: {config_path}") |
|
return None |
|
except Exception as e: |
|
print(f"读取配置文件时发生错误: {e}") |
|
return None |
|
|
|
|
|
def oss(): |
|
AccessKeyId = cfg("oss.AccessKeyId") |
|
AccessKeySecret = cfg("oss.AccessKeySecret") |
|
Endpoint = cfg("oss.EndPoint") |
|
Bucket = cfg("oss.BucketSuwa3dDataSecret") |
|
|
|
# 检查配置是否完整 |
|
if not all([AccessKeyId, AccessKeySecret, Endpoint, Bucket]): |
|
raise ValueError("OSS配置不完整,请检查config.toml文件中的oss配置项") |
|
|
|
oss_client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket) |
|
return oss_client |
|
|
|
|
|
def create_redis_connection(): |
|
"""创建 Redis 连接,若连接失败则重试""" |
|
while True: |
|
try: |
|
r = redis.Redis(host="106.14.158.208",password="kcV2000",port=6379,db=6) |
|
# 尝试进行一次操作,检查连接是否有效 |
|
r.ping() # ping 操作是一个简单的连接测试 |
|
print("Redis连接成功!") |
|
return r |
|
except ConnectionError: |
|
print("Redis连接失败,正在重试...") |
|
time.sleep(5) |
|
|
|
def notify(content): |
|
|
|
if content == "": |
|
return "content 不能为空" |
|
|
|
notify_user_Ids = ["18950403426"] |
|
for user_agent_id in notify_user_Ids: |
|
data = { |
|
'userId': user_agent_id, |
|
'message': content, |
|
} |
|
headers = {'Content-Type': 'application/json'} |
|
message_send_url = "https://mp.api.suwa3d.com/api/qyNotify/sendMessage?userId="+user_agent_id+"&message="+content |
|
response = requests.post(message_send_url, data=json.dumps(data), headers=headers) |