import os import re import time import page import chat_test from conf.config import logger,OUTPUT from database.config import ai_yunying_db from dataset import Table from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from DrissionPage._units.listener import DataPacket import jsonpath from prefect import flow,task import chat_flow class User: def __init__(self, db=ai_yunying_db) -> None: self.db = db self.table_user:Table = self.db.get_table("user") # 解析来自这个包请求返回的信息: https://www.douyin.com/aweme/v1/web/user/profile/other/ # 返回的包内容如: .\testm\user_profile_response.py def filter_from_user_profile(self, body:dict): user = body['user'] save_data = { "uid": user.get('uid'), "nickname": user.get('nickname'), "avatar_medium": user.get("avatar_medium"), "sec_uid": user.get('sec_uid'), "signature": user.get('signature'), "city": user.get('city'), "ip_location": user.get('ip_location'), "province": user.get('province'), "school_name" : user.get('school_name'), "follow_status": user.get('follow_status'), "follower_count" : user.get('follower_count'), "total_favorited" : user.get('total_favorited'), "aweme_count": user.get('aweme_count'), } return save_data def save_user_profile_to_db(self, body:dict): save_data = self.filter_from_user_profile(body) id = self.table_user.insert_ignore(save_data, keys=["uid"]) logger.info(f"插入/存在则忽略用户 id {id}") if id: return self.table_user.find_one(id=id) else: return self.table_user.find_one(uid=save_data['uid']) class Chat: homepage = 'https://www.douyin.com/user/self' def __init__(self, db=ai_yunying_db) -> None: self.tab_home = page.page.tab # logger.info(f"{self.tab_home.url}") self.check_home_page() self.user = User(db) self.mg_test = chat_test.Chat() self.mg_test.send_msg("你好") def check_has_im_msg(self): self.check_home_page() self.tab_home.scroll.to_top() ele_im = self.tab_home.ele('xpath://div[@data-e2e="im-entry"]') ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False) # 存在私信小红点 ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False) return ele_im, ele_has_msg def run(self): stranger_msg_count = 0 ele_im, ele_has_msg = self.check_has_im_msg() chat_flow.response_im(tab=self.tab_home, ele_im=ele_im) return while True: ele_im, ele_has_msg = self.check_has_im_msg() if ele_has_msg: logger.info(f"有未读消息 {ele_has_msg.text}") chat_flow.response_im(tab=self.tab_home, ele_im=ele_im) stranger_msg_count +=1 if stranger_msg_count > 10: stranger_msg_count = 0 ele_im.click() ele_stranger = ele_im._find_elements("陌生人消息", raise_err=False) if ele_stranger: logger.info(f"有陌生人消息") self.response_stranger() time.sleep(1) def parse_conversations_content_text(self, html): # 正则表达式匹配
标签中的文本和标签中的 alt 属性值 pre_pattern = re.compile(r'
(.*?)', re.DOTALL) img_alt_pattern = re.compile(r']+alt="\[(.*?)\]"', re.DOTALL) # 查找所有
标签中的文本 pres = pre_pattern.findall(html) # 查找所有标签中的 alt 属性值 alts = img_alt_pattern.findall(html) # 结合这两部分来构建最终的字符串 result_items = [] pre_index, alt_index = 0, 0 while pre_index < len(pres) or alt_index < len(alts): if pre_index < len(pres): result_items.append(pres[pre_index]) pre_index += 1 if alt_index < len(alts): result_items.append('[' + alts[alt_index] + ']') alt_index += 1 # 返回连接后的字符串 return ''.join(result_items) def analyze_conversations_content(self,ele_content:ChromiumElement, conversation_detail:DataPacket): ret = { "is_me": None, "content": { "type": None, "data": None, }, } ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False) if text: text = self.parse_conversations_content_text(text.text) ret['content']['data'] = text ret['content']['type'] = "text" else: imgs = ele_content._find_elements('xpath://img', raise_err=False) # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素 if imgs and isinstance(imgs, list): ret['content']['type'] = "video" if conversation_detail: data = conversation_detail.response.body ret['content']['data'] = data else: img = ele_content._find_elements('xpath://div[@class="UDDVxYoC"]', raise_err=False) if img: ret['content'] = img.attr("src") ret['content_type'] = "image" return ret # logger.info(f"ele_content.html {ele_content.html}") # if ret['is_me']: # logger.info(f"me {ele_content.s_ele('xpath://span').text}") # else: # logger.info(f"user {ele_content.s_ele('xpath://span').text}") def get_all_conversations(self, ele_popShadowAnimation:ChromiumElement, conversation_detail:DataPacket): # logger.info(f"{ele_popShadowAnimation.html}") # 查找聊天框内所有的消息 class="A1KpIsbL HO4aqgd4" eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]') i = 0 for ele in eles_msg: logger.info(f"ele.html {ele.html}") # 是否存在时间
18:07is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False) ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False) res = self.analyze_conversations_content(ele_content, conversation_detail) res.update({'time': is_ele_time.text if is_ele_time else None}) logger.info(f"res {res}") def enter_conversation(self, conversation_item:ChromiumElement): # 找到聊天会话框 # class="qbjZBApl popShadowAnimation" ele_popShadowAnimation = self.tab_home.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') user_info = None try: user_profile_packet = None conversation_detail = None # 获取该用户的基本信息 for packet in self.tab_home.listen.steps(2, timeout=3): if "user/profile" in packet.url: user_profile_packet = packet if "aweme/detail" in packet.url: conversation_detail = packet if user_profile_packet and conversation_detail: break logger.info(f"user_profile_packet {user_profile_packet}") logger.info(f"conversation_detail {conversation_detail}") user_info = self.user.save_user_profile_to_db(user_profile_packet.response.body) except Exception as e: logger.exception(f"获取用户信息失败 {e} conversation_item {conversation_item}") self.tab_home.listen.stop() ele_popShadowAnimation.ele('退出会话').click() return return ele_popShadowAnimation,user_profile_packet,conversation_detail # ele_im 顶部私信图标元素 def response_frined(self, ele_im:ChromiumElement): logger.info(f"回复好友") ele_im.click() self.tab_home.listen.start("www.douyin.com/aweme/v1/web") # 找到私信弹出窗口 # data-e2e="listDlg-container" ele_list_dlg = self.tab_home.ele('xpath://div[@data-e2e="listDlg-container"]') logger.info(f"ele_list_dlg {ele_list_dlg}") # logger.info(f"ele_list_dlg.html {ele_list_dlg.html}") # "conversation-item" conversation_items = ele_list_dlg.eles('xpath://div[@data-e2e="conversation-item"]') logger.info(f"conversation_items {conversation_items}") name = "" ele_msg_red_pot = None conversation_item = None for item in conversation_items: conversation_item = item logger.info(f"{item.html}") # 找到未读消息小红点 # class="hcPUqxqn" ele_msg_red_pot = item._find_elements('xpath://div[@class="hcPUqxqn"]', raise_err=False) if ele_msg_red_pot: name = item.ele('xpath://div[@class="gZdlhsqq"]').text break if not ele_msg_red_pot: logger.error(f"没有找到元素 ele_msg_red_pot") return # 点击未读消息小红点 ele_msg_red_pot.click() ele_popShadowAnimation,user_profile_packet,conversation_detail = self.enter_conversation(conversation_item) self.get_all_conversations(ele_popShadowAnimation, conversation_detail) return # 找到输入框 # data-e2e="msg-input" ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]') ele_input.click() ele_input.input("hello") # 找到发送按钮 # span class="e2e-send-msg-btn" ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]') ele_send.click() ele_popShadowAnimation.ele('退出会话').click() logger.info(f"回复成功") def response_stranger(self): logger.info(f"回复陌生人") with open(os.path.join(OUTPUT, 'page', time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time())), 'w')) as f: f.write(self.tab_home.html) def check_home_page(self): if "/user/self" not in self.tab_home.url: self.tab_home.get(self.homepage) def main(): self = Chat() self.run() # ele_im, ele_has_msg = self.check_has_im_msg() # ele_im.click() # self.tab_home.listen.clear() # self.tab_home.listen.start("www.douyin.com", method="GET") # ele_popShadowAnimation = self.tab_home.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') # ele_im = self.tab_home.ele('xpath://div[@data-e2e="im-entry"]') # logger.info(f"{ele_im.html}") # ## has_msg_ele = ele_im.ele('xpath://div[@class="LFWqFfyH isLight"]') # logger.info(f"{has_msg_ele.html}") if __name__ == "__main__": main()