import asyncio import datetime import os import re import time import os import sys from pathlib import Path # print(f"{Path(__file__).parent.parent.parent}") # sys.path.append(Path(__file__).parent.parent.parent.parent) # sys.path.append(Path(__file__).parent.parent.parent) # sys.path.append(Path(__file__).parent.parent) # sys.path.append(Path(__file__).parent) sys.path.append(r'I:\code\ai-yunying\live-online-people') import prefect.runtime import prefect.runtime.task_run from dp.page import page from database.config import minio_block from database.s3 import S3Object import prefect.client import jionlp as jio from conf.config import logger,OUTPUT from douyin import base from dataset import Table from DrissionPage import ChromiumPage from DrissionPage._elements.chromium_element import ChromiumElement from DrissionPage._units.listener import DataPacket import jsonpath from prefect import flow,task from prefect.tasks import Task,TaskRun from prefect.flows import Flow from prefect.states import State,StateType from prefect.client.schemas.objects import TaskRunInput,Parameter,TaskRunResult from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table,engine from douyin.flow import check_msg,unread_msg from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text tab:ChromiumPage=page.tab def parse_conversations_content_text(html): # 正则表达式匹配
 标签中的文本和  标签中的 alt 属性值  
        pre_pattern = re.compile(r'
(.*?)
', re.DOTALL) img_alt_pattern = re.compile(r']+alt="\[(.*?)\]"', re.DOTALL) # 查找所有
 标签中的文本  
        pres = pre_pattern.findall(html)  
        
        # 查找所有  标签中的 alt 属性值  
        alts = img_alt_pattern.findall(html)  
        
        # 结合这两部分来构建最终的字符串  
        result_items = []  
        pre_index, alt_index = 0, 0  
        while pre_index < len(pres) or alt_index < len(alts):  
            if pre_index < len(pres):  
                result_items.append(pres[pre_index])  
                pre_index += 1  
            if alt_index < len(alts):  
                result_items.append('[' + alts[alt_index] + ']')  
                alt_index += 1  
                
        # 返回连接后的字符串  
        return ''.join(result_items)  

def filter_aweme_detail_in_conversations_video(aweme_detail:dict):
    ret = {
        "nickname": aweme_detail.get("nickname"),
        "sec_uid": aweme_detail.get("sec_uid"),
        "preview_title" : aweme_detail.get("preview_title"),
        "statistics": aweme_detail.get("statistics"),
    }
    return ret 

def analyze_conversations_content(ele_content:ChromiumElement, conversation_detail:dict):
        ret = {
            "is_me": None,
            "content": {
                "type": None,
                "data": None,
            },
        }
        ret['is_me'] = True if 'tIJOLE11' in ele_content.attr("class") else False
        text = ele_content._find_elements('xpath://span[@class="WCSQFekt"]', raise_err=False)
        if text:
            text = parse_conversations_content_text(text.html)
            ret['content']['data'] = text
            ret['content']['type'] = "text"
        else:
            logger.info(f"_find_elements 'xpath://img'")
            imgs = ele_content._find_elements('xpath://img', index=None, raise_err=False)
            logger.info(f"{imgs}")
            if not imgs:
                 logger.error(f"unkown_type {ele_content.html}")
            # 视频通常会包含封面和播放按钮,所以这里会有两个 img 元素
            elif len(imgs) > 1:
                ret['content']['type'] = "video"
                if conversation_detail:
                    aweme_detail = conversation_detail.get("aweme_details").pop(0)
                    data = filter_aweme_detail_in_conversations_video(aweme_detail)
                    ret['content']['data'] = data
            elif len(imgs) == 1:
                ret['content']['data'] = imgs[0].attr("src")
                ret['content_type'] = "img"
            logger.info(f"ret {ret}")
        return ret

def enter_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
    ele_exit_conversation = tab._find_elements('退出会话')
    if ele_exit_conversation:
        ele_exit_conversation.click()

@task(on_failure=[enter_conversation_on_failure],
      persist_result=True, 
      result_storage=minio_block, 
      result_storage_key=base.get_object_name_by_time())
def enter_conversation(click_user_name):
    tab.listen.start("www.douyin.com/aweme/v1/web")
    ele_listDlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
    ele_listDlg.ele(click_user_name).click()
    conversation_detail = None
    user_profile_packet = None
    # 获取该用户的基本信息
    for packet in tab.listen.steps(2, timeout=3):
        if "user/profile" in packet.url:
            user_profile_packet = packet
        if "aweme/detail" in packet.url:
            conversation_detail = packet
    conversation_detail_body = None if not conversation_detail else conversation_detail.response.body
    if not user_profile_packet:
        raise Exception("进入私信后没有捕获到 user/profile 请求")
    logger.debug(f"user_profile_packet {user_profile_packet}")
    logger.debug(f"conversation_detail {conversation_detail}")
    return user_profile_packet.response.body, conversation_detail_body

