s3.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import os
  2. import sys
  3. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  4. from typing import List, Dict, Any, Optional,Tuple
  5. from io import BytesIO
  6. import json
  7. import mimetypes
  8. import datetime
  9. from conf.config import logger
  10. import pickle
  11. from database.config import minio_block,minio_client
  12. from minio import Minio
  13. from pathlib import Path
  14. from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
  15. import s3fs
  16. def get_client():
  17. s3 = s3fs.S3FileSystem(
  18. key='miniokey...',
  19. secret='asecretkey...',
  20. endpoint_url='https://...'
  21. )
  22. class S3Object(SQLModel,table=False):
  23. type:Optional[str]= Field(default=None)
  24. path:Optional[str] = Field(default=None)
  25. len:Optional[int] = Field(default=0)
  26. def put(self, obj:Any, path=None):
  27. self.type = type(obj).__name__
  28. data = pickle.dumps(obj)
  29. self.len = len(data)
  30. if path is None:
  31. path = self.path
  32. self.path = minio_block.write_path(path, data) # 假设这个函数返回数据的存储路径
  33. return self
  34. def get(self):
  35. path = self.path
  36. if path.startswith(minio_block.basepath):
  37. path = path[len(minio_block.basepath):]
  38. data = minio_block.read_path(path)
  39. return pickle.loads(data)
  40. def info(self):
  41. return vars(self)
  42. def put_object(self, obj:Any, path=None):
  43. self.client.put_object(bucket_name='swl', data=obj, object_name=path)
  44. try:
  45. obj_bytes = json.dumps(obj)
  46. except TypeError as e:
  47. print(e)
  48. res = pickle.dumps(('123', '234'))
  49. print("pickle res", res)
  50. return self.path
  51. def get_object(self, path=None):
  52. if path is None:
  53. path = self.path
  54. return minio_block.read_path(path)
  55. class S3:
  56. # s3client is minio python SDK client
  57. def __init__(self, bucket='swl', client=None) -> None:
  58. self.bucket = bucket
  59. self.minio_client:Minio = client
  60. # 根据本地时间自动获取对象存储前缀,如 log/2024-04-25/xxx
  61. def get_object_prefix(self, dir='log'):
  62. # 获取时分秒,并且符合路径的格式
  63. return dir + '/' + str(datetime.date.today())
  64. def get_object_name_by_time(self):
  65. # 获取时分秒毫秒,并且符合路径的格式,如 023213_123.json
  66. now = datetime.datetime.now()
  67. # 格式化时间:小时、分钟、秒、毫秒
  68. formatted_time = now.strftime("%y%m%d-%H%M%S_%f") # %f 提供了微秒,所以我们取前三个数字作为毫秒
  69. return formatted_time
  70. # def put_dict(self, obj, name_by_time=True):
  71. def put_json(self, obj, object_name='',name_by_time=True):
  72. # object_name 如果为空,则获取 时分秒毫秒,并且符合路径的格式
  73. if name_by_time:
  74. object_name = self.get_object_name_by_time() + object_name
  75. object_name = self.get_object_prefix() + '/' + object_name
  76. if isinstance(obj, dict):
  77. obj = json.dumps(obj).encode()
  78. else:
  79. obj = obj.encode()
  80. data_stream = BytesIO(obj)
  81. # 使用 put_object 上传数据
  82. self.minio_client.put_object(
  83. bucket_name=self.bucket,
  84. data=data_stream,
  85. object_name=object_name,
  86. content_type='application/json',
  87. length=data_stream.getvalue().__len__() # 指定上传文件大小,否则会报错:i:\code\ai-yunying
  88. )
  89. data_stream.close()
  90. logger.info(f'put object {object_name} to s3 success')
  91. return object_name
  92. def put(self, obj:Any, object_name='', content_type='', name_by_time=True):
  93. file_path = object_name.split('/')[-1]
  94. if content_type == '':
  95. content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
  96. if isinstance(obj, str):
  97. obj=obj.encode()
  98. # content_type = 'application/txt'
  99. else :
  100. try:
  101. obj = json.dumps(obj)
  102. # content_type = 'application/json'
  103. except TypeError as e:
  104. obj = pickle.dumps(obj)
  105. # content_type = 'application/pickle'
  106. object_name = self.get_object_prefix() + '/' + object_name
  107. data_stream = BytesIO(obj)
  108. return self.minio_client.put_object(
  109. bucket_name=self.bucket,
  110. data=data_stream,
  111. length=data_stream.getvalue().__len__(),
  112. object_name=object_name,
  113. content_type=content_type
  114. )
  115. def fput(self, file_path, object_name='', content_type=''):
  116. # 根据文件名自动获取 object_name
  117. if object_name == '':
  118. object_name = file_path.split('/')[-1]
  119. # 根据文件后缀自动识别 content_type
  120. if content_type == '':
  121. content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
  122. # 使用 put_object 上传数据
  123. return self.minio_client.fput_object(
  124. bucket_name=self.bucket,
  125. file_path=file_path,
  126. object_name=object_name,
  127. content_type=content_type
  128. )
  129. def get_object(self, object_name):
  130. res = self.minio_client.get_object(bucket_name=self.bucket, object_name=object_name)
  131. return res.json()
  132. def main():
  133. test = {"dat": "test", 'ok':[1,2,3,3]}
  134. # s3.fput(r'I:\code\ai-yunying\live-online-people\output\data.json', 'log/20240425/test2.json')
  135. s3 = S3Object(path="save_enter_im_user_detail_test").put(test)
  136. logger.info(f"{s3.info()}")
  137. res = s3.get()
  138. logger.info(f"{res}")
  139. logger.info(f"{s3.model_dump()}")
  140. if __name__ == "__main__":
  141. main()