浏览代码

完成模式人自动回复;改为部署方式运行流(便于并发两个流)

mrh 1 年之前
父节点
当前提交
1fdf0d6d44
共有 9 个文件被更改,包括 392 次插入159 次删除
  1. 26 0
      database/s3.py
  2. 51 24
      douyin/base.py
  3. 57 19
      douyin/chat_flow.py
  4. 0 88
      douyin/flow/check_msg.py
  5. 166 0
      douyin/flow/check_page.py
  6. 8 10
      douyin/flow/conversation.py
  7. 16 0
      douyin/flow/deployment.py
  8. 50 11
      douyin/flow/unread_msg.py
  9. 18 7
      dp/page_test.py

+ 26 - 0
database/s3.py

@@ -100,6 +100,32 @@ class S3:
         logger.info(f'put object {object_name} to s3 success')
         logger.info(f'put object {object_name} to s3 success')
         return object_name
         return object_name
     
     
+    def put(self, obj:Any, object_name='', content_type='', name_by_time=True):
+        file_path = object_name.split('/')[-1]
+        if content_type == '':
+            content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream'
+        if isinstance(obj, str):
+            obj=obj.encode()
+            # content_type = 'application/txt'
+        else :
+            try:
+                obj = json.dumps(obj)
+                # content_type = 'application/json'
+            except TypeError as e:
+                obj = pickle.dumps(obj)
+                # content_type = 'application/pickle'
+
+        object_name = self.get_object_prefix() + '/' + object_name
+        data_stream = BytesIO(obj)
+        return self.minio_client.put_object(  
+            bucket_name=self.bucket,   
+            data=data_stream,   
+            length=data_stream.getvalue().__len__(),
+            object_name=object_name,   
+            content_type=content_type  
+        )
+
+
     def fput(self, file_path, object_name='', content_type=''):
     def fput(self, file_path, object_name='', content_type=''):
         # 根据文件名自动获取 object_name
         # 根据文件名自动获取 object_name
         if object_name == '':
         if object_name == '':

+ 51 - 24
douyin/base.py

@@ -5,16 +5,31 @@ import pickle
 import os
 import os
 import sys
 import sys
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
-from conf.config import OUTPUT,PAGE_OUTPUT
+from conf.config import OUTPUT,PAGE_OUTPUT,logger
 from database.config import minio_block
 from database.config import minio_block
 import prefect.runtime
 import prefect.runtime
 from dp.page import page
 from dp.page import page
-import jionlp as jio
 
 
 import time  
 import time  
 from datetime import datetime  
 from datetime import datetime  
+from prefect import runtime
+from database.config import minio_client
+from database.s3 import S3Object,S3
+
+
+HOME_URL = 'https://www.douyin.com/user/self'
+tab=page.get_tab(url=HOME_URL)
+
+def get_tab(tab_id=None):
+    if tab_id:
+        return page.get_tab(tab_id)
+    else:
+        tab_id = runtime.flow_run.parameters.get('tab_id', None)
+        if tab_id:
+            return page.get_tab(tab_id)
+        else:
+            return page.tab
 
 
-tab=page.get_tab(url='https://www.douyin.com/user/self')
 
 
 
 
 def get_object_name_by_time():
 def get_object_name_by_time():
@@ -34,32 +49,44 @@ def get_result(path:str):
     decoded_data = base64.b64decode(base64_data)
     decoded_data = base64.b64decode(base64_data)
     result = pickle.loads(decoded_data)
     result = pickle.loads(decoded_data)
     return result
     return result
