chat_flow.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import datetime
  2. import os
  3. import re
  4. import time
  5. import os
  6. import sys
  7. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  8. from dp.page import page
  9. from douyin import chat_test,base,conversation,user
  10. from conf.config import logger,OUTPUT
  11. from database.config import ai_yunying_db,minio_block
  12. from dataset import Table
  13. from DrissionPage import ChromiumPage
  14. from DrissionPage._elements.chromium_element import ChromiumElement
  15. from DrissionPage._units.listener import DataPacket
  16. import jsonpath
  17. from prefect import flow,task
  18. from prefect.tasks import Task,TaskRun
  19. from prefect.flows import Flow
  20. from prefect.states import State,StateType
  21. tab=page.tab
  22. # 点击私信图标后,获取未读消息小圆点
  23. @task
  24. def get_ele_msg_red_pot(ele_im:ChromiumElement):
  25. ele_im.click()
  26. logger.info(f"点击私信图标后,获取未读消息小圆点")
  27. ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
  28. ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]')
  29. return ele_list_dlg,ele_msg_red_pot
  30. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  31. def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
  32. logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
  33. # 构建所需的数据字典
  34. data = {
  35. "name": '',
  36. "avator": '',
  37. "msg": None,
  38. "unread_msg_count":None,
  39. "time": ''
  40. }
  41. data["unread_msg_count"] = int(ele_msg_red_pot.text)
  42. # 定位私信聊天框,一个用户私信聊天框的完整元素
  43. msg_item_div = ele_msg_red_pot.parent()
  44. name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
  45. if name:
  46. data["name"] = name.text
  47. # 定位头像
  48. avatar_ele = msg_item_div.s_ele("xpath://img")
  49. if avatar_ele:
  50. data["avator"] = avatar_ele.attr('src')
  51. msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
  52. if msg_content:
  53. data["msg"] = msg_content.text
  54. # 定位时间
  55. ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
  56. if ele_time:
  57. data["time"] = ele_time.text
  58. table:Table = ai_yunying_db['chat_task']
  59. data['is_done'] = False
  60. data['create_time'] = datetime.datetime.now()
  61. id = table.insert(data)
  62. unread_user_data = table.find_one(id=id)
  63. logger.info(f"{unread_user_data}")
  64. return unread_user_data
  65. @task
  66. def click_im_icon(ele_im:ChromiumElement):
  67. return ele_im.click()
  68. @flow
  69. def response_im(tab:ChromiumPage, ele_im:ChromiumElement):
  70. click_im_icon(ele_im)
  71. ele_msg_red_pot = get_ele_msg_red_pot()
  72. data,msg_item_div = get_im_unread_item_data(ele_msg_red_pot)
  73. @task
  74. def check_home_page(home_url:str='https://www.douyin.com/user/self'):
  75. if "/user/self" not in tab.url:
  76. tab.get(home_url)
  77. tab.scroll.to_top()
  78. @task
  79. def get_im_icon_ele():
  80. ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
  81. if ele_im:
  82. return ele_im
  83. @task
  84. def get_im_icon_red_pot_ele(ele_im:ChromiumElement):
  85. ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
  86. # 存在私信小红点
  87. ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
  88. return ele_has_msg
  89. @flow
  90. def check_has_im_msg_flow(home_url:str):
  91. check_home_page(home_url)
  92. ele_im = get_im_icon_ele()
  93. ele_has_msg = get_im_icon_red_pot_ele(ele_im)
  94. return ele_im, ele_has_msg
  95. @flow(log_prints=False)
  96. def im_chat_flow():
  97. home_url:str='https://www.douyin.com/user/self'
  98. print(f"home_url:{home_url}")
  99. ele_im, ele_has_msg = check_has_im_msg_flow(home_url)
  100. # 私信弹框的红色小红点
  101. ele_list_dlg,ele_pop_red_pot = get_ele_msg_red_pot(ele_im)
  102. # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
  103. unread_user_data = get_im_unread_item_user_data(ele_pop_red_pot)
  104. user_profile_packet, conversation_detail = conversation.enter_conversation(unread_user_data, ele_list_dlg)
  105. chat_history = conversation.get_conversations_history(unread_user_data, conversation_detail)
  106. chat_history = conversation.save_unread_msg_to_db(chat_history, unread_user_data['unread_msg_count'])
  107. reply_res = conversation.reply_to_user(chat_history, unread_user_data)
  108. #
  109. import chat_test
  110. def main():
  111. im_chat_flow(tab=tab)
  112. if __name__ == "__main__":
  113. print(os.environ["PREFECT_API_URL"])
  114. mg_test = chat_test.Chat()
  115. mg_test.send_msg("你好")
  116. # auto_douyin()
  117. im_chat_flow()
  118. # im_chat_flow.with_options(result_storage=os.path.join(OUTPUT, "prefect", "{flow_run.flow_name}.json"))
  119. # im_chat_flow()