| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346 |
- import asyncio
- import datetime
- import os
- import re
- import time
- import os
- import sys
- from pathlib import Path
- # print(f"{Path(__file__).parent.parent.parent}")
- # sys.path.append(Path(__file__).parent.parent.parent.parent)
- # sys.path.append(Path(__file__).parent.parent.parent)
- # sys.path.append(Path(__file__).parent.parent)
- # sys.path.append(Path(__file__).parent)
- sys.path.append(r'I:\code\ai-yunying\live-online-people')
- import prefect.runtime
- import prefect.runtime.task_run
- from dp.page import page
- from database.config import minio_block
- from database.s3 import S3Object
- import prefect.client
- import jionlp as jio
- from conf.config import logger,OUTPUT
- from douyin import base
- from dataset import Table
- from DrissionPage import ChromiumPage
- from DrissionPage._elements.chromium_element import ChromiumElement
- from DrissionPage._units.listener import DataPacket
- import jsonpath
- from prefect import flow,task
- from prefect.tasks import Task,TaskRun
- from prefect.flows import Flow
- from prefect.states import State,StateType
- from prefect.client.schemas.objects import TaskRunInput,Parameter,TaskRunResult
- from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table,engine
- from douyin.flow import check_msg,unread_msg
- from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
- tab:ChromiumPage=page.tab
- def parse_conversations_content_text(html):
- # 正则表达式匹配 <pre> 标签中的文本和 <img> 标签中的 alt 属性值
- pre_pattern = re.compile(r'<pre>(.*?)</pre>', re.DOTALL)
- img_alt_pattern = re.compile(r'<img[^>]+alt="\[(.*?)\]"', re.DOTALL)
-
- # 查找所有 <pre> 标签中的文本
- pres = pre_pattern.findall(html)
-
- # 查找所有 <img> 标签中的 alt 属性值
- alts = img_alt_pattern.findall(html)
-
- # 结合这两部分来构建最终的字符串
- result_items = []
- pre_index, alt_index = 0, 0
- while pre_index < len(pres) or alt_index < len(alts):
- if pre_index < len(pres):
- result_items.append(pres[pre_index])
- pre_index += 1
- if alt_index < len(alts):
- result_items.append('[' + alts[alt_index] + ']')
- alt_index += 1
-
- # 返回连接后的字符串
- return ''.join(result_items)
- def filter_aweme_detail_in_conversations_video(aweme_detail:dict):
- ret = {
- "nickname": aweme_detail.get("nickname"),
- "sec_uid": aweme_detail.get("sec_uid"),
- "preview_title" : aweme_detail.get("preview_title"),
- "statistics": aweme_detail.get("statistics"),
- }
- return ret
- def analyze_conversations_content(ele_content:ChromiumElement, conversation_detail:dict):
- ret = {
- "is_me": None,
- "content": {
- "type": None,
- "data": None,
- },
- }
- ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False
- text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False)
- if text:
- text = parse_conversations_content_text(text.html)
- ret['content']['data'] = text
- ret['content']['type'] = "text"
- else:
- logger.info(f"_find_elements 'xpath://img'")
- imgs = ele_content._find_elements('xpath://img', index=None, raise_err=False)
- logger.info(f"{imgs}")
- if not imgs:
- logger.error(f"unkown_type {ele_content.html}")
- # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素
- elif len(imgs) > 1:
- ret['content']['type'] = "video"
- if conversation_detail:
- aweme_detail = conversation_detail.get("aweme_details").pop(0)
- data = filter_aweme_detail_in_conversations_video(aweme_detail)
- ret['content']['data'] = data
- elif len(imgs) == 1:
- ret['content']['data'] = imgs[0].attr("src")
- ret['content_type'] = "img"
- logger.info(f"ret {ret}")
- return ret
- def enter_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
- ele_exit_conversation = tab._find_elements('退出会话')
- if ele_exit_conversation:
- ele_exit_conversation.click()
- @task(on_failure=[enter_conversation_on_failure],
- persist_result=True,
- result_storage=minio_block,
- result_storage_key=base.get_object_name_by_time())
- def enter_conversation(click_user_name):
- tab.listen.start("www.douyin.com/aweme/v1/web")
- ele_listDlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
- ele_listDlg.ele(click_user_name).click()
- conversation_detail = None
- user_profile_packet = None
- # 获取该用户的基本信息
- for packet in tab.listen.steps(2, timeout=3):
- if "user/profile" in packet.url:
- user_profile_packet = packet
- if "aweme/detail" in packet.url:
- conversation_detail = packet
- conversation_detail_body = None if not conversation_detail else conversation_detail.response.body
- if not user_profile_packet:
- raise Exception("进入私信后没有捕获到 user/profile 请求")
- logger.debug(f"user_profile_packet {user_profile_packet}")
- logger.debug(f"conversation_detail {conversation_detail}")
- return user_profile_packet.response.body, conversation_detail_body
- @task(
- persist_result=True,
- result_storage=minio_block,
- result_storage_key=base.get_object_name_by_time())
- def save_enter_im_user_detail(unread_user_data:UnReadUserData, storage_key, user_profile_body):
- # S3Object(path="save_enter_im_user_detail_test").put((unread_user_data, storage_key, user_profile_body))
- unread_user_data.detail = S3Object(path=minio_block.basepath + storage_key, type=f"{tuple}")
- logger.info(f"S3Object {unread_user_data.detail.model_dump()}")
- with Session(engine) as session:
- statement = select(UserInfoModel).where(UserInfoModel.uid == user_profile_body.get('uid'))
- exist_user_info = session.exec(statement).first()
- if exist_user_info:
- user_info = exist_user_info
- else:
- user_profile_body.get('uid')
- logger.info(f"uid {user_profile_body.get('uid')}")
- logger.info(f"nickname {user_profile_body.get('nickname')}")
- logger.info(f"user_profile_body {user_profile_body}")
- user_info = user_table.dict_to_model(user_profile_body.get("user"))
- unread_user_data.user_info = user_info
- session.add(unread_user_data)
- session.commit()
- session.refresh(unread_user_data)
- logger.info(f"unread_user_data {unread_user_data}")
- return unread_user_data
- def get_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
- logger.info(f"{task}")
- logger.info(f"{task_run.task_inputs}")
- logger.info(f"{state}")
- logger.info(f"{state.result()}")
- def cn_time_to_timestamp(time_str:str, time_base=None):
- '''exzample
- for row in chat_history_table:
- time_base = row.get("create_time")
- cn_time = row.get("time")
- timestamp = cn_time_to_timestamp(cn_time, time_base=time_base)
- if timestamp:
- str_time = datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S')
- else:
- str_time = None
- print(f"timestamp {timestamp} \t\t str {str_time} \t\t time_base {time_base}")
- '''
- if not isinstance(time_str,str):
- return
- if "刚刚" in time_str:
- return int(time.time())
- try:
- if not time_base:
- time_base = time.time()
- res = jio.parse_time(time_str, time_base)
- res_time = res.get('time')[0]
- dt_obj = datetime.datetime.strptime(res_time, '%Y-%m-%d %H:%M:%S')
- # 将datetime对象转换为timestamp
- timestamp = datetime.datetime.timestamp(dt_obj)
- return int(timestamp)
- except Exception as e:
- logger.error(f"{e} time_str {time_str} time_base {time_base}")
- return
- # TODO 不使用,但保留,未来可能导出所有聊天时用到
- # 未读消息时,理论上无需获取所有历史,而是从未读消息显示的 time 开始获取即可。
- # 可认为程序启动后的未读消息才被记录,如果程序没有启动,代表用户不需要AI处理,不用管以前那些信息
- @task(on_failure=[get_conversation_on_failure],
- persist_result=True,
- result_storage=minio_block,
- result_storage_key=base.get_object_name_by_time())
- def get_conversations_history(unread_user_data:UnReadUserData, conversation_detail):
- ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
- # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
- messageContent = ele_popShadowAnimation.ele('#messageContent')
- messageContent.child().scroll.up(150)
- eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
- count = 0
- chat_history = []
- # 未读消息开始于
- start_with_unread_time_txt = None
- for ele in eles_msg:
- logger.debug(f"ele.html {ele.html}")
- # 是否存在时间 <div class="kZAHYArp">18:07 </div>
- is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False)
- ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
- res = analyze_conversations_content(ele_content, conversation_detail)
- if is_ele_time:
- if not start_with_unread_time_txt:
- start_with_unread_time_txt = is_ele_time.text
- cn_to_time = cn_time_to_timestamp(is_ele_time.text)
- else:
- cn_to_time = None
- res.update({'time': cn_to_time})
- logger.debug(f"res {res}")
- chat_history.append(res)
- logger.info(f"{chat_history}")
- unread_user_data.msg_time_txt = start_with_unread_time_txt
- unread_user_data.chat_history = chat_history
- return unread_user_data
- @task(on_failure=[get_conversation_on_failure],
- persist_result=True,
- result_storage=minio_block,
- result_storage_key=base.get_object_name_by_time())
- def get_unread_msg_with_time(unread_user_data:UnReadUserData, conversation_detail:dict):
- ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
- # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
- messageContent = ele_popShadowAnimation.ele('#messageContent')
- messageContent.child().scroll.up(150)
- # 如果消息到来,程序立即进入对话框,则通常 unread_user_data.msg_time 是 “刚刚”
- # 如果程序刚启动,以前就有这些未读消息,通常 unread_user_data.msg_time 是 "16:30" 或 "04-25" 或 "2023-04-25"
- ele_msg_start_with_time = messageContent.s_ele(unread_user_data.msg_time)
- unread_user_data.msg_time = cn_time_to_timestamp(ele_msg_start_with_time.text)
- logger.info(f"msg_start_with_time {ele_msg_start_with_time.text} - " + datetime.datetime.fromtimestamp(unread_user_data.msg_time).strftime('%Y-%m-%d %H:%M'))
- # 找到父级: div class="A1KpIsbL HO4aqgd4"
- ele_msg_start_with = ele_msg_start_with_time.parent()
- ele_msg = ele_msg_start_with
- chat_history = []
- while 'A1KpIsbL' in ele_msg.attr('class'):
- logger.debug(f"ele.html {ele_msg.html}")
- ele_content:ChromiumElement = ele_msg._find_elements('xpath://div[contains(@class, "SZtuWb3S")]')
- logger.debug(f"ele_content {ele_content.html}")
- logger.debug(f"ele_content class {ele_content.attr('class')}")
- res = analyze_conversations_content(ele_content, conversation_detail)
- logger.debug(f"res {res}")
- chat_history.append(res)
- ele_msg = ele_msg.prev()
- unread_user_data.chat_history = chat_history
- # {'name': '程序员马工', 'avator': 'https://p3.huoshanimg.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143', 'msg': '564', 'unread_msg_count': 2, 'time': 1714317757, 'chat_history': [{'is_me': True, 'content': {'type': 'text', 'data': '4公会'}}, {'is_me': False, 'content': {'type': 'text', 'data': '564'}}]}
- logger.info(f"unread_user_data {unread_user_data}")
- return unread_user_data
- @task(persist_result=True,
- result_storage=minio_block,
- result_storage_key=base.get_object_name_by_time())
- def save_unread_msg_to_db(unread_user_data:UnReadUserData):
- chat_history = unread_user_data.chat_history
- ret_chat_history = []
- # 当 chat['is_me']=False 时,表示该条消息是对方发送的,需要统计包含我在内有多少条总计聊天
- for chat in chat_history:
- ret_chat_history.append(db.insert(chat))
- logger.info(f"chat_history {ret_chat_history}")
- unread_user_data.chat_history = ret_chat_history
- return unread_user_data
- @task
- def send_msg_to_user(unread_user_data:UnReadUserData):
-
- # 找到聊天会话框
- # class="qbjZBApl popShadowAnimation"
- ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
- # 找到输入框
- # data-e2e="msg-input"
- ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]')
- ele_input.click()
- ele_input.input("hello")
- # 找到发送按钮
- # span class="e2e-send-msg-btn"
- ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]')
- ele_send.click()
- ele_exit = ele_popShadowAnimation.ele('退出会话')
- # ele_exit.click()
- logger.info(f"回复成功")
- return {"chat_history":unread_user_data.chat_history, "reply":["hello"]}
- @task
- def set_unread_done(unread_user_data:UnReadUserData):
- unread_user_data.is_done = True
- db.update(unread_user_data)
- logger.info("set unread done")
- def reply_to_user_flow_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
- check_msg.check_conversion_exit()
- @flow(log_prints=False, on_failure=[reply_to_user_flow_on_failure])
- def reply_to_user_flow(unread_user_data:UnReadUserData):
- # 进入私信聊天窗口
- enter_state = enter_conversation(unread_user_data.name,return_state=True)
- user_profile_body, conversation_detail_body = enter_state.result()
- # 进入后获取到用户信息如:是否关注、粉丝数、用户作品数量、用户IP、简介等。保存到数据库中,以便后续 AI 聊天时附上用户信息
- unread_user_data = save_enter_im_user_detail(unread_user_data, enter_state.data.storage_key, user_profile_body)
- unread_user_data = get_conversations_history(unread_user_data, conversation_detail_body)
- reply_res = send_msg_to_user(unread_user_data)
- set_unread_done(unread_user_data)
- from prefect.results import PersistedResult
- import pickle
- async def get_from_persistedresult():
- res1 = await minio_block.read_path('s3://swl/prefect/20240427/031928_232222-get_conversations_history-0')
- def main():
- unread_user_data, storage_key, user_profile_body = S3Object(path="save_enter_im_user_detail_test").get()
- save_enter_im_user_detail(unread_user_data, storage_key, user_profile_body)
- # eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
- # ele_content:ChromiumElement = eles_msg[0]._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
- # logger.info(f"{ele_content.html}")
- # analyze_conversations_content(ele_content,res[1])
- # chat = [{'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '刚刚'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': 'df[流泪]sdghh个个'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '12312[流泪][捂脸]'}, 'time': '03:27'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:19'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:01'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '02:55'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:56'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:41'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 16:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:50'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:26'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 14:54'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': '星期四 13:42'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:49'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:26'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 00:22'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '好玩'}, 'time': None}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 23:49'}, {'is_me': True, 'content': {'type': 'text', 'data': '123'}, 'time': '星期三 22:05'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '11'}, 'time': '星期三 18:07'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 15:13'}]
- # save_unread_msg_to_db(chat, 29)
- # asyncio.run(read_client_data())
- # TaskRunResult(input_type='task_run', id=UUID('b850b870-fe4d-4df3-bac3-9e8ea5e1661d'))
- # get_conversations_history()
- if __name__ == "__main__":
- main()
|