-  
-def save_page_info(local=False, file_name=''):
-    if not file_name:
-        file_name = datetime.now().strftime("%Y%m%d-%H%M%S_%f")
-    if local:
-        f = open(PAGE_OUTPUT/f'tab-{file_name}.html', 'w')
-        f.write(tab.html)
-
-    # tab.html
-    minio_block.write_path(f'{file_name}.html',tab.html.encode() )
-    img_bytes = tab.get_screenshot(as_bytes=True)
-    minio_block.write_path(f'{file_name}.png',img_bytes )
 
 
+def save_html_to_s3(file_name):
+    page_dir = Path(r'I:\code\ai-yunying\live-online-people\output\page\\')
+    file_path = page_dir/file_name
+    f = open(file_path, 'w')
+    f.write(tab.html)
+    s3minio = S3(bucket='public',client=minio_client)
+    base_path = '/md/ai-yunying/'
+    obj_name = base_path+ s3minio.get_object_name_by_time() + '-'+ file_name
+    res = s3minio.fput(r'I:\code\ai-yunying\live-online-people\output\page\\'+file_name, obj_name)
+    logger.info(f"{res.bucket_name } {res.object_name} {res._http_headers}")
 
 
+import pathlib
+def save_page_info(file_name='', tab_id=None, local=True,s3=False):
+    base_time_dir = pathlib.Path(datetime.now().strftime("%Y-%m-%d"))
+    tab = get_tab(tab_id)
+    png = tab.get_screenshot(as_bytes=True)
+    if local:
+        save_dir = OUTPUT/base_time_dir
+        logger.info(f"{save_dir}")
+        if not os.path.exists(save_dir):
+            os.makedirs(save_dir)
+        with open(save_dir/f'{file_name}.html', 'w') as f:
+            f.write(tab.html)
+        with open(save_dir/f'{file_name}.png', 'wb') as f:
+            f.write(png)
+    if s3:
+        s3minio = S3(bucket='swl',client=minio_client)
+        # base_path = pathlib.Path('/log') / base_time_dir
+        # obj_name = base_path+ s3minio.get_object_name_by_time() + '-'+ file_name
+        res = s3minio.put(tab.html, f'{file_name}.html')
+        logger.info(f"{res.bucket_name } {res.object_name} {res._http_headers}")
+        res = s3minio.put(png, f'{file_name}.png')
+    return base_time_dir
 
 
     
     
 def main():
 def main():
-    for row in chat_history_table:
-        time_base = row.get("create_time")
-        cn_time = row.get("time")
-        timestamp = cn_time_to_timestamp(cn_time, time_base=time_base)
-        if timestamp:
-            str_time = datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S')
-        else:
-            str_time = None
-        print(f"timestamp {timestamp} \t\t str {str_time} \t\t time_base {time_base}")
+    save_page_info(file_name='点击陌生人对话框', tab_id='A017888A62FE53FAD9D85ED2662FEA34')
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":
     main()
     main()

+ 57 - 19
douyin/chat_flow.py

@@ -4,6 +4,9 @@ import re
 import time
 import time
 import os
 import os
 import sys
 import sys
+
+import asyncio
+import prefect.runtime
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 
 
 from dp.page import page
 from dp.page import page
@@ -21,40 +24,75 @@ from prefect.tasks import Task,TaskRun
 from prefect.flows import Flow
 from prefect.flows import Flow
 from prefect.states import State,StateType
 from prefect.states import State,StateType
 from douyin.models import UnReadUserData,engine
 from douyin.models import UnReadUserData,engine
-from douyin.flow import check_msg, conversation,unread_msg
-tab=page.tab
-
-
-
-@task
-def click_im_icon(ele_im:ChromiumElement):
-    return ele_im.click()
-   
-
-
-
-
+from douyin.flow import check_page, conversation,unread_msg
+from concurrent.futures import ThreadPoolExecutor
+from prefect.deployments import run_deployment
+from prefect import runtime
+from prefect import flows
+from prefect import get_client
+from prefect.flow_runs import wait_for_flow_run
+from prefect.client.schemas.objects import (
+    FlowRun,
+    MinimalDeploymentSchedule,
+)
+from prefect import flow, Runner
+'''
+           Deployments
+┌───────────────────────────────┐
+│ check-page-ok-flow/ai-yunying │
+│ reply-to-user-flow/ai-yunying │
+│ unread-msg-flow/ai-yunying    │
+│ stranger-msg-flow/ai-yunying  │
+└───────────────────────────────┘
+'''
 @flow(log_prints=False)
 @flow(log_prints=False)
