| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- import asyncio
- import base64
- import os
- import pickle
- from uuid import UUID
- import uuid
- prefect_api = f"http://10.0.0.1:8612/api"
- # 在导入 prefect 之前设置环境变量
- os.environ["PREFECT_API_URL"] = prefect_api
- from prefect import flow,task
- from prefect.filesystems import LocalFileSystem, S3
- from prefect.results import PersistedResult
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from conf.config import logger,OUTPUT
- from prefect.states import State
- @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
- def task_flow():
- print(task_flow.result_serializer)
- print(task_flow.result_storage_key)
- return "task_flow"
- @flow(persist_result=True)
- def loacalfilesystem(msg:str="none"):
- logger.info(f"{loacalfilesystem}")
- print(f"Hello, {msg}! Local quickstart flow is running! ")
- # task_flow()
- return f"Hello, {msg}! "
- from prefect import get_client
- def read_loacal_result():
- local_dir = LocalFileSystem()
- local_dir = local_dir.load('anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6')
- # client = get_client()
- # asyncio.run(client.read_block_document_by_name('anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6', block_type_slug=None))
- return
- res = PersistedResult(type= 'reference', artifact_type='result', serializer_type= 'pickle', storage_block_id=UUID('767df9e7-2669-4270-904d-ebdd43aaf30d'), storage_key='c0849a099d6e467a8c6a20093ba78226')
- logger.info(res)
- logger.info(res.get())
-
- base64_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow="
- base64_data = "gAWVJAAAAAAAAACMIEhlbGxvLCBsb2FjYWwgZmlsZXN5c3RlbSB0ZXN0ISAglC4=\n"
- decoded_data = base64.b64decode(base64_data)
- result = pickle.loads(decoded_data)
- print(result)
- def local_file_system_test():
- local_dir = LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect2"),)
- new_flow = loacalfilesystem.with_options(result_storage=local_dir)
- logger.info(f"new_flow {new_flow}")
- res = new_flow(msg="loacal filesystem test",return_state=True)
- logger.info(f"res {res}")
- logger.info(f"name {res.name}")
- logger.info(f"state_details {res.state_details}")
- # data
- logger.info(f"data {res.data} {type(res.data)}")
- # res.data.get()
- logger.info(f"data {res}")
- def main():
- read_loacal_result()
- if __name__ == "__main__":
- main()
|