import os from minio import Minio from minio.error import S3Error from auto_sliceing_operate.utils.logs import log from utils.config import cfg # 全局变量:存储桶名称(从配置文件读取) _bucket_name = None def get_bucket_name(): """获取存储桶名称(从配置文件读取) Returns: str: 存储桶名称 """ global _bucket_name if _bucket_name is None: # 优先从配置文件读取,如果没有则从环境变量读取,最后使用默认值 _bucket_name = cfg('minio.bucket', None) log(f"MiniIO 存储桶名称: {_bucket_name}") return _bucket_name # MiniIO 客户端单例模式 class MiniIOClientSingleton: _instance = None _client = None def __new__(cls): if cls._instance is None: cls._instance = super(MiniIOClientSingleton, cls).__new__(cls) return cls._instance def get_client(self): if self._client is None: # MiniIO 连接配置 # 可以从环境变量或配置文件中读取 endpoint = cfg('minio.endpoint', 'localhost:9000') access_key = cfg('minio.access_key', 'minioadmin') secret_key = cfg('minio.secret_key', 'minioadmin') # secure 参数:True 使用 HTTPS,False 使用 HTTP # 从配置文件读取,如果没有配置则默认为 False(HTTP) secure_config = cfg('minio.secure', False) # 确保是布尔值(处理字符串 'true'/'false' 的情况) if isinstance(secure_config, str): secure = secure_config.lower() in ('true', '1', 'yes') else: secure = bool(secure_config) try: self._client = Minio( endpoint, access_key=access_key, secret_key=secret_key, secure=secure ) log(f"MiniIO 客户端初始化成功: {endpoint} (secure={secure})") except Exception as e: error_msg = str(e) # 检查是否是端口错误 if "API port" in error_msg or "InvalidArgument" in error_msg: log(f"MiniIO 连接失败: 请确保使用的是 API 端口(通常是 9000),而不是 Console 端口(通常是 9001)") log(f"当前配置的 endpoint: {endpoint}") log(f"错误详情: {error_msg}") else: log(f"MiniIO 客户端初始化失败: {error_msg}") raise return self._client def miniIOClient(): """获取 MiniIO 客户端单例""" return MiniIOClientSingleton().get_client() def upload_file(object_name, file_path, bucket_name=None): """ 上传文件到 MiniIO Args: object_name: 对象名称(在 MiniIO 中的路径) file_path: 本地文件路径 bucket_name: 存储桶名称(可选,如果不提供则使用全局配置) Returns: bool: 上传成功返回 True,失败返回 False """ try: if bucket_name is None: bucket_name = get_bucket_name() client = miniIOClient() # 检查存储桶是否存在,不存在则创建 if not client.bucket_exists(bucket_name): client.make_bucket(bucket_name) log(f"创建存储桶: {bucket_name}") # 上传文件 log(f"开始上传文件: {file_path} -> {bucket_name}/{object_name}") client.fput_object(bucket_name, object_name, file_path) log(f"文件上传成功: {bucket_name}/{object_name}") return True except S3Error as e: log(f"上传文件失败 (S3Error): {str(e)}") return False except Exception as e: log(f"上传文件失败: {str(e)}") return False def download_file(object_name, file_path, bucket_name=None): """ 从 MiniIO 下载文件到本地 Args: object_name: 对象名称(在 MiniIO 中的路径) file_path: 本地保存路径 bucket_name: 存储桶名称(可选,如果不提供则使用全局配置) Returns: bool: 下载成功返回 True,失败返回 False """ try: if bucket_name is None: bucket_name = get_bucket_name() client = miniIOClient() # 检查文件是否存在 if not check_file_exists(object_name, bucket_name): log(f"文件不存在: {bucket_name}/{object_name}") return False # 确保本地目录存在 local_dir = os.path.dirname(file_path) if local_dir and not os.path.exists(local_dir): os.makedirs(local_dir, exist_ok=True) # 下载文件 log(f"开始下载文件: {bucket_name}/{object_name} -> {file_path}") client.fget_object(bucket_name, object_name, file_path) # 验证文件是否下载成功 if os.path.exists(file_path): file_size = os.path.getsize(file_path) log(f"文件下载成功: {file_path} (大小: {file_size} 字节)") return True else: log("文件下载失败: 本地文件不存在") return False except S3Error as e: log(f"下载文件失败 (S3Error): {str(e)}") return False except Exception as e: log(f"下载文件失败: {str(e)}") return False def delete_file(object_name, bucket_name=None): """ 从 MiniIO 删除文件 Args: object_name: 对象名称(在 MiniIO 中的路径) bucket_name: 存储桶名称(可选,如果不提供则使用全局配置) Returns: bool: 删除成功返回 True,失败返回 False """ try: if bucket_name is None: bucket_name = get_bucket_name() client = miniIOClient() # 检查文件是否存在 if not check_file_exists(object_name, bucket_name): log(f"文件不存在,无需删除: {bucket_name}/{object_name}") return True # 删除文件 log(f"开始删除文件: {bucket_name}/{object_name}") client.remove_object(bucket_name, object_name) log(f"文件删除成功: {bucket_name}/{object_name}") return True except S3Error as e: log(f"删除文件失败 (S3Error): {str(e)}") return False except Exception as e: log(f"删除文件失败: {str(e)}") return False def check_file_exists(object_name, bucket_name=None): """ 检查文件是否存在于 MiniIO Args: object_name: 对象名称(在 MiniIO 中的路径) bucket_name: 存储桶名称(可选,如果不提供则使用全局配置) Returns: bool: 文件存在返回 True,不存在返回 False """ try: if bucket_name is None: bucket_name = get_bucket_name() client = miniIOClient() # 检查存储桶是否存在 if not client.bucket_exists(bucket_name): log(f"存储桶不存在: {bucket_name}") return False # 检查对象是否存在 try: client.stat_object(bucket_name, object_name) return True except S3Error as e: if e.code == 'NoSuchKey': return False else: log(f"检查文件存在性时出错: {str(e)}") return False except S3Error as e: log(f"检查文件存在性失败 (S3Error): {str(e)}") return False except Exception as e: log(f"检查文件存在性失败: {str(e)}") return False