| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import base64
- import json
- import os
- import pickle
- import re
- import time
- import os
- import sys
- import uuid
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- import asyncio
- from uuid import UUID
- from conf.config import logger,OUTPUT
- from database.config import ai_yunying_db
- from dataset import Table
- from DrissionPage import ChromiumPage
- from DrissionPage._elements.chromium_element import ChromiumElement
- from DrissionPage._units.listener import DataPacket
- import jsonpath
- from prefect import flow,task
- from prefect.filesystems import LocalFileSystem, S3
- import os
- import prefect.results
- from prefect.results import PersistedResult
- from prefect.serializers import PickleSerializer
- from prefect import get_client
- import prefect.client.schemas.objects
- import json
- # import consul_srv_client
- # ip,port = consul_srv_client.get_srv("prefect.service.consul")
- prefect_api = f"http://{'10.0.0.1'}:{'8612'}/api"
- # 在导入 prefect 之前设置环境变量
- os.environ["PREFECT_API_URL"] = prefect_api
- from prefect import flow
- @task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
- def task_flow():
- print(task_flow.result_serializer)
- print(task_flow.result_storage_key)
- return "task_flow"
- @flow(persist_result=True)
- def auto_douyin(msg:str="none"):
- print(f"Hello, {msg}! Local quickstart flow is running! ")
- task_flow()
- return f"Hello, {msg}! "
- # 通过本地存储获取结果(不方便)
- def get_result():
- path = r'I:\code\ai-yunying\live-online-people\output\prefect\auto-douyin_pristine-gecko_hello.json'
- with open(path, 'r') as f:
- json_data = json.load(f)
- base64_data = json_data['data']
- decoded_data = base64.b64decode(base64_data)
- result = pickle.loads(decoded_data)
- print(result)
- from prefect.client.schemas.objects import FlowRun
- from prefect.client.schemas.filters import FlowRunFilter,TaskRunFilter,TaskRunFilterSubFlowRuns,FlowRunFilterId
- from prefect.filesystems import LocalFileSystem
- # 通过api获取结果,较慢
- async def get_result_by_api():
- flow_run_id = "cb1e2c24-988b-44aa-8980-00af79f7a842"
- client = get_client()
- flow_run:FlowRun = await client.read_flow_run(flow_run_id)
- logger.info(f"flow_run {flow_run}")
- flow_run_states = await client.read_flow_run_states(flow_run_id)
- logger.info(f"flow_run_states {flow_run_states}")
- # print(f"Flow Run: {flow_run} {type(flow_run)}")
- print(f"parameters : {flow_run.parameters}")
- print(f"parent_task_run_id : {flow_run.parent_task_run_id}")
- print(f"state : {flow_run.state}")
- state_result:PersistedResult = flow_run.state.result()
- print(f"state_result : {state_result}")
- print(f"typr state_result : {type(state_result)}")
- result_value = await state_result.get()
- print(f"result_value : {result_value}")
- print(f"state_details: {flow_run.state.state_details}")
- f = FlowRunFilter(id=FlowRunFilterId(any_=[flow_run_id]))
- t = TaskRunFilter()
- t.subflow_runs = TaskRunFilterSubFlowRuns(exists_=False)
- # run_tasks = await client.read_task_runs(flow_run_filter=f, task_run_filter=t)
- run_tasks = await client.read_task_runs(flow_run_filter=f)
- # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0})
- logger.info(f"run_tasks {run_tasks}")
- for task in run_tasks:
- res = task.state.result()
- logger.info(f"{task.name} {await res.get()}")
- from prefect.blocks.core import Block
- def localfilesystem_load():
- # print(os.environ["PREFECT_API_URL"])
- # get_result()
- local_file_system_block = LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect2"))
- logger.info(f"local_file_system_block._block_schema_capabilities {local_file_system_block._block_schema_capabilities}")
- logger.info(f"local_file_system_block._block_type_name {local_file_system_block._block_type_name}")
- STORAGE:Block = local_file_system_block.load('anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6')
- logger.info(f"STORAGE {STORAGE} {type(STORAGE)}")
- block_document, block_document_name = asyncio.run(STORAGE._get_block_document('anonymous-92ba6766-2461-4aa6-ba87-f101d0e127e6'))
- logger.info(f"{block_document} {type(block_document)}")
- logger.info(f"{block_document.data}")
- logger.info(f"{block_document_name} {type(block_document_name)}")
- logger.info(f"STORAGE.data {STORAGE.data}")
- logger.info(f"STORAGE._block_document_id {STORAGE._block_document_id}")
- # _block_type_id _block_schema_id _block_document_name _block_type_slug
- logger.info(f"STORAGE._block_type_id {STORAGE._block_type_id}")
- logger.info(f"STORAGE._block_schema_id {STORAGE._block_schema_id}")
- logger.info(f"STORAGE._block_document_name {STORAGE._block_document_name}")
- logger.info(f"STORAGE._block_type_slug {STORAGE._block_type_slug}")
- res = PersistedResult(type='reference', artifact_type=None, artifact_description=None, serializer_type='pickle', storage_block_id=UUID('cbca441d-038d-43c0-a531-94f126101911'), storage_key='auto-douyin_mega-potoo_hello.json')
-
- if __name__ == "__main__":
- local_file_system_block = LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect2"))
- doc = local_file_system_block._to_block_document(name="test",
- block_schema_id=uuid.uuid4(),
- block_type_id=uuid.uuid4(),
- is_anonymous=True
- )
- logger.info(f"{doc}")
- # pickle_data = b"gAAAAABmKiLNhU5fGgCSwcDE-BgYOTAgxQJEjr0DcZRY2oSosTVM9qJ7fMnK373-v3gMrMO0ix_AmEaYQ8lfStKo71FGMofF1hOVtojEuLUXnqEkkNWk0eXefxj35mvGBNQfIL_lzBzxFuSxTTmDkOzPBP57UfNP8Q=="
- # res = pickle.loads(pickle_data)
- # logger.info(f"{res}")
- # logger.info(f"STORAGE.get_block_capabilities {STORAGE.get_block_capabilities()}")
- # # get_block_class_from_key
- # # logger.info(f"STORAGE.get_block_class_from_key {STORAGE.get_block_class_from_key()}")
- # # get_block_class_from_schema
- # logger.info(f"STORAGE.get_block_class_from_schema {STORAGE.get_block_class_from_schema()}")
- # # get_block_placeholder
- # logger.info(f"STORAGE.get_block_placeholder {STORAGE.get_block_placeholder()}")
- # # get_description
- # logger.info(f"STORAGE.get_description {STORAGE.get_description()}")
- # asyncio.run(get_result_by_api())
- # new_flow = auto_douyin.with_options(result_storage=LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect")))
- # new_flow(msg="it's ok flow result")
|