| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- import json
- from pathlib import Path
- import mimetypes
- import urllib.parse
- import smart_open
- from smart_open import open
- from botocore.exceptions import NoCredentialsError,ClientError
- import boto3
- import logging
- from botocore.config import Config
- from config.settings import CFG
- s3 = boto3.client(
- 's3',
- aws_access_key_id=CFG.s3_access_key,
- aws_secret_access_key=CFG.s3_secret_key,
- endpoint_url=CFG.s3_endpoint,
- config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
- )
- # resource = boto3.resource('s3')
- def s3_uri_to_http_url(s3_uri):
- """
- 将 s3://bucket/key 格式的 URI 转换为 Minio 的 HTTP 访问链接。
- 适用于公共可读的存储桶。
- """
- if not s3_uri.startswith('s3://'):
- # raise ValueError("Invalid S3 URI. Must start with 's3://'")
- return s3_uri
-
- # 提取 bucket 和 key
- path = s3_uri[5:] # 去除 's3://'
- parts = path.split('/', 1)
- bucket = parts[0]
- key = parts[1] if len(parts) > 1 else ''
-
- # 对 key 进行 URL 编码(保留路径斜杠)
- encoded_key = urllib.parse.quote(key, safe='/')
-
- # 获取并清理 endpoint(确保无末尾斜杠)
- endpoint = CFG.s3_endpoint.rstrip('/')
-
- # 拼接完整 URL
- return f"{endpoint}/{bucket}/{encoded_key}"
- def create_presigned_url_expanded(client_method_name, method_parameters=None,
- expiration=3600, http_method=None):
- """Generate a presigned URL to invoke an S3.Client method
- Not all the client methods provided in the AWS Python SDK are supported.
- :param client_method_name: Name of the S3.Client method, e.g., 'list_buckets'
- :param method_parameters: Dictionary of parameters to send to the method
- :param expiration: Time in seconds for the presigned URL to remain valid
- :param http_method: HTTP method to use (GET, etc.)
- :return: Presigned URL as string. If error, returns None.
- """
- # Generate a presigned URL for the S3 client method
- s3_client = boto3.client('s3')
- try:
- response = s3_client.generate_presigned_url(ClientMethod=client_method_name,
- Params=method_parameters,
- ExpiresIn=expiration,
- HttpMethod=http_method)
- except ClientError as e:
- logging.error(e)
- return None
- # The response contains the presigned URL
- return response
- def get_s3_uri_info(s3_uri:str):
- bucket_name = s3_uri.split('/')[2]
- object_name = '/'.join(s3_uri.split('/')[3:])
- if object_name.endswith('.mhtml'):
- content_type = 'multipart/related'
- else:
- content_type, _ = mimetypes.guess_type(object_name)
- content_type = content_type or 'application/octet-stream'
- upload_args = {
- 'ContentType': content_type,
- }
- return bucket_name, object_name, upload_args
- def upload_to_s3(content, filename:str, **extra_args):
- bucket_name, object_name, upload_args = get_s3_uri_info(filename)
- upload_args.update(extra_args)
- if isinstance(content, dict): # 处理字典类型
- content = json.dumps(content, ensure_ascii=False)
- if isinstance(content, str):
- content = content.encode('utf-8')
- print(bucket_name, object_name)
- s3.put_object(
- Bucket=bucket_name,
- Key=object_name,
- Body=content,
- **upload_args
- )
- return filename
- def upload_file_to_s3(file_path:str,s3_uri:str, **extra_args):
- bucket_name, object_name, upload_args = get_s3_uri_info(s3_uri)
- upload_args.update(extra_args)
- s3.upload_file(
- file_path,
- bucket_name,
- object_name,
- ExtraArgs=upload_args
- )
- def save_to_file(content, filename:Path, **extra_args):
- '''
- save_to_file(
- data,
- 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
- Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
- )
- '''
- if str(filename).startswith('s3://'):
- return upload_to_s3(content, str(filename), **extra_args)
- if isinstance(content, str):
- with open(filename, "w", encoding="utf-8") as file:
- file.write(content)
- else:
- with open(filename, "wb") as file:
- file.write(content)
- return filename
- def read_file(file_uri:str, mode='r'):
- # if str(file_uri).startswith('s3://'):
- # bucket_name, object_name, upload_args = get_s3_uri_info(file_uri)
- # response = s3.get_object(Bucket=bucket_name, Key=object_name)
- # return response['Body'].read()
- with open(file_uri, mode or 'r', transport_params={'client': s3}) as f:
- # 文件存在,继续操作
- return f.read()
- def check_exists(file_uri:str):
- if not file_uri.startswith('s3://'):
- return Path(file_uri).exists()
- bucket_name, object_name, upload_args = get_s3_uri_info(file_uri)
- try:
- s3.head_object(Bucket=bucket_name, Key=object_name)
- return file_uri
- except (FileNotFoundError,OSError,ClientError) as e:
- if e.response['Error']['Code'] == '404':
- return False
- raise e
- def main():
- response = s3.list_buckets()
- # Output the bucket names
- print('Existing buckets:')
- for bucket in response['Buckets']:
- print(f' {bucket["Name"]}')
- if __name__ == "__main__":
- main()
|