s3.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. import os
  2. import sys
  3. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  4. from io import BytesIO
  5. import json
  6. import mimetypes
  7. import config
  8. import datetime
  9. from conf.config import logger
  10. import pickle
  11. class S3:
  12. # s3client is minio python SDK client
  13. def __init__(self, bucket='swl', client=config.s3client) -> None:
  14. self.bucket = bucket
  15. self.minio_client = client
  16. # 根据本地时间自动获取对象存储前缀,如 log/2024-04-25/xxx
  17. def get_object_prefix(self, dir='log'):
  18. # 获取时分秒,并且符合路径的格式
  19. return dir + '/' + str(datetime.date.today())
  20. def get_object_name_by_time(self):
  21. # 获取时分秒毫秒,并且符合路径的格式,如 023213_123.json
  22. now = datetime.datetime.now()
  23. # 格式化时间:小时、分钟、秒、毫秒
  24. formatted_time = now.strftime("%H%M%S_%f") # %f 提供了微秒,所以我们取前三个数字作为毫秒
  25. return formatted_time
  26. # def put_dict(self, obj, name_by_time=True):
  27. def put_json(self, obj, object_name='',name_by_time=True):
  28. # object_name 如果为空,则获取 时分秒毫秒,并且符合路径的格式
  29. if name_by_time:
  30. object_name = self.get_object_name_by_time() + object_name
  31. object_name = self.get_object_prefix() + '/' + object_name
  32. if isinstance(obj, dict):
  33. obj = json.dumps(obj).encode()
  34. else:
  35. obj = obj.encode()
  36. data_stream = BytesIO(obj)
  37. # 使用 put_object 上传数据
  38. self.minio_client.put_object(
  39. bucket_name=self.bucket,
  40. data=data_stream,
  41. object_name=object_name,
  42. content_type='application/json',
  43. length=data_stream.getvalue().__len__() # 指定上传文件大小,否则会报错:i:\code\ai-yunying
  44. )
  45. data_stream.close()
  46. logger.info(f'put object {object_name} to s3 success')
  47. return object_name
  48. def fput(self, file_path, object_name='', content_type=''):
  49. # 根据文件名自动获取 object_name
  50. if object_name == '':
  51. object_name = file_path.split('/')[-1]
  52. # 根据文件后缀自动识别 content_type
  53. if content_type == '':
  54. content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
  55. object_name = self.get_object_prefix() + '/' + object_name
  56. # 使用 put_object 上传数据
  57. self.minio_client.fput_object(
  58. bucket_name=self.bucket,
  59. file_path=file_path,
  60. object_name=object_name,
  61. content_type=content_type
  62. )
  63. def get_object(self, object_name):
  64. res = self.minio_client.get_object(bucket_name=self.bucket, object_name=object_name)
  65. return res.json()
  66. def main():
  67. s3 = S3()
  68. test = {"dat": "test", 'ok':[1,2,3,3]}
  69. # s3.fput(r'I:\code\ai-yunying\live-online-people\output\data.json', 'log/20240425/test2.json')
  70. path = s3.put_json(obj=test, object_name='test.json', name_by_time=False)
  71. logger.info(f"{path}")
  72. res = s3.get_object(object_name=path)
  73. logger.info(f"{res}")
  74. if __name__ == "__main__":
  75. main()