import datetime import os import re import time import os import sys sys.path.append('.') from dp.page import page from douyin import base from conf.config import logger,OUTPUT from database.config import minio_block from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table from dataset import Table from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from DrissionPage._units.listener import DataPacket from prefect import flow,task from prefect.tasks import Task,TaskRun from prefect.flows import Flow from prefect.states import State,StateType from douyin.models import UnReadUserData,engine from douyin.flow import check_page, conversation # 点击私信图标后,获取未读消息小圆点 @task def get_ele_msg_red_pot(ele_im:ChromiumElement, tab_id=None): tab = base.get_tab(tab_id) ele_im.click() logger.info(f"点击私信图标后,获取未读消息小圆点") ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]') ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]') return ele_list_dlg,ele_msg_red_pot @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def save_unread_user_data(data:UnReadUserData): return unread_table.insert(data) @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_im_item_user_data(msg_item_div:ChromiumElement): # 构建所需的数据字典 data = UnReadUserData() name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']") if name: data.name = name.text # 定位头像 avatar_ele = msg_item_div.s_ele("xpath://img") if avatar_ele: data.avator = avatar_ele.attr('src') msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]') if msg_content: data.msg = msg_content.text # 定位时间 ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]') if ele_time: data.msg_time_txt = ele_time.text[3:] logger.info(f"{data}") return data @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement): logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息") # 定位私信聊天框,一个用户私信聊天框的完整元素 msg_item_div = ele_msg_red_pot.parent() unread_user_data = get_im_item_user_data(msg_item_div) unread_user_data.unread_msg_count = int(ele_msg_red_pot.text) logger.info(f"unread_user_data {unread_user_data}") return unread_user_data @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_stranger_unread_data(stranger_txt='陌生人消息', tab_id=None) -> UnReadUserData: tab = base.get_tab(tab_id) # 只有出现陌生人消息时,才会看到请求 https://www.douyin.com/aweme/v1/web/im/user/info tab.listen.start('im/user/info') ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]',timeout=3) ele_list_dlg.ele(stranger_txt).click() for packet in tab.listen.steps(1, timeout=3): if 'im/user/info' in packet.url: stranger_info_packet = packet data_list = stranger_info_packet.response.body.get('data') logger.info(f"unread_stranger_data {data_list[-1]}") unread_stranger_data = UnReadUserData(name=data_list[-1].get('nickname'), avator=data_list[-1].get("avatar_thumb").get("uri")) logger.info(f"unread_stranger_data {unread_stranger_data}") return unread_stranger_data import DrissionPage # 乍一看这个流好像有点多余,只有简单几条任务,为何不在主流中直接写呢? # 因为要保证并发运行流,并且这个流可能要用新的 tab 运行,在主流中会阻塞无法并发 @flow(log_prints=False) def stranger_msg_flow(home_url, tab_id=None): # tab = base.get_tab(tab_id) logger.info(f"tab_id {tab_id}") unread_items,stranger_txt = check_page.check_page_ok_flow(home_url, tab_id) unread_stranger_data = get_stranger_unread_data(stranger_txt, tab_id) conversation.reply_to_user_flow(unread_stranger_data, tab_id) from concurrent.futures import ThreadPoolExecutor, as_completed @flow(log_prints=False) def unread_msg_flow(name, tab_id=None): # ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]') # ele_list_dlg_stranger= ele_list_dlg.ele(name) # ele_list_dlg_stranger.click() # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time()) unread_user_data = conversation.reply_to_user_flow(name,tab) # 记录未读消息到数据库 unread_user_data = save_unread_user_data(unread_user_data) return unread_user_data def main(): home_url:str='https://www.douyin.com/user/self' # tab = page.new_tab() stranger_msg_flow(home_url, 'A017888A62FE53FAD9D85ED2662FEA34') if __name__ == "__main__": main()