| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import datetime
- import os
- import re
- import time
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
- from dp.page import page
- from douyin import base
- from conf.config import logger,OUTPUT
- from database.config import minio_block
- from dataset import Table
- from DrissionPage import ChromiumPage
- from DrissionPage._elements.chromium_element import ChromiumElement
- from DrissionPage._units.listener import DataPacket
- import jsonpath
- from prefect import flow,task
- from prefect.tasks import Task,TaskRun
- from prefect.flows import Flow
- from prefect.states import State,StateType
- from douyin import chat_test
- from douyin.models import UnReadUserData,UserInfoModel,user_table,unread_table
- # tab=page.tab
-
- @task
- def click_im_icon_ele():
- tab = base.get_tab()
- tab.scroll.to_top()
- ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]', timeout=5)
- logger.info(f"点击私信图标")
- ele_im.click()
- @task
- def get_im_icon_red_pot_ele(ele_im:ChromiumElement):
- ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
- # 存在私信小红点
- ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
- return ele_has_msg
- @task(description='检查是否在用户主页,如果不是则跳转到用户主页')
- def check_home_page(home_url):
- tab = base.get_tab()
- if "/user/self" not in tab.url:
- tab.get(home_url)
- @task(description='检查聊天框是否退出,没有退出则退出')
- def check_conversion_exit():
- tab = base.get_tab()
- tab.s_ele()
- ele_conversaton = tab._find_elements('xpath://div[@data-mask="conversaton-detail-content"]',timeout=0.1, raise_err=False)
- if ele_conversaton:
- ele_conversaton.ele('退出会话',timeout=0.1).click()
- @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
- def save_unread_user_data(data:UnReadUserData):
- return unread_table.insert(data)
- @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
- def get_im_item_user_data(msg_item_div:ChromiumElement):
- # 构建所需的数据字典
- data = UnReadUserData()
- name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
- if name:
- data.name = name.text
- # 定位头像
- avatar_ele = msg_item_div.s_ele("xpath://img")
- if avatar_ele:
- data.avator = avatar_ele.attr('src')
-
- msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
- if msg_content:
- data.msg = msg_content.text
- # 定位时间
- ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
- if ele_time:
- data.msg_time_txt = ele_time.text[3:]
- logger.info(f"{data}")
- return data
-
-
- @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
- def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
- logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
- # 定位私信聊天框,一个用户私信聊天框的完整元素
- msg_item_div = ele_msg_red_pot.parent()
- unread_user_data = get_im_item_user_data(msg_item_div)
- unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
- logger.info(f"unread_user_data {unread_user_data}")
- return unread_user_data
- '''
- @flow(log_prints=False)
- def unread_msg_flow():
- tab = base.get_tab()
- ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
- # 无论有没有消息,都点击顶部私信图标
- click_im_icon_ele(ele_im)
- # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
- unread_user_data = get_im_unread_item_user_data()
- # 记录未读消息到数据库
- unread_user_data = save_unread_user_data(unread_user_data)
- return unread_user_data
- '''
- @task(description=f'在弹出的私信框中,获取未读消息的元素')
- def get_listDlg_unread_msg(ele_list_dlg:ChromiumElement):
- # 所有未读小红点
- ele_msg_red_pots = ele_list_dlg._find_elements('xpath://div[@class="hcPUqxqn"]', index=None, raise_err=False)
- unread_items = []
- for ele_msg_red_pot in ele_msg_red_pots:
- msg_item_div = ele_msg_red_pot.parent()
- unread_user_data = get_im_item_user_data(msg_item_div)
- unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
- unread_items.append(unread_user_data)
- logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息: {unread_user_data}")
- ele_stranger = ele_list_dlg._find_elements('陌生人消息', raise_err=False)
- stranger_txt = ''
- if ele_stranger:
- stranger_txt = '陌生人消息'
- logger.info(f"{stranger_txt}")
- return unread_items,stranger_txt
- from prefect.tasks import Task,TaskRun
- from prefect.flows import Flow,FlowRun
- def check_page_flow_on_failure(flow: Flow, flow_run: FlowRun, state: State, **kwargs):
- # logger.info(f"{flow}")
- # logger.info(f"{flow_run.id}")
- res = base.save_page_info(f"check_page_flow_on_failure-{flow_run.id}", tab_id=flow_run.parameters['tab_id'])
- logger.info(f"{res}")
- @flow(log_prints=False, on_failure=[check_page_flow_on_failure],persist_result=True, result_storage=minio_block)
- def check_page_ok_flow(home_url:str, tab_id=None):
- tab = base.get_tab(tab_id)
- check_home_page(home_url)
- check_conversion_exit()
- # 无论有没有消息,都点击顶部私信图标
- click_im_icon_ele()
- mg_test = chat_test.Chat()
- # mg_test.send_msg("你好")
- ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]',timeout=3)
- unread_items,stranger_txt = get_listDlg_unread_msg(ele_list_dlg)
- logger.info(f"{unread_items,stranger_txt}")
- return unread_items,stranger_txt
- def main():
- # ele_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
- # logger.info(f"{ele_dlg.text}")
- # check_page_ok_flow.serve(name='serv')
- check_page_ok_flow('1')
- # stra = tab._find_elements('xpath:/html/body/div[2]/div/div[3]/div/div/header/div/div/div[2]/div/pace-island/div/ul[2]/div/li/div/div/div[3]/div/div/div/div/div[2]/div/div[2]/div/div/div[2]/div/div')
- # logger.info(f"stra {stra.text}")
- # S3Object(path='runtime/check_msg-stranger-msg.html').put(tab.html)
- # f = open(r'I:\code\ai-yunying\live-online-people\output\page\点击陌生人消息.html', 'w')
- # f.write(tab.html)
- if __name__ == "__main__":
- main()
|