prefect_api.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import base64
  2. import json
  3. import os
  4. import pickle
  5. import re
  6. import time
  7. import os
  8. import sys
  9. import uuid
  10. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  11. import asyncio
  12. from uuid import UUID
  13. from conf.config import logger,OUTPUT
  14. from dataset import Table
  15. from DrissionPage import ChromiumPage
  16. from DrissionPage._elements.chromium_element import ChromiumElement
  17. from DrissionPage._units.listener import DataPacket
  18. import jsonpath
  19. from prefect import flow,task
  20. from prefect.filesystems import LocalFileSystem, S3
  21. import os
  22. import prefect.results
  23. from prefect.results import PersistedResult
  24. from prefect.serializers import PickleSerializer
  25. from prefect import get_client
  26. import prefect.client.schemas.objects
  27. import json
  28. # import consul_srv_client
  29. # ip,port = consul_srv_client.get_srv("prefect.service.consul")
  30. prefect_api = f"http://{'10.0.0.1'}:{'8612'}/api"
  31. # 在导入 prefect 之前设置环境变量
  32. os.environ["PREFECT_API_URL"] = prefect_api
  33. from prefect import flow
  34. @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
  35. def task_flow():
  36. print(task_flow.result_serializer)
  37. print(task_flow.result_storage_key)
  38. return "task_flow"
  39. @flow(persist_result=True)
  40. def auto_douyin(msg:str="none"):
  41. print(f"Hello, {msg}! Local quickstart flow is running! ")
  42. task_flow()
  43. return f"Hello, {msg}! "
  44. # 通过本地存储获取结果(不方便)
  45. def get_result():
  46. path = r'I:\code\ai-yunying\live-online-people\output\prefect\auto-douyin_pristine-gecko_hello.json'
  47. with open(path, 'r') as f:
  48. json_data = json.load(f)
  49. base64_data = json_data['data']
  50. decoded_data = base64.b64decode(base64_data)
  51. result = pickle.loads(decoded_data)
  52. print(result)
  53. from prefect.client.schemas.objects import FlowRun
  54. from prefect.client.schemas.filters import FlowRunFilter,TaskRunFilter,TaskRunFilterSubFlowRuns,FlowRunFilterId,TaskRunFilterId
  55. from prefect.filesystems import LocalFileSystem
  56. # from prefect.runtime.task_run import TaskRun
  57. from prefect.client.schemas.objects import TaskRun
  58. # 通过api获取结果,较慢
  59. async def get_result_by_api():
  60. flow_run_id = "cb1e2c24-988b-44aa-8980-00af79f7a842"
  61. client = get_client()
  62. flow_run:FlowRun = await client.read_flow_run(flow_run_id)
  63. logger.info(f"flow_run {flow_run}")
  64. flow_run_states = await client.read_flow_run_states(flow_run_id)
  65. logger.info(f"flow_run_states {flow_run_states}")
  66. # print(f"Flow Run: {flow_run} {type(flow_run)}")
  67. print(f"parameters : {flow_run.parameters}")
  68. print(f"parent_task_run_id : {flow_run.parent_task_run_id}")
  69. print(f"state : {flow_run.state}")
  70. state_result:PersistedResult = flow_run.state.result()
  71. print(f"state_result : {state_result}")
  72. print(f"typr state_result : {type(state_result)}")
  73. result_value = await state_result.get()
  74. print(f"result_value : {result_value}")
  75. print(f"state_details: {flow_run.state.state_details}")
  76. f = FlowRunFilter(id=FlowRunFilterId(any_=[flow_run_id]))
  77. t = TaskRunFilter()
  78. t.subflow_runs = TaskRunFilterSubFlowRuns(exists_=False)
  79. # run_tasks = await client.read_task_runs(flow_run_filter=f, task_run_filter=t)
  80. run_tasks = await client.read_task_runs(flow_run_filter=f)
  81. # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0})
  82. logger.info(f"run_tasks {run_tasks}")
  83. for task in run_tasks:
  84. res = task.state.result()
  85. logger.info(f"{task.name} {await res.get()}")
  86. async def read_task_run(task_run_id:str, client=None)->TaskRun:
  87. client = client or get_client()
  88. t = TaskRunFilter(id = TaskRunFilterId(any_=[task_run_id]))
  89. run_tasks = await client.read_task_runs(task_run_filter=t)
  90. if run_tasks:
  91. logger.info(f"{type(run_tasks[0])}")
  92. return run_tasks[0]
  93. async def read_task_run_inputs(task_run:TaskRun, client=None):
  94. client = client or get_client()
  95. # task_inputs={'unread_user_data': [], 'storage_key': [], 'user_profile_body': [TaskRunResult(input_type='task_run', id=UUID('d7995cd0-ce86-4970-b727-f597b20f231a'))]}
  96. task_run.task_inputs
  97. logger.info(f"task_run_id {task_run_id}")
  98. t = TaskRunFilter(id = TaskRunFilterId(any_=[task_run_id]))
  99. run_tasks = await client.read_task_runs(task_run_filter=t)
  100. if run_tasks:
  101. logger.info(f"{type(run_tasks[0])}")
  102. return run_tasks
  103. async def read_task_run_result(taskrun:TaskRun, client=None):
  104. client = client or get_client()
  105. t = TaskRunFilter(id = TaskRunFilterId(any_=['204660c9-8dcc-43d0-bf27-4023c9eff94d']))
  106. run_tasks = await client.read_task_runs(task_run_filter=t)
  107. # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0})
  108. logger.info(f"run_tasks {run_tasks}")
  109. for task in run_tasks:
  110. res = task.state.result()
  111. logger.info(f"{task.name} {await res.get()}")
  112. if __name__ == "__main__":
  113. asyncio.run(read_task_run('204660c9-8dcc-43d0-bf27-4023c9eff94d'))
  114. # pickle_data = b"gAAAAABmKiLNhU5fGgCSwcDE-BgYOTAgxQJEjr0DcZRY2oSosTVM9qJ7fMnK373-v3gMrMO0ix_AmEaYQ8lfStKo71FGMofF1hOVtojEuLUXnqEkkNWk0eXefxj35mvGBNQfIL_lzBzxFuSxTTmDkOzPBP57UfNP8Q=="
  115. # res = pickle.loads(pickle_data)
  116. # logger.info(f"{res}")
  117. # logger.info(f"STORAGE.get_block_capabilities {STORAGE.get_block_capabilities()}")
  118. # # get_block_class_from_key
  119. # # logger.info(f"STORAGE.get_block_class_from_key {STORAGE.get_block_class_from_key()}")
  120. # # get_block_class_from_schema
  121. # logger.info(f"STORAGE.get_block_class_from_schema {STORAGE.get_block_class_from_schema()}")
  122. # # get_block_placeholder
  123. # logger.info(f"STORAGE.get_block_placeholder {STORAGE.get_block_placeholder()}")
  124. # # get_description
  125. # logger.info(f"STORAGE.get_description {STORAGE.get_description()}")
  126. # asyncio.run(get_result_by_api())
  127. # new_flow = auto_douyin.with_options(result_storage=LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect")))
  128. # new_flow(msg="it's ok flow result")