-def im_chat_flow():
+def im_chat_flow(tab_id=None):
+    tab = base.get_tab(tab_id)
+    client = get_client()
     home_url:str='https://www.douyin.com/user/self'
     home_url:str='https://www.douyin.com/user/self'
-    print(f"home_url:{home_url}")
-    ele_im, ele_has_msg = check_msg.check_has_im_msg_flow(home_url)
-    unread_user_data = unread_msg.unread_msg_flow()
+    logger.info(f"start im_chat_flow {tab.tab_id} ")
+    # timeout=0 代表立即返回不阻塞
+    # flow_run:FlowRun = run_deployment(name='check-page-ok-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=tab.tab_id),timeout=0)
+    unread_items,stranger_txt = check_page.check_page_ok_flow(home_url,tab.tab_id)
+    
+    # flow_run = run_deployment('check-page-ok-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=tab.tab_id),timeout=0)
+    # flow_run = asyncio.run(wait_for_flow_run(flow_run.id, log_states=True))
+    # logger.info(f"wait_for_flow_run {flow_run.state.result()} ")
+    flows_run = []
+    if stranger_txt:
+        new_tab = base.page.new_tab(home_url)
+        # unread_msg.stranger_msg_flow(stranger_txt, new_tab.tab_id)
+        # timeout=0 代表立即返回不阻塞
+        unread_msg.stranger_msg_flow
+        flow_run = run_deployment('stranger-msg-flow/ai-yunying', parameters=dict(home_url=home_url,tab_id=new_tab.tab_id),timeout=0)
+        flows_run.append(flow_run)
+    if unread_items:
+        # conversation.reply_to_user_flow(unread_items[-1], tab_id)
+        conversation.reply_to_user_flow
+        flow_run = run_deployment('reply-to-user-flow/ai-yunying', parameters=dict(unread_items[-1], tab_id),timeout=0)
+        flows_run.append(flow_run)
+    flow_run = asyncio.run(wait_for_flow_run(flow_run.id, log_states=True))
+
+    # unread_user_data = unread_msg.unread_msg_flow()
+    logger.info(f"flow_run {flow_run} ")
+    return True
     conversation.reply_to_user_flow(unread_user_data)
     conversation.reply_to_user_flow(unread_user_data)
     from prefect.results import PersistedResult
     from prefect.results import PersistedResult
 
 
 
 
 def main():
 def main():
+
     print(os.environ["PREFECT_API_URL"])
     print(os.environ["PREFECT_API_URL"])
     # auto_douyin()
     # auto_douyin()
-    im_chat_flow()
+    im_chat_flow(tab_id=page.tab_id)
+    # im_chat_flow.serve('serv')
+    # run_deployment(name='serv', )
 
 
 @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
 @task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
 def task_test(mytes):
 def task_test(mytes):
     return mytes + 'ok'
     return mytes + 'ok'
 
 
 def test():
 def test():
-    import jionlp as jio
     import prefect
     import prefect
     import json
     import json
     state = task_test(mytes='https://www.douyin.com/user/self', return_state=True)
     state = task_test(mytes='https://www.douyin.com/user/self', return_state=True)

+ 0 - 88
douyin/flow/check_msg.py

