conversation.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. import asyncio
  2. import datetime
  3. import os
  4. import re
  5. import time
  6. import os
  7. import sys
  8. from pathlib import Path
  9. # print(f"{Path(__file__).parent.parent.parent}")
  10. # sys.path.append(Path(__file__).parent.parent.parent.parent)
  11. # sys.path.append(Path(__file__).parent.parent.parent)
  12. # sys.path.append(Path(__file__).parent.parent)
  13. # sys.path.append(Path(__file__).parent)
  14. sys.path.append(r'I:\code\ai-yunying\live-online-people')
  15. import prefect.runtime
  16. import prefect.runtime.task_run
  17. from dp.page import page
  18. from database.config import minio_block
  19. from database.s3 import S3Object
  20. import prefect.client
  21. import jionlp as jio
  22. from conf.config import logger,OUTPUT
  23. from douyin import base
  24. from dataset import Table
  25. from DrissionPage import ChromiumPage
  26. from DrissionPage._elements.chromium_element import ChromiumElement
  27. from DrissionPage._units.listener import DataPacket
  28. import jsonpath
  29. from prefect import flow,task
  30. from prefect.tasks import Task,TaskRun
  31. from prefect.flows import Flow
  32. from prefect.states import State,StateType
  33. from prefect.client.schemas.objects import TaskRunInput,Parameter,TaskRunResult
  34. from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table,engine
  35. from douyin.flow import check_msg,unread_msg
  36. from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
  37. tab:ChromiumPage=page.tab
  38. def parse_conversations_content_text(html):
  39. # 正则表达式匹配 <pre> 标签中的文本和 <img> 标签中的 alt 属性值
  40. pre_pattern = re.compile(r'<pre>(.*?)</pre>', re.DOTALL)
  41. img_alt_pattern = re.compile(r'<img[^>]+alt="\[(.*?)\]"', re.DOTALL)
  42. # 查找所有 <pre> 标签中的文本
  43. pres = pre_pattern.findall(html)
  44. # 查找所有 <img> 标签中的 alt 属性值
  45. alts = img_alt_pattern.findall(html)
  46. # 结合这两部分来构建最终的字符串
  47. result_items = []
  48. pre_index, alt_index = 0, 0
  49. while pre_index < len(pres) or alt_index < len(alts):
  50. if pre_index < len(pres):
  51. result_items.append(pres[pre_index])
  52. pre_index += 1
  53. if alt_index < len(alts):
  54. result_items.append('[' + alts[alt_index] + ']')
  55. alt_index += 1
  56. # 返回连接后的字符串
  57. return ''.join(result_items)
  58. def filter_aweme_detail_in_conversations_video(aweme_detail:dict):
  59. ret = {
  60. "nickname": aweme_detail.get("nickname"),
  61. "sec_uid": aweme_detail.get("sec_uid"),
  62. "preview_title" : aweme_detail.get("preview_title"),
  63. "statistics": aweme_detail.get("statistics"),
  64. }
  65. return ret
  66. def analyze_conversations_content(ele_content:ChromiumElement, conversation_detail:dict):
  67. ret = {
  68. "is_me": None,
  69. "content": {
  70. "type": None,
  71. "data": None,
  72. },
  73. }
  74. ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False
  75. text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False)
  76. if text:
  77. text = parse_conversations_content_text(text.html)
  78. ret['content']['data'] = text
  79. ret['content']['type'] = "text"
  80. else:
  81. logger.info(f"_find_elements 'xpath://img'")
  82. imgs = ele_content._find_elements('xpath://img', index=None, raise_err=False)
  83. logger.info(f"{imgs}")
  84. if not imgs:
  85. logger.error(f"unkown_type {ele_content.html}")
  86. # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素
  87. elif len(imgs) > 1:
  88. ret['content']['type'] = "video"
  89. if conversation_detail:
  90. aweme_detail = conversation_detail.get("aweme_details").pop(0)
  91. data = filter_aweme_detail_in_conversations_video(aweme_detail)
  92. ret['content']['data'] = data
  93. elif len(imgs) == 1:
  94. ret['content']['data'] = imgs[0].attr("src")
  95. ret['content_type'] = "img"
  96. logger.info(f"ret {ret}")
  97. return ret
  98. def enter_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
  99. ele_exit_conversation = tab._find_elements('退出会话')
  100. if ele_exit_conversation:
  101. ele_exit_conversation.click()
  102. @task(on_failure=[enter_conversation_on_failure],
  103. persist_result=True,
  104. result_storage=minio_block,
  105. result_storage_key=base.get_object_name_by_time())
  106. def enter_conversation(click_user_name):
  107. tab.listen.start("www.douyin.com/aweme/v1/web")
  108. ele_listDlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
  109. ele_listDlg.ele(click_user_name).click()
  110. conversation_detail = None
  111. user_profile_packet = None
  112. # 获取该用户的基本信息
  113. for packet in tab.listen.steps(2, timeout=3):
  114. if "user/profile" in packet.url:
  115. user_profile_packet = packet
  116. if "aweme/detail" in packet.url:
  117. conversation_detail = packet
  118. conversation_detail_body = None if not conversation_detail else conversation_detail.response.body
  119. if not user_profile_packet:
  120. raise Exception("进入私信后没有捕获到 user/profile 请求")
  121. logger.debug(f"user_profile_packet {user_profile_packet}")
  122. logger.debug(f"conversation_detail {conversation_detail}")
  123. return user_profile_packet.response.body, conversation_detail_body
  124. @task(
  125. persist_result=True,
  126. result_storage=minio_block,
  127. result_storage_key=base.get_object_name_by_time())
  128. def save_enter_im_user_detail(unread_user_data:UnReadUserData, storage_key, user_profile_body):
  129. # S3Object(path="save_enter_im_user_detail_test").put((unread_user_data, storage_key, user_profile_body))
  130. unread_user_data.detail = S3Object(path=minio_block.basepath + storage_key, type=f"{tuple}")
  131. logger.info(f"S3Object {unread_user_data.detail.model_dump()}")
  132. with Session(engine) as session:
  133. statement = select(UserInfoModel).where(UserInfoModel.uid == user_profile_body.get('uid'))
  134. exist_user_info = session.exec(statement).first()
  135. if exist_user_info:
  136. user_info = exist_user_info
  137. else:
  138. user_profile_body.get('uid')
  139. logger.info(f"uid {user_profile_body.get('uid')}")
  140. logger.info(f"nickname {user_profile_body.get('nickname')}")
  141. logger.info(f"user_profile_body {user_profile_body}")
  142. user_info = user_table.dict_to_model(user_profile_body.get("user"))
  143. unread_user_data.user_info = user_info
  144. session.add(unread_user_data)
  145. session.commit()
  146. session.refresh(unread_user_data)
  147. logger.info(f"unread_user_data {unread_user_data}")
  148. return unread_user_data
  149. def get_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
  150. logger.info(f"{task}")
  151. logger.info(f"{task_run.task_inputs}")
  152. logger.info(f"{state}")
  153. logger.info(f"{state.result()}")
  154. def cn_time_to_timestamp(time_str:str, time_base=None):
  155. '''exzample
  156. for row in chat_history_table:
  157. time_base = row.get("create_time")
  158. cn_time = row.get("time")
  159. timestamp = cn_time_to_timestamp(cn_time, time_base=time_base)
  160. if timestamp:
  161. str_time = datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S')
  162. else:
  163. str_time = None
  164. print(f"timestamp {timestamp} \t\t str {str_time} \t\t time_base {time_base}")
  165. '''
  166. if not isinstance(time_str,str):
  167. return
  168. if "刚刚" in time_str:
  169. return int(time.time())
  170. try:
  171. if not time_base:
  172. time_base = time.time()
  173. res = jio.parse_time(time_str, time_base)
  174. res_time = res.get('time')[0]
  175. dt_obj = datetime.datetime.strptime(res_time, '%Y-%m-%d %H:%M:%S')
  176. # 将datetime对象转换为timestamp
  177. timestamp = datetime.datetime.timestamp(dt_obj)
  178. return int(timestamp)
  179. except Exception as e:
  180. logger.error(f"{e} time_str {time_str} time_base {time_base}")
  181. return
  182. # TODO 不使用,但保留,未来可能导出所有聊天时用到
  183. # 未读消息时,理论上无需获取所有历史,而是从未读消息显示的 time 开始获取即可。
  184. # 可认为程序启动后的未读消息才被记录,如果程序没有启动,代表用户不需要AI处理,不用管以前那些信息
  185. @task(on_failure=[get_conversation_on_failure],
  186. persist_result=True,
  187. result_storage=minio_block,
  188. result_storage_key=base.get_object_name_by_time())
  189. def get_conversations_history(unread_user_data:UnReadUserData, conversation_detail):
  190. ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
  191. # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
  192. messageContent = ele_popShadowAnimation.ele('#messageContent')
  193. messageContent.child().scroll.up(150)
  194. eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
  195. count = 0
  196. chat_history = []
  197. # 未读消息开始于
  198. start_with_unread_time_txt = None
  199. for ele in eles_msg:
  200. logger.debug(f"ele.html {ele.html}")
  201. # 是否存在时间 <div class="kZAHYArp">18:07 </div>
  202. is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False)
  203. ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
  204. res = analyze_conversations_content(ele_content, conversation_detail)
  205. if is_ele_time:
  206. if not start_with_unread_time_txt:
  207. start_with_unread_time_txt = is_ele_time.text
  208. cn_to_time = cn_time_to_timestamp(is_ele_time.text)
  209. else:
  210. cn_to_time = None
  211. res.update({'time': cn_to_time})
  212. logger.debug(f"res {res}")
  213. chat_history.append(res)
  214. logger.info(f"{chat_history}")
  215. unread_user_data.msg_time_txt = start_with_unread_time_txt
  216. unread_user_data.chat_history = chat_history
  217. return unread_user_data
  218. @task(on_failure=[get_conversation_on_failure],
  219. persist_result=True,
  220. result_storage=minio_block,
  221. result_storage_key=base.get_object_name_by_time())
  222. def get_unread_msg_with_time(unread_user_data:UnReadUserData, conversation_detail:dict):
  223. ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
  224. # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
  225. messageContent = ele_popShadowAnimation.ele('#messageContent')
  226. messageContent.child().scroll.up(150)
  227. # 如果消息到来,程序立即进入对话框,则通常 unread_user_data.msg_time 是 “刚刚”
  228. # 如果程序刚启动,以前就有这些未读消息,通常 unread_user_data.msg_time 是 "16:30" 或 "04-25" 或 "2023-04-25"
  229. ele_msg_start_with_time = messageContent.s_ele(unread_user_data.msg_time)
  230. unread_user_data.msg_time = cn_time_to_timestamp(ele_msg_start_with_time.text)
  231. logger.info(f"msg_start_with_time {ele_msg_start_with_time.text} - " + datetime.datetime.fromtimestamp(unread_user_data.msg_time).strftime('%Y-%m-%d %H:%M'))
  232. # 找到父级: div class="A1KpIsbL HO4aqgd4"
  233. ele_msg_start_with = ele_msg_start_with_time.parent()
  234. ele_msg = ele_msg_start_with
  235. chat_history = []
  236. while 'A1KpIsbL' in ele_msg.attr('class'):
  237. logger.debug(f"ele.html {ele_msg.html}")
  238. ele_content:ChromiumElement = ele_msg._find_elements('xpath://div[contains(@class, "SZtuWb3S")]')
  239. logger.debug(f"ele_content {ele_content.html}")
  240. logger.debug(f"ele_content class {ele_content.attr('class')}")
  241. res = analyze_conversations_content(ele_content, conversation_detail)
  242. logger.debug(f"res {res}")
  243. chat_history.append(res)
  244. ele_msg = ele_msg.prev()
  245. unread_user_data.chat_history = chat_history
  246. # {'name': '程序员马工', 'avator': 'https://p3.huoshanimg.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143', 'msg': '564', 'unread_msg_count': 2, 'time': 1714317757, 'chat_history': [{'is_me': True, 'content': {'type': 'text', 'data': '4公会'}}, {'is_me': False, 'content': {'type': 'text', 'data': '564'}}]}
  247. logger.info(f"unread_user_data {unread_user_data}")
  248. return unread_user_data
  249. @task(persist_result=True,
  250. result_storage=minio_block,
  251. result_storage_key=base.get_object_name_by_time())
  252. def save_unread_msg_to_db(unread_user_data:UnReadUserData):
  253. chat_history = unread_user_data.chat_history
  254. ret_chat_history = []
  255. # 当 chat['is_me']=False 时,表示该条消息是对方发送的,需要统计包含我在内有多少条总计聊天
  256. for chat in chat_history:
  257. ret_chat_history.append(db.insert(chat))
  258. logger.info(f"chat_history {ret_chat_history}")
  259. unread_user_data.chat_history = ret_chat_history
  260. return unread_user_data
  261. @task
  262. def send_msg_to_user(unread_user_data:UnReadUserData):
  263. # 找到聊天会话框
  264. # class="qbjZBApl popShadowAnimation"
  265. ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
  266. # 找到输入框
  267. # data-e2e="msg-input"
  268. ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]')
  269. ele_input.click()
  270. ele_input.input("hello")
  271. # 找到发送按钮
  272. # span class="e2e-send-msg-btn"
  273. ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]')
  274. ele_send.click()
  275. ele_exit = ele_popShadowAnimation.ele('退出会话')
  276. # ele_exit.click()
  277. logger.info(f"回复成功")
  278. return {"chat_history":unread_user_data.chat_history, "reply":["hello"]}
  279. @task
  280. def set_unread_done(unread_user_data:UnReadUserData):
  281. unread_user_data.is_done = True
  282. db.update(unread_user_data)
  283. logger.info("set unread done")
  284. def reply_to_user_flow_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
  285. check_msg.check_conversion_exit()
  286. @flow(log_prints=False, on_failure=[reply_to_user_flow_on_failure])
  287. def reply_to_user_flow(unread_user_data:UnReadUserData):
  288. # 进入私信聊天窗口
  289. enter_state = enter_conversation(unread_user_data.name,return_state=True)
  290. user_profile_body, conversation_detail_body = enter_state.result()
  291. # 进入后获取到用户信息如:是否关注、粉丝数、用户作品数量、用户IP、简介等。保存到数据库中,以便后续 AI 聊天时附上用户信息
  292. unread_user_data = save_enter_im_user_detail(unread_user_data, enter_state.data.storage_key, user_profile_body)
  293. unread_user_data = get_conversations_history(unread_user_data, conversation_detail_body)
  294. reply_res = send_msg_to_user(unread_user_data)
  295. set_unread_done(unread_user_data)
  296. from prefect.results import PersistedResult
  297. import pickle
  298. async def get_from_persistedresult():
  299. res1 = await minio_block.read_path('s3://swl/prefect/20240427/031928_232222-get_conversations_history-0')
  300. def main():
  301. unread_user_data, storage_key, user_profile_body = S3Object(path="save_enter_im_user_detail_test").get()
  302. save_enter_im_user_detail(unread_user_data, storage_key, user_profile_body)
  303. # eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
  304. # ele_content:ChromiumElement = eles_msg[0]._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
  305. # logger.info(f"{ele_content.html}")
  306. # analyze_conversations_content(ele_content,res[1])
  307. # chat = [{'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '刚刚'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': 'df[流泪]sdghh个个'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '12312[流泪][捂脸]'}, 'time': '03:27'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:19'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:01'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '02:55'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:56'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:41'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 16:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:50'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:26'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 14:54'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': '星期四 13:42'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:49'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:26'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 00:22'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '好玩'}, 'time': None}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 23:49'}, {'is_me': True, 'content': {'type': 'text', 'data': '123'}, 'time': '星期三 22:05'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '11'}, 'time': '星期三 18:07'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 15:13'}]
  308. # save_unread_msg_to_db(chat, 29)
  309. # asyncio.run(read_client_data())
  310. # TaskRunResult(input_type='task_run', id=UUID('b850b870-fe4d-4df3-bac3-9e8ea5e1661d'))
  311. # get_conversations_history()
  312. if __name__ == "__main__":
  313. main()