@task(
      persist_result=True, 
      result_storage=minio_block, 
      result_storage_key=base.get_object_name_by_time())
def save_enter_im_user_detail(unread_user_data:UnReadUserData, storage_key, user_profile_body):
    # S3Object(path="save_enter_im_user_detail_test").put((unread_user_data, storage_key, user_profile_body))
    unread_user_data.detail = S3Object(path=minio_block.basepath + storage_key, type=f"{tuple}")
    logger.info(f"S3Object {unread_user_data.detail.model_dump()}")
    with Session(engine) as session:
        statement = select(UserInfoModel).where(UserInfoModel.uid == user_profile_body.get('uid'))
        exist_user_info = session.exec(statement).first()
        if exist_user_info:
            user_info = exist_user_info
        else:
            user_profile_body.get('uid')
            logger.info(f"uid {user_profile_body.get('uid')}")
            logger.info(f"nickname {user_profile_body.get('nickname')}")
            logger.info(f"user_profile_body {user_profile_body}")
            user_info = user_table.dict_to_model(user_profile_body.get("user"))
        unread_user_data.user_info = user_info
        session.add(unread_user_data)
        session.commit()
        session.refresh(unread_user_data)
    logger.info(f"unread_user_data {unread_user_data}")
    return unread_user_data
def get_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
    logger.info(f"{task}")
    logger.info(f"{task_run.task_inputs}")
    logger.info(f"{state}")
    logger.info(f"{state.result()}")


def cn_time_to_timestamp(time_str:str, time_base=None):
    '''exzample
        for row in chat_history_table:
        time_base = row.get("create_time")
        cn_time = row.get("time")
        timestamp = cn_time_to_timestamp(cn_time, time_base=time_base)
        if timestamp:
            str_time = datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S')
        else:
            str_time = None
        print(f"timestamp {timestamp} \t\t str {str_time} \t\t time_base {time_base}")
    '''
    if not isinstance(time_str,str):
        return 
    if "刚刚" in time_str:
        return int(time.time())
    try:
        if not time_base:
            time_base = time.time()

        res = jio.parse_time(time_str, time_base)
        res_time = res.get('time')[0]
        dt_obj = datetime.datetime.strptime(res_time, '%Y-%m-%d %H:%M:%S')
        # 将datetime对象转换为timestamp
        timestamp = datetime.datetime.timestamp(dt_obj)
        return int(timestamp)
    except Exception as e:
        logger.error(f"{e} time_str {time_str} time_base {time_base}")
        return 


# TODO 不使用,但保留,未来可能导出所有聊天时用到
# 未读消息时,理论上无需获取所有历史,而是从未读消息显示的 time 开始获取即可。
# 可认为程序启动后的未读消息才被记录,如果程序没有启动,代表用户不需要AI处理,不用管以前那些信息
@task(on_failure=[get_conversation_on_failure],
      persist_result=True, 
      result_storage=minio_block, 
      result_storage_key=base.get_object_name_by_time())
def get_conversations_history(unread_user_data:UnReadUserData, conversation_detail):
    ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
    # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
    messageContent = ele_popShadowAnimation.ele('#messageContent')
    messageContent.child().scroll.up(150)
    eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
    count = 0
    chat_history = []
    # 未读消息开始于
    start_with_unread_time_txt = None
    for ele in eles_msg:
        logger.debug(f"ele.html {ele.html}")
        # 是否存在时间 
