import oss2,redis # 连接oss - 单例模式 class OSSClientSingleton: _instance = None _client = None def __new__(cls): if cls._instance is None: cls._instance = super(OSSClientSingleton, cls).__new__(cls) return cls._instance def get_client(self): if self._client is None: AccessKeyId = 'LTAI5tSReWm8hz7dSYxxth8f' AccessKeySecret = '8ywTDF9upPAtvgXtLKALY2iMYHIxdS' Endpoint = 'oss-cn-shanghai.aliyuncs.com' Bucket = 'suwa3d-securedata' self._client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket) return self._client def ossClient(): """获取OSS客户端单例""" return OSSClientSingleton().get_client() #连接redis,单例模式 class RedisClientSingleton: _instance = None _client = None def __new__(cls): if cls._instance is None: cls._instance = super(RedisClientSingleton, cls).__new__(cls) return cls._instance def get_client(self): if self._client is None: # 添加超时参数,防止连接超时 # socket_timeout: 每次操作的超时时间(秒) # socket_connect_timeout: 连接超时时间(秒) # socket_keepalive: 启用 TCP keepalive # socket_keepalive_options: keepalive 选项 self._client = redis.Redis( host='mp.api.suwa3d.com', password='kcV2000', port=6379, db=6, socket_timeout=30, # 操作超时30秒 socket_connect_timeout=10, # 连接超时10秒 socket_keepalive=True, # 启用 TCP keepalive socket_keepalive_options={}, # keepalive 选项 health_check_interval=30 # 健康检查间隔30秒 ) else: # 检查连接是否有效,如果断开则重新连接 try: self._client.ping() except (redis.ConnectionError, redis.TimeoutError, AttributeError): # 连接断开,重新创建连接 self._client = None self._client = redis.Redis( host='mp.api.suwa3d.com', password='kcV2000', port=6379, db=6, socket_timeout=30, socket_connect_timeout=10, socket_keepalive=True, socket_keepalive_options={}, health_check_interval=30 ) return self._client def redisClient(): """获取Redis客户端单例""" return RedisClientSingleton().get_client()