Ver Fonte

以 SQLModel 的方式存储数据,完成私信回复

mrh há 1 ano atrás
pai
commit
595cf95033

+ 3 - 2
ai/client.py

@@ -6,10 +6,11 @@ import os
 import sys
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 
+from douyin.flow import conversation
 from dp.page import page
-from douyin import chat_test,base,conversation,user
+from douyin import chat_test,base,user
 from conf.config import logger,OUTPUT,WORK_DIR
-from database.config import ai_yunying_db,minio_block
+from database.config import minio_block
 from dataset import Table
 from DrissionPage import ChromiumPage
 from DrissionPage._elements.chromium_element import ChromiumElement

+ 10 - 11
ai/fallbacks.py

@@ -92,18 +92,18 @@ model_list = [
         },
     }, 
 ]
+router = Router(model_list=model_list, 
+                fallbacks=[{"g4f/gpt-3.5-turbo": ["gpt-3.5-turbo"]}, 
+                            {"g4f-sv-v2/gpt-3.5-turbo": ["gpt-3.5-turbo"]},
+                            {"chatanywhere/gpt-3.5-turbo": ["gpt-3.5-turbo"]},
+                            {"qwen": ["gpt-3.5-turbo"]},
+                            {"kimi": ["gpt-3.5-turbo"]},
+                            {"chatgml4": ["gpt-3.5-turbo"]},
+                            ], 
+                context_window_fallbacks=[{"kimi": ["gpt-3.5-turbo"]}, {"chatgml4": ["gpt-3.5-turbo"]}],
+                set_verbose=True)
 
 def main():
-    router = Router(model_list=model_list, 
-                    fallbacks=[{"g4f/gpt-3.5-turbo": ["gpt-3.5-turbo"]}, 
-                               {"g4f-sv-v2/gpt-3.5-turbo": ["gpt-3.5-turbo"]},
-                                {"chatanywhere/gpt-3.5-turbo": ["gpt-3.5-turbo"]},
-                                {"qwen": ["gpt-3.5-turbo"]},
-                                {"kimi": ["gpt-3.5-turbo"]},
-                                {"chatgml4": ["gpt-3.5-turbo"]},
-                               ], 
-                    context_window_fallbacks=[{"kimi": ["gpt-3.5-turbo"]}, {"chatgml4": ["gpt-3.5-turbo"]}],
-                    set_verbose=True)
     user_message = "你是"
     messages = [{"content": user_message, "role": "user"}]
 
@@ -111,7 +111,6 @@ def main():
     # response = router.completion(model="g4f/gpt-3.5-turbo", messages=messages,)
     response = completion(model_list=model_list, model="yiyan", messages=messages,)
 
-
     print(f"response: {response}")
 
 if __name__ == "__main__":

+ 15 - 6
database/config.py

