check_page.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import datetime
  2. import os
  3. import re
  4. import time
  5. import os
  6. import sys
  7. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  8. sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
  9. from dp.page import page
  10. from douyin import base
  11. from conf.config import logger,OUTPUT
  12. from database.config import minio_block
  13. from dataset import Table
  14. from DrissionPage import ChromiumPage
  15. from DrissionPage._elements.chromium_element import ChromiumElement
  16. from DrissionPage._units.listener import DataPacket
  17. import jsonpath
  18. from prefect import flow,task
  19. from prefect.tasks import Task,TaskRun
  20. from prefect.flows import Flow
  21. from prefect.states import State,StateType
  22. from douyin import chat_test
  23. from douyin.models import UnReadUserData,UserInfoModel,user_table,unread_table
  24. # tab=page.tab
  25. @task
  26. def click_im_icon_ele():
  27. tab = base.get_tab()
  28. tab.scroll.to_top()
  29. ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]', timeout=5)
  30. logger.info(f"点击私信图标")
  31. ele_im.click()
  32. @task
  33. def get_im_icon_red_pot_ele(ele_im:ChromiumElement):
  34. ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
  35. # 存在私信小红点
  36. ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
  37. return ele_has_msg
  38. @task(description='检查是否在用户主页,如果不是则跳转到用户主页')
  39. def check_home_page(home_url):
  40. tab = base.get_tab()
  41. if "/user/self" not in tab.url:
  42. tab.get(home_url)
  43. @task(description='检查聊天框是否退出,没有退出则退出')
  44. def check_conversion_exit():
  45. tab = base.get_tab()
  46. tab.s_ele()
  47. ele_conversaton = tab._find_elements('xpath://div[@data-mask="conversaton-detail-content"]',timeout=0.1, raise_err=False)
  48. if ele_conversaton:
  49. ele_conversaton.ele('退出会话',timeout=0.1).click()
  50. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  51. def save_unread_user_data(data:UnReadUserData):
  52. return unread_table.insert(data)
  53. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  54. def get_im_item_user_data(msg_item_div:ChromiumElement):
  55. # 构建所需的数据字典
  56. data = UnReadUserData()
  57. name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
  58. if name:
  59. data.name = name.text
  60. # 定位头像
  61. avatar_ele = msg_item_div.s_ele("xpath://img")
  62. if avatar_ele:
  63. data.avator = avatar_ele.attr('src')
  64. msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
  65. if msg_content:
  66. data.msg = msg_content.text
  67. # 定位时间
  68. ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
  69. if ele_time:
  70. data.msg_time_txt = ele_time.text[3:]
  71. logger.info(f"{data}")
  72. return data
  73. @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
  74. def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
  75. logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
  76. # 定位私信聊天框,一个用户私信聊天框的完整元素
  77. msg_item_div = ele_msg_red_pot.parent()
  78. unread_user_data = get_im_item_user_data(msg_item_div)
  79. unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
  80. logger.info(f"unread_user_data {unread_user_data}")
  81. return unread_user_data
  82. '''
  83. @flow(log_prints=False)
  84. def unread_msg_flow():
  85. tab = base.get_tab()
  86. ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
  87. # 无论有没有消息,都点击顶部私信图标
  88. click_im_icon_ele(ele_im)
  89. # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
  90. unread_user_data = get_im_unread_item_user_data()
  91. # 记录未读消息到数据库
  92. unread_user_data = save_unread_user_data(unread_user_data)
  93. return unread_user_data
  94. '''
  95. @task(description=f'在弹出的私信框中,获取未读消息的元素')
  96. def get_listDlg_unread_msg(ele_list_dlg:ChromiumElement):
  97. # 所有未读小红点
  98. ele_msg_red_pots = ele_list_dlg._find_elements('xpath://div[@class="hcPUqxqn"]', index=None, raise_err=False)
  99. unread_items = []
  100. for ele_msg_red_pot in ele_msg_red_pots:
  101. msg_item_div = ele_msg_red_pot.parent()
  102. unread_user_data = get_im_item_user_data(msg_item_div)
  103. unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
  104. unread_items.append(unread_user_data)
  105. logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息: {unread_user_data}")
  106. ele_stranger = ele_list_dlg._find_elements('陌生人消息', raise_err=False)
  107. stranger_txt = ''
  108. if ele_stranger:
  109. stranger_txt = '陌生人消息'
  110. logger.info(f"{stranger_txt}")
  111. return unread_items,stranger_txt
  112. from prefect.tasks import Task,TaskRun
  113. from prefect.flows import Flow,FlowRun
  114. def check_page_flow_on_failure(flow: Flow, flow_run: FlowRun, state: State, **kwargs):
  115. # logger.info(f"{flow}")
  116. # logger.info(f"{flow_run.id}")
  117. res = base.save_page_info(f"check_page_flow_on_failure-{flow_run.id}", tab_id=flow_run.parameters['tab_id'])
  118. logger.info(f"{res}")
  119. @flow(log_prints=False, on_failure=[check_page_flow_on_failure],persist_result=True, result_storage=minio_block)
  120. def check_page_ok_flow(home_url:str, tab_id=None):
  121. tab = base.get_tab(tab_id)
  122. check_home_page(home_url)
  123. check_conversion_exit()
  124. # 无论有没有消息,都点击顶部私信图标
  125. click_im_icon_ele()
  126. mg_test = chat_test.Chat()
  127. # mg_test.send_msg("你好")
  128. ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]',timeout=3)
  129. unread_items,stranger_txt = get_listDlg_unread_msg(ele_list_dlg)
  130. logger.info(f"{unread_items,stranger_txt}")
  131. return unread_items,stranger_txt
  132. def main():
  133. # ele_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
  134. # logger.info(f"{ele_dlg.text}")
  135. # check_page_ok_flow.serve(name='serv')
  136. check_page_ok_flow('1')
  137. # stra = tab._find_elements('xpath:/html/body/div[2]/div/div[3]/div/div/header/div/div/div[2]/div/pace-island/div/ul[2]/div/li/div/div/div[3]/div/div/div/div/div[2]/div/div[2]/div/div/div[2]/div/div')
  138. # logger.info(f"stra {stra.text}")
  139. # S3Object(path='runtime/check_msg-stranger-msg.html').put(tab.html)
  140. # f = open(r'I:\code\ai-yunying\live-online-people\output\page\点击陌生人消息.html', 'w')
  141. # f.write(tab.html)
  142. if __name__ == "__main__":
  143. main()