@@ -1,88 +0,0 @@
-import datetime
-import os
-import re
-import time
-import os
-import sys
-sys.path.append(os.path.dirname(os.path.dirname(__file__)))
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
-from dp.page import page
-from douyin import base
-from conf.config import logger,OUTPUT
-from database.config import  minio_block
-from dataset import Table
-from DrissionPage import ChromiumPage
-from DrissionPage._elements.chromium_element import ChromiumElement
-from DrissionPage._units.listener import DataPacket
-import jsonpath
-from prefect import flow,task
-from prefect.tasks import Task,TaskRun
-from prefect.flows import Flow
-from prefect.states import State,StateType
-from douyin import chat_test
-
-tab=page.tab
-     
-
-@task
-def get_im_icon_ele():
-    tab.scroll.to_top()
-    ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
-    if ele_im:
-        return ele_im
-
-@task
-def get_im_icon_red_pot_ele(ele_im:ChromiumElement):
-    ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
-    # 存在私信小红点
-    ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
-    return ele_has_msg
-
-
-@task
-def check_home_page(home_url):
-    if "/user/self" not in tab.url:
-        tab.get(home_url)
-
-@task
-def check_conversion_exit():
-    ele_conversaton = tab._find_elements('xpath://div[@data-mask="conversaton-detail-content"]',raise_err=False)
-    if ele_conversaton:
-        ele_conversaton.ele('退出会话').click()
-
-
-@flow
-def check_has_im_msg_flow(home_url:str):
-    check_home_page(home_url)
-    check_conversion_exit()
-    mg_test = chat_test.Chat()
-    mg_test.send_msg("你好")
-    ele_im = get_im_icon_ele()
-    ele_has_msg = get_im_icon_red_pot_ele(ele_im)
-    return ele_im, ele_has_msg
-
-def main():
-    from database.s3 import S3Object,S3
-    from database.config import minio_client
-    from pathlib import Path
-    # res = minio_client.fput_object(bucket_name='public', object_name=base_path+file_name, file_path=r"I:\code\ai-yunying\live-online-people\output\page\\"+file_name,content_type= 'text/html')
-    
-    file_name = '点击陌生人对话框.html'
-    page_dir = Path(r'I:\code\ai-yunying\live-online-people\output\page\\')
-    file_path = page_dir/file_name
-    f = open(file_path, 'w')
-    f.write(tab.html)
-    s3minio = S3(bucket='public',client=minio_client)
-    base_path = '/md/ai-yunying/'
-    obj_name = base_path+ s3minio.get_object_name_by_time() + '-'+ file_name
-    res = s3minio.fput(r'I:\code\ai-yunying\live-online-people\output\page\\'+file_name, obj_name)
-    logger.info(f"{res.bucket_name } {res.object_name} {res._http_headers}")
-    # ele_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
-    # logger.info(f"{ele_dlg.text}")
-    # check_has_im_msg_flow(home_url="https://www.douyin.com/user/self")
-    # S3Object(path='runtime/check_msg-stranger-msg.html').put(tab.html)
-    # f = open(r'I:\code\ai-yunying\live-online-people\output\page\点击陌生人消息.html', 'w')
-    # f.write(tab.html)
-
-if __name__ == "__main__":
-    main()

+ 166 - 0
douyin/flow/check_page.py

