file.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import json
  2. from pathlib import Path
  3. import smart_open
  4. from smart_open import open
  5. from botocore.exceptions import NoCredentialsError
  6. import boto3
  7. from botocore.config import Config
  8. from utils.config import CFG
  9. import mimetypes
  10. s3 = boto3.client(
  11. 's3',
  12. aws_access_key_id=CFG.s3_access_key,
  13. aws_secret_access_key=CFG.s3_secret_key,
  14. endpoint_url=CFG.s3_endpoint,
  15. config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
  16. )
  17. resource = boto3.resource('s3')
  18. def upload_to_s3(content, filename:str, **extra_args):
  19. bucket_name = filename.split('/')[2]
  20. object_name = '/'.join(filename.split('/')[3:])
  21. content_type, _ = mimetypes.guess_type(object_name)
  22. content_type = content_type or 'application/octet-stream'
  23. upload_args = {
  24. 'ContentType': content_type,
  25. }
  26. upload_args.update(extra_args)
  27. if isinstance(content, str):
  28. content = content.encode('utf-8')
  29. print(bucket_name, object_name)
  30. s3.put_object(
  31. Bucket=bucket_name,
  32. Key=object_name,
  33. Body=content,
  34. **upload_args
  35. )
  36. return filename
  37. def save_to_file(content, filename:Path, **extra_args):
  38. '''
  39. save_to_file(
  40. data,
  41. 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
  42. Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
  43. )
  44. '''
  45. if str(filename).startswith('s3://'):
  46. return upload_to_s3(content, str(filename), **extra_args)
  47. with open(filename, "w", encoding="utf-8") as file:
  48. file.write(content)
  49. return filename
  50. def read_file(file_uri:str):
  51. with open(file_uri, 'r', transport_params={'client': s3}) as f:
  52. # 文件存在,继续操作
  53. return f.read()
  54. def check_exists(file_uri:str):
  55. try:
  56. with open(file_uri, 'r', transport_params={'client': s3}) as f:
  57. # 文件存在,继续操作
  58. return file_uri
  59. except (FileNotFoundError,OSError):
  60. # 文件不存在,执行相应的操作
  61. return False
  62. def main():
  63. response = s3.list_buckets()
  64. # Output the bucket names
  65. print('Existing buckets:')
  66. for bucket in response['Buckets']:
  67. print(f' {bucket["Name"]}')
  68. if __name__ == "__main__":
  69. main()