import asyncio import base64 import os import pickle from uuid import UUID import uuid prefect_api = f"http://10.0.0.1:8612/api" # 在导入 prefect 之前设置环境变量 os.environ["PREFECT_API_URL"] = prefect_api from prefect import flow,task from prefect.filesystems import LocalFileSystem, RemoteFileSystem from prefect.results import PersistedResult import os import sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) from conf.config import logger,OUTPUT from prefect.states import State import database.config @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json") def task_flow(): print(task_flow.result_serializer) print(task_flow.result_storage_key) return "task_flow" @flow(persist_result=True) def miniofilesystem(msg:str="none"): logger.info(f"{miniofilesystem}") print(f"Hello, {msg}! minio quickstart flow is running! ") # task_flow() return f"Hello, {msg}! " def read_loacal_result(): res = PersistedResult(type= 'reference', artifact_type='result', serializer_type= 'pickle', storage_block_id=UUID('767df9e7-2669-4270-904d-ebdd43aaf30d'), storage_key='c0849a099d6e467a8c6a20093ba78226') logger.info(res) logger.info(res.get()) base64_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow=" base64_data = "gAWVJAAAAAAAAACMIEhlbGxvLCBsb2FjYWwgZmlsZXN5c3RlbSB0ZXN0ISAglC4=\n" decoded_data = base64.b64decode(base64_data) result = pickle.loads(decoded_data) print(result) def load_block_data(): minio_block = RemoteFileSystem( basepath="s3://swl", settings={ "key": os.environ["s3_access_key"], "secret": os.environ["s3_secret_key"], "client_kwargs": {"endpoint_url": os.environ["s3_endpoint"]} , },) block_document, block_document_name = asyncio.run(minio_block._get_block_document("minio")) logger.info(f"{block_document} {type(block_document)}") logger.info(f"{block_document.data}") logger.info(f"{block_document_name} {type(block_document_name)}") # block_document, block_document_name = asyncio.run(minio_block._get_block_document("anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6")) def minio_file_system_test(): logger.info(f"s3_endpoint " + os.environ["s3_endpoint"]) # https://docs.prefect.io/latest/concepts/filesystems/#s3 minio_block = RemoteFileSystem( basepath="s3://swl", settings={ "key": os.environ["s3_access_key"], "secret": os.environ["s3_secret_key"], "client_kwargs": {"endpoint_url": 'http://' + os.environ["s3_endpoint"]} , },) minio_block.write_path("foo", b"hello") res = minio_block.read_path("foo") logger.info(f"{res}") # base64_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow=" # data = minio_block.parse_obj(base64_data) # logger.info(f"{data}") # minio_block.save("minio") return new_flow = miniofilesystem.with_options(result_storage=minio_block) logger.info(f"new_flow {new_flow}") res = new_flow(msg="loacal filesystem test",return_state=True) logger.info(f"res {res}") logger.info(f"name {res.name}") logger.info(f"state_details {res.state_details}") # data logger.info(f"data {res.data} {type(res.data)}") # res.data.get() logger.info(f"data {res}") def main(): # load_block_data() minio_file_system_test() if __name__ == "__main__": main()