@@ -32,12 +32,21 @@ minio_block = RemoteFileSystem(
 minio_block:RemoteFileSystem = minio_block.load("minio")
 
 
-# s3client = Minio(os.environ["s3_endpoint"] ,
-#     access_key=os.environ["s3_access_key"],
-#     secret_key=os.environ["s3_secret_key"],
-#     secure=False
-# )
-ai_yunying_db = dataset.connect(f'postgresql://pg:pg@sv-v:5432/ai_yunying')
+minio_client = Minio(os.environ["s3_endpoint"] ,
+    access_key=os.environ["s3_access_key"],
+    secret_key=os.environ["s3_secret_key"],
+    secure=False
+)
+
+DB_URL=f'postgresql://pg:pg@sv-v:5432/ai_yunying'
+# engine = create_engine(DB_URL)
+# def create_all_table():
+#     SQLModel.metadata.create_all(engine)
+# ai_yunying_db = dataset.connect(DB_URL)
+# chat_history_table:Table = ai_yunying_db.get_table('chat_history')
+# chat_task_table:Table = ai_yunying_db.get_table('chat_task')
+# user_table:Table = ai_yunying_db.get_table('user')
+
 # content_type header json
 # print(s3client.fput_object(content_type="application/json", bucket_name='ai-yunying'))
 # print(db.tables)

+ 145 - 0
database/prefect_api.py

@@ -0,0 +1,145 @@
+import base64
+import json
+import os
+import pickle
+import re
+import time
+import os
+import sys
+import uuid
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+
+import asyncio
+from uuid import UUID
+from conf.config import logger,OUTPUT
+from dataset import Table
+from DrissionPage import ChromiumPage
+from DrissionPage._elements.chromium_element import ChromiumElement
+from DrissionPage._units.listener import DataPacket
+import jsonpath
+from prefect import flow,task
+from prefect.filesystems import LocalFileSystem, S3
+import os
+import prefect.results 
+from prefect.results import PersistedResult
+from prefect.serializers import PickleSerializer
+from prefect import get_client
+import prefect.client.schemas.objects
+import json  
+
+# import consul_srv_client
+# ip,port = consul_srv_client.get_srv("prefect.service.consul")
+prefect_api = f"http://{'10.0.0.1'}:{'8612'}/api"
+# 在导入 prefect 之前设置环境变量
+os.environ["PREFECT_API_URL"] = prefect_api
+
+from prefect import flow
+
+@task(persist_result=True, result_storage_key="{flow_run.flow_name}_{flow_run.name}_hello.json")
+def task_flow():
+    print(task_flow.result_serializer)
+    print(task_flow.result_storage_key)
+    return "task_flow"
+
+
+@flow(persist_result=True)
+def auto_douyin(msg:str="none"):
+    print(f"Hello, {msg}! Local quickstart flow is running! ")
+    task_flow()
+    return f"Hello, {msg}!  "
+
+
+# 通过本地存储获取结果(不方便)
+def get_result():
+    path = r'I:\code\ai-yunying\live-online-people\output\prefect\auto-douyin_pristine-gecko_hello.json'
+    with open(path, 'r') as f:
+        json_data = json.load(f)
+        base64_data = json_data['data']
+        decoded_data = base64.b64decode(base64_data)
+        result = pickle.loads(decoded_data)
+        print(result)  
+
+from prefect.client.schemas.objects import FlowRun
+from prefect.client.schemas.filters import FlowRunFilter,TaskRunFilter,TaskRunFilterSubFlowRuns,FlowRunFilterId,TaskRunFilterId
+from prefect.filesystems import LocalFileSystem
+# from prefect.runtime.task_run import TaskRun
+from prefect.client.schemas.objects import TaskRun
+# 通过api获取结果,较慢
+async def get_result_by_api():
+    flow_run_id = "cb1e2c24-988b-44aa-8980-00af79f7a842"
+    client = get_client()
+    flow_run:FlowRun = await client.read_flow_run(flow_run_id)
+    logger.info(f"flow_run {flow_run}")
+    flow_run_states = await client.read_flow_run_states(flow_run_id)
+    logger.info(f"flow_run_states {flow_run_states}")
+    # print(f"Flow Run: {flow_run} {type(flow_run)}")
+    print(f"parameters : {flow_run.parameters}")
+    print(f"parent_task_run_id : {flow_run.parent_task_run_id}")
+    print(f"state : {flow_run.state}")
+    state_result:PersistedResult = flow_run.state.result()
+    print(f"state_result : {state_result}")
+    print(f"typr state_result : {type(state_result)}")
+
+    result_value = await state_result.get()
+    print(f"result_value : {result_value}")
+    print(f"state_details: {flow_run.state.state_details}")
+    f = FlowRunFilter(id=FlowRunFilterId(any_=[flow_run_id]))
+    t = TaskRunFilter()
+    t.subflow_runs = TaskRunFilterSubFlowRuns(exists_=False)
+    # run_tasks = await client.read_task_runs(flow_run_filter=f, task_run_filter=t)
+    run_tasks = await client.read_task_runs(flow_run_filter=f)
+    # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0})
+    logger.info(f"run_tasks {run_tasks}")
+    for task in run_tasks:
+        res = task.state.result()
+        logger.info(f"{task.name} {await  res.get()}")
+
+async def read_task_run(task_run_id:str, client=None)->TaskRun:
+    client = client or get_client()
+    t = TaskRunFilter(id = TaskRunFilterId(any_=[task_run_id]))
+    run_tasks = await client.read_task_runs(task_run_filter=t)
+    if run_tasks:
+        logger.info(f"{type(run_tasks[0])}")
+        return run_tasks[0]
+
+async def read_task_run_inputs(task_run:TaskRun, client=None):
+    client = client or get_client()
+    # task_inputs={'unread_user_data': [], 'storage_key': [], 'user_profile_body': [TaskRunResult(input_type='task_run', id=UUID('d7995cd0-ce86-4970-b727-f597b20f231a'))]}
+    task_run.task_inputs
+    logger.info(f"task_run_id {task_run_id}")
+    t = TaskRunFilter(id = TaskRunFilterId(any_=[task_run_id]))
+    run_tasks = await client.read_task_runs(task_run_filter=t)
+    if run_tasks:
+        logger.info(f"{type(run_tasks[0])}")
+        return run_tasks
+
+async def read_task_run_result(taskrun:TaskRun, client=None):
+    client = client or get_client()
+    t = TaskRunFilter(id = TaskRunFilterId(any_=['204660c9-8dcc-43d0-bf27-4023c9eff94d']))
+    run_tasks = await client.read_task_runs(task_run_filter=t)
+    # run_tasks = await client._client.post("/task_runs/filter", json={"flow_runs":{"id":{"any_":["cb1e2c24-988b-44aa-8980-00af79f7a842"]}},"task_runs":{"subflow_runs":{"exists_":False}},"sort":"EXPECTED_START_TIME_DESC","limit":200,"offset":0})
+    logger.info(f"run_tasks {run_tasks}")
+    for task in run_tasks:
+        res = task.state.result()
+        logger.info(f"{task.name} {await  res.get()}")
+
+
+
+if __name__ == "__main__":
+    asyncio.run(read_task_run('204660c9-8dcc-43d0-bf27-4023c9eff94d'))
+    # pickle_data = b"gAAAAABmKiLNhU5fGgCSwcDE-BgYOTAgxQJEjr0DcZRY2oSosTVM9qJ7fMnK373-v3gMrMO0ix_AmEaYQ8lfStKo71FGMofF1hOVtojEuLUXnqEkkNWk0eXefxj35mvGBNQfIL_lzBzxFuSxTTmDkOzPBP57UfNP8Q=="
+    # res = pickle.loads(pickle_data)
+    # logger.info(f"{res}")
+    # logger.info(f"STORAGE.get_block_capabilities {STORAGE.get_block_capabilities()}")
+    # # get_block_class_from_key 
+    # # logger.info(f"STORAGE.get_block_class_from_key {STORAGE.get_block_class_from_key()}")
+    # # get_block_class_from_schema 
+    # logger.info(f"STORAGE.get_block_class_from_schema {STORAGE.get_block_class_from_schema()}")
+    # # get_block_placeholder 
+    # logger.info(f"STORAGE.get_block_placeholder {STORAGE.get_block_placeholder()}")
+    # # get_description 
+    # logger.info(f"STORAGE.get_description {STORAGE.get_description()}")
+    # asyncio.run(get_result_by_api())
+
+    # new_flow = auto_douyin.with_options(result_storage=LocalFileSystem(basepath=os.path.join(OUTPUT, "prefect")))
+    # new_flow(msg="it's ok flow result")

+ 47 - 7
database/s3.py

@@ -1,18 +1,58 @@
 import os
 import sys
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
-
+from typing import List, Dict, Any, Optional,Tuple
 from io import BytesIO
 import json
 import mimetypes
-import config
 import datetime
 from conf.config import logger
 import pickle  
+from database.config import minio_block,minio_client
+from minio import Minio
+from pathlib import Path
+from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
 
+class S3Object(SQLModel,table=False):
+    type:Optional[str]= Field(default=None)
+    path:Optional[str] = Field(default=None)
+    len:Optional[int] = Field(default=0)
+    
+    def put(self, obj:Any, path=None):
+        self.type = type(obj).__name__  
+        data = pickle.dumps(obj)  
+        self.len = len(data)  
+        if path is None:  
+            path = self.path  
+        self.path = minio_block.write_path(path, data)  # 假设这个函数返回数据的存储路径  
+        return self  
+    
+    def get(self):
+        path = self.path
+        if path.startswith(minio_block.basepath):
+            path = path[len(minio_block.basepath):]
+        data = minio_block.read_path(path)
+        return pickle.loads(data)
+    
+    def info(self):
+        return vars(self)
+    def put_object(self, obj:Any, path=None):
+        self.client.put_object(bucket_name='swl', data=obj, object_name=path)
+        try:
+            obj_bytes = json.dumps(obj)
+        except TypeError as e:
+            print(e)
+            res = pickle.dumps(('123', '234'))
+            print("pickle res", res)
+        return self.path
+    def get_object(self, path=None):
+        if path is None:
+            path = self.path
+        return minio_block.read_path(path)
+    
 class S3:
     # s3client is minio python SDK client
-    def __init__(self, bucket='swl', client=config.s3client) -> None:
+    def __init__(self, bucket='swl', client=None) -> None:
         self.bucket = bucket
         self.minio_client = client
     # 根据本地时间自动获取对象存储前缀,如 log/2024-04-25/xxx
@@ -74,13 +114,13 @@ class S3:
         return res.json()
 
 def main():
-    s3 = S3()
     test = {"dat": "test", 'ok':[1,2,3,3]}
     # s3.fput(r'I:\code\ai-yunying\live-online-people\output\data.json', 'log/20240425/test2.json')
-    path = s3.put_json(obj=test, object_name='test.json', name_by_time=False)
-    logger.info(f"{path}")
-    res = s3.get_object(object_name=path)
+    s3 = S3Object(path="save_enter_im_user_detail_test").put(test)
+    logger.info(f"{s3.info()}")
+    res = s3.get()
     logger.info(f"{res}")
+    logger.info(f"{s3.model_dump()}")
 
 if __name__ == "__main__":
     main()

+ 18 - 1
douyin/base.py

@@ -8,8 +8,8 @@ sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 from conf.config import OUTPUT,PAGE_OUTPUT
 from database.config import minio_block
 import prefect.runtime
-from pyunit_time import Time
 from dp.page import page
+import jionlp as jio
 
 import time  
 from datetime import datetime  
@@ -46,3 +46,20 @@ def save_page_info(local=False, file_name=''):
     minio_block.write_path(f'{file_name}.html',tab.html.encode() )
     img_bytes = tab.get_screenshot(as_bytes=True)
     minio_block.write_path(f'{file_name}.png',img_bytes )
+
+
+
+    
+def main():
+    for row in chat_history_table:
+        time_base = row.get("create_time")
+        cn_time = row.get("time")
+        timestamp = cn_time_to_timestamp(cn_time, time_base=time_base)
+        if timestamp:
+            str_time = datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S')
+        else:
+            str_time = None
+        print(f"timestamp {timestamp} \t\t str {str_time} \t\t time_base {time_base}")
+
+if __name__ == "__main__":
+    main()

+ 30 - 101
douyin/chat_flow.py

@@ -7,9 +7,10 @@ import sys
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 
 from dp.page import page
-from douyin import chat_test,base,conversation,user
+from douyin import base
 from conf.config import logger,OUTPUT
-from database.config import ai_yunying_db,minio_block
+from database.config import  minio_block
+from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table
 from dataset import Table
 from DrissionPage import ChromiumPage
 from DrissionPage._elements.chromium_element import ChromiumElement
@@ -19,125 +20,53 @@ from prefect import flow,task
 from prefect.tasks import Task,TaskRun
 from prefect.flows import Flow
 from prefect.states import State,StateType
-
+from douyin.models import UnReadUserData,engine
+from douyin.flow import check_msg, conversation,unread_msg
 tab=page.tab
 
 
 
-# 点击私信图标后,获取未读消息小圆点
-@task
-def get_ele_msg_red_pot(ele_im:ChromiumElement):
-    ele_im.click()
-    logger.info(f"点击私信图标后,获取未读消息小圆点")
-    ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
-    ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]')
-    return ele_list_dlg,ele_msg_red_pot
-
-@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
-def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
-    logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
-    # 构建所需的数据字典  
-    data = {  
-        "name": '',  
-        "avator": '',  
-        "msg": None,
-        "unread_msg_count":None,
-        "time": ''
-    }
-
-    data["unread_msg_count"] = int(ele_msg_red_pot.text)
-    # 定位私信聊天框,一个用户私信聊天框的完整元素
-    msg_item_div = ele_msg_red_pot.parent() 
-    name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
-    if name:
-        data["name"] = name.text
-    # 定位头像  
-    avatar_ele = msg_item_div.s_ele("xpath://img")
-    if avatar_ele:
-        data["avator"] = avatar_ele.attr('src')  
-    
-    msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
-    if msg_content:
-        data["msg"] = msg_content.text  
-
-    # 定位时间  
-    ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
-    if ele_time:
-        data["time"] = ele_time.text  
-
-    table:Table = ai_yunying_db['chat_task']
-    data['is_done'] = False
-    data['create_time'] = datetime.datetime.now()
-    id = table.insert(data)
-    unread_user_data = table.find_one(id=id)
-    logger.info(f"{unread_user_data}")
-    return unread_user_data
-
-
 @task
 def click_im_icon(ele_im:ChromiumElement):
     return ele_im.click()
