| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- import base64
- import json
- import os
- import pickle
- import re
- import time
- import os
- import sys
- import uuid
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- import asyncio
- from uuid import UUID
- from conf.config import logger,OUTPUT
- from dataset import Table
- from DrissionPage import ChromiumPage
- from DrissionPage._elements.chromium_element import ChromiumElement
- from DrissionPage._units.listener import DataPacket
- import jsonpath
- from prefect import flow,task
- from prefect.filesystems import LocalFileSystem, S3
- import os
- import prefect.results
- from prefect.results import PersistedResult
- from prefect.serializers import PickleSerializer
- from prefect import get_client
- import prefect.client.schemas.objects
- import json
- # import consul_srv_client
- # ip,port = consul_srv_client.get_srv("prefect.service.consul")
- prefect_api = f"http://{'10.0.0.1'}:{'8612'}/api"
- # 在导入 prefect 之前设置环境变量
- os.environ["PREFECT_API_URL"] = prefect_api
- from prefect import flow
- @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
- def task_flow():
- print(task_flow.result_serializer)
- print(task_flow.result_storage_key)
- return "task_flow"
- @flow(persist_result=True)
- def auto_douyin(msg:str="none"):
- print(f"Hello, {msg}! Local quickstart flow is running! ")
- task_flow()
- return f"Hello, {msg}! "
- # 通过本地存储获取结果(不方便)
- def get_result():
- path = r'I:\code\ai-yunying\live-online-people\output\prefect\auto-douyin_pristine-gecko_hello.json'
- with open(path, 'r') as f:
- json_data = json.load(f)
- base64_data = json_data['data']
- decoded_data = base64.b64decode(base64_data)
- result = pickle.loads(decoded_data)
- print(result)
- from prefect.client.schemas.objects import FlowRun
- from prefect.client.schemas.filters import FlowRunFilter,TaskRunFilter,TaskRunFilterSubFlowRuns,FlowRunFilterId,TaskRunFilterId
- from prefect.filesystems import LocalFileSystem
- # from prefect.runtime.task_run import TaskRun
- from prefect.client.schemas.objects import TaskRun
- # 通过api获取结果,较慢
- async def get_result_by_api():
- flow_run_id = "cb1e2c24-988b-44aa-8980-00af79f7a842"
- client = get_client()
- flow_run:FlowRun = await client.read_flow_run(flow_run_id)
- logger.info(f"flow_run {flow_run}")
- flow_run_states = await client.read_flow_run_states(flow_run_id)
- logger.info(f"flow_run_states {flow_run_states}")
- # print(f"Flow Run: {flow_run} {type(flow_run)}")
- print(f"parameters : {flow_run.parameters}")
- print(f"parent_task_run_id : {flow_run.parent_task_run_id}")
- print(f"state : {flow_run.state}")
- state_result:PersistedResult = flow_run.state.result()
- print(f"state_result : {state_result}")
- print(f"typr state_result : {type(state_result)}")
- result_value = await state_result.get()
- print(f"result_value : {result_value}")
- print(f"state_details: {flow_run.state.state_details}")
- f = FlowRunFilter(id=FlowRunFilterId(any_=[flow_run_id]))
- t = TaskRunFilter()
- t.subflow_runs = TaskRunFilterSubFlowRuns(exists_=False)
- # run_tasks = await client.read_task_runs(flow_run_filter=f, task_run_filter=t)
- run_tasks = await client.read_task_runs(flow_run_filter=f)
- # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0})
- logger.info(f"run_tasks {run_tasks}")
- for task in run_tasks:
- res = task.state.result()
- logger.info(f"{task.name} {await res.get()}")
- async def read_task_run(task_run_id:str, client=None)->TaskRun:
- client = client or get_client()
- t = TaskRunFilter(id = TaskRunFilterId(any_=[task_run_id]))
- run_tasks = await client.read_task_runs(task_run_filter=t)
- if run_tasks:
- logger.info(f"{type(run_tasks[0])}")
- return run_tasks[0]
- async def read_task_run_inputs(task_run:TaskRun, client=None):
- client = client or get_client()
- # task_inputs={'unread_user_data': [], 'storage_key': [], 'user_profile_body': [TaskRunResult(input_type='task_run', id=UUID('d7995cd0-ce86-4970-b727-f597b20f231a'))]}
- task_run.task_inputs
- logger.info(f"task_run_id {task_run_id}")
- t = TaskRunFilter(id = TaskRunFilterId(any_=[task_run_id]))
- run_tasks = await client.read_task_runs(task_run_filter=t)
- if run_tasks:
- logger.info(f"{type(run_tasks[0])}")
- return run_tasks
- async def read_task_run_result(taskrun:TaskRun, client=None):
- client = client or get_client()
- t = TaskRunFilter(id = TaskRunFilterId(any_=['204660c9-8dcc-43d0-bf27-4023c9eff94d']))
- run_tasks = await client.read_task_runs(task_run_filter=t)
- # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0})
- logger.info(f"run_tasks {run_tasks}")
- for task in run_tasks:
- res = task.state.result()
- logger.info(f"{task.name} {await res.get()}")
- if __name__ == "__main__":
- asyncio.run(read_task_run('204660c9-8dcc-43d0-bf27-4023c9eff94d'))
- # pickle_data = b"gAAAAABmKiLNhU5fGgCSwcDE-BgYOTAgxQJEjr0DcZRY2oSosTVM9qJ7fMnK373-v3gMrMO0ix_AmEaYQ8lfStKo71FGMofF1hOVtojEuLUXnqEkkNWk0eXefxj35mvGBNQfIL_lzBzxFuSxTTmDkOzPBP57UfNP8Q=="
- # res = pickle.loads(pickle_data)
- # logger.info(f"{res}")
- # logger.info(f"STORAGE.get_block_capabilities {STORAGE.get_block_capabilities()}")
- # # get_block_class_from_key
- # # logger.info(f"STORAGE.get_block_class_from_key {STORAGE.get_block_class_from_key()}")
- # # get_block_class_from_schema
- # logger.info(f"STORAGE.get_block_class_from_schema {STORAGE.get_block_class_from_schema()}")
- # # get_block_placeholder
- # logger.info(f"STORAGE.get_block_placeholder {STORAGE.get_block_placeholder()}")
- # # get_description
- # logger.info(f"STORAGE.get_description {STORAGE.get_description()}")
- # asyncio.run(get_result_by_api())
- # new_flow = auto_douyin.with_options(result_storage=LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect")))
- # new_flow(msg="it's ok flow result")
|