import base64 import json import os import pickle import re import time import os import sys import uuid sys.path.append(os.path.dirname(os.path.dirname(__file__))) import asyncio from uuid import UUID from conf.config import logger,OUTPUT from dataset import Table from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from DrissionPage._units.listener import DataPacket import jsonpath from prefect import flow,task from prefect.filesystems import LocalFileSystem, S3 import os import prefect.results from prefect.results import PersistedResult from prefect.serializers import PickleSerializer from prefect import get_client import prefect.client.schemas.objects import json # import consul_srv_client # ip,port = consul_srv_client.get_srv("prefect.service.consul") prefect_api = f"http://{'10.0.0.1'}:{'8612'}/api" # 在导入 prefect 之前设置环境变量 os.environ["PREFECT_API_URL"] = prefect_api from prefect import flow @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json") def task_flow(): print(task_flow.result_serializer) print(task_flow.result_storage_key) return "task_flow" @flow(persist_result=True) def auto_douyin(msg:str="none"): print(f"Hello, {msg}! Local quickstart flow is running! ") task_flow() return f"Hello, {msg}! " # 通过本地存储获取结果(不方便) def get_result(): path = r'I:\code\ai-yunying\live-online-people\output\prefect\auto-douyin_pristine-gecko_hello.json' with open(path, 'r') as f: json_data = json.load(f) base64_data = json_data['data'] decoded_data = base64.b64decode(base64_data) result = pickle.loads(decoded_data) print(result) from prefect.client.schemas.objects import FlowRun from prefect.client.schemas.filters import FlowRunFilter,TaskRunFilter,TaskRunFilterSubFlowRuns,FlowRunFilterId,TaskRunFilterId from prefect.filesystems import LocalFileSystem # from prefect.runtime.task_run import TaskRun from prefect.client.schemas.objects import TaskRun # 通过api获取结果,较慢 async def get_result_by_api(): flow_run_id = "cb1e2c24-988b-44aa-8980-00af79f7a842" client = get_client() flow_run:FlowRun = await client.read_flow_run(flow_run_id) logger.info(f"flow_run {flow_run}") flow_run_states = await client.read_flow_run_states(flow_run_id) logger.info(f"flow_run_states {flow_run_states}") # print(f"Flow Run: {flow_run} {type(flow_run)}") print(f"parameters : {flow_run.parameters}") print(f"parent_task_run_id : {flow_run.parent_task_run_id}") print(f"state : {flow_run.state}") state_result:PersistedResult = flow_run.state.result() print(f"state_result : {state_result}") print(f"typr state_result : {type(state_result)}") result_value = await state_result.get() print(f"result_value : {result_value}") print(f"state_details: {flow_run.state.state_details}") f = FlowRunFilter(id=FlowRunFilterId(any_=[flow_run_id])) t = TaskRunFilter() t.subflow_runs = TaskRunFilterSubFlowRuns(exists_=False) # run_tasks = await client.read_task_runs(flow_run_filter=f, task_run_filter=t) run_tasks = await client.read_task_runs(flow_run_filter=f) # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0}) logger.info(f"run_tasks {run_tasks}") for task in run_tasks: res = task.state.result() logger.info(f"{task.name} {await res.get()}") async def read_task_run(task_run_id:str, client=None)->TaskRun: client = client or get_client() t = TaskRunFilter(id = TaskRunFilterId(any_=[task_run_id])) run_tasks = await client.read_task_runs(task_run_filter=t) if run_tasks: logger.info(f"{type(run_tasks[0])}") return run_tasks[0] async def read_task_run_inputs(task_run:TaskRun, client=None): client = client or get_client() # task_inputs={'unread_user_data': [], 'storage_key': [], 'user_profile_body': [TaskRunResult(input_type='task_run', id=UUID('d7995cd0-ce86-4970-b727-f597b20f231a'))]} task_run.task_inputs logger.info(f"task_run_id {task_run_id}") t = TaskRunFilter(id = TaskRunFilterId(any_=[task_run_id])) run_tasks = await client.read_task_runs(task_run_filter=t) if run_tasks: logger.info(f"{type(run_tasks[0])}") return run_tasks async def read_task_run_result(taskrun:TaskRun, client=None): client = client or get_client() t = TaskRunFilter(id = TaskRunFilterId(any_=['204660c9-8dcc-43d0-bf27-4023c9eff94d'])) run_tasks = await client.read_task_runs(task_run_filter=t) # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0}) logger.info(f"run_tasks {run_tasks}") for task in run_tasks: res = task.state.result() logger.info(f"{task.name} {await res.get()}") if __name__ == "__main__": asyncio.run(read_task_run('204660c9-8dcc-43d0-bf27-4023c9eff94d')) # pickle_data = b"gAAAAABmKiLNhU5fGgCSwcDE-BgYOTAgxQJEjr0DcZRY2oSosTVM9qJ7fMnK373-v3gMrMO0ix_AmEaYQ8lfStKo71FGMofF1hOVtojEuLUXnqEkkNWk0eXefxj35mvGBNQfIL_lzBzxFuSxTTmDkOzPBP57UfNP8Q==" # res = pickle.loads(pickle_data) # logger.info(f"{res}") # logger.info(f"STORAGE.get_block_capabilities {STORAGE.get_block_capabilities()}") # # get_block_class_from_key # # logger.info(f"STORAGE.get_block_class_from_key {STORAGE.get_block_class_from_key()}") # # get_block_class_from_schema # logger.info(f"STORAGE.get_block_class_from_schema {STORAGE.get_block_class_from_schema()}") # # get_block_placeholder # logger.info(f"STORAGE.get_block_placeholder {STORAGE.get_block_placeholder()}") # # get_description # logger.info(f"STORAGE.get_description {STORAGE.get_description()}") # asyncio.run(get_result_by_api()) # new_flow = auto_douyin.with_options(result_storage=LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect"))) # new_flow(msg="it's ok flow result")