file.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import json
  2. from pathlib import Path
  3. import smart_open
  4. from smart_open import open
  5. from botocore.exceptions import NoCredentialsError
  6. import boto3
  7. from botocore.config import Config
  8. from utils.config import CFG
  9. import mimetypes
  10. s3 = boto3.client(
  11. 's3',
  12. aws_access_key_id=CFG.s3_access_key,
  13. aws_secret_access_key=CFG.s3_secret_key,
  14. endpoint_url=CFG.s3_endpoint,
  15. config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
  16. )
  17. resource = boto3.resource('s3')
  18. def upload_to_s3(content, filename:str, **extra_args):
  19. bucket_name = filename.split('/')[2]
  20. object_name = '/'.join(filename.split('/')[3:])
  21. content_type, _ = mimetypes.guess_type(object_name)
  22. content_type = content_type or 'application/octet-stream'
  23. upload_args = {
  24. 'ContentType': content_type,
  25. }
  26. upload_args.update(extra_args)
  27. if isinstance(content, str):
  28. content = content.encode('utf-8')
  29. print(bucket_name, object_name)
  30. s3.put_object(
  31. Bucket=bucket_name,
  32. Key=object_name,
  33. Body=content,
  34. **upload_args
  35. )
  36. return filename
  37. def save_to_file(content, filename:Path, **extra_args):
  38. '''
  39. save_to_file(
  40. data,
  41. 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
  42. Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
  43. )
  44. '''
  45. if str(filename).startswith('s3://'):
  46. return upload_to_s3(content, str(filename), **extra_args)
  47. with open(filename, "w", encoding="utf-8") as file:
  48. file.write(content)
  49. return filename
  50. def check_exists(file_uri:str):
  51. try:
  52. with open(file_uri, 'r', transport_params={'client': s3}) as f:
  53. # 文件存在,继续操作
  54. return file_uri
  55. except (FileNotFoundError,OSError):
  56. # 文件不存在,执行相应的操作
  57. return False
  58. def main():
  59. response = s3.list_buckets()
  60. # Output the bucket names
  61. print('Existing buckets:')
  62. for bucket in response['Buckets']:
  63. print(f' {bucket["Name"]}')
  64. if __name__ == "__main__":
  65. main()