file.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import json
  2. from pathlib import Path
  3. import mimetypes
  4. import urllib.parse
  5. import smart_open
  6. from smart_open import open
  7. from botocore.exceptions import NoCredentialsError,ClientError
  8. import boto3
  9. import logging
  10. from botocore.config import Config
  11. from config.settings import CFG
  12. s3 = boto3.client(
  13. 's3',
  14. aws_access_key_id=CFG.s3_access_key,
  15. aws_secret_access_key=CFG.s3_secret_key,
  16. endpoint_url=CFG.s3_endpoint,
  17. config=Config(signature_version='s3v4', retries={'mode': 'standard'}),
  18. )
  19. # resource = boto3.resource('s3')
  20. def s3_uri_to_http_url(s3_uri):
  21. """
  22. 将 s3://bucket/key 格式的 URI 转换为 Minio 的 HTTP 访问链接。
  23. 适用于公共可读的存储桶。
  24. """
  25. if not s3_uri.startswith('s3://'):
  26. # raise ValueError("Invalid S3 URI. Must start with 's3://'")
  27. return s3_uri
  28. # 提取 bucket 和 key
  29. path = s3_uri[5:] # 去除 's3://'
  30. parts = path.split('/', 1)
  31. bucket = parts[0]
  32. key = parts[1] if len(parts) > 1 else ''
  33. # 对 key 进行 URL 编码(保留路径斜杠)
  34. encoded_key = urllib.parse.quote(key, safe='/')
  35. # 获取并清理 endpoint(确保无末尾斜杠)
  36. endpoint = CFG.s3_endpoint.rstrip('/')
  37. # 拼接完整 URL
  38. return f"{endpoint}/{bucket}/{encoded_key}"
  39. def create_presigned_url_expanded(client_method_name, method_parameters=None,
  40. expiration=3600, http_method=None):
  41. """Generate a presigned URL to invoke an S3.Client method
  42. Not all the client methods provided in the AWS Python SDK are supported.
  43. :param client_method_name: Name of the S3.Client method, e.g., 'list_buckets'
  44. :param method_parameters: Dictionary of parameters to send to the method
  45. :param expiration: Time in seconds for the presigned URL to remain valid
  46. :param http_method: HTTP method to use (GET, etc.)
  47. :return: Presigned URL as string. If error, returns None.
  48. """
  49. # Generate a presigned URL for the S3 client method
  50. s3_client = boto3.client('s3')
  51. try:
  52. response = s3_client.generate_presigned_url(ClientMethod=client_method_name,
  53. Params=method_parameters,
  54. ExpiresIn=expiration,
  55. HttpMethod=http_method)
  56. except ClientError as e:
  57. logging.error(e)
  58. return None
  59. # The response contains the presigned URL
  60. return response
  61. def get_s3_uri_info(s3_uri:str):
  62. bucket_name = s3_uri.split('/')[2]
  63. object_name = '/'.join(s3_uri.split('/')[3:])
  64. if object_name.endswith('.mhtml'):
  65. content_type = 'multipart/related'
  66. else:
  67. content_type, _ = mimetypes.guess_type(object_name)
  68. content_type = content_type or 'application/octet-stream'
  69. upload_args = {
  70. 'ContentType': content_type,
  71. }
  72. return bucket_name, object_name, upload_args
  73. def upload_to_s3(content, filename:str, **extra_args):
  74. bucket_name, object_name, upload_args = get_s3_uri_info(filename)
  75. upload_args.update(extra_args)
  76. if isinstance(content, dict): # 处理字典类型
  77. content = json.dumps(content, ensure_ascii=False)
  78. if isinstance(content, str):
  79. content = content.encode('utf-8')
  80. print(bucket_name, object_name)
  81. s3.put_object(
  82. Bucket=bucket_name,
  83. Key=object_name,
  84. Body=content,
  85. **upload_args
  86. )
  87. return filename
  88. def upload_file_to_s3(file_path:str,s3_uri:str, **extra_args):
  89. bucket_name, object_name, upload_args = get_s3_uri_info(s3_uri)
  90. upload_args.update(extra_args)
  91. s3.upload_file(
  92. file_path,
  93. bucket_name,
  94. object_name,
  95. ExtraArgs=upload_args
  96. )
  97. def save_to_file(content, filename:Path, **extra_args):
  98. '''
  99. save_to_file(
  100. data,
  101. 's3://public/amazone/copywriting_production/output/B0CQ1SHD8V.html',
  102. Metadata={'mykey':'myvalue','mykey2':'myvalue2'}
  103. )
  104. '''
  105. if str(filename).startswith('s3://'):
  106. return upload_to_s3(content, str(filename), **extra_args)
  107. if isinstance(content, str):
  108. with open(filename, "w", encoding="utf-8") as file:
  109. file.write(content)
  110. else:
  111. with open(filename, "wb") as file:
  112. file.write(content)
  113. return filename
  114. def read_file(file_uri:str, mode='r'):
  115. # if str(file_uri).startswith('s3://'):
  116. # bucket_name, object_name, upload_args = get_s3_uri_info(file_uri)
  117. # response = s3.get_object(Bucket=bucket_name, Key=object_name)
  118. # return response['Body'].read()
  119. with open(file_uri, mode or 'r', transport_params={'client': s3}) as f:
  120. # 文件存在,继续操作
  121. return f.read()
  122. def check_exists(file_uri:str):
  123. if not file_uri.startswith('s3://'):
  124. return Path(file_uri).exists()
  125. bucket_name, object_name, upload_args = get_s3_uri_info(file_uri)
  126. try:
  127. s3.head_object(Bucket=bucket_name, Key=object_name)
  128. return file_uri
  129. except (FileNotFoundError,OSError,ClientError) as e:
  130. if e.response['Error']['Code'] == '404':
  131. return False
  132. raise e
  133. def main():
  134. response = s3.list_buckets()
  135. # Output the bucket names
  136. print('Existing buckets:')
  137. for bucket in response['Buckets']:
  138. print(f' {bucket["Name"]}')
  139. if __name__ == "__main__":
  140. main()