| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- import datetime
- import os
- import re
- import time
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from dp.page import page
- from douyin import base
- from conf.config import logger,OUTPUT
- from database.config import minio_block
- from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table
- from dataset import Table
- from DrissionPage import ChromiumPage
- from DrissionPage._elements.chromium_element import ChromiumElement
- from DrissionPage._units.listener import DataPacket
- import jsonpath
- from prefect import flow,task
- from prefect.tasks import Task,TaskRun
- from prefect.flows import Flow
- from prefect.states import State,StateType
- from douyin.models import UnReadUserData,engine
- from douyin.flow import check_msg, conversation
- tab=page.tab
- # 点击私信图标后,获取未读消息小圆点
- @task
- def get_ele_msg_red_pot(ele_im:ChromiumElement):
- ele_im.click()
- logger.info(f"点击私信图标后,获取未读消息小圆点")
- ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
- ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]')
- return ele_list_dlg,ele_msg_red_pot
- @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
- def save_unread_user_data(data:UnReadUserData):
- return unread_table.insert(data)
- @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
- def get_im_item_user_data(msg_item_div:ChromiumElement):
- # 构建所需的数据字典
- data = UnReadUserData()
- name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
- if name:
- data.name = name.text
- # 定位头像
- avatar_ele = msg_item_div.s_ele("xpath://img")
- if avatar_ele:
- data.avator = avatar_ele.attr('src')
-
- msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
- if msg_content:
- data.msg = msg_content.text
- # 定位时间
- ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
- if ele_time:
- data.msg_time_txt = ele_time.text[3:]
- logger.info(f"{data}")
- return data
-
-
- @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
- def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
- logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
- # 定位私信聊天框,一个用户私信聊天框的完整元素
- msg_item_div = ele_msg_red_pot.parent()
- unread_user_data = get_im_item_user_data(msg_item_div)
- unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
- logger.info(f"unread_user_data {unread_user_data}")
- return unread_user_data
- @flow(log_prints=False)
- def unread_msg_flow():
- ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
- # 私信弹框的红色小红点
- ele_list_dlg,ele_pop_red_pot = get_ele_msg_red_pot(ele_im)
- # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
- unread_user_data = get_im_unread_item_user_data(ele_pop_red_pot)
- # 记录未读消息到数据库
- unread_user_data = save_unread_user_data(unread_user_data)
- return unread_user_data
|