chat_flow.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import datetime
  2. import os
  3. import re
  4. import time
  5. import os
  6. import sys
  7. import asyncio
  8. import prefect.runtime
  9. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  10. from dp.page import page
  11. from douyin import base
  12. from conf.config import logger,OUTPUT
  13. from database.config import minio_block
  14. from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table
  15. from dataset import Table
  16. from DrissionPage import ChromiumPage
  17. from DrissionPage._elements.chromium_element import ChromiumElement
  18. from DrissionPage._units.listener import DataPacket
  19. import jsonpath
  20. from prefect import flow,task
  21. from prefect.tasks import Task,TaskRun
  22. from prefect.flows import Flow
  23. from prefect.states import State,StateType
  24. from douyin.models import UnReadUserData,engine
  25. from douyin.flow import check_page, conversation,unread_msg
  26. from concurrent.futures import ThreadPoolExecutor
  27. from prefect.deployments import run_deployment
  28. from prefect import runtime
  29. from prefect import flows
  30. from prefect import get_client
  31. from prefect.flow_runs import wait_for_flow_run
  32. from prefect.client.schemas.objects import (
  33. FlowRun,
  34. MinimalDeploymentSchedule,
  35. )
  36. from prefect import flow, Runner
  37. '''
  38. Deployments
  39. ┌───────────────────────────────┐
  40. │ check-page-ok-flow/ai-yunying │
  41. │ reply-to-user-flow/ai-yunying │
  42. │ unread-msg-flow/ai-yunying │
  43. │ stranger-msg-flow/ai-yunying │
  44. └───────────────────────────────┘
  45. '''
  46. @flow(log_prints=False)
  47. def im_chat_flow(tab_id=None):
  48. tab = base.get_tab(tab_id)
  49. client = get_client()
  50. home_url:str='https://www.douyin.com/user/self'
  51. logger.info(f"start im_chat_flow {tab.tab_id} ")
  52. # timeout=0 代表立即返回不阻塞
  53. # flow_run:FlowRun = run_deployment(name='check-page-ok-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=tab.tab_id),timeout=0)
  54. unread_items,stranger_txt = check_page.check_page_ok_flow(home_url,tab.tab_id)
  55. # flow_run = run_deployment('check-page-ok-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=tab.tab_id),timeout=0)
  56. # flow_run = asyncio.run(wait_for_flow_run(flow_run.id, log_states=True))
  57. # logger.info(f"wait_for_flow_run {flow_run.state.result()} ")
  58. flows_run = []
  59. if stranger_txt:
  60. new_tab = base.page.new_tab(home_url)
  61. # unread_msg.stranger_msg_flow(stranger_txt, new_tab.tab_id)
  62. # timeout=0 代表立即返回不阻塞
  63. unread_msg.stranger_msg_flow
  64. flow_run = run_deployment('stranger-msg-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=new_tab.tab_id),timeout=0)
  65. flows_run.append(flow_run)
  66. if unread_items:
  67. # conversation.reply_to_user_flow(unread_items[-1], tab_id)
  68. conversation.reply_to_user_flow
  69. flow_run = run_deployment('reply-to-user-flow/ai-yunying', parameters=dict(unread_items[-1], tab_id),timeout=0)
  70. flows_run.append(flow_run)
  71. flow_run = asyncio.run(wait_for_flow_run(flow_run.id, log_states=True))
  72. # unread_user_data = unread_msg.unread_msg_flow()
  73. logger.info(f"flow_run {flow_run} ")
  74. return True
  75. conversation.reply_to_user_flow(unread_user_data)
  76. from prefect.results import PersistedResult
  77. def main():
  78. print(os.environ["PREFECT_API_URL"])
  79. # auto_douyin()
  80. im_chat_flow(tab_id=page.tab_id)
  81. # im_chat_flow.serve('serv')
  82. # run_deployment(name='serv', )
  83. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  84. def task_test(mytes):
  85. return mytes + 'ok'
  86. def test():
  87. import prefect
  88. import json
  89. state = task_test(mytes='https://www.douyin.com/user/self', return_state=True)
  90. print(state.data.storage_key)
  91. print(state.data.json())
  92. print(type(state.data))
  93. res_dict = json.loads(state.data.json())
  94. res = prefect.results.PersistedResult(**res_dict)
  95. print(res)
  96. if __name__ == "__main__":
  97. main()
  98. # test()
  99. # im_chat_flow.with_options(result_storage=os.path.join(OUTPUT, "prefect", "{flow_run.flow_name}.json"))
  100. # im_chat_flow()