@@ -0,0 +1,166 @@
+import datetime
+import os
+import re
+import time
+import os
+import sys
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
+from dp.page import page
+from douyin import base
+from conf.config import logger,OUTPUT
+from database.config import  minio_block
+from dataset import Table
+from DrissionPage import ChromiumPage
+from DrissionPage._elements.chromium_element import ChromiumElement
+from DrissionPage._units.listener import DataPacket
+import jsonpath
+from prefect import flow,task
+from prefect.tasks import Task,TaskRun
+from prefect.flows import Flow
+from prefect.states import State,StateType
+from douyin import chat_test
+from douyin.models import UnReadUserData,UserInfoModel,user_table,unread_table
+
+# tab=page.tab
+     
+
+@task
+def click_im_icon_ele():
+    tab = base.get_tab()
+    tab.scroll.to_top()
+    ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]', timeout=5)
+    logger.info(f"点击私信图标")
+    ele_im.click()
+
+@task
+def get_im_icon_red_pot_ele(ele_im:ChromiumElement):
+    ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
+    # 存在私信小红点
+    ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
+    return ele_has_msg
+
+
+@task(description='检查是否在用户主页,如果不是则跳转到用户主页')
+def check_home_page(home_url):
+    tab = base.get_tab()
+    if "/user/self" not in tab.url:
+        tab.get(home_url)
+
+@task(description='检查聊天框是否退出,没有退出则退出')
+def check_conversion_exit():
+    tab = base.get_tab()
+    tab.s_ele()
+    ele_conversaton = tab._find_elements('xpath://div[@data-mask="conversaton-detail-content"]',timeout=0.1, raise_err=False)
+    if ele_conversaton:
+        ele_conversaton.ele('退出会话',timeout=0.1).click()
+
+
+
+@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
+def save_unread_user_data(data:UnReadUserData):
+    return unread_table.insert(data)
+
+@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
+def get_im_item_user_data(msg_item_div:ChromiumElement):
+    # 构建所需的数据字典  
+    data = UnReadUserData()
+    name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
+    if name:
+        data.name = name.text
+    # 定位头像  
+    avatar_ele = msg_item_div.s_ele("xpath://img")
+    if avatar_ele:
+        data.avator = avatar_ele.attr('src')  
+    
+    msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
+    if msg_content:
+        data.msg = msg_content.text  
+
+    # 定位时间  
+    ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
+    if ele_time:
+        data.msg_time_txt = ele_time.text[3:]  
+    logger.info(f"{data}")
+    return data
+   
+    
+
+@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
+def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
+    logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
+    # 定位私信聊天框,一个用户私信聊天框的完整元素
+    msg_item_div = ele_msg_red_pot.parent() 
+    unread_user_data = get_im_item_user_data(msg_item_div)
+    unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
+    logger.info(f"unread_user_data {unread_user_data}")
+    return unread_user_data
+
+'''
+@flow(log_prints=False)
+def unread_msg_flow():
+    tab = base.get_tab()
+    ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
+    # 无论有没有消息,都点击顶部私信图标
+    click_im_icon_ele(ele_im)
+    # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
+    unread_user_data = get_im_unread_item_user_data()
+    # 记录未读消息到数据库
+    unread_user_data = save_unread_user_data(unread_user_data)
+    return unread_user_data
+'''
+@task(description=f'在弹出的私信框中,获取未读消息的元素')
+def get_listDlg_unread_msg(ele_list_dlg:ChromiumElement):
+    # 所有未读小红点
+    ele_msg_red_pots = ele_list_dlg._find_elements('xpath://div[@class="hcPUqxqn"]', index=None, raise_err=False)
+    unread_items = []
+    for ele_msg_red_pot in ele_msg_red_pots:
+        msg_item_div = ele_msg_red_pot.parent() 
+        unread_user_data = get_im_item_user_data(msg_item_div)
+        unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
+        unread_items.append(unread_user_data)
+        logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息: {unread_user_data}")
+
+    ele_stranger = ele_list_dlg._find_elements('陌生人消息', raise_err=False)
+    stranger_txt = ''
+    if ele_stranger:
+        stranger_txt = '陌生人消息'
+    logger.info(f"{stranger_txt}")
+    return unread_items,stranger_txt
+
+from prefect.tasks import Task,TaskRun
+from prefect.flows import Flow,FlowRun
+def check_page_flow_on_failure(flow: Flow, flow_run: FlowRun, state: State, **kwargs):
+    # logger.info(f"{flow}")
+    # logger.info(f"{flow_run.id}")
+    res = base.save_page_info(f"check_page_flow_on_failure-{flow_run.id}", tab_id=flow_run.parameters['tab_id'])
+    logger.info(f"{res}")
+
+@flow(log_prints=False, on_failure=[check_page_flow_on_failure],persist_result=True, result_storage=minio_block)
+def check_page_ok_flow(home_url:str, tab_id=None):
+    tab = base.get_tab(tab_id)  
+    check_home_page(home_url)
+    check_conversion_exit()
+    # 无论有没有消息,都点击顶部私信图标
+    click_im_icon_ele()
+    mg_test = chat_test.Chat()
+    # mg_test.send_msg("你好")
+    ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]',timeout=3)
+    unread_items,stranger_txt = get_listDlg_unread_msg(ele_list_dlg)
+    logger.info(f"{unread_items,stranger_txt}")
+
+    return unread_items,stranger_txt
+def main():
+    # ele_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
+    # logger.info(f"{ele_dlg.text}")
+    # check_page_ok_flow.serve(name='serv')
+    check_page_ok_flow('1')
+    # stra = tab._find_elements('xpath:/html/body/div[2]/div/div[3]/div/div/header/div/div/div[2]/div/pace-island/div/ul[2]/div/li/div/div/div[3]/div/div/div/div/div[2]/div/div[2]/div/div/div[2]/div/div')
+    # logger.info(f"stra {stra.text}")
+
+    # S3Object(path='runtime/check_msg-stranger-msg.html').put(tab.html)
+    # f = open(r'I:\code\ai-yunying\live-online-people\output\page\点击陌生人消息.html', 'w')
+    # f.write(tab.html)
+
+if __name__ == "__main__":
+    main()

