import json from pathlib import Path import mimetypes import urllib.parse import smart_open from smart_open import open from botocore.exceptions import NoCredentialsError,ClientError import boto3 import logging from botocore.config import Config from config.settings import CFG s3 = boto3.client( 's3', aws_access_key_id=CFG.s3_access_key, aws_secret_access_key=CFG.s3_secret_key, endpoint_url=CFG.s3_endpoint, config=Config(signature_version='s3v4', retries={'mode': 'standard'}), ) # resource = boto3.resource('s3') def s3_uri_to_http_url(s3_uri): """ 将 s3://bucket/key 格式的 URI 转换为 Minio 的 HTTP 访问链接。 适用于公共可读的存储桶。 """ if not s3_uri.startswith('s3://'): # raise ValueError("Invalid S3 URI. Must start with 's3://'") return s3_uri # 提取 bucket 和 key path = s3_uri[5:] # 去除 's3://' parts = path.split('/', 1) bucket = parts[0] key = parts[1] if len(parts) > 1 else '' # 对 key 进行 URL 编码(保留路径斜杠) encoded_key = urllib.parse.quote(key, safe='/') # 获取并清理 endpoint(确保无末尾斜杠) endpoint = CFG.s3_endpoint.rstrip('/') # 拼接完整 URL return f"{endpoint}/{bucket}/{encoded_key}" def create_presigned_url_expanded(client_method_name, method_parameters=None, expiration=3600, http_method=None): """Generate a presigned URL to invoke an S3.Client method Not all the client methods provided in the AWS Python SDK are supported. :param client_method_name: Name of the S3.Client method, e.g., 'list_buckets' :param method_parameters: Dictionary of parameters to send to the method :param expiration: Time in seconds for the presigned URL to remain valid :param http_method: HTTP method to use (GET, etc.) :return: Presigned URL as string. If error, returns None. """ # Generate a presigned URL for the S3 client method s3_client = boto3.client('s3') try: response = s3_client.generate_presigned_url(ClientMethod=client_method_name, Params=method_parameters, ExpiresIn=expiration, HttpMethod=http_method) except ClientError as e: logging.error(e) return None # The response contains the presigned URL return response def get_s3_uri_info(s3_uri:str): bucket_name = s3_uri.split('/')[2] object_name = '/'.join(s3_uri.split('/')[3:]) if object_name.endswith('.mhtml'): content_type = 'multipart/related' else: content_type, _ = mimetypes.guess_type(object_name) content_type = content_type or 'application/octet-stream' upload_args = { 'ContentType': content_type, } return bucket_name, object_name, upload_args def upload_to_s3(content, filename:str, **extra_args): bucket_name, object_name, upload_args = get_s3_uri_info(filename) upload_args.update(extra_args) if isinstance(content, dict): # 处理字典类型 content = json.dumps(content, ensure_ascii=False) if isinstance(content, str): content = content.encode('utf-8') print(bucket_name, object_name) s3.put_object( Bucket=bucket_name, Key=object_name, Body=content, **upload_args ) return filename def upload_file_to_s3(file_path:str,s3_uri:str, **extra_args): bucket_name, object_name, upload_args = get_s3_uri_info(s3_uri) upload_args.update(extra_args) s3.upload_file( file_path, bucket_name, object_name, ExtraArgs=upload_args ) def save_to_file(content, filename:Path, **extra_args): ''' save_to_file( data, 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html', Metadata={'mykey':'myvalue','mykey2':'myvalue2'} ) ''' if str(filename).startswith('s3://'): return upload_to_s3(content, str(filename), **extra_args) if isinstance(content, str): with open(filename, "w", encoding="utf-8") as file: file.write(content) else: with open(filename, "wb") as file: file.write(content) return filename def read_file(file_uri:str, mode='r'): # if str(file_uri).startswith('s3://'): # bucket_name, object_name, upload_args = get_s3_uri_info(file_uri) # response = s3.get_object(Bucket=bucket_name, Key=object_name) # return response['Body'].read() with open(file_uri, mode or 'r', transport_params={'client': s3}) as f: # 文件存在,继续操作 return f.read() def check_exists(file_uri:str): if not file_uri.startswith('s3://'): return Path(file_uri).exists() bucket_name, object_name, upload_args = get_s3_uri_info(file_uri) try: s3.head_object(Bucket=bucket_name, Key=object_name) return file_uri except (FileNotFoundError,OSError,ClientError) as e: if e.response['Error']['Code'] == '404': return False raise e def main(): response = s3.list_buckets() # Output the bucket names print('Existing buckets:') for bucket in response['Buckets']: print(f' {bucket["Name"]}') if __name__ == "__main__": main()