import oss2,redis import time import logging # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # 连接oss - 单例模式 class OSSClientSingleton: _instance = None _client = None def __new__(cls): if cls._instance is None: cls._instance = super(OSSClientSingleton, cls).__new__(cls) return cls._instance def get_client(self): if self._client is None: AccessKeyId = 'LTAI5tSReWm8hz7dSYxxth8f' AccessKeySecret = '8ywTDF9upPAtvgXtLKALY2iMYHIxdS' Endpoint = 'oss-cn-shanghai.aliyuncs.com' Bucket = 'suwa3d-securedata' self._client = oss2.Bucket(oss2.Auth(AccessKeyId, AccessKeySecret), Endpoint, Bucket) return self._client def ossClient(): """获取OSS客户端单例""" return OSSClientSingleton().get_client() #连接redis,单例模式 class RedisClientSingleton: _instance = None _client = None # Redis连接配置 REDIS_CONFIG = { 'host': 'mp.api.suwa3d.com', 'password': 'kcV2000', 'port': 6379, 'db': 6, 'socket_timeout': 30, # 操作超时30秒 'socket_connect_timeout': 10, # 连接超时10秒 'socket_keepalive': True, # 启用 TCP keepalive 'socket_keepalive_options': {}, # keepalive 选项 'health_check_interval': 30 # 健康检查间隔30秒 } # 重试配置 RETRY_INTERVAL = 5 # 重试间隔(秒) MAX_RETRY_INTERVAL = 60 # 最大重试间隔(秒),用于指数退避 def __new__(cls): if cls._instance is None: cls._instance = super(RedisClientSingleton, cls).__new__(cls) return cls._instance def _create_redis_client(self): """创建Redis客户端""" return redis.Redis(**self.REDIS_CONFIG) def _connect_with_retry(self, max_retries=None, retry_interval=None): """ 带重试机制的Redis连接方法 :param max_retries: 最大重试次数,None表示无限重试 :param retry_interval: 重试间隔(秒),None表示使用默认值,支持指数退避 :return: Redis客户端实例 """ if retry_interval is None: retry_interval = self.RETRY_INTERVAL retry_count = 0 current_interval = retry_interval while True: try: logger.info(f"尝试连接Redis (第 {retry_count + 1} 次)...") client = self._create_redis_client() # 测试连接 client.ping() logger.info("Redis连接成功!") return client except (redis.ConnectionError, redis.TimeoutError, AttributeError, Exception) as e: retry_count += 1 error_msg = str(e) logger.warning(f"Redis连接失败 (第 {retry_count} 次): {error_msg}") # 如果设置了最大重试次数且已达到,则抛出异常 if max_retries is not None and retry_count >= max_retries: logger.error(f"达到最大重试次数 {max_retries},停止重试") raise # 指数退避:每次重试间隔逐渐增加,但不超过最大值 logger.info(f"等待 {current_interval} 秒后重试...") time.sleep(current_interval) current_interval = min(current_interval * 1.5, self.MAX_RETRY_INTERVAL) def get_client(self): """ 获取Redis客户端,如果连接断开则自动重连(带重试机制) :return: Redis客户端实例 """ if self._client is None: # 首次连接,使用无限重试直到成功 logger.info("初始化Redis连接...") self._client = self._connect_with_retry(max_retries=None) else: # 检查连接是否有效,如果断开则重新连接(带重试) try: self._client.ping() except (redis.ConnectionError, redis.TimeoutError, AttributeError) as e: # 连接断开,重新创建连接(使用无限重试直到成功) logger.warning(f"Redis连接已断开: {str(e)},开始重新连接...") self._client = None self._client = self._connect_with_retry(max_retries=None) return self._client def redisClient(): """获取Redis客户端单例""" return RedisClientSingleton().get_client()