+ 8 - 10
douyin/flow/conversation.py

@@ -20,22 +20,18 @@ from database.config import minio_block
 from database.s3 import S3Object
 from database.s3 import S3Object
 
 
 import prefect.client
 import prefect.client
-import jionlp as jio
+
 
 
 from conf.config import logger,OUTPUT
 from conf.config import logger,OUTPUT
 from douyin import base
 from douyin import base
-from dataset import Table
 from DrissionPage import ChromiumPage
 from DrissionPage import ChromiumPage
 from DrissionPage._elements.chromium_element import ChromiumElement
 from DrissionPage._elements.chromium_element import ChromiumElement
-from DrissionPage._units.listener import DataPacket
-import jsonpath
 from prefect import flow,task
 from prefect import flow,task
 from prefect.tasks import Task,TaskRun
 from prefect.tasks import Task,TaskRun
-from prefect.flows import Flow
 from prefect.states import State,StateType
 from prefect.states import State,StateType
 from prefect.client.schemas.objects import TaskRunInput,Parameter,TaskRunResult
 from prefect.client.schemas.objects import TaskRunInput,Parameter,TaskRunResult
 from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table,engine
 from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table,engine
-from douyin.flow import check_msg,unread_msg
+from douyin.flow import check_page,unread_msg
 from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
 from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
 
 
 tab:ChromiumPage=page.tab
 tab:ChromiumPage=page.tab
@@ -168,6 +164,8 @@ def get_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **k
 
 
 
 
 def cn_time_to_timestamp(time_str:str, time_base=None):
 def cn_time_to_timestamp(time_str:str, time_base=None):
+    # 因为 jionlp 启动需要加载数据,所以只有在需要用到的时候才导入
+    import jionlp as jio
     '''exzample
     '''exzample
         for row in chat_history_table:
         for row in chat_history_table:
         time_base = row.get("create_time")
         time_base = row.get("create_time")
@@ -296,9 +294,9 @@ def send_msg_to_user(unread_user_data:UnReadUserData):
     # 找到发送按钮
     # 找到发送按钮
     # span class="e2e-send-msg-btn"
     # span class="e2e-send-msg-btn"
     ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]')
     ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]')
-    ele_send.click()
+    # ele_send.click()
     ele_exit = ele_popShadowAnimation.ele('退出会话')
     ele_exit = ele_popShadowAnimation.ele('退出会话')
-    # ele_exit.click()
+    ele_exit.click()
 
 
     logger.info(f"回复成功")
     logger.info(f"回复成功")
     return {"chat_history":unread_user_data.chat_history, "reply":["hello"]}
     return {"chat_history":unread_user_data.chat_history, "reply":["hello"]}
@@ -310,10 +308,10 @@ def set_unread_done(unread_user_data:UnReadUserData):
     logger.info("set unread done")
     logger.info("set unread done")
 
 
 def reply_to_user_flow_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
 def reply_to_user_flow_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
-    check_msg.check_conversion_exit()
+    check_page.check_conversion_exit()
 
 
 @flow(log_prints=False, on_failure=[reply_to_user_flow_on_failure])
 @flow(log_prints=False, on_failure=[reply_to_user_flow_on_failure])
-def reply_to_user_flow(unread_user_data:UnReadUserData):
+def reply_to_user_flow(unread_user_data:UnReadUserData, tab_id=None):
     # 进入私信聊天窗口
     # 进入私信聊天窗口
     enter_state = enter_conversation(unread_user_data.name,return_state=True)
     enter_state = enter_conversation(unread_user_data.name,return_state=True)
     user_profile_body, conversation_detail_body = enter_state.result()
     user_profile_body, conversation_detail_body = enter_state.result()

+ 16 - 0
douyin/flow/deployment.py

