conversation.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. import asyncio
  2. import datetime
  3. import os
  4. import re
  5. import time
  6. import os
  7. import sys
  8. from pathlib import Path
  9. # print(f"{Path(__file__).parent.parent.parent}")
  10. # sys.path.append(Path(__file__).parent.parent.parent.parent)
  11. # sys.path.append(Path(__file__).parent.parent.parent)
  12. # sys.path.append(Path(__file__).parent.parent)
  13. # sys.path.append(Path(__file__).parent)
  14. sys.path.append(r'I:\code\ai-yunying\live-online-people')
  15. import prefect.runtime
  16. import prefect.runtime.task_run
  17. from dp.page import page
  18. from database.config import minio_block
  19. from database.s3 import S3Object
  20. import prefect.client
  21. from conf.config import logger,OUTPUT
  22. from douyin import base
  23. from DrissionPage import ChromiumPage
  24. from DrissionPage._elements.chromium_element import ChromiumElement
  25. from prefect import flow,task
  26. from prefect.tasks import Task,TaskRun
  27. from prefect.states import State,StateType
  28. from prefect.client.schemas.objects import TaskRunInput,Parameter,TaskRunResult
  29. from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table,engine
  30. from douyin.flow import check_page,unread_msg
  31. from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
  32. tab:ChromiumPage=page.tab
  33. def parse_conversations_content_text(html):
  34. # 正则表达式匹配 <pre> 标签中的文本和 <img> 标签中的 alt 属性值
  35. pre_pattern = re.compile(r'<pre>(.*?)</pre>', re.DOTALL)
  36. img_alt_pattern = re.compile(r'<img[^>]+alt="\[(.*?)\]"', re.DOTALL)
  37. # 查找所有 <pre> 标签中的文本
  38. pres = pre_pattern.findall(html)
  39. # 查找所有 <img> 标签中的 alt 属性值
  40. alts = img_alt_pattern.findall(html)
  41. # 结合这两部分来构建最终的字符串
  42. result_items = []
  43. pre_index, alt_index = 0, 0
  44. while pre_index < len(pres) or alt_index < len(alts):
  45. if pre_index < len(pres):
  46. result_items.append(pres[pre_index])
  47. pre_index += 1
  48. if alt_index < len(alts):
  49. result_items.append('[' + alts[alt_index] + ']')
  50. alt_index += 1
  51. # 返回连接后的字符串
  52. return ''.join(result_items)
  53. def filter_aweme_detail_in_conversations_video(aweme_detail:dict):
  54. ret = {
  55. "nickname": aweme_detail.get("nickname"),
  56. "sec_uid": aweme_detail.get("sec_uid"),
  57. "preview_title" : aweme_detail.get("preview_title"),
  58. "statistics": aweme_detail.get("statistics"),
  59. }
  60. return ret
  61. def analyze_conversations_content(ele_content:ChromiumElement, conversation_detail:dict):
  62. ret = {
  63. "is_me": None,
  64. "content": {
  65. "type": None,
  66. "data": None,
  67. },
  68. }
  69. ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False
  70. text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False)
  71. if text:
  72. text = parse_conversations_content_text(text.html)
  73. ret['content']['data'] = text
  74. ret['content']['type'] = "text"
  75. else:
  76. logger.info(f"_find_elements 'xpath://img'")
  77. imgs = ele_content._find_elements('xpath://img', index=None, raise_err=False)
  78. logger.info(f"{imgs}")
  79. if not imgs:
  80. logger.error(f"unkown_type {ele_content.html}")
  81. # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素
  82. elif len(imgs) > 1:
  83. ret['content']['type'] = "video"
  84. if conversation_detail:
  85. aweme_detail = conversation_detail.get("aweme_details").pop(0)
  86. data = filter_aweme_detail_in_conversations_video(aweme_detail)
  87. ret['content']['data'] = data
  88. elif len(imgs) == 1:
  89. ret['content']['data'] = imgs[0].attr("src")
  90. ret['content_type'] = "img"
  91. logger.info(f"ret {ret}")
  92. return ret
  93. def enter_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
  94. ele_exit_conversation = tab._find_elements('退出会话')
  95. if ele_exit_conversation:
  96. ele_exit_conversation.click()
  97. @task(on_failure=[enter_conversation_on_failure],
  98. persist_result=True,
  99. result_storage=minio_block,
  100. result_storage_key=base.get_object_name_by_time())
  101. def enter_conversation(click_user_name):
  102. tab.listen.start("www.douyin.com/aweme/v1/web")
  103. ele_listDlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
  104. ele_listDlg.ele(click_user_name).click()
  105. conversation_detail = None
  106. user_profile_packet = None
  107. # 获取该用户的基本信息
  108. for packet in tab.listen.steps(2, timeout=3):
  109. if "user/profile" in packet.url:
  110. user_profile_packet = packet
  111. if "aweme/detail" in packet.url:
  112. conversation_detail = packet
  113. conversation_detail_body = None if not conversation_detail else conversation_detail.response.body
  114. if not user_profile_packet:
  115. raise Exception("进入私信后没有捕获到 user/profile 请求")
  116. logger.debug(f"user_profile_packet {user_profile_packet}")
  117. logger.debug(f"conversation_detail {conversation_detail}")
  118. return user_profile_packet.response.body, conversation_detail_body
  119. @task(
  120. persist_result=True,
  121. result_storage=minio_block,
  122. result_storage_key=base.get_object_name_by_time())
  123. def save_enter_im_user_detail(unread_user_data:UnReadUserData, storage_key, user_profile_body):
  124. # S3Object(path="save_enter_im_user_detail_test").put((unread_user_data, storage_key, user_profile_body))
  125. unread_user_data.detail = S3Object(path=minio_block.basepath + storage_key, type=f"{tuple}")
  126. logger.info(f"S3Object {unread_user_data.detail.model_dump()}")
  127. with Session(engine) as session:
  128. statement = select(UserInfoModel).where(UserInfoModel.uid == user_profile_body.get('uid'))
  129. exist_user_info = session.exec(statement).first()
  130. if exist_user_info:
  131. user_info = exist_user_info
  132. else:
  133. user_profile_body.get('uid')
  134. logger.info(f"uid {user_profile_body.get('uid')}")
  135. logger.info(f"nickname {user_profile_body.get('nickname')}")
  136. logger.info(f"user_profile_body {user_profile_body}")
  137. user_info = user_table.dict_to_model(user_profile_body.get("user"))
  138. unread_user_data.user_info = user_info
  139. session.add(unread_user_data)
  140. session.commit()
  141. session.refresh(unread_user_data)
  142. logger.info(f"unread_user_data {unread_user_data}")
  143. return unread_user_data
  144. def get_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
  145. logger.info(f"{task}")
  146. logger.info(f"{task_run.task_inputs}")
  147. logger.info(f"{state}")
  148. logger.info(f"{state.result()}")
  149. def cn_time_to_timestamp(time_str:str, time_base=None):
  150. # 因为 jionlp 启动需要加载数据,所以只有在需要用到的时候才导入
  151. import jionlp as jio
  152. '''exzample
  153. for row in chat_history_table:
  154. time_base = row.get("create_time")
  155. cn_time = row.get("time")
  156. timestamp = cn_time_to_timestamp(cn_time, time_base=time_base)
  157. if timestamp:
  158. str_time = datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S')
  159. else:
  160. str_time = None
  161. print(f"timestamp {timestamp} \t\t str {str_time} \t\t time_base {time_base}")
  162. '''
  163. if not isinstance(time_str,str):
  164. return
  165. if "刚刚" in time_str:
  166. return int(time.time())
  167. try:
  168. if not time_base:
  169. time_base = time.time()
  170. res = jio.parse_time(time_str, time_base)
  171. res_time = res.get('time')[0]
  172. dt_obj = datetime.datetime.strptime(res_time, '%Y-%m-%d %H:%M:%S')
  173. # 将datetime对象转换为timestamp
  174. timestamp = datetime.datetime.timestamp(dt_obj)
  175. return int(timestamp)
  176. except Exception as e:
  177. logger.error(f"{e} time_str {time_str} time_base {time_base}")
  178. return
  179. # TODO 不使用,但保留,未来可能导出所有聊天时用到
  180. # 未读消息时,理论上无需获取所有历史,而是从未读消息显示的 time 开始获取即可。
  181. # 可认为程序启动后的未读消息才被记录,如果程序没有启动,代表用户不需要AI处理,不用管以前那些信息
  182. @task(on_failure=[get_conversation_on_failure],
  183. persist_result=True,
  184. result_storage=minio_block,
  185. result_storage_key=base.get_object_name_by_time())
  186. def get_conversations_history(unread_user_data:UnReadUserData, conversation_detail):
  187. ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
  188. # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
  189. messageContent = ele_popShadowAnimation.ele('#messageContent')
  190. messageContent.child().scroll.up(150)
  191. eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
  192. count = 0
  193. chat_history = []
  194. # 未读消息开始于
  195. start_with_unread_time_txt = None
  196. for ele in eles_msg:
  197. logger.debug(f"ele.html {ele.html}")
  198. # 是否存在时间 <div class="kZAHYArp">18:07 </div>
  199. is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False)
  200. ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
  201. res = analyze_conversations_content(ele_content, conversation_detail)
  202. if is_ele_time:
  203. if not start_with_unread_time_txt:
  204. start_with_unread_time_txt = is_ele_time.text
  205. cn_to_time = cn_time_to_timestamp(is_ele_time.text)
  206. else:
  207. cn_to_time = None
  208. res.update({'time': cn_to_time})
  209. logger.debug(f"res {res}")
  210. chat_history.append(res)
  211. logger.info(f"{chat_history}")
  212. unread_user_data.msg_time_txt = start_with_unread_time_txt
  213. unread_user_data.chat_history = chat_history
  214. return unread_user_data
  215. @task(on_failure=[get_conversation_on_failure],
  216. persist_result=True,
  217. result_storage=minio_block,
  218. result_storage_key=base.get_object_name_by_time())
  219. def get_unread_msg_with_time(unread_user_data:UnReadUserData, conversation_detail:dict):
  220. ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
  221. # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
  222. messageContent = ele_popShadowAnimation.ele('#messageContent')
  223. messageContent.child().scroll.up(150)
  224. # 如果消息到来,程序立即进入对话框,则通常 unread_user_data.msg_time 是 “刚刚”
  225. # 如果程序刚启动,以前就有这些未读消息,通常 unread_user_data.msg_time 是 "16:30" 或 "04-25" 或 "2023-04-25"
  226. ele_msg_start_with_time = messageContent.s_ele(unread_user_data.msg_time)
  227. unread_user_data.msg_time = cn_time_to_timestamp(ele_msg_start_with_time.text)
  228. logger.info(f"msg_start_with_time {ele_msg_start_with_time.text} - " + datetime.datetime.fromtimestamp(unread_user_data.msg_time).strftime('%Y-%m-%d %H:%M'))
  229. # 找到父级: div class="A1KpIsbL HO4aqgd4"
  230. ele_msg_start_with = ele_msg_start_with_time.parent()
  231. ele_msg = ele_msg_start_with
  232. chat_history = []
  233. while 'A1KpIsbL' in ele_msg.attr('class'):
  234. logger.debug(f"ele.html {ele_msg.html}")
  235. ele_content:ChromiumElement = ele_msg._find_elements('xpath://div[contains(@class, "SZtuWb3S")]')
  236. logger.debug(f"ele_content {ele_content.html}")
  237. logger.debug(f"ele_content class {ele_content.attr('class')}")
  238. res = analyze_conversations_content(ele_content, conversation_detail)
  239. logger.debug(f"res {res}")
  240. chat_history.append(res)
  241. ele_msg = ele_msg.prev()
  242. unread_user_data.chat_history = chat_history
  243. # {'name': '程序员马工', 'avator': 'https://p3.huoshanimg.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143', 'msg': '564', 'unread_msg_count': 2, 'time': 1714317757, 'chat_history': [{'is_me': True, 'content': {'type': 'text', 'data': '4公会'}}, {'is_me': False, 'content': {'type': 'text', 'data': '564'}}]}
  244. logger.info(f"unread_user_data {unread_user_data}")
  245. return unread_user_data
  246. @task(persist_result=True,
  247. result_storage=minio_block,
  248. result_storage_key=base.get_object_name_by_time())
  249. def save_unread_msg_to_db(unread_user_data:UnReadUserData):
  250. chat_history = unread_user_data.chat_history
  251. ret_chat_history = []
  252. # 当 chat['is_me']=False 时,表示该条消息是对方发送的,需要统计包含我在内有多少条总计聊天
  253. for chat in chat_history:
  254. ret_chat_history.append(db.insert(chat))
  255. logger.info(f"chat_history {ret_chat_history}")
  256. unread_user_data.chat_history = ret_chat_history
  257. return unread_user_data
  258. @task
  259. def send_msg_to_user(unread_user_data:UnReadUserData):
  260. # 找到聊天会话框
  261. # class="qbjZBApl popShadowAnimation"
  262. ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
  263. # 找到输入框
  264. # data-e2e="msg-input"
  265. ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]')
  266. ele_input.click()
  267. ele_input.input("hello")
  268. # 找到发送按钮
  269. # span class="e2e-send-msg-btn"
  270. ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]')
  271. # ele_send.click()
  272. ele_exit = ele_popShadowAnimation.ele('退出会话')
  273. ele_exit.click()
  274. logger.info(f"回复成功")
  275. return {"chat_history":unread_user_data.chat_history, "reply":["hello"]}
  276. @task
  277. def set_unread_done(unread_user_data:UnReadUserData):
  278. unread_user_data.is_done = True
  279. db.update(unread_user_data)
  280. logger.info("set unread done")
  281. def reply_to_user_flow_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
  282. check_page.check_conversion_exit()
  283. @flow(log_prints=False, on_failure=[reply_to_user_flow_on_failure])
  284. def reply_to_user_flow(unread_user_data:UnReadUserData, tab_id=None):
  285. # 进入私信聊天窗口
  286. enter_state = enter_conversation(unread_user_data.name,return_state=True)
  287. user_profile_body, conversation_detail_body = enter_state.result()
  288. # 进入后获取到用户信息如:是否关注、粉丝数、用户作品数量、用户IP、简介等。保存到数据库中,以便后续 AI 聊天时附上用户信息
  289. unread_user_data = save_enter_im_user_detail(unread_user_data, enter_state.data.storage_key, user_profile_body)
  290. unread_user_data = get_conversations_history(unread_user_data, conversation_detail_body)
  291. reply_res = send_msg_to_user(unread_user_data)
  292. set_unread_done(unread_user_data)
  293. from prefect.results import PersistedResult
  294. import pickle
  295. async def get_from_persistedresult():
  296. res1 = await minio_block.read_path('s3://swl/prefect/20240427/031928_232222-get_conversations_history-0')
  297. def main():
  298. unread_user_data, storage_key, user_profile_body = S3Object(path="save_enter_im_user_detail_test").get()
  299. save_enter_im_user_detail(unread_user_data, storage_key, user_profile_body)
  300. # eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
  301. # ele_content:ChromiumElement = eles_msg[0]._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
  302. # logger.info(f"{ele_content.html}")
  303. # analyze_conversations_content(ele_content,res[1])
  304. # chat = [{'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '刚刚'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': 'df[流泪]sdghh个个'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '12312[流泪][捂脸]'}, 'time': '03:27'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:19'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:01'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '02:55'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:56'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:41'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 16:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:50'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:26'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 14:54'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': '星期四 13:42'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:49'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:26'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 00:22'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '好玩'}, 'time': None}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 23:49'}, {'is_me': True, 'content': {'type': 'text', 'data': '123'}, 'time': '星期三 22:05'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '11'}, 'time': '星期三 18:07'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 15:13'}]
  305. # save_unread_msg_to_db(chat, 29)
  306. # asyncio.run(read_client_data())
  307. # TaskRunResult(input_type='task_run', id=UUID('b850b870-fe4d-4df3-bac3-9e8ea5e1661d'))
  308. # get_conversations_history()
  309. if __name__ == "__main__":
  310. main()