| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- import datetime
- import os
- import re
- import time
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from dp.page import page
- from douyin import chat_test,base,conversation,user
- from conf.config import logger,OUTPUT
- from database.config import ai_yunying_db,minio_block
- from dataset import Table
- from DrissionPage import ChromiumPage
- from DrissionPage._elements.chromium_element import ChromiumElement
- from DrissionPage._units.listener import DataPacket
- import jsonpath
- from prefect import flow,task
- from prefect.tasks import Task,TaskRun
- from prefect.flows import Flow
- from prefect.states import State,StateType
- tab=page.tab
- # 点击私信图标后,获取未读消息小圆点
- @task
- def get_ele_msg_red_pot(ele_im:ChromiumElement):
- ele_im.click()
- logger.info(f"点击私信图标后,获取未读消息小圆点")
- ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
- ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]')
- return ele_list_dlg,ele_msg_red_pot
- @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
- def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
- logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
- # 构建所需的数据字典
- data = {
- "name": '',
- "avator": '',
- "msg": None,
- "unread_msg_count":None,
- "time": ''
- }
- data["unread_msg_count"] = int(ele_msg_red_pot.text)
- # 定位私信聊天框,一个用户私信聊天框的完整元素
- msg_item_div = ele_msg_red_pot.parent()
- name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
- if name:
- data["name"] = name.text
- # 定位头像
- avatar_ele = msg_item_div.s_ele("xpath://img")
- if avatar_ele:
- data["avator"] = avatar_ele.attr('src')
-
- msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
- if msg_content:
- data["msg"] = msg_content.text
- # 定位时间
- ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
- if ele_time:
- data["time"] = ele_time.text
- table:Table = ai_yunying_db['chat_task']
- data['is_done'] = False
- data['create_time'] = datetime.datetime.now()
- id = table.insert(data)
- unread_user_data = table.find_one(id=id)
- logger.info(f"{unread_user_data}")
- return unread_user_data
- @task
- def click_im_icon(ele_im:ChromiumElement):
- return ele_im.click()
-
- @flow
- def response_im(tab:ChromiumPage, ele_im:ChromiumElement):
- click_im_icon(ele_im)
- ele_msg_red_pot = get_ele_msg_red_pot()
- data,msg_item_div = get_im_unread_item_data(ele_msg_red_pot)
- @task
- def check_home_page(home_url:str='https://www.douyin.com/user/self'):
- if "/user/self" not in tab.url:
- tab.get(home_url)
- tab.scroll.to_top()
- @task
- def get_im_icon_ele():
- ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
- if ele_im:
- return ele_im
- @task
- def get_im_icon_red_pot_ele(ele_im:ChromiumElement):
- ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
- # 存在私信小红点
- ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
- return ele_has_msg
- @flow
- def check_has_im_msg_flow(home_url:str):
- check_home_page(home_url)
- ele_im = get_im_icon_ele()
- ele_has_msg = get_im_icon_red_pot_ele(ele_im)
- return ele_im, ele_has_msg
- @flow(log_prints=False)
- def im_chat_flow():
- home_url:str='https://www.douyin.com/user/self'
- print(f"home_url:{home_url}")
- ele_im, ele_has_msg = check_has_im_msg_flow(home_url)
- # 私信弹框的红色小红点
- ele_list_dlg,ele_pop_red_pot = get_ele_msg_red_pot(ele_im)
- # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
- unread_user_data = get_im_unread_item_user_data(ele_pop_red_pot)
- user_profile_packet, conversation_detail = conversation.enter_conversation(unread_user_data, ele_list_dlg)
- chat_history = conversation.get_conversations_history(unread_user_data, conversation_detail)
- chat_history = conversation.save_unread_msg_to_db(chat_history, unread_user_data['unread_msg_count'])
- reply_res = conversation.reply_to_user(chat_history, unread_user_data)
- #
- import chat_test
- def main():
- im_chat_flow(tab=tab)
- if __name__ == "__main__":
- print(os.environ["PREFECT_API_URL"])
- mg_test = chat_test.Chat()
- mg_test.send_msg("你好")
- # auto_douyin()
- im_chat_flow()
- # im_chat_flow.with_options(result_storage=os.path.join(OUTPUT, "prefect", "{flow_run.flow_name}.json"))
- # im_chat_flow()
|