| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- import json
- from pathlib import Path
- import mimetypes
- import urllib.parse
- import smart_open
- from smart_open import open
- from botocore.exceptions import NoCredentialsError,ClientError
- import boto3
- import logging
- from typing import List, Dict, Any, Optional
- import pandas as pd
- import requests
- import io
- from botocore.config import Config
- from config.settings import CFG
- s3 = boto3.client(
- 's3',
- aws_access_key_id=CFG.s3_access_key,
- aws_secret_access_key=CFG.s3_secret_key,
- endpoint_url=CFG.s3_endpoint,
- config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
- )
- # resource = boto3.resource('s3')
- class S3Client:
- """S3客户端类,封装常用的S3操作"""
-
- def __init__(self, client=None):
- """初始化S3客户端"""
- self.client = client or s3
- self.logger = logging.getLogger(__name__)
-
- def list_s3_files(self, s3_path: str) -> Dict[str, Any]:
- """列出S3指定路径下的所有文件"""
- self.logger.info(f"开始列出S3路径下的文件: {s3_path}")
-
- try:
- # 解析S3路径
- if s3_path.startswith('s3://'):
- # 从s3://bucket/path格式解析
- path_parts = s3_path[5:].split('/', 1)
- bucket_name = path_parts[0]
- prefix = path_parts[1] if len(path_parts) > 1 else ''
- else:
- # 从/path格式解析,使用默认bucket
- bucket_name = 'public'
- prefix = s3_path.lstrip('/')
- # 移除路径开头的'public/'(如果存在),避免重复
- if prefix.startswith('public/'):
- prefix = prefix[7:]
-
- # 确保prefix以/结尾,以便正确列出目录内容
- if prefix and not prefix.endswith('/'):
- prefix += '/'
-
- self.logger.info(f"使用bucket: {bucket_name}, prefix: {prefix}")
-
- # 使用boto3列出文件
- response = self.client.list_objects_v2(
- Bucket=bucket_name,
- Prefix=prefix
- )
-
- files = []
- if 'Contents' in response:
- for obj in response['Contents']:
- # 跳过目录本身(以/结尾的对象)
- if not obj['Key'].endswith('/'):
- files.append({
- 'key': obj['Key'],
- 'size': obj['Size'],
- 'last_modified': obj['LastModified'].isoformat(),
- 'etag': obj['ETag'].strip('"')
- })
-
- self.logger.info(f"找到 {len(files)} 个文件")
- return {
- 'path': s3_path,
- 'bucket': bucket_name,
- 'prefix': prefix,
- 'files': files,
- 'count': len(files)
- }
-
- except Exception as e:
- self.logger.error(f"列出S3文件失败: {e}")
- raise Exception(f"列出S3文件失败: {e}")
-
- def generate_http_urls(self, s3_files_result: Dict[str, Any], base_url: str = "http://s3.vs1.lan") -> List[str]:
- """根据S3文件列表结果生成HTTP URL列表"""
- http_urls = []
-
- for file_info in s3_files_result['files']:
- # 从完整路径中提取文件名
- file_name = file_info['key'].split('/')[-1]
- # 构造完整的 HTTP URL
- http_url = f"{base_url}/{s3_files_result['bucket']}/{s3_files_result['prefix']}{file_name}"
- http_urls.append(http_url)
-
- return http_urls
-
- def list_s3_files_with_urls(self, s3_path: str, base_url: str = "http://s3.vs1.lan") -> Dict[str, Any]:
- """列出S3指定路径下的所有文件并生成HTTP URL列表"""
- # 先获取文件列表
- files_result = self.list_s3_files(s3_path)
- # 生成HTTP URL列表
- http_urls = self.generate_http_urls(files_result, base_url)
-
- # 将HTTP URL添加到结果中
- result = files_result.copy()
- result['http_urls'] = http_urls
-
- return result
- # 创建全局S3客户端实例
- s3_client = S3Client()
- def s3_uri_to_http_url(s3_uri):
- """
- 将 s3://bucket/key 格式的 URI 转换为 Minio 的 HTTP 访问链接。
- 适用于公共可读的存储桶。
- """
- if not s3_uri.startswith('s3://'):
- # raise ValueError("Invalid S3 URI. Must start with 's3://'")
- return s3_uri
-
- # 提取 bucket 和 key
- path = s3_uri[5:] # 去除 's3://'
- parts = path.split('/', 1)
- bucket = parts[0]
- key = parts[1] if len(parts) > 1 else ''
-
- # 对 key 进行 URL 编码(保留路径斜杠)
- encoded_key = urllib.parse.quote(key, safe='/')
-
- # 获取并清理 endpoint(确保无末尾斜杠)
- endpoint = CFG.s3_endpoint.rstrip('/')
-
- # 拼接完整 URL
- return f"{endpoint}/{bucket}/{encoded_key}"
- def create_presigned_url_expanded(client_method_name, method_parameters=None,
- expiration=3600, http_method=None):
- """Generate a presigned URL to invoke an S3.Client method
- Not all the client methods provided in the AWS Python SDK are supported.
- :param client_method_name: Name of the S3.Client method, e.g., 'list_buckets'
- :param method_parameters: Dictionary of parameters to send to the method
- :param expiration: Time in seconds for the presigned URL to remain valid
- :param http_method: HTTP method to use (GET, etc.)
- :return: Presigned URL as string. If error, returns None.
- """
- # Generate a presigned URL for the S3 client method
- s3_client = boto3.client('s3')
- try:
- response = s3_client.generate_presigned_url(ClientMethod=client_method_name,
- Params=method_parameters,
- ExpiresIn=expiration,
- HttpMethod=http_method)
- except ClientError as e:
- logging.error(e)
- return None
- # The response contains the presigned URL
- return response
- def get_s3_uri_info(s3_uri:str):
- bucket_name = s3_uri.split('/')[2]
- object_name = '/'.join(s3_uri.split('/')[3:])
- if object_name.endswith('.mhtml'):
- content_type = 'multipart/related'
- else:
- content_type, _ = mimetypes.guess_type(object_name)
- content_type = content_type or 'application/octet-stream'
- upload_args = {
- 'ContentType': content_type,
- }
- return bucket_name, object_name, upload_args
- def upload_to_s3(content, filename:str, **extra_args):
- bucket_name, object_name, upload_args = get_s3_uri_info(filename)
- upload_args.update(extra_args)
- if isinstance(content, dict): # 处理字典类型
- content = json.dumps(content, ensure_ascii=False)
- if isinstance(content, str):
- content = content.encode('utf-8')
- print(bucket_name, object_name)
- s3.put_object(
- Bucket=bucket_name,
- Key=object_name,
- Body=content,
- **upload_args
- )
- return filename
- def upload_file_to_s3(file_path:str,s3_uri:str, **extra_args):
- bucket_name, object_name, upload_args = get_s3_uri_info(s3_uri)
- upload_args.update(extra_args)
- s3.upload_file(
- file_path,
- bucket_name,
- object_name,
- ExtraArgs=upload_args
- )
- def save_to_file(content, filename:Path, **extra_args):
- '''
- save_to_file(
- data,
- 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
- Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
- )
- '''
- if str(filename).startswith('s3://'):
- return upload_to_s3(content, str(filename), **extra_args)
- if isinstance(content, str):
- with open(filename, "w", encoding="utf-8") as file:
- file.write(content)
- else:
- with open(filename, "wb") as file:
- file.write(content)
- return filename
- def read_file(file_uri:str, mode='r'):
- if str(file_uri).startswith('s3://'):
- with open(file_uri, mode or 'r', transport_params={'client': s3}) as f:
- # 文件存在,继续操作
- return f.read()
- else:
- with open(file_uri, mode) as f:
- return f.read()
- def check_exists(file_uri:str):
- if not file_uri.startswith('s3://'):
- return Path(file_uri).exists()
- bucket_name, object_name, upload_args = get_s3_uri_info(file_uri)
- try:
- s3.head_object(Bucket=bucket_name, Key=object_name)
- return file_uri
- except (FileNotFoundError,OSError,ClientError) as e:
- if e.response['Error']['Code'] == '404':
- return False
- raise e
- def delete_s3_file(s3_uri:str):
- """删除S3文件"""
- if not s3_uri or not s3_uri.startswith('s3://'):
- return False
-
- try:
- bucket_name, object_name, _ = get_s3_uri_info(s3_uri)
- s3.delete_object(Bucket=bucket_name, Key=object_name)
- return True
- except (FileNotFoundError,OSError,ClientError) as e:
- logging.error(f"删除S3文件失败: {s3_uri}, 错误: {e}")
- return False
- def read_excel_from_url(url):
- """
- 使用 pandas 从 URL 读取 Excel 文件
-
- Args:
- url (str): Excel 文件的 URL
-
- Returns:
- dict: 包含所有工作表数据的字典
- """
- try:
- # 下载文件内容
- response = requests.get(url)
- response.raise_for_status()
-
- # 使用 io.BytesIO 将内容转换为文件对象
- file_content = io.BytesIO(response.content)
-
- # 使用 pandas 读取 Excel 文件
- # 读取所有工作表,设置header=None以保留第一行作为数据而不是标题
- excel_data = pd.read_excel(file_content, sheet_name=None, header=None)
-
- return excel_data
-
- except Exception as e:
- print(f"读取 Excel 文件时发生错误: {e}")
- return None
- def get_all_cells_text(excel_data):
- """
- 从 Excel 数据中提取所有单元格的文本内容,包括空值,保持原始结构
-
- Args:
- excel_data (dict): 包含所有工作表数据的字典
-
- Returns:
- dict: 按字典顺序排列的所有工作表的文本内容字典
- """
- if not excel_data:
- return {}
-
- result = {}
-
- # 按照字典顺序处理所有工作表
- for sheet_name in sorted(excel_data.keys()):
- df = excel_data[sheet_name]
- sheet_text = []
-
- # 遍历所有行和列,包括空值
- for idx, row in df.iterrows():
- row_text = []
- for value in row:
- if pd.isna(value):
- row_text.append("")
- else:
- row_text.append(str(value))
-
- # 用制表符分隔每列,保持表格结构
- sheet_text.append("\t".join(row_text))
-
- # 将当前工作表的文本内容添加到结果字典中
- result[sheet_name] = "\n".join(sheet_text)
-
- return result
- def extract_excel_text_from_url(url):
- """
- 从 URL 读取 Excel 文件并提取所有单元格的文本内容
-
- Args:
- url (str): Excel 文件的 URL
-
- Returns:
- dict: 按字典顺序排列的所有工作表的文本内容字典
- """
- # 读取 Excel 文件
- excel_data = read_excel_from_url(url)
-
- if excel_data:
- # 提取所有单元格内容(包括空值)
- return get_all_cells_text(excel_data)
- else:
- return {}
- def main():
- response = s3.list_buckets()
- # Output the bucket names
- print('Existing buckets:')
- for bucket in response['Buckets']:
- print(f' {bucket["Name"]}')
- if __name__ == "__main__":
- main()
|