import asyncio import datetime import os import re import time import os import sys from pathlib import Path # print(f"{Path(__file__).parent.parent.parent}") # sys.path.append(Path(__file__).parent.parent.parent.parent) # sys.path.append(Path(__file__).parent.parent.parent) # sys.path.append(Path(__file__).parent.parent) # sys.path.append(Path(__file__).parent) sys.path.append(r'I:\code\ai-yunying\live-online-people') import prefect.runtime import prefect.runtime.task_run from dp.page import page from database.config import minio_block from database.s3 import S3Object import prefect.client from conf.config import logger,OUTPUT from douyin import base from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from prefect import flow,task from prefect.tasks import Task,TaskRun from prefect.states import State,StateType from prefect.client.schemas.objects import TaskRunInput,Parameter,TaskRunResult from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table,engine from douyin.flow import check_page,unread_msg from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text tab:ChromiumPage=page.tab def parse_conversations_content_text(html): # 正则表达式匹配
标签中的文本和标签中的 alt 属性值 pre_pattern = re.compile(r'
(.*?)', re.DOTALL) img_alt_pattern = re.compile(r']+alt="\[(.*?)\]"', re.DOTALL) # 查找所有
标签中的文本 pres = pre_pattern.findall(html) # 查找所有标签中的 alt 属性值 alts = img_alt_pattern.findall(html) # 结合这两部分来构建最终的字符串 result_items = [] pre_index, alt_index = 0, 0 while pre_index < len(pres) or alt_index < len(alts): if pre_index < len(pres): result_items.append(pres[pre_index]) pre_index += 1 if alt_index < len(alts): result_items.append('[' + alts[alt_index] + ']') alt_index += 1 # 返回连接后的字符串 return ''.join(result_items) def filter_aweme_detail_in_conversations_video(aweme_detail:dict): ret = { "nickname": aweme_detail.get("nickname"), "sec_uid": aweme_detail.get("sec_uid"), "preview_title" : aweme_detail.get("preview_title"), "statistics": aweme_detail.get("statistics"), } return ret def analyze_conversations_content(ele_content:ChromiumElement, conversation_detail:dict): ret = { "is_me": None, "content": { "type": None, "data": None, }, } ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False) if text: text = parse_conversations_content_text(text.html) ret['content']['data'] = text ret['content']['type'] = "text" else: logger.info(f"_find_elements 'xpath://img'") imgs = ele_content._find_elements('xpath://img', index=None, raise_err=False) logger.info(f"{imgs}") if not imgs: logger.error(f"unkown_type {ele_content.html}") # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素 elif len(imgs) > 1: ret['content']['type'] = "video" if conversation_detail: aweme_detail = conversation_detail.get("aweme_details").pop(0) data = filter_aweme_detail_in_conversations_video(aweme_detail) ret['content']['data'] = data elif len(imgs) == 1: ret['content']['data'] = imgs[0].attr("src") ret['content_type'] = "img" logger.info(f"ret {ret}") return ret def enter_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs): ele_exit_conversation = tab._find_elements('退出会话') if ele_exit_conversation: ele_exit_conversation.click() @task(on_failure=[enter_conversation_on_failure], persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def enter_conversation(click_user_name): tab.listen.start("www.douyin.com/aweme/v1/web") ele_listDlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]') ele_listDlg.ele(click_user_name).click() conversation_detail = None user_profile_packet = None # 获取该用户的基本信息 for packet in tab.listen.steps(2, timeout=3): if "user/profile" in packet.url: user_profile_packet = packet if "aweme/detail" in packet.url: conversation_detail = packet conversation_detail_body = None if not conversation_detail else conversation_detail.response.body if not user_profile_packet: raise Exception("进入私信后没有捕获到 user/profile 请求") logger.debug(f"user_profile_packet {user_profile_packet}") logger.debug(f"conversation_detail {conversation_detail}") return user_profile_packet.response.body, conversation_detail_body @task( persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def save_enter_im_user_detail(unread_user_data:UnReadUserData, storage_key, user_profile_body): # S3Object(path="save_enter_im_user_detail_test").put((unread_user_data, storage_key, user_profile_body)) unread_user_data.detail = S3Object(path=minio_block.basepath + storage_key, type=f"{tuple}") logger.info(f"S3Object {unread_user_data.detail.model_dump()}") with Session(engine) as session: statement = select(UserInfoModel).where(UserInfoModel.uid == user_profile_body.get('uid')) exist_user_info = session.exec(statement).first() if exist_user_info: user_info = exist_user_info else: user_profile_body.get('uid') logger.info(f"uid {user_profile_body.get('uid')}") logger.info(f"nickname {user_profile_body.get('nickname')}") logger.info(f"user_profile_body {user_profile_body}") user_info = user_table.dict_to_model(user_profile_body.get("user")) unread_user_data.user_info = user_info session.add(unread_user_data) session.commit() session.refresh(unread_user_data) logger.info(f"unread_user_data {unread_user_data}") return unread_user_data def get_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs): logger.info(f"{task}") logger.info(f"{task_run.task_inputs}") logger.info(f"{state}") logger.info(f"{state.result()}") def cn_time_to_timestamp(time_str:str, time_base=None): # 因为 jionlp 启动需要加载数据,所以只有在需要用到的时候才导入 import jionlp as jio '''exzample for row in chat_history_table: time_base = row.get("create_time") cn_time = row.get("time") timestamp = cn_time_to_timestamp(cn_time, time_base=time_base) if timestamp: str_time = datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S') else: str_time = None print(f"timestamp {timestamp} \t\t str {str_time} \t\t time_base {time_base}") ''' if not isinstance(time_str,str): return if "刚刚" in time_str: return int(time.time()) try: if not time_base: time_base = time.time() res = jio.parse_time(time_str, time_base) res_time = res.get('time')[0] dt_obj = datetime.datetime.strptime(res_time, '%Y-%m-%d %H:%M:%S') # 将datetime对象转换为timestamp timestamp = datetime.datetime.timestamp(dt_obj) return int(timestamp) except Exception as e: logger.error(f"{e} time_str {time_str} time_base {time_base}") return # TODO 不使用,但保留,未来可能导出所有聊天时用到 # 未读消息时,理论上无需获取所有历史,而是从未读消息显示的 time 开始获取即可。 # 可认为程序启动后的未读消息才被记录,如果程序没有启动,代表用户不需要AI处理,不用管以前那些信息 @task(on_failure=[get_conversation_on_failure], persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_conversations_history(unread_user_data:UnReadUserData, conversation_detail): ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现 messageContent = ele_popShadowAnimation.ele('#messageContent') messageContent.child().scroll.up(150) eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]') count = 0 chat_history = [] # 未读消息开始于 start_with_unread_time_txt = None for ele in eles_msg: logger.debug(f"ele.html {ele.html}") # 是否存在时间
18:07is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False) ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False) res = analyze_conversations_content(ele_content, conversation_detail) if is_ele_time: if not start_with_unread_time_txt: start_with_unread_time_txt = is_ele_time.text cn_to_time = cn_time_to_timestamp(is_ele_time.text) else: cn_to_time = None res.update({'time': cn_to_time}) logger.debug(f"res {res}") chat_history.append(res) logger.info(f"{chat_history}") unread_user_data.msg_time_txt = start_with_unread_time_txt unread_user_data.chat_history = chat_history return unread_user_data @task(on_failure=[get_conversation_on_failure], persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_unread_msg_with_time(unread_user_data:UnReadUserData, conversation_detail:dict): ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现 messageContent = ele_popShadowAnimation.ele('#messageContent') messageContent.child().scroll.up(150) # 如果消息到来,程序立即进入对话框,则通常 unread_user_data.msg_time 是 “刚刚” # 如果程序刚启动,以前就有这些未读消息,通常 unread_user_data.msg_time 是 "16:30" 或 "04-25" 或 "2023-04-25" ele_msg_start_with_time = messageContent.s_ele(unread_user_data.msg_time) unread_user_data.msg_time = cn_time_to_timestamp(ele_msg_start_with_time.text) logger.info(f"msg_start_with_time {ele_msg_start_with_time.text} - " + datetime.datetime.fromtimestamp(unread_user_data.msg_time).strftime('%Y-%m-%d %H:%M')) # 找到父级: div class="A1KpIsbL HO4aqgd4" ele_msg_start_with = ele_msg_start_with_time.parent() ele_msg = ele_msg_start_with chat_history = [] while 'A1KpIsbL' in ele_msg.attr('class'): logger.debug(f"ele.html {ele_msg.html}") ele_content:ChromiumElement = ele_msg._find_elements('xpath://div[contains(@class, "SZtuWb3S")]') logger.debug(f"ele_content {ele_content.html}") logger.debug(f"ele_content class {ele_content.attr('class')}") res = analyze_conversations_content(ele_content, conversation_detail) logger.debug(f"res {res}") chat_history.append(res) ele_msg = ele_msg.prev() unread_user_data.chat_history = chat_history # {'name': '程序员马工', 'avator': 'https://p3.huoshanimg.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143', 'msg': '564', 'unread_msg_count': 2, 'time': 1714317757, 'chat_history': [{'is_me': True, 'content': {'type': 'text', 'data': '4公会'}}, {'is_me': False, 'content': {'type': 'text', 'data': '564'}}]} logger.info(f"unread_user_data {unread_user_data}") return unread_user_data @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def save_unread_msg_to_db(unread_user_data:UnReadUserData): chat_history = unread_user_data.chat_history ret_chat_history = [] # 当 chat['is_me']=False 时,表示该条消息是对方发送的,需要统计包含我在内有多少条总计聊天 for chat in chat_history: ret_chat_history.append(db.insert(chat)) logger.info(f"chat_history {ret_chat_history}") unread_user_data.chat_history = ret_chat_history return unread_user_data @task def send_msg_to_user(unread_user_data:UnReadUserData): # 找到聊天会话框 # class="qbjZBApl popShadowAnimation" ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') # 找到输入框 # data-e2e="msg-input" ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]') ele_input.click() ele_input.input("hello") # 找到发送按钮 # span class="e2e-send-msg-btn" ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]') # ele_send.click() ele_exit = ele_popShadowAnimation.ele('退出会话') ele_exit.click() logger.info(f"回复成功") return {"chat_history":unread_user_data.chat_history, "reply":["hello"]} @task def set_unread_done(unread_user_data:UnReadUserData): unread_user_data.is_done = True db.update(unread_user_data) logger.info("set unread done") def reply_to_user_flow_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs): check_page.check_conversion_exit() @flow(log_prints=False, on_failure=[reply_to_user_flow_on_failure]) def reply_to_user_flow(unread_user_data:UnReadUserData, tab_id=None): # 进入私信聊天窗口 enter_state = enter_conversation(unread_user_data.name,return_state=True) user_profile_body, conversation_detail_body = enter_state.result() # 进入后获取到用户信息如:是否关注、粉丝数、用户作品数量、用户IP、简介等。保存到数据库中,以便后续 AI 聊天时附上用户信息 unread_user_data = save_enter_im_user_detail(unread_user_data, enter_state.data.storage_key, user_profile_body) unread_user_data = get_conversations_history(unread_user_data, conversation_detail_body) reply_res = send_msg_to_user(unread_user_data) set_unread_done(unread_user_data) from prefect.results import PersistedResult import pickle async def get_from_persistedresult(): res1 = await minio_block.read_path('s3://swl/prefect/20240427/031928_232222-get_conversations_history-0') def main(): unread_user_data, storage_key, user_profile_body = S3Object(path="save_enter_im_user_detail_test").get() save_enter_im_user_detail(unread_user_data, storage_key, user_profile_body) # eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]') # ele_content:ChromiumElement = eles_msg[0]._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False) # logger.info(f"{ele_content.html}") # analyze_conversations_content(ele_content,res[1]) # chat = [{'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '刚刚'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': 'df[流泪]sdghh个个'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '12312[流泪][捂脸]'}, 'time': '03:27'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:19'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:01'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '02:55'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:56'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:41'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 16:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:50'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:26'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 14:54'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': '星期四 13:42'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:49'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:26'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 00:22'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '好玩'}, 'time': None}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 23:49'}, {'is_me': True, 'content': {'type': 'text', 'data': '123'}, 'time': '星期三 22:05'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '11'}, 'time': '星期三 18:07'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 15:13'}] # save_unread_msg_to_db(chat, 29) # asyncio.run(read_client_data()) # TaskRunResult(input_type='task_run', id=UUID('b850b870-fe4d-4df3-bac3-9e8ea5e1661d')) # get_conversations_history() if __name__ == "__main__": main()