| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from typing import List, Dict, Any, Optional,Tuple
- from io import BytesIO
- import json
- import mimetypes
- import datetime
- from conf.config import logger
- import pickle
- from database.config import minio_block,minio_client
- from minio import Minio
- from pathlib import Path
- from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
- import s3fs
- def get_client():
- s3 = s3fs.S3FileSystem(
- key='miniokey...',
- secret='asecretkey...',
- endpoint_url='https://...'
- )
- class S3Object(SQLModel,table=False):
- type:Optional[str]= Field(default=None)
- path:Optional[str] = Field(default=None)
- len:Optional[int] = Field(default=0)
-
- def put(self, obj:Any, path=None):
- self.type = type(obj).__name__
- data = pickle.dumps(obj)
- self.len = len(data)
- if path is None:
- path = self.path
- self.path = minio_block.write_path(path, data) # 假设这个函数返回数据的存储路径
- return self
-
- def get(self):
- path = self.path
- if path.startswith(minio_block.basepath):
- path = path[len(minio_block.basepath):]
- data = minio_block.read_path(path)
- return pickle.loads(data)
-
- def info(self):
- return vars(self)
- def put_object(self, obj:Any, path=None):
- self.client.put_object(bucket_name='swl', data=obj, object_name=path)
- try:
- obj_bytes = json.dumps(obj)
- except TypeError as e:
- print(e)
- res = pickle.dumps(('123', '234'))
- print("pickle res", res)
- return self.path
- def get_object(self, path=None):
- if path is None:
- path = self.path
- return minio_block.read_path(path)
-
- class S3:
- # s3client is minio python SDK client
- def __init__(self, bucket='swl', client=None) -> None:
- self.bucket = bucket
- self.minio_client:Minio = client
- # 根据本地时间自动获取对象存储前缀,如 log/2024-04-25/xxx
- def get_object_prefix(self, dir='log'):
- # 获取时分秒,并且符合路径的格式
- return dir + '/' + str(datetime.date.today())
-
- def get_object_name_by_time(self):
- # 获取时分秒毫秒,并且符合路径的格式,如 023213_123.json
- now = datetime.datetime.now()
- # 格式化时间:小时、分钟、秒、毫秒
- formatted_time = now.strftime("%y%m%d-%H%M%S_%f") # %f 提供了微秒,所以我们取前三个数字作为毫秒
- return formatted_time
- # def put_dict(self, obj, name_by_time=True):
- def put_json(self, obj, object_name='',name_by_time=True):
- # object_name 如果为空,则获取 时分秒毫秒,并且符合路径的格式
- if name_by_time:
- object_name = self.get_object_name_by_time() + object_name
- object_name = self.get_object_prefix() + '/' + object_name
- if isinstance(obj, dict):
- obj = json.dumps(obj).encode()
- else:
- obj = obj.encode()
- data_stream = BytesIO(obj)
- # 使用 put_object 上传数据
- self.minio_client.put_object(
- bucket_name=self.bucket,
- data=data_stream,
- object_name=object_name,
- content_type='application/json',
- length=data_stream.getvalue().__len__() # 指定上传文件大小,否则会报错:i:\code\ai-yunying
- )
- data_stream.close()
- logger.info(f'put object {object_name} to s3 success')
- return object_name
-
- def put(self, obj:Any, object_name='', content_type='', name_by_time=True):
- file_path = object_name.split('/')[-1]
- if content_type == '':
- content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
- if isinstance(obj, str):
- obj=obj.encode()
- # content_type = 'application/txt'
- else :
- try:
- obj = json.dumps(obj)
- # content_type = 'application/json'
- except TypeError as e:
- obj = pickle.dumps(obj)
- # content_type = 'application/pickle'
- object_name = self.get_object_prefix() + '/' + object_name
- data_stream = BytesIO(obj)
- return self.minio_client.put_object(
- bucket_name=self.bucket,
- data=data_stream,
- length=data_stream.getvalue().__len__(),
- object_name=object_name,
- content_type=content_type
- )
- def fput(self, file_path, object_name='', content_type=''):
- # 根据文件名自动获取 object_name
- if object_name == '':
- object_name = file_path.split('/')[-1]
- # 根据文件后缀自动识别 content_type
- if content_type == '':
- content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
- # 使用 put_object 上传数据
- return self.minio_client.fput_object(
- bucket_name=self.bucket,
- file_path=file_path,
- object_name=object_name,
- content_type=content_type
- )
-
- def get_object(self, object_name):
- res = self.minio_client.get_object(bucket_name=self.bucket, object_name=object_name)
- return res.json()
- def main():
- test = {"dat": "test", 'ok':[1,2,3,3]}
- # s3.fput(r'I:\code\ai-yunying\live-online-people\output\data.json', 'log/20240425/test2.json')
- s3 = S3Object(path="save_enter_im_user_detail_test").put(test)
- logger.info(f"{s3.info()}")
- res = s3.get()
- logger.info(f"{res}")
- logger.info(f"{s3.model_dump()}")
- if __name__ == "__main__":
- main()
|