unread_msg.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. import datetime
  2. import os
  3. import re
  4. import time
  5. import os
  6. import sys
  7. sys.path.append('.')
  8. from dp.page import page
  9. from douyin import base
  10. from conf.config import logger,OUTPUT
  11. from database.config import minio_block
  12. from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table
  13. from dataset import Table
  14. from DrissionPage import ChromiumPage
  15. from DrissionPage._elements.chromium_element import ChromiumElement
  16. from DrissionPage._units.listener import DataPacket
  17. from prefect import flow,task
  18. from prefect.tasks import Task,TaskRun
  19. from prefect.flows import Flow
  20. from prefect.states import State,StateType
  21. from douyin.models import UnReadUserData,engine
  22. from douyin.flow import check_page, conversation
  23. # 点击私信图标后,获取未读消息小圆点
  24. @task
  25. def get_ele_msg_red_pot(ele_im:ChromiumElement, tab_id=None):
  26. tab = base.get_tab(tab_id)
  27. ele_im.click()
  28. logger.info(f"点击私信图标后,获取未读消息小圆点")
  29. ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
  30. ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]')
  31. return ele_list_dlg,ele_msg_red_pot
  32. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  33. def save_unread_user_data(data:UnReadUserData):
  34. return unread_table.insert(data)
  35. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  36. def get_im_item_user_data(msg_item_div:ChromiumElement):
  37. # 构建所需的数据字典
  38. data = UnReadUserData()
  39. name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
  40. if name:
  41. data.name = name.text
  42. # 定位头像
  43. avatar_ele = msg_item_div.s_ele("xpath://img")
  44. if avatar_ele:
  45. data.avator = avatar_ele.attr('src')
  46. msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
  47. if msg_content:
  48. data.msg = msg_content.text
  49. # 定位时间
  50. ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
  51. if ele_time:
  52. data.msg_time_txt = ele_time.text[3:]
  53. logger.info(f"{data}")
  54. return data
  55. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  56. def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
  57. logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
  58. # 定位私信聊天框,一个用户私信聊天框的完整元素
  59. msg_item_div = ele_msg_red_pot.parent()
  60. unread_user_data = get_im_item_user_data(msg_item_div)
  61. unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
  62. logger.info(f"unread_user_data {unread_user_data}")
  63. return unread_user_data
  64. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  65. def get_stranger_unread_data(stranger_txt='陌生人消息', tab_id=None) -> UnReadUserData:
  66. tab = base.get_tab(tab_id)
  67. # 只有出现陌生人消息时,才会看到请求 https://www.douyin.com/aweme/v1/web/im/user/info
  68. tab.listen.start('im/user/info')
  69. ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]',timeout=3)
  70. ele_list_dlg.ele(stranger_txt).click()
  71. for packet in tab.listen.steps(1, timeout=3):
  72. if 'im/user/info' in packet.url:
  73. stranger_info_packet = packet
  74. data_list = stranger_info_packet.response.body.get('data')
  75. logger.info(f"unread_stranger_data {data_list[-1]}")
  76. unread_stranger_data = UnReadUserData(name=data_list[-1].get('nickname'), avator=data_list[-1].get("avatar_thumb").get("uri"))
  77. logger.info(f"unread_stranger_data {unread_stranger_data}")
  78. return unread_stranger_data
  79. import DrissionPage
  80. # 乍一看这个流好像有点多余,只有简单几条任务,为何不在主流中直接写呢?
  81. # 因为要保证并发运行流,并且这个流可能要用新的 tab 运行,在主流中会阻塞无法并发
  82. @flow(log_prints=False)
  83. def stranger_msg_flow(home_url, tab_id=None):
  84. # tab = base.get_tab(tab_id)
  85. logger.info(f"tab_id {tab_id}")
  86. unread_items,stranger_txt = check_page.check_page_ok_flow(home_url, tab_id)
  87. unread_stranger_data = get_stranger_unread_data(stranger_txt, tab_id)
  88. conversation.reply_to_user_flow(unread_stranger_data, tab_id)
  89. from concurrent.futures import ThreadPoolExecutor, as_completed
  90. @flow(log_prints=False)
  91. def unread_msg_flow(name, tab_id=None):
  92. # ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
  93. # ele_list_dlg_stranger= ele_list_dlg.ele(name)
  94. # ele_list_dlg_stranger.click()
  95. # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
  96. unread_user_data = conversation.reply_to_user_flow(name,tab)
  97. # 记录未读消息到数据库
  98. unread_user_data = save_unread_user_data(unread_user_data)
  99. return unread_user_data
  100. def main():
  101. home_url:str='https://www.douyin.com/user/self'
  102. # tab = page.new_tab()
  103. stranger_msg_flow(home_url, 'A017888A62FE53FAD9D85ED2662FEA34')
  104. if __name__ == "__main__":
  105. main()