chat.py 11 KB


  1. import os
  2. import re
  3. import time
  4. import page
  5. import chat_test
  6. from conf.config import logger,OUTPUT
  7. from database.config import ai_yunying_db
  8. from dataset import Table
  9. from DrissionPage import ChromiumPage
  10. from DrissionPage._elements.chromium_element import ChromiumElement
  11. from DrissionPage._units.listener import DataPacket
  12. import jsonpath
  13. from prefect import flow,task
  14. import chat_flow
  15. class User:
  16. def __init__(self, db=ai_yunying_db) -> None:
  17. self.db = db
  18. self.table_user:Table = self.db.get_table("user")
  19. # 解析来自这个包请求返回的信息: https://www.douyin.com/aweme/v1/web/user/profile/other/
  20. # 返回的包内容如: .\testm\user_profile_response.py
  21. def filter_from_user_profile(self, body:dict):
  22. user = body['user']
  23. save_data = {
  24. "uid": user.get('uid'),
  25. "nickname": user.get('nickname'),
  26. "avatar_medium": user.get("avatar_medium"),
  27. "sec_uid": user.get('sec_uid'),
  28. "signature": user.get('signature'),
  29. "city": user.get('city'),
  30. "ip_location": user.get('ip_location'),
  31. "province": user.get('province'),
  32. "school_name" : user.get('school_name'),
  33. "follow_status": user.get('follow_status'),
  34. "follower_count" : user.get('follower_count'),
  35. "total_favorited" : user.get('total_favorited'),
  36. "aweme_count": user.get('aweme_count'),
  37. }
  38. return save_data
  39. def save_user_profile_to_db(self, body:dict):
  40. save_data = self.filter_from_user_profile(body)
  41. id = self.table_user.insert_ignore(save_data, keys=["uid"])
  42. logger.info(f"插入/存在则忽略用户 id {id}")
  43. if id:
  44. return self.table_user.find_one(id=id)
  45. else:
  46. return self.table_user.find_one(uid=save_data['uid'])
  47. class Chat:
  48. homepage = 'https://www.douyin.com/user/self'
  49. def __init__(self, db=ai_yunying_db) -> None:
  50. self.tab_home = page.page.tab
  51. # logger.info(f"{self.tab_home.url}")
  52. self.check_home_page()
  53. self.user = User(db)
  54. self.mg_test = chat_test.Chat()
  55. self.mg_test.send_msg("你好")
  56. def check_has_im_msg(self):
  57. self.check_home_page()
  58. self.tab_home.scroll.to_top()
  59. ele_im = self.tab_home.ele('xpath://div[@data-e2e="im-entry"]')
  60. ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
  61. # 存在私信小红点
  62. ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
  63. return ele_im, ele_has_msg
  64. def run(self):
  65. stranger_msg_count = 0
  66. ele_im, ele_has_msg = self.check_has_im_msg()
  67. chat_flow.response_im(tab=self.tab_home, ele_im=ele_im)
  68. return
  69. while True:
  70. ele_im, ele_has_msg = self.check_has_im_msg()
  71. if ele_has_msg:
  72. logger.info(f"有未读消息 {ele_has_msg.text}")
  73. chat_flow.response_im(tab=self.tab_home, ele_im=ele_im)
  74. stranger_msg_count +=1
  75. if stranger_msg_count > 10:
  76. stranger_msg_count = 0
  77. ele_im.click()
  78. ele_stranger = ele_im._find_elements("陌生人消息", raise_err=False)
  79. if ele_stranger:
  80. logger.info(f"有陌生人消息")
  81. self.response_stranger()
  82. time.sleep(1)
  83. def parse_conversations_content_text(self, html):
  84. # 正则表达式匹配 <pre> 标签中的文本和 <img> 标签中的 alt 属性值
  85. pre_pattern = re.compile(r'<pre>(.*?)</pre>', re.DOTALL)
  86. img_alt_pattern = re.compile(r'<img[^>]+alt="\[(.*?)\]"', re.DOTALL)
  87. # 查找所有 <pre> 标签中的文本
  88. pres = pre_pattern.findall(html)
  89. # 查找所有 <img> 标签中的 alt 属性值
  90. alts = img_alt_pattern.findall(html)
  91. # 结合这两部分来构建最终的字符串
  92. result_items = []
  93. pre_index, alt_index = 0, 0
  94. while pre_index < len(pres) or alt_index < len(alts):
  95. if pre_index < len(pres):
  96. result_items.append(pres[pre_index])
  97. pre_index += 1
  98. if alt_index < len(alts):
  99. result_items.append('[' + alts[alt_index] + ']')
  100. alt_index += 1
  101. # 返回连接后的字符串
  102. return ''.join(result_items)
  103. def analyze_conversations_content(self,ele_content:ChromiumElement, conversation_detail:DataPacket):
  104. ret = {
  105. "is_me": None,
  106. "content": {
  107. "type": None,
  108. "data": None,
  109. },
  110. }
  111. ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False
  112. text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False)
  113. if text:
  114. text = self.parse_conversations_content_text(text.text)
  115. ret['content']['data'] = text
  116. ret['content']['type'] = "text"
  117. else:
  118. imgs = ele_content._find_elements('xpath://img', raise_err=False)
  119. # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素
  120. if imgs and isinstance(imgs, list):
  121. ret['content']['type'] = "video"
  122. if conversation_detail:
  123. data = conversation_detail.response.body
  124. ret['content']['data'] = data
  125. else:
  126. img = ele_content._find_elements('xpath://div[@class="UDDVxYoC"]', raise_err=False)
  127. if img:
  128. ret['content'] = img.attr("src")
  129. ret['content_type'] = "image"
  130. return ret
  131. # logger.info(f"ele_content.html {ele_content.html}")
  132. # if ret['is_me']:
  133. # logger.info(f"me {ele_content.s_ele('xpath://span').text}")
  134. # else:
  135. # logger.info(f"user {ele_content.s_ele('xpath://span').text}")
  136. def get_all_conversations(self, ele_popShadowAnimation:ChromiumElement, conversation_detail:DataPacket):
  137. # logger.info(f"{ele_popShadowAnimation.html}")
  138. # 查找聊天框内所有的消息 class="A1KpIsbL HO4aqgd4"
  139. eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
  140. i = 0
  141. for ele in eles_msg:
  142. logger.info(f"ele.html {ele.html}")
  143. # 是否存在时间 <div class="kZAHYArp">18:07 </div>
  144. is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False)
  145. ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
  146. res = self.analyze_conversations_content(ele_content, conversation_detail)
  147. res.update({'time': is_ele_time.text if is_ele_time else None})
  148. logger.info(f"res {res}")
  149. def enter_conversation(self, conversation_item:ChromiumElement):
  150. # 找到聊天会话框
  151. # class="qbjZBApl popShadowAnimation"
  152. ele_popShadowAnimation = self.tab_home.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
  153. user_info = None
  154. try:
  155. user_profile_packet = None
  156. conversation_detail = None
  157. # 获取该用户的基本信息
  158. for packet in self.tab_home.listen.steps(2, timeout=3):
  159. if "user/profile" in packet.url:
  160. user_profile_packet = packet
  161. if "aweme/detail" in packet.url:
  162. conversation_detail = packet
  163. if user_profile_packet and conversation_detail:
  164. break
  165. logger.info(f"user_profile_packet {user_profile_packet}")
  166. logger.info(f"conversation_detail {conversation_detail}")
  167. user_info = self.user.save_user_profile_to_db(user_profile_packet.response.body)
  168. except Exception as e:
  169. logger.exception(f"获取用户信息失败 {e} conversation_item {conversation_item}")
  170. self.tab_home.listen.stop()
  171. ele_popShadowAnimation.ele('退出会话').click()
  172. return
  173. return ele_popShadowAnimation,user_profile_packet,conversation_detail
  174. # ele_im 顶部私信图标元素
  175. def response_frined(self, ele_im:ChromiumElement):
  176. logger.info(f"回复好友")
  177. ele_im.click()
  178. self.tab_home.listen.start("www.douyin.com/aweme/v1/web")
  179. # 找到私信弹出窗口
  180. # data-e2e="listDlg-container"
  181. ele_list_dlg = self.tab_home.ele('xpath://div[@data-e2e="listDlg-container"]')
  182. logger.info(f"ele_list_dlg {ele_list_dlg}")
  183. # logger.info(f"ele_list_dlg.html {ele_list_dlg.html}")
  184. # "conversation-item"
  185. conversation_items = ele_list_dlg.eles('xpath://div[@data-e2e="conversation-item"]')
  186. logger.info(f"conversation_items {conversation_items}")
  187. name = ""
  188. ele_msg_red_pot = None
  189. conversation_item = None
  190. for item in conversation_items:
  191. conversation_item = item
  192. logger.info(f"{item.html}")
  193. # 找到未读消息小红点
  194. # class="hcPUqxqn"
  195. ele_msg_red_pot = item._find_elements('xpath://div[@class="hcPUqxqn"]', raise_err=False)
  196. if ele_msg_red_pot:
  197. name = item.ele('xpath://div[@class="gZdlhsqq"]').text
  198. break
  199. if not ele_msg_red_pot:
  200. logger.error(f"没有找到元素 ele_msg_red_pot")
  201. return
  202. # 点击未读消息小红点
  203. ele_msg_red_pot.click()
  204. ele_popShadowAnimation,user_profile_packet,conversation_detail = self.enter_conversation(conversation_item)
  205. self.get_all_conversations(ele_popShadowAnimation, conversation_detail)
  206. return
  207. # 找到输入框
  208. # data-e2e="msg-input"
  209. ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]')
  210. ele_input.click()
  211. ele_input.input("hello")
  212. # 找到发送按钮
  213. # span class="e2e-send-msg-btn"
  214. ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]')
  215. ele_send.click()
  216. ele_popShadowAnimation.ele('退出会话').click()
  217. logger.info(f"回复成功")
  218. def response_stranger(self):
  219. logger.info(f"回复陌生人")
  220. with open(os.path.join(OUTPUT, 'page', time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime(time.time())), 'w')) as f:
  221. f.write(self.tab_home.html)
  222. def check_home_page(self):
  223. if "/user/self" not in self.tab_home.url:
  224. self.tab_home.get(self.homepage)
  225. def main():
  226. self = Chat()
  227. self.run()
  228. # ele_im, ele_has_msg = self.check_has_im_msg()
  229. # ele_im.click()
  230. # self.tab_home.listen.clear()
  231. # self.tab_home.listen.start("www.douyin.com", method="GET")
  232. # ele_popShadowAnimation = self.tab_home.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
  233. # ele_im = self.tab_home.ele('xpath://div[@data-e2e="im-entry"]')
  234. # logger.info(f"{ele_im.html}")
  235. # # <div class="LFWqFfyH isLight">
  236. # has_msg_ele = ele_im.ele('xpath://div[@class="LFWqFfyH isLight"]')
  237. # logger.info(f"{has_msg_ele.html}")
  238. if __name__ == "__main__":
  239. main()