import datetime import os import re import time import os import sys import asyncio import prefect.runtime sys.path.append(os.path.dirname(os.path.dirname(__file__))) from dp.page import page from douyin import base from conf.config import logger,OUTPUT from database.config import minio_block from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table from dataset import Table from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from DrissionPage._units.listener import DataPacket import jsonpath from prefect import flow,task from prefect.tasks import Task,TaskRun from prefect.flows import Flow from prefect.states import State,StateType from douyin.models import UnReadUserData,engine from douyin.flow import check_page, conversation,unread_msg from concurrent.futures import ThreadPoolExecutor from prefect.deployments import run_deployment from prefect import runtime from prefect import flows from prefect import get_client from prefect.flow_runs import wait_for_flow_run from prefect.client.schemas.objects import ( FlowRun, MinimalDeploymentSchedule, ) from prefect import flow, Runner ''' Deployments ┌───────────────────────────────┐ │ check-page-ok-flow/ai-yunying │ │ reply-to-user-flow/ai-yunying │ │ unread-msg-flow/ai-yunying │ │ stranger-msg-flow/ai-yunying │ └───────────────────────────────┘ ''' @flow(log_prints=False) def im_chat_flow(tab_id=None): tab = base.get_tab(tab_id) client = get_client() home_url:str='https://www.douyin.com/user/self' logger.info(f"start im_chat_flow {tab.tab_id} ") # timeout=0 代表立即返回不阻塞 # flow_run:FlowRun = run_deployment(name='check-page-ok-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=tab.tab_id),timeout=0) unread_items,stranger_txt = check_page.check_page_ok_flow(home_url,tab.tab_id) # flow_run = run_deployment('check-page-ok-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=tab.tab_id),timeout=0) # flow_run = asyncio.run(wait_for_flow_run(flow_run.id, log_states=True)) # logger.info(f"wait_for_flow_run {flow_run.state.result()} ") flows_run = [] if stranger_txt: new_tab = base.page.new_tab(home_url) # unread_msg.stranger_msg_flow(stranger_txt, new_tab.tab_id) # timeout=0 代表立即返回不阻塞 unread_msg.stranger_msg_flow flow_run = run_deployment('stranger-msg-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=new_tab.tab_id),timeout=0) flows_run.append(flow_run) if unread_items: # conversation.reply_to_user_flow(unread_items[-1], tab_id) conversation.reply_to_user_flow flow_run = run_deployment('reply-to-user-flow/ai-yunying', parameters=dict(unread_items[-1], tab_id),timeout=0) flows_run.append(flow_run) flow_run = asyncio.run(wait_for_flow_run(flow_run.id, log_states=True)) # unread_user_data = unread_msg.unread_msg_flow() logger.info(f"flow_run {flow_run} ") return True conversation.reply_to_user_flow(unread_user_data) from prefect.results import PersistedResult def main(): print(os.environ["PREFECT_API_URL"]) # auto_douyin() im_chat_flow(tab_id=page.tab_id) # im_chat_flow.serve('serv') # run_deployment(name='serv', ) @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def task_test(mytes): return mytes + 'ok' def test(): import prefect import json state = task_test(mytes='https://www.douyin.com/user/self', return_state=True) print(state.data.storage_key) print(state.data.json()) print(type(state.data)) res_dict = json.loads(state.data.json()) res = prefect.results.PersistedResult(**res_dict) print(res) if __name__ == "__main__": main() # test() # im_chat_flow.with_options(result_storage=os.path.join(OUTPUT, "prefect", "{flow_run.flow_name}.json")) # im_chat_flow()