| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import json
- from pathlib import Path
- import smart_open
- from smart_open import open
- from botocore.exceptions import NoCredentialsError
- import boto3
- from botocore.config import Config
- from utils.config import CFG
- import mimetypes
- s3 = boto3.client(
- 's3',
- aws_access_key_id=CFG.s3_access_key,
- aws_secret_access_key=CFG.s3_secret_key,
- endpoint_url=CFG.s3_endpoint,
- config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
- )
- resource = boto3.resource('s3')
- def upload_to_s3(content, filename:str, **extra_args):
- bucket_name = filename.split('/')[2]
- object_name = '/'.join(filename.split('/')[3:])
- content_type, _ = mimetypes.guess_type(object_name)
- content_type = content_type or 'application/octet-stream'
- upload_args = {
- 'ContentType': content_type,
- }
- upload_args.update(extra_args)
- if isinstance(content, str):
- content = content.encode('utf-8')
- print(bucket_name, object_name)
- s3.put_object(
- Bucket=bucket_name,
- Key=object_name,
- Body=content,
- **upload_args
- )
- return filename
- def save_to_file(content, filename:Path, **extra_args):
- '''
- save_to_file(
- data,
- 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
- Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
- )
- '''
- if str(filename).startswith('s3://'):
- return upload_to_s3(content, str(filename), **extra_args)
- with open(filename, "w", encoding="utf-8") as file:
- file.write(content)
- return filename
- def check_exists(file_uri:str):
- try:
- with open(file_uri, 'r', transport_params={'client': s3}) as f:
- # 文件存在,继续操作
- return file_uri
- except (FileNotFoundError,OSError):
- # 文件不存在,执行相应的操作
- return False
- def main():
- response = s3.list_buckets()
- # Output the bucket names
- print('Existing buckets:')
- for bucket in response['Buckets']:
- print(f' {bucket["Name"]}')
- if __name__ == "__main__":
- main()
|