unread_msg.py 3.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import datetime
  2. import os
  3. import re
  4. import time
  5. import os
  6. import sys
  7. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  8. from dp.page import page
  9. from douyin import base
  10. from conf.config import logger,OUTPUT
  11. from database.config import minio_block
  12. from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table
  13. from dataset import Table
  14. from DrissionPage import ChromiumPage
  15. from DrissionPage._elements.chromium_element import ChromiumElement
  16. from DrissionPage._units.listener import DataPacket
  17. import jsonpath
  18. from prefect import flow,task
  19. from prefect.tasks import Task,TaskRun
  20. from prefect.flows import Flow
  21. from prefect.states import State,StateType
  22. from douyin.models import UnReadUserData,engine
  23. from douyin.flow import check_msg, conversation
  24. tab=page.tab
  25. # 点击私信图标后,获取未读消息小圆点
  26. @task
  27. def get_ele_msg_red_pot(ele_im:ChromiumElement):
  28. ele_im.click()
  29. logger.info(f"点击私信图标后,获取未读消息小圆点")
  30. ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
  31. ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]')
  32. return ele_list_dlg,ele_msg_red_pot
  33. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  34. def save_unread_user_data(data:UnReadUserData):
  35. return unread_table.insert(data)
  36. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  37. def get_im_item_user_data(msg_item_div:ChromiumElement):
  38. # 构建所需的数据字典
  39. data = UnReadUserData()
  40. name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
  41. if name:
  42. data.name = name.text
  43. # 定位头像
  44. avatar_ele = msg_item_div.s_ele("xpath://img")
  45. if avatar_ele:
  46. data.avator = avatar_ele.attr('src')
  47. msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
  48. if msg_content:
  49. data.msg = msg_content.text
  50. # 定位时间
  51. ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
  52. if ele_time:
  53. data.msg_time_txt = ele_time.text[3:]
  54. logger.info(f"{data}")
  55. return data
  56. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  57. def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
  58. logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
  59. # 定位私信聊天框,一个用户私信聊天框的完整元素
  60. msg_item_div = ele_msg_red_pot.parent()
  61. unread_user_data = get_im_item_user_data(msg_item_div)
  62. unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
  63. logger.info(f"unread_user_data {unread_user_data}")
  64. return unread_user_data
  65. @flow(log_prints=False)
  66. def unread_msg_flow():
  67. ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
  68. # 私信弹框的红色小红点
  69. ele_list_dlg,ele_pop_red_pot = get_ele_msg_red_pot(ele_im)
  70. # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
  71. unread_user_data = get_im_unread_item_user_data(ele_pop_red_pot)
  72. # 记录未读消息到数据库
  73. unread_user_data = save_unread_user_data(unread_user_data)
  74. return unread_user_data