@@ -0,0 +1,16 @@
+import sys
+sys.path.append(r'.')
+import time
+from prefect.deployments import run_deployment
+from prefect import flow, serve
+from douyin.flow import check_page,conversation,unread_msg
+
+def main():
+    check_page_deploy = check_page.check_page_ok_flow.to_deployment('ai-yunying')
+    reply_to_user_deploy = conversation.reply_to_user_flow.to_deployment('ai-yunying')
+    unread_msg_depoly = unread_msg.unread_msg_flow.to_deployment('ai-yunying')
+    stranger_msg_depoly = unread_msg.stranger_msg_flow.to_deployment('ai-yunying')
+    serve(check_page_deploy,reply_to_user_deploy, unread_msg_depoly, stranger_msg_depoly)
+
+if __name__ == "__main__":
+    main()

+ 50 - 11
douyin/flow/unread_msg.py

@@ -4,7 +4,7 @@ import re
 import time
 import time
 import os
 import os
 import sys
 import sys
-sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+sys.path.append('.')
 
 
 from dp.page import page
 from dp.page import page
 from douyin import base
 from douyin import base
@@ -15,18 +15,17 @@ from dataset import Table
 from DrissionPage import ChromiumPage
 from DrissionPage import ChromiumPage
 from DrissionPage._elements.chromium_element import ChromiumElement
 from DrissionPage._elements.chromium_element import ChromiumElement
 from DrissionPage._units.listener import DataPacket
 from DrissionPage._units.listener import DataPacket
-import jsonpath
 from prefect import flow,task
 from prefect import flow,task
 from prefect.tasks import Task,TaskRun
 from prefect.tasks import Task,TaskRun
 from prefect.flows import Flow
 from prefect.flows import Flow
 from prefect.states import State,StateType
 from prefect.states import State,StateType
 from douyin.models import UnReadUserData,engine
 from douyin.models import UnReadUserData,engine
-from douyin.flow import check_msg, conversation
-tab=page.tab
+from douyin.flow import check_page, conversation
 
 
 # 点击私信图标后,获取未读消息小圆点
 # 点击私信图标后,获取未读消息小圆点
 @task
 @task
-def get_ele_msg_red_pot(ele_im:ChromiumElement):
+def get_ele_msg_red_pot(ele_im:ChromiumElement, tab_id=None):
+    tab = base.get_tab(tab_id)
     ele_im.click()
     ele_im.click()
     logger.info(f"点击私信图标后,获取未读消息小圆点")
     logger.info(f"点击私信图标后,获取未读消息小圆点")
     ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
     ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
@@ -72,14 +71,54 @@ def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
     logger.info(f"unread_user_data {unread_user_data}")
     logger.info(f"unread_user_data {unread_user_data}")
     return unread_user_data
     return unread_user_data
 
 
+@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
+def get_stranger_unread_data(stranger_txt='陌生人消息', tab_id=None) -> UnReadUserData:
+    tab = base.get_tab(tab_id)
+    # 只有出现陌生人消息时,才会看到请求 https://www.douyin.com/aweme/v1/web/im/user/info 
+    tab.listen.start('im/user/info')
+    ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]',timeout=3)
+    ele_list_dlg.ele(stranger_txt).click()
+    for packet in tab.listen.steps(1, timeout=3):
+        if 'im/user/info' in packet.url:
+            stranger_info_packet = packet
+    data_list = stranger_info_packet.response.body.get('data')
+    logger.info(f"unread_stranger_data {data_list[-1]}")
+
+    unread_stranger_data = UnReadUserData(name=data_list[-1].get('nickname'), avator=data_list[-1].get("avatar_thumb").get("uri"))
+    logger.info(f"unread_stranger_data {unread_stranger_data}")
+    return unread_stranger_data
+
 
 
+import DrissionPage
+# 乍一看这个流好像有点多余,只有简单几条任务,为何不在主流中直接写呢?
+# 因为要保证并发运行流,并且这个流可能要用新的 tab 运行,在主流中会阻塞无法并发
 @flow(log_prints=False)
 @flow(log_prints=False)
