| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265 |
- import os
- import re
- import time
- import page
- import chat_test
- from conf.config import logger,OUTPUT
- from database.config import ai_yunying_db
- from dataset import Table
- from DrissionPage import ChromiumPage
- from DrissionPage._elements.chromium_element import ChromiumElement
- from DrissionPage._units.listener import DataPacket
- import jsonpath
- from prefect import flow,task
- import chat_flow
- class User:
- def __init__(self, db=ai_yunying_db) -> None:
- self.db = db
- self.table_user:Table = self.db.get_table("user")
- # 解析来自这个包请求返回的信息: https://www.douyin.com/aweme/v1/web/user/profile/other/
- # 返回的包内容如: .\testm\user_profile_response.py
- def filter_from_user_profile(self, body:dict):
- user = body['user']
- save_data = {
- "uid": user.get('uid'),
- "nickname": user.get('nickname'),
- "avatar_medium": user.get("avatar_medium"),
- "sec_uid": user.get('sec_uid'),
- "signature": user.get('signature'),
- "city": user.get('city'),
- "ip_location": user.get('ip_location'),
- "province": user.get('province'),
- "school_name" : user.get('school_name'),
- "follow_status": user.get('follow_status'),
- "follower_count" : user.get('follower_count'),
- "total_favorited" : user.get('total_favorited'),
- "aweme_count": user.get('aweme_count'),
- }
- return save_data
- def save_user_profile_to_db(self, body:dict):
- save_data = self.filter_from_user_profile(body)
- id = self.table_user.insert_ignore(save_data, keys=["uid"])
- logger.info(f"插入/存在则忽略用户 id {id}")
- if id:
- return self.table_user.find_one(id=id)
- else:
- return self.table_user.find_one(uid=save_data['uid'])
- class Chat:
- homepage = 'https://www.douyin.com/user/self'
- def __init__(self, db=ai_yunying_db) -> None:
- self.tab_home = page.page.tab
- # logger.info(f"{self.tab_home.url}")
- self.check_home_page()
- self.user = User(db)
- self.mg_test = chat_test.Chat()
- self.mg_test.send_msg("你好")
- def check_has_im_msg(self):
- self.check_home_page()
- self.tab_home.scroll.to_top()
- ele_im = self.tab_home.ele('xpath://div[@data-e2e="im-entry"]')
- ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
- # 存在私信小红点
- ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
- return ele_im, ele_has_msg
- def run(self):
- stranger_msg_count = 0
- ele_im, ele_has_msg = self.check_has_im_msg()
- chat_flow.response_im(tab=self.tab_home, ele_im=ele_im)
- return
- while True:
- ele_im, ele_has_msg = self.check_has_im_msg()
- if ele_has_msg:
- logger.info(f"有未读消息 {ele_has_msg.text}")
- chat_flow.response_im(tab=self.tab_home, ele_im=ele_im)
- stranger_msg_count +=1
- if stranger_msg_count > 10:
- stranger_msg_count = 0
- ele_im.click()
- ele_stranger = ele_im._find_elements("陌生人消息", raise_err=False)
- if ele_stranger:
- logger.info(f"有陌生人消息")
- self.response_stranger()
- time.sleep(1)
-
- def parse_conversations_content_text(self, html):
- # 正则表达式匹配 <pre> 标签中的文本和 <img> 标签中的 alt 属性值
- pre_pattern = re.compile(r'<pre>(.*?)</pre>', re.DOTALL)
- img_alt_pattern = re.compile(r'<img[^>]+alt="\[(.*?)\]"', re.DOTALL)
-
- # 查找所有 <pre> 标签中的文本
- pres = pre_pattern.findall(html)
-
- # 查找所有 <img> 标签中的 alt 属性值
- alts = img_alt_pattern.findall(html)
-
- # 结合这两部分来构建最终的字符串
- result_items = []
- pre_index, alt_index = 0, 0
- while pre_index < len(pres) or alt_index < len(alts):
- if pre_index < len(pres):
- result_items.append(pres[pre_index])
- pre_index += 1
- if alt_index < len(alts):
- result_items.append('[' + alts[alt_index] + ']')
- alt_index += 1
-
- # 返回连接后的字符串
- return ''.join(result_items)
- def analyze_conversations_content(self,ele_content:ChromiumElement, conversation_detail:DataPacket):
- ret = {
- "is_me": None,
- "content": {
- "type": None,
- "data": None,
- },
- }
- ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False
- text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False)
- if text:
- text = self.parse_conversations_content_text(text.text)
- ret['content']['data'] = text
- ret['content']['type'] = "text"
- else:
- imgs = ele_content._find_elements('xpath://img', raise_err=False)
- # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素
- if imgs and isinstance(imgs, list):
- ret['content']['type'] = "video"
- if conversation_detail:
- data = conversation_detail.response.body
- ret['content']['data'] = data
- else:
- img = ele_content._find_elements('xpath://div[@class="UDDVxYoC"]', raise_err=False)
- if img:
- ret['content'] = img.attr("src")
- ret['content_type'] = "image"
- return ret
- # logger.info(f"ele_content.html {ele_content.html}")
- # if ret['is_me']:
- # logger.info(f"me {ele_content.s_ele('xpath://span').text}")
- # else:
- # logger.info(f"user {ele_content.s_ele('xpath://span').text}")
- def get_all_conversations(self, ele_popShadowAnimation:ChromiumElement, conversation_detail:DataPacket):
- # logger.info(f"{ele_popShadowAnimation.html}")
- # 查找聊天框内所有的消息 class="A1KpIsbL HO4aqgd4"
- eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
- i = 0
- for ele in eles_msg:
- logger.info(f"ele.html {ele.html}")
- # 是否存在时间 <div class="kZAHYArp">18:07 </div>
- is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False)
- ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
- res = self.analyze_conversations_content(ele_content, conversation_detail)
- res.update({'time': is_ele_time.text if is_ele_time else None})
- logger.info(f"res {res}")
- def enter_conversation(self, conversation_item:ChromiumElement):
- # 找到聊天会话框
- # class="qbjZBApl popShadowAnimation"
- ele_popShadowAnimation = self.tab_home.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
- user_info = None
- try:
- user_profile_packet = None
- conversation_detail = None
- # 获取该用户的基本信息
- for packet in self.tab_home.listen.steps(2, timeout=3):
- if "user/profile" in packet.url:
- user_profile_packet = packet
- if "aweme/detail" in packet.url:
- conversation_detail = packet
- if user_profile_packet and conversation_detail:
- break
- logger.info(f"user_profile_packet {user_profile_packet}")
- logger.info(f"conversation_detail {conversation_detail}")
- user_info = self.user.save_user_profile_to_db(user_profile_packet.response.body)
- except Exception as e:
- logger.exception(f"获取用户信息失败 {e} conversation_item {conversation_item}")
- self.tab_home.listen.stop()
- ele_popShadowAnimation.ele('退出会话').click()
- return
- return ele_popShadowAnimation,user_profile_packet,conversation_detail
-
- # ele_im 顶部私信图标元素
- def response_frined(self, ele_im:ChromiumElement):
- logger.info(f"回复好友")
- ele_im.click()
- self.tab_home.listen.start("www.douyin.com/aweme/v1/web")
- # 找到私信弹出窗口
- # data-e2e="listDlg-container"
- ele_list_dlg = self.tab_home.ele('xpath://div[@data-e2e="listDlg-container"]')
- logger.info(f"ele_list_dlg {ele_list_dlg}")
-
- # logger.info(f"ele_list_dlg.html {ele_list_dlg.html}")
- # "conversation-item"
- conversation_items = ele_list_dlg.eles('xpath://div[@data-e2e="conversation-item"]')
- logger.info(f"conversation_items {conversation_items}")
- name = ""
- ele_msg_red_pot = None
- conversation_item = None
- for item in conversation_items:
- conversation_item = item
- logger.info(f"{item.html}")
- # 找到未读消息小红点
- # class="hcPUqxqn"
- ele_msg_red_pot = item._find_elements('xpath://div[@class="hcPUqxqn"]', raise_err=False)
- if ele_msg_red_pot:
- name = item.ele('xpath://div[@class="gZdlhsqq"]').text
- break
- if not ele_msg_red_pot:
- logger.error(f"没有找到元素 ele_msg_red_pot")
- return
-
- # 点击未读消息小红点
- ele_msg_red_pot.click()
-
- ele_popShadowAnimation,user_profile_packet,conversation_detail = self.enter_conversation(conversation_item)
- self.get_all_conversations(ele_popShadowAnimation, conversation_detail)
- return
- # 找到输入框
- # data-e2e="msg-input"
- ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]')
- ele_input.click()
- ele_input.input("hello")
- # 找到发送按钮
- # span class="e2e-send-msg-btn"
- ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]')
- ele_send.click()
- ele_popShadowAnimation.ele('退出会话').click()
- logger.info(f"回复成功")
- def response_stranger(self):
- logger.info(f"回复陌生人")
- with open(os.path.join(OUTPUT, 'page', time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time())), 'w')) as f:
- f.write(self.tab_home.html)
- def check_home_page(self):
- if "/user/self" not in self.tab_home.url:
- self.tab_home.get(self.homepage)
- def main():
- self = Chat()
- self.run()
- # ele_im, ele_has_msg = self.check_has_im_msg()
- # ele_im.click()
- # self.tab_home.listen.clear()
- # self.tab_home.listen.start("www.douyin.com", method="GET")
- # ele_popShadowAnimation = self.tab_home.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
- # ele_im = self.tab_home.ele('xpath://div[@data-e2e="im-entry"]')
- # logger.info(f"{ele_im.html}")
- # # <div class="LFWqFfyH isLight">
- # has_msg_ele = ele_im.ele('xpath://div[@class="LFWqFfyH isLight"]')
- # logger.info(f"{has_msg_ele.html}")
- if __name__ == "__main__":
- main()
|