| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from io import BytesIO
- import json
- import mimetypes
- import config
- import datetime
- from conf.config import logger
- import pickle
- class S3:
- # s3client is minio python SDK client
- def __init__(self, bucket='swl', client=config.s3client) -> None:
- self.bucket = bucket
- self.minio_client = client
- # 根据本地时间自动获取对象存储前缀,如 log/2024-04-25/xxx
- def get_object_prefix(self, dir='log'):
- # 获取时分秒,并且符合路径的格式
- return dir + '/' + str(datetime.date.today())
-
- def get_object_name_by_time(self):
- # 获取时分秒毫秒,并且符合路径的格式,如 023213_123.json
- now = datetime.datetime.now()
- # 格式化时间:小时、分钟、秒、毫秒
- formatted_time = now.strftime("%H%M%S_%f") # %f 提供了微秒,所以我们取前三个数字作为毫秒
- return formatted_time
- # def put_dict(self, obj, name_by_time=True):
- def put_json(self, obj, object_name='',name_by_time=True):
- # object_name 如果为空,则获取 时分秒毫秒,并且符合路径的格式
- if name_by_time:
- object_name = self.get_object_name_by_time() + object_name
- object_name = self.get_object_prefix() + '/' + object_name
- if isinstance(obj, dict):
- obj = json.dumps(obj).encode()
- else:
- obj = obj.encode()
- data_stream = BytesIO(obj)
- # 使用 put_object 上传数据
- self.minio_client.put_object(
- bucket_name=self.bucket,
- data=data_stream,
- object_name=object_name,
- content_type='application/json',
- length=data_stream.getvalue().__len__() # 指定上传文件大小,否则会报错:i:\code\ai-yunying
- )
- data_stream.close()
- logger.info(f'put object {object_name} to s3 success')
- return object_name
-
- def fput(self, file_path, object_name='', content_type=''):
- # 根据文件名自动获取 object_name
- if object_name == '':
- object_name = file_path.split('/')[-1]
- # 根据文件后缀自动识别 content_type
- if content_type == '':
- content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
- object_name = self.get_object_prefix() + '/' + object_name
- # 使用 put_object 上传数据
- self.minio_client.fput_object(
- bucket_name=self.bucket,
- file_path=file_path,
- object_name=object_name,
- content_type=content_type
- )
-
- def get_object(self, object_name):
- res = self.minio_client.get_object(bucket_name=self.bucket, object_name=object_name)
- return res.json()
- def main():
- s3 = S3()
- test = {"dat": "test", 'ok':[1,2,3,3]}
- # s3.fput(r'I:\code\ai-yunying\live-online-people\output\data.json', 'log/20240425/test2.json')
- path = s3.put_json(obj=test, object_name='test.json', name_by_time=False)
- logger.info(f"{path}")
- res = s3.get_object(object_name=path)
- logger.info(f"{res}")
- if __name__ == "__main__":
- main()
|