-        
-@flow
-def response_im(tab:ChromiumPage, ele_im:ChromiumElement):
-    click_im_icon(ele_im)
-    ele_msg_red_pot = get_ele_msg_red_pot()
-    data,msg_item_div = get_im_unread_item_data(ele_msg_red_pot)
-
-@task
-def check_home_page(home_url:str='https://www.douyin.com/user/self'):
-    if "/user/self" not in tab.url:
-        tab.get(home_url)
-    tab.scroll.to_top()
+   
 
-@task
-def get_im_icon_ele():
-    ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
-    if ele_im:
-        return ele_im
-
-@task
-def get_im_icon_red_pot_ele(ele_im:ChromiumElement):
-    ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
-    # 存在私信小红点
-    ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
-    return ele_has_msg
 
 
 
-@flow
-def check_has_im_msg_flow(home_url:str):
-    check_home_page(home_url)
-    ele_im = get_im_icon_ele()
-    ele_has_msg = get_im_icon_red_pot_ele(ele_im)
-    return ele_im, ele_has_msg
-
-
 @flow(log_prints=False)
 def im_chat_flow():
     home_url:str='https://www.douyin.com/user/self'
     print(f"home_url:{home_url}")
-    ele_im, ele_has_msg = check_has_im_msg_flow(home_url)
-    # 私信弹框的红色小红点
-    ele_list_dlg,ele_pop_red_pot = get_ele_msg_red_pot(ele_im)
-    # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
-    unread_user_data = get_im_unread_item_user_data(ele_pop_red_pot)
-    user_profile_packet, conversation_detail = conversation.enter_conversation(unread_user_data, ele_list_dlg)
-    chat_history = conversation.get_conversations_history(unread_user_data, conversation_detail)
-    chat_history = conversation.save_unread_msg_to_db(chat_history, unread_user_data['unread_msg_count'])
-    reply_res = conversation.reply_to_user(chat_history, unread_user_data)
-    # 
+    ele_im, ele_has_msg = check_msg.check_has_im_msg_flow(home_url)
+    unread_user_data = unread_msg.unread_msg_flow()
+    conversation.reply_to_user_flow(unread_user_data)
+    from prefect.results import PersistedResult
 
 
-import chat_test
 def main():
-    im_chat_flow(tab=tab)
-
-if __name__ == "__main__":
     print(os.environ["PREFECT_API_URL"])
-    mg_test = chat_test.Chat()
-    mg_test.send_msg("你好")
     # auto_douyin()
     im_chat_flow()
+
+@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
+def task_test(mytes):
+    return mytes + 'ok'
+
+def test():
+    import jionlp as jio
+    import prefect
+    import json
+    state = task_test(mytes='https://www.douyin.com/user/self', return_state=True)
+    print(state.data.storage_key)
+    print(state.data.json())
+    print(type(state.data))
+    res_dict = json.loads(state.data.json())
+    res = prefect.results.PersistedResult(**res_dict)
+    print(res)
+
+if __name__ == "__main__":
+    main()
+    # test()
     # im_chat_flow.with_options(result_storage=os.path.join(OUTPUT, "prefect", "{flow_run.flow_name}.json"))
     # im_chat_flow()

+ 62 - 0
douyin/flow/check_msg.py

@@ -0,0 +1,62 @@
+import datetime
+import os
+import re
+import time
+import os
+import sys
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+
+from dp.page import page
+from douyin import base
+from conf.config import logger,OUTPUT
+from database.config import  minio_block
+from dataset import Table
+from DrissionPage import ChromiumPage
+from DrissionPage._elements.chromium_element import ChromiumElement
+from DrissionPage._units.listener import DataPacket
+import jsonpath
+from prefect import flow,task
+from prefect.tasks import Task,TaskRun
+from prefect.flows import Flow
+from prefect.states import State,StateType
+from douyin import chat_test
+
+tab=page.tab
+     
+
+@task
+def get_im_icon_ele():
+    tab.scroll.to_top()
+    ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
+    if ele_im:
+        return ele_im
+
+@task
+def get_im_icon_red_pot_ele(ele_im:ChromiumElement):
+    ele_im_red_pot = ele_im._find_elements('xpath://div[@class="LFWqFfyH isLight"]', raise_err=False)
+    # 存在私信小红点
+    ele_has_msg = ele_im_red_pot._find_elements('xpath:/div',raise_err=False)
+    return ele_has_msg
+
+
+@task
+def check_home_page(home_url):
+    if "/user/self" not in tab.url:
+        tab.get(home_url)
+
+@task
+def check_conversion_exit():
+    ele_conversaton = tab._find_elements('xpath://div[@data-mask="conversaton-detail-content"]',raise_err=False)
+    if ele_conversaton:
+        ele_conversaton.ele('退出会话').click()
+
+
+@flow
+def check_has_im_msg_flow(home_url:str):
+    check_home_page(home_url)
+    check_conversion_exit()
+    mg_test = chat_test.Chat()
+    mg_test.send_msg("你好")
+    ele_im = get_im_icon_ele()
+    ele_has_msg = get_im_icon_red_pot_ele(ele_im)
+    return ele_im, ele_has_msg