-def unread_msg_flow():
-    ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
-    # 私信弹框的红色小红点
-    ele_list_dlg,ele_pop_red_pot = get_ele_msg_red_pot(ele_im)
+def stranger_msg_flow(home_url, tab_id=None):
+    # tab = base.get_tab(tab_id)
+    logger.info(f"tab_id {tab_id}")
+    unread_items,stranger_txt = check_page.check_page_ok_flow(home_url, tab_id)
+    unread_stranger_data = get_stranger_unread_data(stranger_txt, tab_id)
+    conversation.reply_to_user_flow(unread_stranger_data, tab_id)
+
+
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
+@flow(log_prints=False)
+def unread_msg_flow(name, tab_id=None):
+
+    # ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
+    # ele_list_dlg_stranger= ele_list_dlg.ele(name)
+    # ele_list_dlg_stranger.click()
     # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
     # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
-    unread_user_data = get_im_unread_item_user_data(ele_pop_red_pot)
+    unread_user_data = conversation.reply_to_user_flow(name,tab)
     # 记录未读消息到数据库
     # 记录未读消息到数据库
     unread_user_data = save_unread_user_data(unread_user_data)
     unread_user_data = save_unread_user_data(unread_user_data)
-    return unread_user_data
+    return unread_user_data
+
+def main():
+    home_url:str='https://www.douyin.com/user/self'
+    # tab = page.new_tab()
+    stranger_msg_flow(home_url, 'A017888A62FE53FAD9D85ED2662FEA34')
+
+if __name__ == "__main__":
+    main()

+ 18 - 7
dp/page_test.py

@@ -1,18 +1,17 @@
 import os
 import os
 import sys
 import sys
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+from douyin.flow import check_page
 from dp.page import page
 from dp.page import page
 from conf.config import logger,OUTPUT
 from conf.config import logger,OUTPUT
 from database.config import  minio_block
 from database.config import  minio_block
-
+from database.s3 import S3Object,S3
+from database.config import minio_client
+from pathlib import Path
+from douyin import chat_test
 tab=page.tab
 tab=page.tab
 
 
-def main():
-    from database.s3 import S3Object,S3
-    from database.config import minio_client
-    from pathlib import Path
-    # res = minio_client.fput_object(bucket_name='public', object_name=base_path+file_name, file_path=r"I:\code\ai-yunying\live-online-people\output\page\\"+file_name,content_type= 'text/html')
-    
+def save_html_to_s3():
     file_name = '点击陌生人对话框.html'
     file_name = '点击陌生人对话框.html'
     page_dir = Path(r'I:\code\ai-yunying\live-online-people\output\page\\')
     page_dir = Path(r'I:\code\ai-yunying\live-online-people\output\page\\')
     file_path = page_dir/file_name
     file_path = page_dir/file_name
@@ -23,6 +22,18 @@ def main():
     obj_name = base_path+ s3minio.get_object_name_by_time() + '-'+ file_name
     obj_name = base_path+ s3minio.get_object_name_by_time() + '-'+ file_name
     res = s3minio.fput(r'I:\code\ai-yunying\live-online-people\output\page\\'+file_name, obj_name)
     res = s3minio.fput(r'I:\code\ai-yunying\live-online-people\output\page\\'+file_name, obj_name)
     logger.info(f"{res.bucket_name } {res.object_name} {res._http_headers}")
     logger.info(f"{res.bucket_name } {res.object_name} {res._http_headers}")
+
+def check_msg():
+    from douyin.flow import conversation,unread_msg
+    
+
+def main():
+    chat_test.Chat()
+
+    # save_html_to_s3()
+
+    # res = minio_client.fput_object(bucket_name='public', object_name=base_path+file_name, file_path=r"I:\code\ai-yunying\live-online-people\output\page\\"+file_name,content_type= 'text/html')
+    
     # ele_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
     # ele_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
     # logger.info(f"{ele_dlg.text}")
     # logger.info(f"{ele_dlg.text}")
     # check_has_im_msg_flow(home_url="https://www.douyin.com/user/self")
     # check_has_im_msg_flow(home_url="https://www.douyin.com/user/self")