| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import asyncio
- import base64
- import os
- import pickle
- from uuid import UUID
- import uuid
- prefect_api = f"http://10.0.0.1:8612/api"
- # 在导入 prefect 之前设置环境变量
- os.environ["PREFECT_API_URL"] = prefect_api
- from prefect import flow,task
- from prefect.filesystems import LocalFileSystem, RemoteFileSystem
- from prefect.results import PersistedResult
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from conf.config import logger,OUTPUT
- from prefect.states import State
- import database.config
- @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
- def task_flow():
- print(task_flow.result_serializer)
- print(task_flow.result_storage_key)
- return "task_flow"
- @flow(persist_result=True)
- def miniofilesystem(msg:str="none"):
- logger.info(f"{miniofilesystem}")
- print(f"Hello, {msg}! minio quickstart flow is running! ")
- # task_flow()
- return f"Hello, {msg}! "
- def read_loacal_result():
- res = PersistedResult(type= 'reference', artifact_type='result', serializer_type= 'pickle', storage_block_id=UUID('767df9e7-2669-4270-904d-ebdd43aaf30d'), storage_key='c0849a099d6e467a8c6a20093ba78226')
- logger.info(res)
- logger.info(res.get())
-
- base64_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow="
- base64_data = "gAWVJAAAAAAAAACMIEhlbGxvLCBsb2FjYWwgZmlsZXN5c3RlbSB0ZXN0ISAglC4=\n"
- decoded_data = base64.b64decode(base64_data)
- result = pickle.loads(decoded_data)
- print(result)
- def load_block_data():
- minio_block = RemoteFileSystem(
- basepath="s3://swl",
- settings={
- "key": os.environ["s3_access_key"],
- "secret": os.environ["s3_secret_key"],
- "client_kwargs": {"endpoint_url": os.environ["s3_endpoint"]} ,
- },)
- block_document, block_document_name = asyncio.run(minio_block._get_block_document("minio"))
- logger.info(f"{block_document} {type(block_document)}")
- logger.info(f"{block_document.data}")
- logger.info(f"{block_document_name} {type(block_document_name)}")
- # block_document, block_document_name = asyncio.run(minio_block._get_block_document("anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6"))
- def minio_file_system_test():
- logger.info(f"s3_endpoint " + os.environ["s3_endpoint"])
- # https://docs.prefect.io/latest/concepts/filesystems/#s3
- minio_block = RemoteFileSystem(
- basepath="s3://swl",
- settings={
- "key": os.environ["s3_access_key"],
- "secret": os.environ["s3_secret_key"],
- "client_kwargs": {"endpoint_url": 'http://' + os.environ["s3_endpoint"]} ,
- },)
- minio_block.write_path("foo", b"hello")
- res = minio_block.read_path("foo")
- logger.info(f"{res}")
- # base64_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow="
- # data = minio_block.parse_obj(base64_data)
- # logger.info(f"{data}")
- # minio_block.save("minio")
- return
- new_flow = miniofilesystem.with_options(result_storage=minio_block)
- logger.info(f"new_flow {new_flow}")
- res = new_flow(msg="loacal filesystem test",return_state=True)
- logger.info(f"res {res}")
- logger.info(f"name {res.name}")
- logger.info(f"state_details {res.state_details}")
- # data
- logger.info(f"data {res.data} {type(res.data)}")
- # res.data.get()
- logger.info(f"data {res}")
- def main():
- # load_block_data()
- minio_file_system_test()
- if __name__ == "__main__":
- main()
|