+ 173 - 42
douyin/conversation.py → douyin/flow/conversation.py

@@ -5,14 +5,25 @@ import re
 import time
 import os
 import sys
-sys.path.append(os.path.dirname(os.path.dirname(__file__)))
-from database.config import ai_yunying_db,minio_block
+from pathlib import Path
 
-import prefect.client
+# print(f"{Path(__file__).parent.parent.parent}")
+# sys.path.append(Path(__file__).parent.parent.parent.parent)
+# sys.path.append(Path(__file__).parent.parent.parent)
+# sys.path.append(Path(__file__).parent.parent)
+# sys.path.append(Path(__file__).parent)
+sys.path.append(r'I:\code\ai-yunying\live-online-people')
+import prefect.runtime
+import prefect.runtime.task_run
 from dp.page import page
+from database.config import minio_block
+from database.s3 import S3Object
+
+import prefect.client
+import jionlp as jio
 
 from conf.config import logger,OUTPUT
-from douyin import user,base
+from douyin import base
 from dataset import Table
 from DrissionPage import ChromiumPage
 from DrissionPage._elements.chromium_element import ChromiumElement
@@ -23,7 +34,11 @@ from prefect.tasks import Task,TaskRun
 from prefect.flows import Flow
 from prefect.states import State,StateType
 from prefect.client.schemas.objects import TaskRunInput,Parameter,TaskRunResult
-tab=page.tab
+from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table,engine
+from douyin.flow import check_msg,unread_msg
+from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
+
+tab:ChromiumPage=page.tab
 
 def parse_conversations_content_text(html):  
         # 正则表达式匹配 <pre> 标签中的文本和 <img> 标签中的 alt 属性值  
@@ -59,7 +74,7 @@ def filter_aweme_detail_in_conversations_video(aweme_detail:dict):
     }
     return ret 
 
