import datetime import os import re import time import os import sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) from dp.page import page from douyin import base from conf.config import logger,OUTPUT from database.config import minio_block from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table from dataset import Table from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from DrissionPage._units.listener import DataPacket import jsonpath from prefect import flow,task from prefect.tasks import Task,TaskRun from prefect.flows import Flow from prefect.states import State,StateType from douyin.models import UnReadUserData,engine from douyin.flow import check_msg, conversation tab=page.tab # 点击私信图标后,获取未读消息小圆点 @task def get_ele_msg_red_pot(ele_im:ChromiumElement): ele_im.click() logger.info(f"点击私信图标后,获取未读消息小圆点") ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]') ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]') return ele_list_dlg,ele_msg_red_pot @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def save_unread_user_data(data:UnReadUserData): return unread_table.insert(data) @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_im_item_user_data(msg_item_div:ChromiumElement): # 构建所需的数据字典 data = UnReadUserData() name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']") if name: data.name = name.text # 定位头像 avatar_ele = msg_item_div.s_ele("xpath://img") if avatar_ele: data.avator = avatar_ele.attr('src') msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]') if msg_content: data.msg = msg_content.text # 定位时间 ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]') if ele_time: data.msg_time_txt = ele_time.text[3:] logger.info(f"{data}") return data @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement): logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息") # 定位私信聊天框,一个用户私信聊天框的完整元素 msg_item_div = ele_msg_red_pot.parent() unread_user_data = get_im_item_user_data(msg_item_div) unread_user_data.unread_msg_count = int(ele_msg_red_pot.text) logger.info(f"unread_user_data {unread_user_data}") return unread_user_data @flow(log_prints=False) def unread_msg_flow(): ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]') # 私信弹框的红色小红点 ele_list_dlg,ele_pop_red_pot = get_ele_msg_red_pot(ele_im) # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time()) unread_user_data = get_im_unread_item_user_data(ele_pop_red_pot) # 记录未读消息到数据库 unread_user_data = save_unread_user_data(unread_user_data) return unread_user_data