import os import re import time import page import chat_test from conf.config import logger,OUTPUT from database.config import ai_yunying_db from dataset import Table from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from DrissionPage._units.listener import DataPacket import jsonpath from prefect import flow,task import chat_flow class User: def __init__(self, db=ai_yunying_db) -> None: self.db = db self.table_user:Table = self.db.get_table("user") # 解析来自这个包请求返回的信息: https://www.douyin.com/aweme/v1/web/user/profile/other/ # 返回的包内容如: .\testm\user_profile_response.py def filter_from_user_profile(self, body:dict): user = body['user'] save_data = { "uid": user.get('uid'), "nickname": user.get('nickname'), "avatar_medium": user.get("avatar_medium"), "sec_uid": user.get('sec_uid'), "signature": user.get('signature'), "city": user.get('city'), "ip_location": user.get('ip_location'), "province": user.get('province'), "school_name" : user.get('school_name'), "follow_status": user.get('follow_status'), "follower_count" : user.get('follower_count'), "total_favorited" : user.get('total_favorited'), "aweme_count": user.get('aweme_count'), } return save_data def save_user_profile_to_db(self, body:dict): save_data = self.filter_from_user_profile(body) id = self.table_user.insert_ignore(save_data, keys=["uid"]) logger.info(f"插入/存在则忽略用户 id {id}") if id: return self.table_user.find_one(id=id) else: return self.table_user.find_one(uid=save_data['uid']) class Chat: homepage = 'https://www.douyin.com/user/self' def __init__(self, db=ai_yunying_db) -> None: self.tab_home = page.page.tab # logger.info(f"{self.tab_home.url}") self.check_home_page() self.user = User(db) self.mg_test = chat_test.Chat() self.mg_test.send_msg("你好") def check_has_im_msg(self): self.check_home_page() self.tab_home.scroll.to_top() ele_im = self.tab_home.ele('xpath://div[@data-e2e="im-entry"]') ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False) # 存在私信小红点 ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False) return ele_im, ele_has_msg def run(self): stranger_msg_count = 0 ele_im, ele_has_msg = self.check_has_im_msg() chat_flow.response_im(tab=self.tab_home, ele_im=ele_im) return while True: ele_im, ele_has_msg = self.check_has_im_msg() if ele_has_msg: logger.info(f"有未读消息 {ele_has_msg.text}") chat_flow.response_im(tab=self.tab_home, ele_im=ele_im) stranger_msg_count +=1 if stranger_msg_count > 10: stranger_msg_count = 0 ele_im.click() ele_stranger = ele_im._find_elements("陌生人消息", raise_err=False) if ele_stranger: logger.info(f"有陌生人消息") self.response_stranger() time.sleep(1) def parse_conversations_content_text(self, html): # 正则表达式匹配
 标签中的文本和  标签中的 alt 属性值  
        pre_pattern = re.compile(r'
(.*?)
', re.DOTALL) img_alt_pattern = re.compile(r']+alt="\[(.*?)\]"', re.DOTALL) # 查找所有
 标签中的文本  
        pres = pre_pattern.findall(html)  
        
        # 查找所有  标签中的 alt 属性值  
        alts = img_alt_pattern.findall(html)  
        
        # 结合这两部分来构建最终的字符串  
        result_items = []  
        pre_index, alt_index = 0, 0  
        while pre_index < len(pres) or alt_index < len(alts):  
            if pre_index < len(pres):  
                result_items.append(pres[pre_index])  
                pre_index += 1  
            if alt_index < len(alts):  
                result_items.append('[' + alts[alt_index] + ']')  
                alt_index += 1  
                
        # 返回连接后的字符串  
        return ''.join(result_items)  
    def analyze_conversations_content(self,ele_content:ChromiumElement, conversation_detail:DataPacket):
        ret = {
            "is_me": None,
            "content": {
                "type": None,
                "data": None,
            },
        }
        ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False
        text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False)
        if text:
            text = self.parse_conversations_content_text(text.text)
            ret['content']['data'] = text
            ret['content']['type'] = "text"
        else:
            imgs = ele_content._find_elements('xpath://img', raise_err=False)
            # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素
            if imgs and isinstance(imgs, list):
                ret['content']['type'] = "video"
                if conversation_detail:
                    data = conversation_detail.response.body
                    ret['content']['data'] = data
            else:
                img = ele_content._find_elements('xpath://div[@class="UDDVxYoC"]', raise_err=False)
                if img:
                    ret['content'] = img.attr("src")
                    ret['content_type'] = "image"
        return ret
        # logger.info(f"ele_content.html {ele_content.html}")
        # if ret['is_me']:
        #     logger.info(f"me {ele_content.s_ele('xpath://span').text}")
        # else:
        #     logger.info(f"user {ele_content.s_ele('xpath://span').text}")

    def get_all_conversations(self, ele_popShadowAnimation:ChromiumElement, conversation_detail:DataPacket):
        # logger.info(f"{ele_popShadowAnimation.html}")
        # 查找聊天框内所有的消息 class="A1KpIsbL HO4aqgd4"
        eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
        i = 0
        for ele in eles_msg:
            logger.info(f"ele.html {ele.html}")
            # 是否存在时间 