-def analyze_conversations_content(ele_content:ChromiumElement, conversation_detail:DataPacket):
+def analyze_conversations_content(ele_content:ChromiumElement, conversation_detail:dict):
         ret = {
             "is_me": None,
             "content": {
@@ -101,9 +116,12 @@ def enter_conversation_on_failure(task: Task, task_run: TaskRun, state: State, *
       persist_result=True, 
       result_storage=minio_block, 
       result_storage_key=base.get_object_name_by_time())
-def enter_conversation(data:dict, ele_list_dlg:ChromiumElement):
+def enter_conversation(click_user_name):
     tab.listen.start("www.douyin.com/aweme/v1/web")
-    ele_list_dlg.ele(data['name']).click()
+    ele_listDlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
+    ele_listDlg.ele(click_user_name).click()
+    conversation_detail = None
+    user_profile_packet = None
     # 获取该用户的基本信息
     for packet in tab.listen.steps(2, timeout=3):
         if "user/profile" in packet.url:
@@ -111,63 +129,161 @@ def enter_conversation(data:dict, ele_list_dlg:ChromiumElement):
         if "aweme/detail" in packet.url:
             conversation_detail = packet
     conversation_detail_body = None if not conversation_detail else conversation_detail.response.body
-    logger.info(f"user_profile_packet {user_profile_packet}")
-    logger.info(f"conversation_detail {conversation_detail}")
-    user.user.save_user_profile_to_db(user_profile_packet.response.body)
+    if not user_profile_packet:
+        raise Exception("进入私信后没有捕获到 user/profile 请求")
+    logger.debug(f"user_profile_packet {user_profile_packet}")
+    logger.debug(f"conversation_detail {conversation_detail}")
     return user_profile_packet.response.body, conversation_detail_body
 
+@task(
+      persist_result=True, 
+      result_storage=minio_block, 
+      result_storage_key=base.get_object_name_by_time())
+def save_enter_im_user_detail(unread_user_data:UnReadUserData, storage_key, user_profile_body):
+    # S3Object(path="save_enter_im_user_detail_test").put((unread_user_data, storage_key, user_profile_body))
+    unread_user_data.detail = S3Object(path=minio_block.basepath + storage_key, type=f"{tuple}")
+    logger.info(f"S3Object {unread_user_data.detail.model_dump()}")
+    with Session(engine) as session:
+        statement = select(UserInfoModel).where(UserInfoModel.uid == user_profile_body.get('uid'))
+        exist_user_info = session.exec(statement).first()
+        if exist_user_info:
+            user_info = exist_user_info
+        else:
+            user_profile_body.get('uid')
+            logger.info(f"uid {user_profile_body.get('uid')}")
+            logger.info(f"nickname {user_profile_body.get('nickname')}")
+            logger.info(f"user_profile_body {user_profile_body}")
+            user_info = user_table.dict_to_model(user_profile_body.get("user"))
+        unread_user_data.user_info = user_info
+        session.add(unread_user_data)
+        session.commit()
+        session.refresh(unread_user_data)
+    logger.info(f"unread_user_data {unread_user_data}")
+    return unread_user_data
 def get_conversation_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
     logger.info(f"{task}")
     logger.info(f"{task_run.task_inputs}")
     logger.info(f"{state}")
     logger.info(f"{state.result()}")
 
+
+def cn_time_to_timestamp(time_str:str, time_base=None):
+    '''exzample
+        for row in chat_history_table:
+        time_base = row.get("create_time")
+        cn_time = row.get("time")
+        timestamp = cn_time_to_timestamp(cn_time, time_base=time_base)
+        if timestamp:
+            str_time = datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S')
+        else:
+            str_time = None
+        print(f"timestamp {timestamp} \t\t str {str_time} \t\t time_base {time_base}")
+    '''
+    if not isinstance(time_str,str):
+        return 
+    if "刚刚" in time_str:
+        return int(time.time())
+    try:
+        if not time_base:
+            time_base = time.time()
+
+        res = jio.parse_time(time_str, time_base)
+        res_time = res.get('time')[0]
+        dt_obj = datetime.datetime.strptime(res_time, '%Y-%m-%d %H:%M:%S')
+        # 将datetime对象转换为timestamp
+        timestamp = datetime.datetime.timestamp(dt_obj)
+        return int(timestamp)
+    except Exception as e:
+        logger.error(f"{e} time_str {time_str} time_base {time_base}")
+        return 
+
+
+# TODO 不使用,但保留,未来可能导出所有聊天时用到
+# 未读消息时,理论上无需获取所有历史,而是从未读消息显示的 time 开始获取即可。
+# 可认为程序启动后的未读消息才被记录,如果程序没有启动,代表用户不需要AI处理,不用管以前那些信息
 @task(on_failure=[get_conversation_on_failure],
       persist_result=True, 
       result_storage=minio_block, 
       result_storage_key=base.get_object_name_by_time())
-def get_conversations_history(unread_user_data:dict, conversation_detail:dict):
+def get_conversations_history(unread_user_data:UnReadUserData, conversation_detail):
     ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
+    # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
+    messageContent = ele_popShadowAnimation.ele('#messageContent')
+    messageContent.child().scroll.up(150)
     eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
     count = 0
     chat_history = []
+    # 未读消息开始于
+    start_with_unread_time_txt = None
     for ele in eles_msg:
         logger.debug(f"ele.html {ele.html}")
         # 是否存在时间 <div class="kZAHYArp">18:07 </div>
         is_ele_time = ele._find_elements('xpath://div[@class="kZAHYArp"]', raise_err=False)
         ele_content:ChromiumElement = ele._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
         res = analyze_conversations_content(ele_content, conversation_detail)
-        res.update({'time': is_ele_time.text if is_ele_time else None})
+        if is_ele_time:
+            if not start_with_unread_time_txt:
+                start_with_unread_time_txt = is_ele_time.text
+            cn_to_time = cn_time_to_timestamp(is_ele_time.text)
+        else:
+            cn_to_time = None
+        res.update({'time': cn_to_time})
         logger.debug(f"res {res}")
         chat_history.append(res)
     logger.info(f"{chat_history}")
-    return chat_history
+    unread_user_data.msg_time_txt = start_with_unread_time_txt
+    unread_user_data.chat_history = chat_history
+    return unread_user_data
+
+@task(on_failure=[get_conversation_on_failure],
+      persist_result=True, 
+      result_storage=minio_block, 
+      result_storage_key=base.get_object_name_by_time())
+def get_unread_msg_with_time(unread_user_data:UnReadUserData, conversation_detail:dict):
+    ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
+    # 在聊天窗口中往上滚动一点,以免待会有未读消息的时候直接被设置为已读而没有发现
+    messageContent = ele_popShadowAnimation.ele('#messageContent')
+    messageContent.child().scroll.up(150)
+    # 如果消息到来,程序立即进入对话框,则通常 unread_user_data.msg_time 是 “刚刚”
+    # 如果程序刚启动,以前就有这些未读消息,通常 unread_user_data.msg_time 是 "16:30" 或 "04-25" 或 "2023-04-25"
+
+    ele_msg_start_with_time = messageContent.s_ele(unread_user_data.msg_time)
+    unread_user_data.msg_time = cn_time_to_timestamp(ele_msg_start_with_time.text)
+    logger.info(f"msg_start_with_time {ele_msg_start_with_time.text} - " + datetime.datetime.fromtimestamp(unread_user_data.msg_time).strftime('%Y-%m-%d %H:%M'))
+    # 找到父级: div class="A1KpIsbL HO4aqgd4"
+    ele_msg_start_with = ele_msg_start_with_time.parent()
+    ele_msg = ele_msg_start_with
+    chat_history = []
+    while 'A1KpIsbL' in ele_msg.attr('class'):
+        logger.debug(f"ele.html {ele_msg.html}")
+        ele_content:ChromiumElement = ele_msg._find_elements('xpath://div[contains(@class, "SZtuWb3S")]')
+        logger.debug(f"ele_content {ele_content.html}")
+        logger.debug(f"ele_content class {ele_content.attr('class')}")
+        res = analyze_conversations_content(ele_content, conversation_detail)
+        logger.debug(f"res {res}")
+        chat_history.append(res)
+        ele_msg = ele_msg.prev()
+    unread_user_data.chat_history = chat_history
+    # {'name': '程序员马工', 'avator': 'https://p3.huoshanimg.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143', 'msg': '564', 'unread_msg_count': 2, 'time': 1714317757, 'chat_history': [{'is_me': True, 'content': {'type': 'text', 'data': '4公会'}}, {'is_me': False, 'content': {'type': 'text', 'data': '564'}}]}
+    logger.info(f"unread_user_data {unread_user_data}")
+    return unread_user_data
 
 @task(persist_result=True, 
       result_storage=minio_block, 
       result_storage_key=base.get_object_name_by_time())
-def save_unread_msg_to_db(chat_history:list, unread_num:int):
-    chat_history_table:Table = ai_yunying_db['chat_history']
+def save_unread_msg_to_db(unread_user_data:UnReadUserData):
+    chat_history = unread_user_data.chat_history
     ret_chat_history = []
     # 当 chat['is_me']=False 时,表示该条消息是对方发送的,需要统计包含我在内有多少条总计聊天
-    should_save_num = 0
-    for i in range(len(chat_history)):
-        chat_history[i]['create_time'] = datetime.datetime.now()
-        id = chat_history_table.insert(chat_history[i])
-        ret_chat_history.append(chat_history_table.find_one(id=id))
-        if not chat_history[i]['is_me']:
-            unread_num -= 1
-        should_save_num += 1
-        if unread_num <= 0:
-            break
-    logger.info(f"should_save_num {should_save_num}")
+    for chat in chat_history:
+        ret_chat_history.append(db.insert(chat))
     logger.info(f"chat_history {ret_chat_history}")
-    # return chat_history
+    unread_user_data.chat_history = ret_chat_history
+    return unread_user_data
 
-@task(persist_result=True, 
-      result_storage=minio_block, 
-      result_storage_key=base.get_object_name_by_time())
-def reply_to_user(chat_history:list, unread_user_data):
+@task
+def send_msg_to_user(unread_user_data:UnReadUserData):
+    
     # 找到聊天会话框
     # class="qbjZBApl popShadowAnimation"
     ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
@@ -181,12 +297,31 @@ def reply_to_user(chat_history:list, unread_user_data):
     # span class="e2e-send-msg-btn"
     ele_send = ele_popShadowAnimation.ele('xpath://span[contains(@class, "e2e-send-msg-btn")]')
     ele_send.click()
-    ele_popShadowAnimation.ele('退出会话').click()
+    ele_exit = ele_popShadowAnimation.ele('退出会话')
+    # ele_exit.click()
 
     logger.info(f"回复成功")
-    table:Table = ai_yunying_db['chat_task']
-    table.update(unread_user_data, ['id'])
-    return {"chat_history":chat_history, "reply":["hello"]}
+    return {"chat_history":unread_user_data.chat_history, "reply":["hello"]}
+
+@task
+def set_unread_done(unread_user_data:UnReadUserData):
+    unread_user_data.is_done = True 
+    db.update(unread_user_data)
+    logger.info("set unread done")
+
+def reply_to_user_flow_on_failure(task: Task, task_run: TaskRun, state: State, **kwargs):
+    check_msg.check_conversion_exit()
+
+@flow(log_prints=False, on_failure=[reply_to_user_flow_on_failure])
+def reply_to_user_flow(unread_user_data:UnReadUserData):
+    # 进入私信聊天窗口
+    enter_state = enter_conversation(unread_user_data.name,return_state=True)
+    user_profile_body, conversation_detail_body = enter_state.result()
+    # 进入后获取到用户信息如:是否关注、粉丝数、用户作品数量、用户IP、简介等。保存到数据库中,以便后续 AI 聊天时附上用户信息
+    unread_user_data = save_enter_im_user_detail(unread_user_data, enter_state.data.storage_key, user_profile_body)
+    unread_user_data = get_conversations_history(unread_user_data, conversation_detail_body)
+    reply_res = send_msg_to_user(unread_user_data)
+    set_unread_done(unread_user_data)
 
 from prefect.results import PersistedResult
 import pickle
@@ -195,12 +330,8 @@ async def get_from_persistedresult():
 
 
 def main():
-    res = base.get_result("s3://swl/prefect/20240427/142756_653126-get_im_unread_item_user_data")
-    logger.info(f"{res}")
-    unread_msg_count = int(res['unread_msg_count'])
-    logger.info(res["unread_msg_count"] + f" {type(res['unread_msg_count'])}" )
-    logger.info(f"{unread_msg_count} 类型 {type(unread_msg_count)}")
-    # ele_popShadowAnimation = tab.ele('xpath://div[@class="qbjZBApl popShadowAnimation"]')
+    unread_user_data, storage_key, user_profile_body = S3Object(path="save_enter_im_user_detail_test").get()
+    save_enter_im_user_detail(unread_user_data, storage_key, user_profile_body)
     # eles_msg = ele_popShadowAnimation.s_eles('xpath://div[@class="A1KpIsbL HO4aqgd4"]')
     # ele_content:ChromiumElement = eles_msg[0]._find_elements('xpath://div[contains(@class, "SZtuWb3S")]', raise_err=False)
     # logger.info(f"{ele_content.html}")

+ 85 - 0
douyin/flow/unread_msg.py

@@ -0,0 +1,85 @@
+import datetime
+import os
+import re
+import time
+import os
+import sys
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+
+from dp.page import page
+from douyin import base
+from conf.config import logger,OUTPUT
+from database.config import  minio_block
+from douyin.models import UnReadUserData,UserInfoModel,db,unread_table,user_table
+from dataset import Table
+from DrissionPage import ChromiumPage
+from DrissionPage._elements.chromium_element import ChromiumElement
+from DrissionPage._units.listener import DataPacket
+import jsonpath
+from prefect import flow,task
+from prefect.tasks import Task,TaskRun
+from prefect.flows import Flow
+from prefect.states import State,StateType
+from douyin.models import UnReadUserData,engine
+from douyin.flow import check_msg, conversation
+tab=page.tab
+
+# 点击私信图标后,获取未读消息小圆点
+@task
+def get_ele_msg_red_pot(ele_im:ChromiumElement):
+    ele_im.click()
+    logger.info(f"点击私信图标后,获取未读消息小圆点")
+    ele_list_dlg = tab.ele('xpath://div[@data-e2e="listDlg-container"]')
+    ele_msg_red_pot = ele_list_dlg.ele('xpath://div[@class="hcPUqxqn"]')
+    return ele_list_dlg,ele_msg_red_pot
+
+@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
+def save_unread_user_data(data:UnReadUserData):
+    return unread_table.insert(data)
+
+@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
+def get_im_item_user_data(msg_item_div:ChromiumElement):
+    # 构建所需的数据字典  
+    data = UnReadUserData()
+    name = msg_item_div.s_ele("xpath://div[@class='gZdlhsqq']")
+    if name:
+        data.name = name.text
+    # 定位头像  
+    avatar_ele = msg_item_div.s_ele("xpath://img")
+    if avatar_ele:
+        data.avator = avatar_ele.attr('src')  
+    
+    msg_content = msg_item_div.s_ele('xpath://pre[@class="MnyOYvbN"]')
+    if msg_content:
+        data.msg = msg_content.text  
+
+    # 定位时间  
+    ele_time = msg_item_div.s_ele('xpath://div[@class="skNuRdW_"]')
+    if ele_time:
+        data.msg_time_txt = ele_time.text[3:]  
+    logger.info(f"{data}")
+    return data
+   
+    
+
+@task(persist_result=True, result_storage=minio_block, result_storage_key=base.get_object_name_by_time())
+def get_im_unread_item_user_data(ele_msg_red_pot:ChromiumElement):
+    logger.info(f"存在未读消息,获取该的用户头像、昵称、时间等信息")
+    # 定位私信聊天框,一个用户私信聊天框的完整元素
+    msg_item_div = ele_msg_red_pot.parent() 
+    unread_user_data = get_im_item_user_data(msg_item_div)
+    unread_user_data.unread_msg_count = int(ele_msg_red_pot.text)
+    logger.info(f"unread_user_data {unread_user_data}")
+    return unread_user_data
+
+
+@flow(log_prints=False)
+def unread_msg_flow():
+    ele_im = tab.ele('xpath://div[@data-e2e="im-entry"]')
+    # 私信弹框的红色小红点
+    ele_list_dlg,ele_pop_red_pot = get_ele_msg_red_pot(ele_im)
+    # get_im_unread_item_data.with_options(result_storage_key=get_object_name_by_time())
+    unread_user_data = get_im_unread_item_user_data(ele_pop_red_pot)
+    # 记录未读消息到数据库
+    unread_user_data = save_unread_user_data(unread_user_data)
+    return unread_user_data

+ 0 - 0
douyin/im_msg.py


+ 199 - 0
douyin/models.py

@@ -0,0 +1,199 @@
+
+import datetime
+import json
+import pydantic
+from typing import List, Dict, Any, Optional,Tuple
+import os
+import sys
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+from conf.config import logger
+from sqlmodel import SQLModel, Field, Relationship, create_engine
+import pickle
+from database.config import DB_URL
+from database.s3 import S3Object
+from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint,PickleType,text
+from sqlalchemy.dialects.postgresql import ARRAY, JSON
+from sqlalchemy.engine.cursor import CursorResult
+from sqlalchemy.sql.elements import BinaryExpression
+from pydantic import Json  
+from typing import TypeVar, Type ,Generic 
+
+engine = create_engine(DB_URL)
+def create_all_table():
+    SQLModel.metadata.create_all(engine)
+
+T = TypeVar('T', bound="BaseSQLModel")
+
+class BaseSQLModel(SQLModel):
+
+    # 自动过滤无关字段,任何 dict_data 中多余的字段都会被过滤掉
+    @classmethod
+    def dict_to_model(cls, dict_data: dict, model:T=None) -> T:
+        if not model:
+            model:T = cls
+        # print("dict_to_model ", model)
+        clean_data = {k: v for k, v in dict_data.items() if hasattr(model, k)}
+        obj_model:T = model(**clean_data)
+        return obj_model
+
+    class Config:
+        arbitrary_types_allowed = True
+
+
+class UserInfoModel(BaseSQLModel, table=True):
+    __tablename__ = 'user_data'
+    id:int = Field(default=None, primary_key=True)  
+    uid: Optional[str] = Field(nullable=False)
+    nickname: Optional[str] 
+    avatar_medium:Optional[Json] = Field(sa_column=Column(JSON))
+    sec_uid: Optional[str] 
+    signature: Optional[str] 
+    city: Optional[str] 
+    ip_location: Optional[str] 
+    province: Optional[str] 
+    school_name: Optional[str] 
+    follow_status: Optional[str] 
+    follower_count: Optional[int] 
+    total_favorited: Optional[int]  
+    aweme_count: Optional[int]  
+
+    unread_user_data: list["UnReadUserData"] = Relationship(back_populates="user_info")
+
+class UnReadUserData(BaseSQLModel, table=True):
+    __tablename__ = 'unread_user_data'
+    id:int = Field(default=None, primary_key=True)  
+    name: Optional[str]  = Field(default=None)  
+    avator: Optional[str]  = Field(default=None)  
+    msg: Optional[str]  = Field(default=None)  
+    unread_msg_count: Optional[int] = Field(default=None)  
+    msg_time:Optional[datetime.datetime]  = Field(default=None)  
+    msg_time_txt:Optional[str]  = Field(default=None)  
+    chat_history:Optional[Json] = Field(sa_column=Column(JSON))
+    detail: Optional[S3Object] = Field(sa_column=Column(PickleType))
+    create_time:datetime.datetime = Field(default_factory=datetime.datetime.now)
+    is_done:Optional[bool] = Field(default=False)
+
+    user_info_id: Optional[int | None] = Field(default=None, foreign_key="user_data.id")
+    user_info:Optional[UserInfoModel|None] = Relationship(back_populates="unread_user_data", sa_relationship_kwargs={"lazy": "joined","cascade": "all, delete-orphan", "single_parent":True})
+
+
+
+create_all_table()
+
+
+class DataBase(Generic[T]):  
+    def __init__(self, engine=engine) -> None:
+        self.engine = engine
+    
+    def fine_one(self, model: Type[T], *where:BinaryExpression) -> T:
+        session = Session(engine)
+        statement = select(model).where(*where)
+        res = session.exec(statement)
+        if res:
+            return res.first()
+
+    def insert(self, data:T|dict|str, model_type: T=None) -> T:
+        model = self._get_model_from_data(data, model_type)
+        with Session(engine) as session:
+            session.add(model)
+            session.commit()
+            session.refresh(model)
+            return model
+    
+    def update(self, model: SQLModel, update_fields: List[str] = None):
+        return self.insert(model)
+        
+    def insert_ignore(self, model: T, unique_keys: List[str] = ['id']) -> T:
+        model = self._get_model_from_data(model)
+        with Session(engine) as session:
+            statement = select(model.__class__)
+            for key in unique_keys:
+                statement = statement.where(getattr(model.__class__, key) == key)
+            result = session.exec(statement).first()
+            if result:
+                return result
+            return self.insert(model)
+        
+    @classmethod
+    def dict_to_model(self, dict_data: dict, model:T) -> T:
+        # print("dict_to_model ", model)
+        clean_data = {k: v for k, v in dict_data.items() if hasattr(model, k)}
+        obj_model:T = model(**clean_data)
+        return obj_model
+    
+    def exec(self, query:str):
+        '''
+        E.g.:
+            db = DateBase()
+            db.exec("SELECT * FROM chat_task LIMIT 1").first() # will return dict obj
+            db.exec("...").all() # will return list[dict]
+        '''
+        with Session(engine) as session:
+            result:CursorResult = session.exec(text(query))
+            mappings = result.mappings()
+            return mappings
+
+    def _get_model_from_data(self, data:str|dict|T, model_type) -> T:
+        if isinstance(data, SQLModel):
+            return data
+        if isinstance(data, dict):
+            return self.dict_to_model(data, model_type)
+        if isinstance(data, str):
+            return self.dict_to_model(json.loads(data), model_type)
+        raise Exception("data type not support")
+
+class Table(DataBase[T]):
+    def __init__(self, model_type: Type[T], engine=engine, ) -> None:
+        self.model_type = model_type
+        super().__init__(engine)
+    def fine_one(self, *where: BinaryExpression) -> T:
+        res = super().fine_one(self.model_type, *where)
+        return res
+    def insert(self, data: T | dict) -> T:
+        return super().insert(data, self.model_type)
+    def update(self, model: T) -> T:
+        return super().update(model)
+    def dict_to_model(self, dict_data: dict) -> T:
+        return super().dict_to_model(dict_data, self.model_type)
+    def _get_model_from_data(self, data: str | Dict | T, model_type=None) -> T:
+
+        return super()._get_model_from_data(data, self.model_type)
+
+# 为了检查器能够完成类型检查,这里定义的时候声明了类型
+db = DataBase[UnReadUserData|UserInfoModel]()
+unread_table = Table[UnReadUserData](UnReadUserData)
+user_table = Table[UserInfoModel](UserInfoModel)
+def main():
+    import json
+    import time
+    unread_user_data = UnReadUserData(name=f"name{time.time()}", avator="avator", msg="msg", detail=S3Object(path="test", type=tuple))
+    exist_user_info = user_table.fine_one(UserInfoModel.id == 3)
+    logger.info(f"{exist_user_info}")
+    if exist_user_info:
+        user_info = exist_user_info
+    unread_user_data.user_info = user_info
+    return unread_table.update(unread_user_data)
+    return
+    db = DataBase()
+    uf = UserInfoModel(nickname="test", uid=12)
+    ud = UnReadUserData(name=f"name{time.time()}", avator="avator", msg="msg", detail=S3Object.put({"a":1, "b":2}, 'test122'))
+    ud.user_info = uf
+    db.insert(ud)
+    # res = db.fine_one(UnReadUserData, UnReadUserData.id==2)
+    print(f"{ud.detail}")
+    print(f"{ud.detail.get()}")
+    return
+    with Session(engine) as session:
+        query = text("SELECT * FROM chat_task LIMIT 1")  
+        result = session.exec(query)
+        print(result)
+        print(type(result))
+        mappings = result.mappings()
+        print("result.mappings() ", mappings)
+        print(type(mappings))
+        print("result.mappings() first ", mappings.all())
+        return result
+
+
+if __name__ == "__main__":
+    main()

+ 0 - 0
douyin/reply_flow.py


+ 0 - 1
douyin/user.py

@@ -1,5 +1,4 @@
 from conf.config import logger,OUTPUT
-from database.config import ai_yunying_db
 from dataset import Table
 
 

+ 23 - 0
testm/cn_time.py

@@ -0,0 +1,23 @@
+import os
+import sys
+import time
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+
+from pyunit_time import Time
+from datetime import datetime,timedelta
+import jionlp as jio
+res = jio.parse_time(u'昨天 10:51')
+print(res)
+res = jio.parse_time(u'星期五 21:43')
+print(res)
+res = jio.parse_time(u'20:43')
+print(res)
+res_time = res.get('time')[0]
+print(type(res_time))
+# 将字符串时间转换为datetime对象
+dt_obj = datetime.strptime(res_time, '%Y-%m-%d %H:%M:%S')
+
+# 将datetime对象转换为timestamp
+timestamp = datetime.timestamp(dt_obj)
+timestamp = int(time.time())
+print(datetime.strftime(datetime.fromtimestamp(timestamp), '%Y-%m-%d %H:%M:%S'))

+ 31 - 0
testm/edgedb_test.py

@@ -0,0 +1,31 @@
+import datetime
+import edgedb
+import os
+
+os.environ['EDGEDB_CLIENT_SECURITY '] = 'insecure'
+client = edgedb.create_client(host='10.0.0.1', port=5656, tls_security='insecure')
+# https://docs.edgedb.com/easy-edgedb/zh/chapter1
+
+client.run("""
+type Person {
+  required property name -> str;
+}
+
+type Movie {
+  required property title -> str;
+  multi link actors -> Person;
+};
+""")
+client.query("""
+    INSERT User {
+        name := <str>$name,
+        dob := <cal::local_date>$dob
+    }
+""", name="Bob", dob=datetime.date(1984, 3, 1))
+
+user_set = client.query(
+    "SELECT User {name, dob} FILTER .name = <str>$name", name="Bob")
+# *user_set* now contains
+# Set{Object{name := 'Bob', dob := datetime.date(1984, 3, 1)}}
+
+client.close()

+ 20 - 15
testm/medata.py

@@ -1,20 +1,25 @@
 import base64  
 from pydantic import BaseModel, ValidationError  
-  
-def find_base64_truncation(encoded_data):  
-    # 将输入字符串划分为4字符的组  
-    for i in range(0, len(encoded_data), 4):  
-        chunk = encoded_data[i:i+4]  
-        try:  
-            # 尝试解码当前4字符组  
-            res = base64.b64decode(chunk)  
-            print(f"解码成功,截断点位于索引 {i}, 字符组为 '{chunk}', 解码结果为: {res.decode()}")  
-        except Exception as e:  
-            # 如果解码失败,打印出错误位置和可能的截断点  
-            print(f"可能的截断点位于索引 {i}, 字符组为 '{chunk}', 错误信息: {e}")  
-            break  # 假设你有一个 Base64 编码的字符串  
-encoded_data = "gAAAAABmK1YeWb24sQcKaJVtnmeoexGo3-W8dJlNEFrLZzBhtpZPUglAkG-VNzMBnLiY14vRuOORZen24-I0wVytnbNa61LO0UY2bZqF4k6vugsPGCgPUT5vvvm7hYlG3FcGwTG1zNM6EMfGelC2vS-LTFjByMwQ8AB_YjLtkRFaJ4nHEgUexow="
-find_base64_truncation(encoded_data=encoded_data)
+import pickle
+import json
+
+print(json.dumps(('123', '234')))
+data = pickle
+from typing import Any
+class PickleObj:
+    def __init__(self, obj:Any) -> None:
+        pass
+    
+    def put(self, obj, client):
+        pass
+
+try:
+    print(json.dumps(data))
+except TypeError as e:
+    print(e)
+    res = pickle.dumps(('123', '234'))
+    print("pickle res", res)
+    print(pickle.loads(res))
 # Base64 解码  
 # decoded_data = base64.b64decode(encoded_data)  
 # print(decoded_data)  

+ 186 - 0
testm/models.py

@@ -0,0 +1,186 @@
+import datetime
+import pydantic
+from typing import List, Dict, Any, Optional
+import sqlalchemy
+import os
+import sys
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+
+from database.config import chat_task_table,DB_URL,ai_yunying_db
+# from sqlmodel import Field, SQLModel,Relationship,Column,Session,select,func,UniqueConstraint
+from typing import Optional,Dict,Tuple
+from sqlalchemy.dialects.postgresql import ARRAY  
+
+'''
+class UserInfo(SQLModel, table=True):  
+    id: Optional[int] = Field(default=None, primary_key=True)  
+    avatar: Optional[str] = Field(default=None)  
+    avatar_larger: Optional[str] = Field(default=None)  
+    client_key:  Optional[str] = Field(default=None)  
+    e_account_role:  Optional[str] = Field(default=None)    
+    nickname:  Optional[str] = Field(default=None)   
+    # 外键约束有助于:级联操作、避免冗余、数据完整性
+    open_id: Optional[str] = Field(index=True, unique=True)
+    union_id:  Optional[str] = Field(default=None)  
+    update_time: datetime.datetime = Field(default_factory=datetime.datetime.now)  # 添加时间戳字段 
+
+'''
+'''
+class UnReadUserData(SQLModel, table=True):
+    id:Optional[int] = Field(default=None, primary_key=True)  
+    name: Optional[str]  = Field(default=None)  
+    avator: Optional[str]  = Field(default=None)  
+    msg: Optional[str]  = Field(default=None)  
+    unread_msg_count: int = Field(default=None)  
+    msg_time: int | None = Field(default=None)  
+    # chat_history: list = Field(sa_column=Column(ARRAY(String)),default=[])
+    detail: int = Field(default=None)  
+    create_time:datetime.datetime = Field(default_factory=datetime.datetime.now)
+    is_done:bool = Field(default=False)
+'''
+'''
+class UnReadUserData(pydantic.BaseModel):
+    id:Optional[int] 
+    name: Optional[str]
+    avatar: Optional[str] 
+    msg: Optional[str]
+    unread_msg_count: Optional[int]
+    msg_time: Optional[int|str] 
+    chat_history: Optional[list|dict|str]
+    detail: Optional[int]
+    create_time: Optional[datetime.datetime]
+    is_done: Optional[bool]
+
+    @property
+    def chat_history(self):
+        return self.chat_history
+'''
+'''
+class UnReadUserData(Schema):
+    id:Optional[int] 
+    name = fields.Str()
+    avatar = fields.Str()
+    msg = fields.Str()
+    unread_msg_count = fields.Int()
+    msg_time = fields.Str()
+    chat_history = fields.List(fields.Dict())
+    detail = fields.Str()
+    create_time = fields.Time()
+    is_done = fields.Bool()
+'''
+'''
+# https://github.com/ponyorm/pony
+# https://docs.ponyorm.org/
+from pony.orm import *
+from pony import orm
+db = Database()
+db.bind(provider='postgres', user='pg', password='pg', host='sv-v',port=5432, database='ai_yunying')
+
+class Person(db.Entity):
+    name = Required(str)
+    age = Required(int)
+    cars = Set('Car')
+
+class Car(db.Entity):
+    make = Required(str)
+    model = Required(str)
+    owner = Required(Person)
+db.generate_mapping(create_tables=True)
+    p1 = Person(name='John', age=20)
+    p2 = Person(name='Mary', age=22)
+    p3 = Person(name='Bob', age=30)
+    c1 = Car(make='Toyota', model='Prius', owner=p2)
+    c2 = Car(make='Ford', model='Explorer', owner=p3)
+    commit()
+'''
+
+'''
+from sqlalchemy import Column, Integer, String, Boolean,DateTime
+from sqlalchemy.dialects.postgresql import JSON
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+from sqlalchemy import create_engine  
+from sqlalchemy.orm import sessionmaker  
+from dataset import Table
+from sqlalchemy.orm import as_declarative 
+
+class UnReadUserData(Base):
+    __tablename__ = 'unread_user_data'
+
+    id = Column(Integer, primary_key=True, )
+    name = Column(String)
+    avatar = Column(String)
+    msg = Column(DateTime)
+    unread_msg_count = Column(Integer)
+    msg_time = Column(String)  # 或者其他适合的类型
+    chat_history = Column(JSON)
+    detail = Column(JSON)
+    create_time = Column(DateTime)
+    is_done = Column(Boolean)
+    
+    def to_dict(self, ignore=['id']): 
+        ret = {}
+        for c in self.__table__.columns:
+             if c.name not in ignore:
+                ret[c.name] = getattr(self, c.name)
+        return ret
+# 创建引擎  
+engine = create_engine(DB_URL, echo=False)  
+  
+# 创建表(如果表不存在)  
+Base.metadata.create_all(engine)  
+  
+# 创建会话制造工厂  
+def add_uu():
+    Session = sessionmaker(bind=engine)
+    session = Session()  
+    session.add(uu)  
+    session.commit()  
+    session.close()
+
+def dataset_get():
+    print(uu)
+    print(uu.chat_history)
+    print(type(uu))
+    unread_user_data_table = ai_yunying_db['unread_user_data']
+    # id = chat_task_table.insert(d)
+    res = unread_user_data_table.find_one(id=1)
+    chat_history = res.get('chat_history')
+    print(res.get('chat_history'))
+    print(type(chat_history))
+
+'''
+
+def main():
+    import json
+    import time
+    d = {'name': '程序员马工', 'avatar': 'https://p3.huoshanimg.com/aweme/100x100/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143', 'msg': '你好', 'msg_time': '刚刚', 'chat_history': [{1:"555"}, {'你好':555}, {1:555}], 'detail': {'1':123}, 'unread_msg_count': 2}
+    # uu  = UnReadUserData(name='sdfasd', avatar='12312')
+    d['msg_time'] = time.time()
+    uu  = UnReadUserData(**d)
+    # unread_user_data_table:Table = ai_yunying_db['unread_user_data']
+    unread_user_data_table = chat_task_table
+    id = unread_user_data_table.insert(uu.to_dict(), types={"chat_history": JSON, "detail": JSON})
+    res = unread_user_data_table.find_one(id=id)
+    chat_history = res.get('chat_history')
+    print("chat_history " , chat_history)
+    print(type(chat_history))
+    detail = res.get('detail')
+    print(detail)
+    print(type(detail))
+    msg_time = res.get('msg_time')
+    print("msg_time ", msg_time)
+    print(type(msg_time))
+    # chat_history = json.loads(chat_history)
+    return
+    create_all_table()
+    with Session(engine) as session:
+        session.add(user_data)
+        session.commit()
+    # data = user_data.dict()
+    # data.pop('id')
+    # chat_task_table.insert(data)
+
+if __name__ == "__main__":
+    main()