file.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import json
  2. from pathlib import Path
  3. import mimetypes
  4. import urllib.parse
  5. import smart_open
  6. from smart_open import open
  7. from botocore.exceptions import NoCredentialsError
  8. import boto3
  9. import logging
  10. from botocore.exceptions import ClientError
  11. from botocore.config import Config
  12. from config.settings import CFG
  13. s3 = boto3.client(
  14. 's3',
  15. aws_access_key_id=CFG.s3_access_key,
  16. aws_secret_access_key=CFG.s3_secret_key,
  17. endpoint_url=CFG.s3_endpoint,
  18. config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
  19. )
  20. resource = boto3.resource('s3')
  21. def s3_uri_to_http_url(s3_uri):
  22. """
  23. 将 s3://bucket/key 格式的 URI 转换为 Minio 的 HTTP 访问链接。
  24. 适用于公共可读的存储桶。
  25. """
  26. if not s3_uri.startswith('s3://'):
  27. raise ValueError("Invalid S3 URI. Must start with 's3://'")
  28. # 提取 bucket 和 key
  29. path = s3_uri[5:] # 去除 's3://'
  30. parts = path.split('/', 1)
  31. bucket = parts[0]
  32. key = parts[1] if len(parts) > 1 else ''
  33. # 对 key 进行 URL 编码(保留路径斜杠)
  34. encoded_key = urllib.parse.quote(key, safe='/')
  35. # 获取并清理 endpoint(确保无末尾斜杠)
  36. endpoint = CFG.s3_endpoint.rstrip('/')
  37. # 拼接完整 URL
  38. return f"{endpoint}/{bucket}/{encoded_key}"
  39. def create_presigned_url_expanded(client_method_name, method_parameters=None,
  40. expiration=3600, http_method=None):
  41. """Generate a presigned URL to invoke an S3.Client method
  42. Not all the client methods provided in the AWS Python SDK are supported.
  43. :param client_method_name: Name of the S3.Client method, e.g., 'list_buckets'
  44. :param method_parameters: Dictionary of parameters to send to the method
  45. :param expiration: Time in seconds for the presigned URL to remain valid
  46. :param http_method: HTTP method to use (GET, etc.)
  47. :return: Presigned URL as string. If error, returns None.
  48. """
  49. # Generate a presigned URL for the S3 client method
  50. s3_client = boto3.client('s3')
  51. try:
  52. response = s3_client.generate_presigned_url(ClientMethod=client_method_name,
  53. Params=method_parameters,
  54. ExpiresIn=expiration,
  55. HttpMethod=http_method)
  56. except ClientError as e:
  57. logging.error(e)
  58. return None
  59. # The response contains the presigned URL
  60. return response
  61. def upload_to_s3(content, filename:str, **extra_args):
  62. bucket_name = filename.split('/')[2]
  63. object_name = '/'.join(filename.split('/')[3:])
  64. if object_name.endswith('.mhtml'):
  65. content_type = 'multipart/related'
  66. else:
  67. content_type, _ = mimetypes.guess_type(object_name)
  68. content_type = content_type or 'application/octet-stream'
  69. upload_args = {
  70. 'ContentType': content_type,
  71. }
  72. upload_args.update(extra_args)
  73. if isinstance(content, str):
  74. content = content.encode('utf-8')
  75. print(bucket_name, object_name)
  76. s3.put_object(
  77. Bucket=bucket_name,
  78. Key=object_name,
  79. Body=content,
  80. **upload_args
  81. )
  82. return filename
  83. def save_to_file(content, filename:Path, **extra_args):
  84. '''
  85. save_to_file(
  86. data,
  87. 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
  88. Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
  89. )
  90. '''
  91. if str(filename).startswith('s3://'):
  92. return upload_to_s3(content, str(filename), **extra_args)
  93. with open(filename, "w", encoding="utf-8") as file:
  94. file.write(content)
  95. return filename
  96. def read_file(file_uri:str):
  97. with open(file_uri, 'r', transport_params={'client': s3}) as f:
  98. # 文件存在,继续操作
  99. return f.read()
  100. def check_exists(file_uri:str):
  101. try:
  102. with open(file_uri, 'r', transport_params={'client': s3}) as f:
  103. # 文件存在,继续操作
  104. return file_uri
  105. except (FileNotFoundError,OSError):
  106. # 文件不存在,执行相应的操作
  107. return False
  108. def main():
  109. response = s3.list_buckets()
  110. # Output the bucket names
  111. print('Existing buckets:')
  112. for bucket in response['Buckets']:
  113. print(f' {bucket["Name"]}')
  114. if __name__ == "__main__":
  115. main()