prefect_dp.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import base64
  2. import json
  3. import os
  4. import pickle
  5. import re
  6. import time
  7. import os
  8. import sys
  9. import uuid
  10. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  11. import asyncio
  12. from uuid import UUID
  13. from conf.config import logger,OUTPUT
  14. from database.config import ai_yunying_db
  15. from dataset import Table
  16. from DrissionPage import ChromiumPage
  17. from DrissionPage._elements.chromium_element import ChromiumElement
  18. from DrissionPage._units.listener import DataPacket
  19. import jsonpath
  20. from prefect import flow,task
  21. from prefect.filesystems import LocalFileSystem, S3
  22. import os
  23. import prefect.results
  24. from prefect.results import PersistedResult
  25. from prefect.serializers import PickleSerializer
  26. from prefect import get_client
  27. import prefect.client.schemas.objects
  28. import json
  29. # import consul_srv_client
  30. # ip,port = consul_srv_client.get_srv("prefect.service.consul")
  31. prefect_api = f"http://{'10.0.0.1'}:{'8612'}/api"
  32. # 在导入 prefect 之前设置环境变量
  33. os.environ["PREFECT_API_URL"] = prefect_api
  34. from prefect import flow
  35. @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
  36. def task_flow():
  37. print(task_flow.result_serializer)
  38. print(task_flow.result_storage_key)
  39. return "task_flow"
  40. @flow(persist_result=True)
  41. def auto_douyin(msg:str="none"):
  42. print(f"Hello, {msg}! Local quickstart flow is running! ")
  43. task_flow()
  44. return f"Hello, {msg}! "
  45. # 通过本地存储获取结果(不方便)
  46. def get_result():
  47. path = r'I:\code\ai-yunying\live-online-people\output\prefect\auto-douyin_pristine-gecko_hello.json'
  48. with open(path, 'r') as f:
  49. json_data = json.load(f)
  50. base64_data = json_data['data']
  51. decoded_data = base64.b64decode(base64_data)
  52. result = pickle.loads(decoded_data)
  53. print(result)
  54. from prefect.client.schemas.objects import FlowRun
  55. from prefect.client.schemas.filters import FlowRunFilter,TaskRunFilter,TaskRunFilterSubFlowRuns,FlowRunFilterId
  56. from prefect.filesystems import LocalFileSystem
  57. # 通过api获取结果,较慢
  58. async def get_result_by_api():
  59. flow_run_id = "cb1e2c24-988b-44aa-8980-00af79f7a842"
  60. client = get_client()
  61. flow_run:FlowRun = await client.read_flow_run(flow_run_id)
  62. logger.info(f"flow_run {flow_run}")
  63. flow_run_states = await client.read_flow_run_states(flow_run_id)
  64. logger.info(f"flow_run_states {flow_run_states}")
  65. # print(f"Flow Run: {flow_run} {type(flow_run)}")
  66. print(f"parameters : {flow_run.parameters}")
  67. print(f"parent_task_run_id : {flow_run.parent_task_run_id}")
  68. print(f"state : {flow_run.state}")
  69. state_result:PersistedResult = flow_run.state.result()
  70. print(f"state_result : {state_result}")
  71. print(f"typr state_result : {type(state_result)}")
  72. result_value = await state_result.get()
  73. print(f"result_value : {result_value}")
  74. print(f"state_details: {flow_run.state.state_details}")
  75. f = FlowRunFilter(id=FlowRunFilterId(any_=[flow_run_id]))
  76. t = TaskRunFilter()
  77. t.subflow_runs = TaskRunFilterSubFlowRuns(exists_=False)
  78. # run_tasks = await client.read_task_runs(flow_run_filter=f, task_run_filter=t)
  79. run_tasks = await client.read_task_runs(flow_run_filter=f)
  80. # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0})
  81. logger.info(f"run_tasks {run_tasks}")
  82. for task in run_tasks:
  83. res = task.state.result()
  84. logger.info(f"{task.name} {await res.get()}")
  85. from prefect.blocks.core import Block
  86. def localfilesystem_load():
  87. # print(os.environ["PREFECT_API_URL"])
  88. # get_result()
  89. local_file_system_block = LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect2"))
  90. logger.info(f"local_file_system_block._block_schema_capabilities {local_file_system_block._block_schema_capabilities}")
  91. logger.info(f"local_file_system_block._block_type_name {local_file_system_block._block_type_name}")
  92. STORAGE:Block = local_file_system_block.load('anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6')
  93. logger.info(f"STORAGE {STORAGE} {type(STORAGE)}")
  94. block_document, block_document_name = asyncio.run(STORAGE._get_block_document('anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6'))
  95. logger.info(f"{block_document} {type(block_document)}")
  96. logger.info(f"{block_document.data}")
  97. logger.info(f"{block_document_name} {type(block_document_name)}")
  98. logger.info(f"STORAGE.data {STORAGE.data}")
  99. logger.info(f"STORAGE._block_document_id {STORAGE._block_document_id}")
  100. # _block_type_id _block_schema_id _block_document_name _block_type_slug
  101. logger.info(f"STORAGE._block_type_id {STORAGE._block_type_id}")
  102. logger.info(f"STORAGE._block_schema_id {STORAGE._block_schema_id}")
  103. logger.info(f"STORAGE._block_document_name {STORAGE._block_document_name}")
  104. logger.info(f"STORAGE._block_type_slug {STORAGE._block_type_slug}")
  105. res = PersistedResult(type='reference', artifact_type=None, artifact_description=None, serializer_type='pickle', storage_block_id=UUID('cbca441d-038d-43c0-a531-94f126101911'), storage_key='auto-douyin_mega-potoo_hello.json')
  106. if __name__ == "__main__":
  107. local_file_system_block = LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect2"))
  108. doc = local_file_system_block._to_block_document(name="test",
  109. block_schema_id=uuid.uuid4(),
  110. block_type_id=uuid.uuid4(),
  111. is_anonymous=True
  112. )
  113. logger.info(f"{doc}")
  114. # pickle_data = b"gAAAAABmKiLNhU5fGgCSwcDE-BgYOTAgxQJEjr0DcZRY2oSosTVM9qJ7fMnK373-v3gMrMO0ix_AmEaYQ8lfStKo71FGMofF1hOVtojEuLUXnqEkkNWk0eXefxj35mvGBNQfIL_lzBzxFuSxTTmDkOzPBP57UfNP8Q=="
  115. # res = pickle.loads(pickle_data)
  116. # logger.info(f"{res}")
  117. # logger.info(f"STORAGE.get_block_capabilities {STORAGE.get_block_capabilities()}")
  118. # # get_block_class_from_key
  119. # # logger.info(f"STORAGE.get_block_class_from_key {STORAGE.get_block_class_from_key()}")
  120. # # get_block_class_from_schema
  121. # logger.info(f"STORAGE.get_block_class_from_schema {STORAGE.get_block_class_from_schema()}")
  122. # # get_block_placeholder
  123. # logger.info(f"STORAGE.get_block_placeholder {STORAGE.get_block_placeholder()}")
  124. # # get_description
  125. # logger.info(f"STORAGE.get_description {STORAGE.get_description()}")
  126. # asyncio.run(get_result_by_api())
  127. # new_flow = auto_douyin.with_options(result_storage=LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect")))
  128. # new_flow(msg="it's ok flow result")