import datetime import os import re import time import os import sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from dp.page import page from douyin import base from conf.config import logger,OUTPUT from database.config import minio_block from dataset import Table from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from DrissionPage._units.listener import DataPacket import jsonpath from prefect import flow,task from prefect.tasks import Task,TaskRun from prefect.flows import Flow from prefect.states import State,StateType from douyin import chat_test from douyin.models import UnReadUserData,UserInfoModel,user_table,unread_table # tab=page.tab @task def click_im_icon_ele(): tab = base.get_tab() tab.scroll.to_top() ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]', timeout=5) logger.info(f"点击私信图标") ele_im.click() @task def get_im_icon_red_pot_ele(ele_im:ChromiumElement): ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False) # 存在私信小红点 ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False) return ele_has_msg @task(description='检查是否在用户主页,如果不是则跳转到用户主页') def check_home_page(home_url): tab = base.get_tab() if "/user/self" not in tab.url: tab.get(home_url) @task(description='检查聊天框是否退出,没有退出则退出') def check_conversion_exit(): tab = base.get_tab() tab.s_ele() ele_conversaton = tab._find_elements('xpath://div[@data-mask="conversaton-detail-content"]',timeout=0.1, raise_err=False) if ele_conversaton: ele_conversaton.ele('退出会话',timeout=0.1).click() @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def save_unread_user_data(data:UnReadUserData): return unread_table.insert(data) @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_im_item_user_data(msg_item_div:ChromiumElement): # 构建所需的数据字典 data = UnReadUserData() name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']") if name: data.name = name.text # 定位头像 avatar_ele = msg_item_div.s_ele("xpath://img") if avatar_ele: data.avator = avatar_ele.attr('src') msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]') if msg_content: data.msg = msg_content.text # 定位时间 ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]') if ele_time: data.msg_time_txt = ele_time.text[3:] logger.info(f"{data}") return data @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement): logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息") # 定位私信聊天框,一个用户私信聊天框的完整元素 msg_item_div = ele_msg_red_pot.parent() unread_user_data = get_im_item_user_data(msg_item_div) unread_user_data.unread_msg_count = int(ele_msg_red_pot.text) logger.info(f"unread_user_data {unread_user_data}") return unread_user_data ''' @flow(log_prints=False) def unread_msg_flow(): tab = base.get_tab() ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]') # 无论有没有消息,都点击顶部私信图标 click_im_icon_ele(ele_im) # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time()) unread_user_data = get_im_unread_item_user_data() # 记录未读消息到数据库 unread_user_data = save_unread_user_data(unread_user_data) return unread_user_data ''' @task(description=f'在弹出的私信框中,获取未读消息的元素') def get_listDlg_unread_msg(ele_list_dlg:ChromiumElement): # 所有未读小红点 ele_msg_red_pots = ele_list_dlg._find_elements('xpath://div[@class="hcPUqxqn"]', index=None, raise_err=False) unread_items = [] for ele_msg_red_pot in ele_msg_red_pots: msg_item_div = ele_msg_red_pot.parent() unread_user_data = get_im_item_user_data(msg_item_div) unread_user_data.unread_msg_count = int(ele_msg_red_pot.text) unread_items.append(unread_user_data) logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息: {unread_user_data}") ele_stranger = ele_list_dlg._find_elements('陌生人消息', raise_err=False) stranger_txt = '' if ele_stranger: stranger_txt = '陌生人消息' logger.info(f"{stranger_txt}") return unread_items,stranger_txt from prefect.tasks import Task,TaskRun from prefect.flows import Flow,FlowRun def check_page_flow_on_failure(flow: Flow, flow_run: FlowRun, state: State, **kwargs): # logger.info(f"{flow}") # logger.info(f"{flow_run.id}") res = base.save_page_info(f"check_page_flow_on_failure-{flow_run.id}", tab_id=flow_run.parameters['tab_id']) logger.info(f"{res}") @flow(log_prints=False, on_failure=[check_page_flow_on_failure],persist_result=True, result_storage=minio_block) def check_page_ok_flow(home_url:str, tab_id=None): tab = base.get_tab(tab_id) check_home_page(home_url) check_conversion_exit() # 无论有没有消息,都点击顶部私信图标 click_im_icon_ele() mg_test = chat_test.Chat() # mg_test.send_msg("你好") ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]',timeout=3) unread_items,stranger_txt = get_listDlg_unread_msg(ele_list_dlg) logger.info(f"{unread_items,stranger_txt}") return unread_items,stranger_txt def main(): # ele_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]') # logger.info(f"{ele_dlg.text}") # check_page_ok_flow.serve(name='serv') check_page_ok_flow('1') # stra = tab._find_elements('xpath:/html/body/div[2]/div/div[3]/div/div/header/div/div/div[2]/div/pace-island/div/ul[2]/div/li/div/div/div[3]/div/div/div/div/div[2]/div/div[2]/div/div/div[2]/div/div') # logger.info(f"stra {stra.text}") # S3Object(path='runtime/check_msg-stranger-msg.html').put(tab.html) # f = open(r'I:\code\ai-yunying\live-online-people\output\page\点击陌生人消息.html', 'w') # f.write(tab.html) if __name__ == "__main__": main()