s3.py 880 B

123456789101112131415161718192021222324252627
  1. import os
  2. from minio import Minio
  3. from .files import FileStore
  4. AWS_S3_ENDPOINT = 's3.amazonaws.com'
  5. class S3FileStore(FileStore):
  6. def __init__(self, endpoint: str = AWS_S3_ENDPOINT) -> None:
  7. access_key = os.getenv('AWS_ACCESS_KEY_ID')
  8. secret_key = os.getenv('AWS_SECRET_ACCESS_KEY')
  9. self.bucket = os.getenv('AWS_S3_BUCKET')
  10. self.client = Minio(endpoint, access_key, secret_key)
  11. def write(self, path: str, contents: str) -> None:
  12. self.client.put_object(self.bucket, path, contents)
  13. def read(self, path: str) -> str:
  14. return self.client.get_object(self.bucket, path).data.decode('utf-8')
  15. def list(self, path: str) -> list[str]:
  16. return [obj.object_name for obj in self.client.list_objects(self.bucket, path)]
  17. def delete(self, path: str) -> None:
  18. self.client.remove_object(self.bucket, path)