18:07
is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False) ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False) res = analyze_conversations_content(ele_content, conversation_detail) if is_ele_time: if not start_with_unread_time_txt: start_with_unread_time_txt = is_ele_time.text cn_to_time = cn_time_to_timestamp(is_ele_time.text) else: cn_to_time = None res.update({'time': cn_to_time}) logger.debug(f"res {res}") chat_history.append(res) logger.info(f"{chat_history}") unread_user_data.msg_time_txt = start_with_unread_time_txt unread_user_data.chat_history = chat_history return unread_user_data @task(on_failure=[get_conversation_on_failure], persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def get_unread_msg_with_time(unread_user_data:UnReadUserData, conversation_detail:dict): ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现 messageContent = ele_popShadowAnimation.ele('#messageContent') messageContent.child().scroll.up(150) # 如果消息到来,程序立即进入对话框,则通常 unread_user_data.msg_time 是 “刚刚” # 如果程序刚启动,以前就有这些未读消息,通常 unread_user_data.msg_time 是 "16:30" 或 "04-25" 或 "2023-04-25" ele_msg_start_with_time = messageContent.s_ele(unread_user_data.msg_time) unread_user_data.msg_time = cn_time_to_timestamp(ele_msg_start_with_time.text) logger.info(f"msg_start_with_time {ele_msg_start_with_time.text} - " + datetime.datetime.fromtimestamp(unread_user_data.msg_time).strftime('%Y-%m-%d %H:%M')) # 找到父级: div class="A1KpIsbL HO4aqgd4" ele_msg_start_with = ele_msg_start_with_time.parent() ele_msg = ele_msg_start_with chat_history = [] while 'A1KpIsbL' in ele_msg.attr('class'): logger.debug(f"ele.html {ele_msg.html}") ele_content:ChromiumElement = ele_msg._find_elements('xpath://div[contains(@class, "SZtuWb3S")]') logger.debug(f"ele_content {ele_content.html}") logger.debug(f"ele_content class {ele_content.attr('class')}") res = analyze_conversations_content(ele_content, conversation_detail) logger.debug(f"res {res}") chat_history.append(res) ele_msg = ele_msg.prev() unread_user_data.chat_history = chat_history # {'name': '程序员马工', 'avator': 'https://p3.huoshanimg.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143', 'msg': '564', 'unread_msg_count': 2, 'time': 1714317757, 'chat_history': [{'is_me': True, 'content': {'type': 'text', 'data': '4公会'}}, {'is_me': False, 'content': {'type': 'text', 'data': '564'}}]} logger.info(f"unread_user_data {unread_user_data}") return unread_user_data @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time()) def save_unread_msg_to_db(unread_user_data:UnReadUserData): chat_history = unread_user_data.chat_history ret_chat_history = [] # 当 chat['is_me']=False 时,表示该条消息是对方发送的,需要统计包含我在内有多少条总计聊天 for chat in chat_history: ret_chat_history.append(db.insert(chat)) logger.info(f"chat_history {ret_chat_history}") unread_user_data.chat_history = ret_chat_history return unread_user_data @task def send_msg_to_user(unread_user_data:UnReadUserData): # 找到聊天会话框 # class="qbjZBApl popShadowAnimation" ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]') # 找到输入框 # data-e2e="msg-input" ele_input = ele_popShadowAnimation.ele('xpath://div[@data-e2e="msg-input"]') ele_input.click() ele_input.input("hello") # 找到发送按钮 # span class="e2e-send-msg-btn" ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]') ele_send.click() ele_exit = ele_popShadowAnimation.ele('退出会话') # ele_exit.click() logger.info(f"回复成功") return {"chat_history":unread_user_data.chat_history, "reply":["hello"]} @task def set_unread_done(unread_user_data:UnReadUserData): unread_user_data.is_done = True db.update(unread_user_data) logger.info("set unread done") def reply_to_user_flow_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs): check_msg.check_conversion_exit() @flow(log_prints=False, on_failure=[reply_to_user_flow_on_failure]) def reply_to_user_flow(unread_user_data:UnReadUserData): # 进入私信聊天窗口 enter_state = enter_conversation(unread_user_data.name,return_state=True) user_profile_body, conversation_detail_body = enter_state.result() # 进入后获取到用户信息如:是否关注、粉丝数、用户作品数量、用户IP、简介等。保存到数据库中,以便后续 AI 聊天时附上用户信息 unread_user_data = save_enter_im_user_detail(unread_user_data, enter_state.data.storage_key, user_profile_body) unread_user_data = get_conversations_history(unread_user_data, conversation_detail_body) reply_res = send_msg_to_user(unread_user_data) set_unread_done(unread_user_data) from prefect.results import PersistedResult import pickle async def get_from_persistedresult(): res1 = await minio_block.read_path('s3://swl/prefect/20240427/031928_232222-get_conversations_history-0') def main(): unread_user_data, storage_key, user_profile_body = S3Object(path="save_enter_im_user_detail_test").get() save_enter_im_user_detail(unread_user_data, storage_key, user_profile_body) # eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]') # ele_content:ChromiumElement = eles_msg[0]._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False) # logger.info(f"{ele_content.html}") # analyze_conversations_content(ele_content,res[1]) # chat = [{'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '刚刚'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': 'df[流泪]sdghh个个'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '12312[流泪][捂脸]'}, 'time': '03:27'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:19'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '03:01'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '02:55'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:56'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '00:41'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 16:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:50'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 15:26'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 14:54'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': '星期四 13:42'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:49'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:34'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期四 04:26'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': '星期四 00:22'}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '好玩'}, 'time': None}, {'is_me': False, 'content': {'type': None, 'data': None}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 23:49'}, {'is_me': True, 'content': {'type': 'text', 'data': '123'}, 'time': '星期三 22:05'}, {'is_me': False, 'content': {'type': 'text', 'data': '1'}, 'time': None}, {'is_me': True, 'content': {'type': 'text', 'data': '11'}, 'time': '星期三 18:07'}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': None}, {'is_me': False, 'content': {'type': 'text', 'data': '你好'}, 'time': '星期三 15:13'}] # save_unread_msg_to_db(chat, 29) # asyncio.run(read_client_data()) # TaskRunResult(input_type='task_run', id=UUID('b850b870-fe4d-4df3-bac3-9e8ea5e1661d')) # get_conversations_history() if __name__ == "__main__": main()