18:07
is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False) ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False) res = self.analyze_conversations_content(ele_content, conversation_detail) res.update({'time': is_ele_time.text if is_ele_time else None}) logger.info(f"res {res}") def enter_conversation(self, conversation_item:ChromiumElement): # 找到聊天会话框 # class="qbjZBApl popShadowAnimation" ele_popShadowAnimation = self.tab_home.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') user_info = None try: user_profile_packet = None conversation_detail = None # 获取该用户的基本信息 for packet in self.tab_home.listen.steps(2, timeout=3): if "user/profile" in packet.url: user_profile_packet = packet if "aweme/detail" in packet.url: conversation_detail = packet if user_profile_packet and conversation_detail: break logger.info(f"user_profile_packet {user_profile_packet}") logger.info(f"conversation_detail {conversation_detail}") user_info = self.user.save_user_profile_to_db(user_profile_packet.response.body) except Exception as e: logger.exception(f"获取用户信息失败 {e} conversation_item {conversation_item}") self.tab_home.listen.stop() ele_popShadowAnimation.ele('退出会话').click() return return ele_popShadowAnimation,user_profile_packet,conversation_detail # ele_im 顶部私信图标元素 def response_frined(self, ele_im:ChromiumElement): logger.info(f"回复好友") ele_im.click() self.tab_home.listen.start("www.douyin.com/aweme/v1/web") # 找到私信弹出窗口 # data-e2e="listDlg-container" ele_list_dlg = self.tab_home.ele('xpath://div[@data-e2e="listDlg-container"]') logger.info(f"ele_list_dlg {ele_list_dlg}") # logger.info(f"ele_list_dlg.html {ele_list_dlg.html}") # "conversation-item" conversation_items = ele_list_dlg.eles('xpath://div[@data-e2e="conversation-item"]') logger.info(f"conversation_items {conversation_items}") name = "" ele_msg_red_pot = None conversation_item = None for item in conversation_items: conversation_item = item logger.info(f"{item.html}") # 找到未读消息小红点 # class="hcPUqxqn" ele_msg_red_pot = item._find_elements('xpath://div[@class="hcPUqxqn"]', raise_err=False) if ele_msg_red_pot: name = item.ele('xpath://div[@class="gZdlhsqq"]').text break if not ele_msg_red_pot: logger.error(f"没有找到元素 ele_msg_red_pot") return # 点击未读消息小红点 ele_msg_red_pot.click() ele_popShadowAnimation,user_profile_packet,conversation_detail = self.enter_conversation(conversation_item) self.get_all_conversations(ele_popShadowAnimation, conversation_detail) return # 找到输入框 # data-e2e="msg-input" ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]') ele_input.click() ele_input.input("hello") # 找到发送按钮 # span class="e2e-send-msg-btn" ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]') ele_send.click() ele_popShadowAnimation.ele('退出会话').click() logger.info(f"回复成功") def response_stranger(self): logger.info(f"回复陌生人") with open(os.path.join(OUTPUT, 'page', time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time())), 'w')) as f: f.write(self.tab_home.html) def check_home_page(self): if "/user/self" not in self.tab_home.url: self.tab_home.get(self.homepage) def main(): self = Chat() self.run() # ele_im, ele_has_msg = self.check_has_im_msg() # ele_im.click() # self.tab_home.listen.clear() # self.tab_home.listen.start("www.douyin.com", method="GET") # ele_popShadowAnimation = self.tab_home.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') # ele_im = self.tab_home.ele('xpath://div[@data-e2e="im-entry"]') # logger.info(f"{ele_im.html}") # #
# has_msg_ele = ele_im.ele('xpath://div[@class="LFWqFfyH isLight"]') # logger.info(f"{has_msg_ele.html}") if __name__ == "__main__": main()