| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import datetime
- import os
- import re
- import time
- import os
- import sys
- import asyncio
- import prefect.runtime
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from dp.page import page
- from douyin import base
- from conf.config import logger,OUTPUT
- from database.config import minio_block
- from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table
- from dataset import Table
- from DrissionPage import ChromiumPage
- from DrissionPage._elements.chromium_element import ChromiumElement
- from DrissionPage._units.listener import DataPacket
- import jsonpath
- from prefect import flow,task
- from prefect.tasks import Task,TaskRun
- from prefect.flows import Flow
- from prefect.states import State,StateType
- from douyin.models import UnReadUserData,engine
- from douyin.flow import check_page, conversation,unread_msg
- from concurrent.futures import ThreadPoolExecutor
- from prefect.deployments import run_deployment
- from prefect import runtime
- from prefect import flows
- from prefect import get_client
- from prefect.flow_runs import wait_for_flow_run
- from prefect.client.schemas.objects import (
- FlowRun,
- MinimalDeploymentSchedule,
- )
- from prefect import flow, Runner
- '''
- Deployments
- ┌───────────────────────────────┐
- │ check-page-ok-flow/ai-yunying │
- │ reply-to-user-flow/ai-yunying │
- │ unread-msg-flow/ai-yunying │
- │ stranger-msg-flow/ai-yunying │
- └───────────────────────────────┘
- '''
- @flow(log_prints=False)
- def im_chat_flow(tab_id=None):
- tab = base.get_tab(tab_id)
- client = get_client()
- home_url:str='https://www.douyin.com/user/self'
- logger.info(f"start im_chat_flow {tab.tab_id} ")
- # timeout=0 代表立即返回不阻塞
- # flow_run:FlowRun = run_deployment(name='check-page-ok-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=tab.tab_id),timeout=0)
- unread_items,stranger_txt = check_page.check_page_ok_flow(home_url,tab.tab_id)
-
- # flow_run = run_deployment('check-page-ok-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=tab.tab_id),timeout=0)
- # flow_run = asyncio.run(wait_for_flow_run(flow_run.id, log_states=True))
- # logger.info(f"wait_for_flow_run {flow_run.state.result()} ")
- flows_run = []
- if stranger_txt:
- new_tab = base.page.new_tab(home_url)
- # unread_msg.stranger_msg_flow(stranger_txt, new_tab.tab_id)
- # timeout=0 代表立即返回不阻塞
- unread_msg.stranger_msg_flow
- flow_run = run_deployment('stranger-msg-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=new_tab.tab_id),timeout=0)
- flows_run.append(flow_run)
- if unread_items:
- # conversation.reply_to_user_flow(unread_items[-1], tab_id)
- conversation.reply_to_user_flow
- flow_run = run_deployment('reply-to-user-flow/ai-yunying', parameters=dict(unread_items[-1], tab_id),timeout=0)
- flows_run.append(flow_run)
- flow_run = asyncio.run(wait_for_flow_run(flow_run.id, log_states=True))
- # unread_user_data = unread_msg.unread_msg_flow()
- logger.info(f"flow_run {flow_run} ")
- return True
- conversation.reply_to_user_flow(unread_user_data)
- from prefect.results import PersistedResult
- def main():
- print(os.environ["PREFECT_API_URL"])
- # auto_douyin()
- im_chat_flow(tab_id=page.tab_id)
- # im_chat_flow.serve('serv')
- # run_deployment(name='serv', )
- @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
- def task_test(mytes):
- return mytes + 'ok'
- def test():
- import prefect
- import json
- state = task_test(mytes='https://www.douyin.com/user/self', return_state=True)
- print(state.data.storage_key)
- print(state.data.json())
- print(type(state.data))
- res_dict = json.loads(state.data.json())
- res = prefect.results.PersistedResult(**res_dict)
- print(res)
- if __name__ == "__main__":
- main()
- # test()
- # im_chat_flow.with_options(result_storage=os.path.join(OUTPUT, "prefect", "{flow_run.flow_name}.json"))
- # im_chat_flow()
|