|
|
@@ -7,6 +7,7 @@ from smart_open import open
|
|
|
from botocore.exceptions import NoCredentialsError,ClientError
|
|
|
import boto3
|
|
|
import logging
|
|
|
+from typing import List, Dict, Any, Optional
|
|
|
|
|
|
from botocore.config import Config
|
|
|
from config.settings import CFG
|
|
|
@@ -20,6 +21,99 @@ s3 = boto3.client(
|
|
|
)
|
|
|
# resource = boto3.resource('s3')
|
|
|
|
|
|
+class S3Client:
|
|
|
+ """S3客户端类,封装常用的S3操作"""
|
|
|
+
|
|
|
+ def __init__(self, client=None):
|
|
|
+ """初始化S3客户端"""
|
|
|
+ self.client = client or s3
|
|
|
+ self.logger = logging.getLogger(__name__)
|
|
|
+
|
|
|
+ def list_s3_files(self, s3_path: str) -> Dict[str, Any]:
|
|
|
+ """列出S3指定路径下的所有文件"""
|
|
|
+ self.logger.info(f"开始列出S3路径下的文件: {s3_path}")
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 解析S3路径
|
|
|
+ if s3_path.startswith('s3://'):
|
|
|
+ # 从s3://bucket/path格式解析
|
|
|
+ path_parts = s3_path[5:].split('/', 1)
|
|
|
+ bucket_name = path_parts[0]
|
|
|
+ prefix = path_parts[1] if len(path_parts) > 1 else ''
|
|
|
+ else:
|
|
|
+ # 从/path格式解析,使用默认bucket
|
|
|
+ bucket_name = 'public'
|
|
|
+ prefix = s3_path.lstrip('/')
|
|
|
+ # 移除路径开头的'public/'(如果存在),避免重复
|
|
|
+ if prefix.startswith('public/'):
|
|
|
+ prefix = prefix[7:]
|
|
|
+
|
|
|
+ # 确保prefix以/结尾,以便正确列出目录内容
|
|
|
+ if prefix and not prefix.endswith('/'):
|
|
|
+ prefix += '/'
|
|
|
+
|
|
|
+ self.logger.info(f"使用bucket: {bucket_name}, prefix: {prefix}")
|
|
|
+
|
|
|
+ # 使用boto3列出文件
|
|
|
+ response = self.client.list_objects_v2(
|
|
|
+ Bucket=bucket_name,
|
|
|
+ Prefix=prefix
|
|
|
+ )
|
|
|
+
|
|
|
+ files = []
|
|
|
+ if 'Contents' in response:
|
|
|
+ for obj in response['Contents']:
|
|
|
+ # 跳过目录本身(以/结尾的对象)
|
|
|
+ if not obj['Key'].endswith('/'):
|
|
|
+ files.append({
|
|
|
+ 'key': obj['Key'],
|
|
|
+ 'size': obj['Size'],
|
|
|
+ 'last_modified': obj['LastModified'].isoformat(),
|
|
|
+ 'etag': obj['ETag'].strip('"')
|
|
|
+ })
|
|
|
+
|
|
|
+ self.logger.info(f"找到 {len(files)} 个文件")
|
|
|
+ return {
|
|
|
+ 'path': s3_path,
|
|
|
+ 'bucket': bucket_name,
|
|
|
+ 'prefix': prefix,
|
|
|
+ 'files': files,
|
|
|
+ 'count': len(files)
|
|
|
+ }
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ self.logger.error(f"列出S3文件失败: {e}")
|
|
|
+ raise Exception(f"列出S3文件失败: {e}")
|
|
|
+
|
|
|
+ def generate_http_urls(self, s3_files_result: Dict[str, Any], base_url: str = "http://s3.vs1.lan") -> List[str]:
|
|
|
+ """根据S3文件列表结果生成HTTP URL列表"""
|
|
|
+ http_urls = []
|
|
|
+
|
|
|
+ for file_info in s3_files_result['files']:
|
|
|
+ # 从完整路径中提取文件名
|
|
|
+ file_name = file_info['key'].split('/')[-1]
|
|
|
+ # 构造完整的 HTTP URL
|
|
|
+ http_url = f"{base_url}/{s3_files_result['bucket']}/{s3_files_result['prefix']}{file_name}"
|
|
|
+ http_urls.append(http_url)
|
|
|
+
|
|
|
+ return http_urls
|
|
|
+
|
|
|
+ def list_s3_files_with_urls(self, s3_path: str, base_url: str = "http://s3.vs1.lan") -> Dict[str, Any]:
|
|
|
+ """列出S3指定路径下的所有文件并生成HTTP URL列表"""
|
|
|
+ # 先获取文件列表
|
|
|
+ files_result = self.list_s3_files(s3_path)
|
|
|
+ # 生成HTTP URL列表
|
|
|
+ http_urls = self.generate_http_urls(files_result, base_url)
|
|
|
+
|
|
|
+ # 将HTTP URL添加到结果中
|
|
|
+ result = files_result.copy()
|
|
|
+ result['http_urls'] = http_urls
|
|
|
+
|
|
|
+ return result
|
|
|
+
|
|
|
+# 创建全局S3客户端实例
|
|
|
+s3_client = S3Client()
|
|
|
+
|
|
|
def s3_uri_to_http_url(s3_uri):
|
|
|
"""
|
|
|
将 s3://bucket/key 格式的 URI 转换为 Minio 的 HTTP 访问链接。
|
|
|
@@ -128,12 +222,13 @@ def save_to_file(content, filename:Path, **extra_args):
|
|
|
return filename
|
|
|
|
|
|
def read_file(file_uri:str, mode='r'):
|
|
|
- if not str(file_uri).startswith('http'):
|
|
|
+ if str(file_uri).startswith('s3://'):
|
|
|
+ with open(file_uri, mode or 'r', transport_params={'client': s3}) as f:
|
|
|
+ # 文件存在,继续操作
|
|
|
+ return f.read()
|
|
|
+ else:
|
|
|
with open(file_uri, mode) as f:
|
|
|
return f.read()
|
|
|
- with open(file_uri, mode or 'r', transport_params={'client': s3}) as f:
|
|
|
- # 文件存在,继续操作
|
|
|
- return f.read()
|
|
|
|
|
|
def check_exists(file_uri:str):
|
|
|
if not file_uri.startswith('s3://'):
|