prefect_localfilesystem.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import asyncio
  2. import base64
  3. import os
  4. import pickle
  5. from uuid import UUID
  6. import uuid
  7. prefect_api = f"http://10.0.0.1:8612/api"
  8. # 在导入 prefect 之前设置环境变量
  9. os.environ["PREFECT_API_URL"] = prefect_api
  10. from prefect import flow,task
  11. from prefect.filesystems import LocalFileSystem, S3
  12. from prefect.results import PersistedResult
  13. import os
  14. import sys
  15. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  16. from conf.config import logger,OUTPUT
  17. from prefect.states import State
  18. @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
  19. def task_flow():
  20. print(task_flow.result_serializer)
  21. print(task_flow.result_storage_key)
  22. return "task_flow"
  23. @flow(persist_result=True)
  24. def loacalfilesystem(msg:str="none"):
  25. logger.info(f"{loacalfilesystem}")
  26. print(f"Hello, {msg}! Local quickstart flow is running! ")
  27. # task_flow()
  28. return f"Hello, {msg}! "
  29. from prefect import get_client
  30. def read_loacal_result():
  31. local_dir = LocalFileSystem()
  32. local_dir = local_dir.load('anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6')
  33. # client = get_client()
  34. # asyncio.run(client.read_block_document_by_name('anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6', block_type_slug=None))
  35. return
  36. res = PersistedResult(type= 'reference', artifact_type='result', serializer_type= 'pickle', storage_block_id=UUID('767df9e7-2669-4270-904d-ebdd43aaf30d'), storage_key='c0849a099d6e467a8c6a20093ba78226')
  37. logger.info(res)
  38. logger.info(res.get())
  39. base64_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow="
  40. base64_data = "gAWVJAAAAAAAAACMIEhlbGxvLCBsb2FjYWwgZmlsZXN5c3RlbSB0ZXN0ISAglC4=\n"
  41. decoded_data = base64.b64decode(base64_data)
  42. result = pickle.loads(decoded_data)
  43. print(result)
  44. def local_file_system_test():
  45. local_dir = LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect2"),)
  46. new_flow = loacalfilesystem.with_options(result_storage=local_dir)
  47. logger.info(f"new_flow {new_flow}")
  48. res = new_flow(msg="loacal filesystem test",return_state=True)
  49. logger.info(f"res {res}")
  50. logger.info(f"name {res.name}")
  51. logger.info(f"state_details {res.state_details}")
  52. # data
  53. logger.info(f"data {res.data} {type(res.data)}")
  54. # res.data.get()
  55. logger.info(f"data {res}")
  56. def main():
  57. read_loacal_result()
  58. if __name__ == "__main__":
  59. main()