gpt.md 40 KB

项目简介 项目名称: swl-douyin

项目简介: 一个基于 FastAPI 框架的网站后端,用于用户登录、文档上传、文档向量转换、抖音 API 获取用户信息、视频评论回复等功能。

项目结构如下:

api/ 目录下包含了一系列处理用户登录鉴权、上传文档、微信支付等相关功能的 Python 脚本。 douyin/ 目录下是与抖音 API 相关的接口,用于获取 access_token 和用户信息。 grpc_m/ 目录下为 gRPC 相关的客户端和服务端代码,其中微服务 VectorService 用于接收文档并将其转换为向量数据存储。

├── api │ ├── jwt.py │ ├── login.py │ ├── readme.md │ ├── swl.http │ ├── upload.py │ ├── weixin_pay.py ├── config.py ├── db │ ├── base.py │ ├── docs.py │ ├── engine.py │ ├── readme.md │ ├── user.py ├── douyin │ ├── access_token.py │ ├── user_info.py ├── grpc_m │ ├── client.py │ ├── send_data_to_vector.py │ ├── vector_service.proto │ ├── vector_service_pb2.py │ ├── vector_service_pb2_grpc.py ├── main.py ├── readme.md 项目功能:

用户通过抖音扫码登录 用户上传文档 将文档转换为向量 获取抖音用户公开信息 评论抖音视频

class Documents(SQLModel, table=True):  
    id: UUID4 = Field(default_factory=uuid.uuid1, primary_key=True,index=True)  # 使用 UUID v1 作为主键 
    open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True)  # 关联到用户表的外键  
    path: str = Field(nullable=False) # 相对路径
    status: int = Field(nullable=False) # 文档状态  
    update_time: datetime = Field(default_factory=datetime.now)  # 创建时间、更新时间
    __table_args__ = (UniqueConstraint('open_id', 'path', name='uq_documents'),) 

如上向量文档数据库模型,用来存储用户自定义上传的文档,以便于后续向量化处理。 同时用户自己也有许多视频,我需要把视频也转化成文本,每一个视频 item_id ,对应一个文档 Documents.id ,并且进行向量化。 对于自定义文档和视频文本两者的数据关系,我应该如何定义表结构? 如果自定义文档我假设有一个分类表,可以按不同文档分类:

class Categories(SQLModel, table=True):  
    id: UUID4 = Field(default_factory=uuid.uuid1, primary_key=True)  # 使用 UUID v1 作为主键 
    open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True)  # 关联到用户表的外键  
    name: str = Field(default="default", index=True)  # 分类的名称,添加索引以优化查询性能  
    update_time: datetime = Field(default_factory=datetime.now)  # 创建时间、更新时间
    # 添加联合唯一约束  
    __table_args__ = (UniqueConstraint('open_id', 'name', name='uq_open_id_ctname'),)


class DocumentCategories(SQLModel, table=True):
    id: UUID4 = Field(foreign_key="documents.id",index=True, primary_key=True)  # 关联到文档表的外键  
    category_id: UUID4 = Field(foreign_key="categories.id",index=True)  # 关联到分类表的外键  
    __table_args__ = (UniqueConstraint('id', 'category_id', ),)

那么视频对应的一个文档,又该如何关联这些表结构或分类?给我最佳合理的程序设计、表结构设计架构。

级联数据结构

# Event: {'event': 'item_comment_reply', 'client_key': 'aw6aipmfdtplwtyq', 'from_user_id': '_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy', 'to_user_id': '_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk', 'content': CommentContent, 'log_id': '20240227180303CB402D84F669A90052F5'}
# CommentContent: {"at_user_id":"","avatar":"https://p11.douyinpic.com/aweme/720x720/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143","comment_id":"@9VxS1/qCUc80K2etd8wkUc791mHpPPGEPZB1rgyvKVQRavj960zdRmYqig357zEBaMZS/8P5ZrVUQVC/27oJAA==","comment_user_id":"_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy","content":"测试","create_time":1709028183,"digg_count":0,"level":1,"nick_name":"程序员马工","parent_id":"@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ==","reply_comment_total":0,"reply_to_comment_id":"","reply_to_item_id":"@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ=="}
class CommentContent(SQLModel, table=True):  
  
    id: int = Field(primary_key=True)  
    at_user_id: Optional[str] = Field(default=None) 
    avatar: Optional[str] = Field(default=None) 
    # 该条评论的id
    comment_id: Optional[str] = Field(default=None) 
    # 发评论的用户uid
    comment_user_id: Optional[str] = Field(default=None) 
    # 评论内容
    content: Optional[str] = Field(default=None) 
    # 该评论回复的总父级评论。也就是说在视频中直接看到的评论。
    parent_id: Optional[str] = Field(default=None) 
    reply_comment_total: Optional[int] = Field(default=None) 
    reply_to_comment_id: Optional[str] = Field(default=None) 
    # 视频 id
    reply_to_item_id: Optional[str] = Field(default=None) 
    nick_name: Optional[str] = Field(default=None)
    event: "Events" = Relationship(back_populates="content", link_model=EventCommentLink)

class Events(SQLModel, table=True):  
    id: int = Field(default=None, primary_key=True)  
    event: str  
    client_key:str
    # 发起评论的人,可以是自己,也可以是别人
    from_user_id: str  
    # to_user_id 一直是自己的 open_id,你授权了思维链管理视频评论,订阅事件,当你自己的视频被评论时,to user 就是自己
    to_user_id: str  
    # 使用 Relationship 定义与 CommentContent 的关系  
    content: Optional[CommentContent] = Relationship(back_populates="event", link_model=EventCommentLink)
    log_id:str

class EventRepository(DouyinBaseRepository):
    def __init__(self, engine=engine):  
        super().__init__(CommentContent, engine)  
        self.model:Events

    def save_item_comment_reply(self, data:dict) -> Tuple[Events,CommentContent]:
        with Session(self.engine) as session:
            content = json.loads(data.pop("content"))
            content_model = CommentContent(**content)
            event_model = Events(**data)
            event_model.content = content_model
            session.add(event_model)
            session.commit()
            return event_model,content_model

上述模型是参考的 SQLmodel 官方的代码风格,定义了3个数据表,并用表关联的方式 save_item_comment_reply 存放数据。具体说明请看注释。 不过 parent_id 、 reply_to_comment_id 是两个复杂的字段, 假如用户在视频中发表了评论,这条评论视为根极评论假设 comment_id = 1,则 parent_id 是视频 id 也就是 reply_to_item_id, reply_to_comment_id='' ,因为仅仅是评论视频而没有产生回复。

如果 根极评论 comment_id = 1 被人评论,则产生子评论 comment_id = 2, parent_id 则变成 comment_id = 1 ,reply_to_comment_id='' ,因为仅仅是评论根极评论而没有产生回复。

如果子评论 comment_id = 2 被人回复,则产生新的评论数据 comment_id = 3 , 因为在根极评论 comment_id = 1下的回复,父级 parent_id 仍然是 comment_id = 1 ,但是 reply_to_comment_id=comment_id = 2

基于以上逻辑,能否根据 comment_id 、parent_id 、 reply_to_comment_id 等这几个重要的字段递归查询发生了哪些对话。然后按照顺序返回对话内容

嵌套数据模型

示例数据: data = {'event': 'item_comment_reply', 'from_user_id': 'mg', 'to_user_id': 'wang', 'content': {data_content}...} data_content ={"comment_id":"@9VxS1","comment_user_id":"mg","content":"这是编程吗","parent_id":"video1","reply_comment_total":0,"reply_to_comment_id":"0","reply_to_item_id":"video1",nick_name":"程序员马工"}

假设有json 数据 data 里面有一个字段是 'content' ,它的字段是字典,如示例中的 data_content 。那么python SQLmodel 、SQLAlmodel 2.0 基于 PostgreSQL 是否支持这样的子嵌套数据库模型:

from sqlmodel import Field, SQLModel  
from typing import Dict, Any  
  
class Content(SQLModel):  
    comment_id: str  
    comment_user_id: str  
    content: str  
    parent_id: str  
    reply_comment_total: int  
    reply_to_comment_id: str  
    reply_to_item_id: str  
    nick_name: str  
  
    # 如果你希望这个模型也作为一个Pydantic模型进行验证,可以添加一个`__root__`字段  
    # 但这在作为数据库模型的一部分时通常是不必要的  
    #__root__: Dict[str, Any]  
  
class EventData(SQLModel, table=True):  
    id: int = Field(default=None, primary_key=True)  
    event: str  
    from_user_id: str  
    to_user_id: str  
    content: Content = Field(sa_column_kwargs={'type': 'jsonb'})  # 使用PostgreSQL的JSONB类型

如果支持,给一个 session.add 示例

R

from sqlmodel import create_engine, Session  
from your_module import EventData, Content  # 假设EventData和Content是从你的模块中导入的  
  
# 创建数据库引擎  
engine = create_engine('postgresql://user:password@localhost/dbname')  
  
# 创建表(如果它们还不存在)  
SQLModel.metadata.create_all(engine)  
  
# 创建一个新的会话  
with Session(engine) as session:  
    # 创建一个Content实例  
    content_data = Content(  
        comment_id="@9VxS1",  
        comment_user_id="mg",  
        content="这是编程吗",  
        parent_id="video1",  
        reply_comment_total=0,  
        reply_to_comment_id="0",  
        reply_to_item_id="video1",  
        nick_name="程序员马工"  
    )  
      
    # 创建一个EventData实例,并将Content实例赋值给它的content字段  
    event_data = EventData(  
        event="item_comment_reply",  
        from_user_id="mg",  
        to_user_id="wang",  
        content=content_data  
    )  
      
    # 将EventData实例添加到会话中  
    session.add(event_data)  
      
    # 提交会话以保存更改到数据库  
    session.commit()  
  
# 会话在with块结束时自动关闭,不需要手动关闭  
# 所有在会话中添加并提交的实例现在都应该已经保存到数据库中了

用户管理

项目简介 swl-douyin

你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、抖音 API 获取用户信息、视频评论回复等的功能。

git ls-files .gitignore

api/jwt.py Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据

api/login.py 抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token

api/comment.py 视频产生评论时,抖音向 Fastapi 后端服务器 post 请求的用户评论 webhook

api/readme.md : 说明文档

api/upload.py 前端请求上传文件的路径

api/weixin_pay.py: 用户通过微信支付扫码充值。充值后才能使用网站的付费应用

config.py : 本项目的配置文件

db/base.py : 数据库增删改查基类

db/docs.py : 用户上传的文档 docs 数据库模型定义,操作数据库的实现

db/engine.py 数据库引擎,用法: from db.engine import engine,create_all_table

db/readme.md 本目录的说明文档

db/user.py 用户登录的相关数据库定义和模型 class DouyinBaseRepository(BaseRepository):

def __init__(self, model: SQLModel, engine=engine):  
    super().__init__(model, engine)  

def add_or_update(self, dict_data: dict) -> SQLModel:

class UserInfoRepository(DouyinBaseRepository):

def __init__(self, engine=engine):  
    super().__init__(UserInfo, engine)  
    self.model:UserInfo

Database manager class

class UserOAuthRepository(DouyinBaseRepository):

def __init__(self, engine=engine):  
    super().__init__(UserOAuthToken, engine)  
    self.model:UserOAuthToken

douyin/access_token.py 抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息 async def get_access_token(code) async def refresh_access_token(refresh_token):

db/comment.py 抖音 API 接口,用户对自己的视频评论进行回复。 示例: async def reply_to_comment(open_id: str, access_token: str, content: str='', item_id:str='', comment_id='')

douyin/user_info.py 抖音 API 接口,获取抖音用户公开信息

grpc_m/client.py gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中

grpc_m/send_data_to_vector.py 发送文档数据到向量服务器

grpc_m/vector_service.proto gRPC 协议

grpc_m/vector_service_pb2.py grpc_m/vector_service_pb2.pyi grpc_m/vector_service_pb2_grpc.py

main.py 主程序入口,已经完成代码

readme.md

现在 db 和 douyin 目录还是相对独立的,保持单一职责,没有太多耦合。可是当 comment 评论发生的时候,需要一系列复杂判断,例如向数据库 db 获取用户 id 等信息,从 id 中获取数据库用户文档元数据 docs ,回复 reply_to_comment 时还需要判断 access_token 是否过期,refresh_access_token还需要向db数据库查询。因此有了非常多的逻辑和耦合
因此我想新建一个文件: douyin/manage_user.py

python from db.user import UserInfo,UserOAuthRepository,UserOAuthToken,UserInfoRepository from db.docs import DocStatus,DocumentCategories,DocumentCategoriesRepository,Documents,DocumentBase,DocumentsRepository from douyin.access_token import get_access_token,refresh_access_token from douyin.comment import reply_to_comment

class UserManage:

def __init__(self, open_id) -> None:
    self.open_id = open_id

def get_token_model(self) -> UserOAuthToken:
    res = UserOAuthRepository().select(UserOAuthToken.open_id == self.open_id)
    return res.first()

def get_info_model(self) -> UserInfo:
    res = UserInfoRepository().select(UserInfo.open_id == self.open_id)
    return res.first()

def get_access_token(self) -> str:
    model = self.get_token_model()
    return model.access_token
用来统一管理所有业务。 get_access_token 理应 自行判断 token过期,如果过期则自动刷新 access_token ,如果刷新 token 也过期,才需要提示错误。
这是 model :
class UserOAuthToken(SQLModel, table=True):  
    id: Optional[int] = Field(primary_key=True)
    access_token:str
    expires_in: Optional[int] = None
    open_id:str = Field(index=True)
    refresh_expires_in: Optional[int] = None
    refresh_token:str
    scope: str
    update_time: datetime = Field(default_factory=datetime.now)  # 添加时间戳字段 
    __table_args__ = (UniqueConstraint('open_id'),)  

expires_in int64 access_token 接口调用凭证超时时间,单位(秒) 
refresh_expires_in int64 refresh_token 凭证超时时间,单位(秒)
refresh_access_token 函数正常示例:
{
  "data": {
    "description": "",
    "error_code": 0,
    "expires_in": "86400",
    "refresh_token": "refresh_token"
  },
  "message": "success"
}

异常示例
{
  "data": {
    "description": "Parameter error",
    "error_code": 2100005
  },
  "extra": {
    "logid": "2020070614111601022506808001045D59",
    "now": 1594015876138
  }
}
我是否可以直接修改数据库字段 expire_in ,表明 token 过期时间?这样才能判断字段是否过期,如果需要,请帮我完成代码


## 项目简介 swl-douyin
你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、抖音 API 获取用户信息、视频评论回复等的功能。

git ls-files
.gitignore

api/jwt.py
Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据

api/login.py
抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token


api/readme.md :
说明文档

api/swl.http
http 请求草稿

api/upload.py
前端请求上传文件的路径

api/weixin_pay copy.py
api/weixin_pay.py:
用户通过微信支付扫码充值。充值后才能使用网站的付费应用

config.py :
本项目的配置文件

db/base.py :
数据库增删改查基类

db/docs.py :
用户上传的文档 docs 数据库模型定义,操作数据库的实现

db/engine.py
数据库引擎,用法: from db.engine import engine,create_all_table

db/readme.md
本目录的说明文档

db/user.py
用户登录的相关数据库定义和模型

douyin/access_token.py
抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息

douyin/user_info.py
抖音 API 接口,获取抖音用户公开信息

grpc_m/client.py
gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中

grpc_m/send_data_to_vector.py
发送文档数据到向量服务器

grpc_m/vector_service.proto
gRPC 协议

grpc_m/vector_service_pb2.py
grpc_m/vector_service_pb2.pyi
grpc_m/vector_service_pb2_grpc.py


main.py
主程序入口,已经完成代码

readme.md

在另一个微服务 LangChain 项目中 main.py 如下:

import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from concurrent import futures
import time  
import grpc  
import logging  
import uuid  
from db_vector.vector_for_douyin import save_user_doc_to_vector  # 假设这里有必要的导入  
from grpc_m import vector_service_pb2, vector_service_pb2_grpc  
from config import MNT_DOUYIN_DATA


class VectorService(vector_service_pb2_grpc.VectorServiceServicer):  
    def SaveDocToVector(self, request, context):  
        category_id = request.category_id
        user_doc_relative_path = request.user_doc_relative_path
        mnt_user_docs_path = os.path.join(MNT_DOUYIN_DATA, user_doc_relative_path)
        if not mnt_user_docs_path:
            return vector_service_pb2.SaveDocToVectorResponse(status=vector_service_pb2.ErrorCode.MNT_DOUYIN_DATA_ERROR)  
        status = save_user_doc_to_vector(category_id, mnt_user_docs_path)  # 这个函数需要根据实际情况来实现  
        return vector_service_pb2.SaveDocToVectorResponse(status=status)  


def serve():
    port = "18600"
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=30))  
    vector_service_pb2_grpc.add_VectorServiceServicer_to_server(VectorService(), server)
    server.add_insecure_port("0.0.0.0:" + port)
    server.start()
    print("Server started, listening on " + port)
    server.wait_for_termination()
  
if __name__ == '__main__':  
    serve()

gRPC grpc_m/vector_service.proto 文件:

syntax = "proto3";  
  
package grpc_m;  
// 定义错误枚举类型
enum ErrorCode {
    SUCCESS = 0;
    DOC_CONVERT_ERROR = 1;
    VECTOR_SERVER_ERROR = 2;
}

service VectorService {  
    rpc SaveDocToVector (SaveDocToVectorRequest) returns (SaveDocToVectorResponse) {}  
}  
  
message SaveDocToVectorRequest {  
    string category_id = 1;  
    string user_doc_relative_path = 2; 
}  

message SaveDocToVectorResponse {  
    ErrorCode status = 1;  
}

具体逻辑:前端用户上传文档 网站后端服务 Fastapi 接收鉴权成功 - Fastapi 请求 LangChain 微服务将文档保存到向量数据库 - 前端用户发起文档对话 - 网站后端服务 Fastapi 接收鉴权成功 - 后端Fastapi请求 LangChain 微服务 进行向量相似度计算,文本片段组合成 prompt 请求 LLM api ,将 LLM 回复的结果给后端 - 后端服务 Fastapi将结果返回给前端用户

目前我已经完成了 api/upload.py 文档上传 - status = save_user_doc_to_vector(category_id, mnt_user_docs_path) 保存在向量 请在这两个微服务 Fastapi 后端和 LangChain 项目中,实现文档对话的代码,你会如何实现?包括在两个项目中的 shell 指令新建目录及文件发给我

项目架构建议

项目简介 swl-douyin

你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、对话聊天测试的功能。

git ls-files .gitignore

api/jwt.py Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据

api/login.py 抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token

api/readme.md : 说明文档

api/swl.http http 请求草稿

api/upload.py 前端请求上传文件的路径

api/weixin_pay copy.py api/weixin_pay.py: 用户通过微信支付扫码充值。充值后才能使用网站的付费应用

config.py : 本项目的配置文件

db/base.py : 数据库增删改查基类

db/docs.py : 用户上传的文档 docs 数据库模型定义,操作数据库的实现

db/engine.py 数据库引擎,用法: from db.engine import engine,create_all_table

db/readme.md 本目录的说明文档

db/user.py 用户登录的相关数据库定义和模型

douyin/access_token.py 抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息

douyin/user_info.py 抖音 API 接口,获取抖音用户公开信息

grpc_m/client.py gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中

grpc_m/send_data_to_vector.py 发送文档数据到向量服务器

grpc_m/vector_service.proto gRPC 协议

grpc_m/vector_service_pb2.py grpc_m/vector_service_pb2.pyi grpc_m/vector_service_pb2_grpc.py

main.py 主程序入口

readme.md


在另一个微服务 vector 项目中主文件如下:

python import os import sys sys.path.append(os.path.dirname(os.path.dirname(file))) from concurrent import futures import time
import grpc
import logging
import uuid
from db_vector.vector_for_douyin import save_user_doc_to_vector # 假设这里有必要的导入
from grpc_m import vector_service_pb2, vector_service_pb2_grpc
from config import MNT_DOUYIN_DATA

class VectorService(vector_service_pb2_grpc.VectorServiceServicer):

def SaveDocToVector(self, request, context):  
    category_id = request.category_id
    user_doc_relative_path = request.user_doc_relative_path
    mnt_user_docs_path = os.path.join(MNT_DOUYIN_DATA, user_doc_relative_path)
    if not mnt_user_docs_path:
        return vector_service_pb2.SaveDocToVectorResponse(status=vector_service_pb2.ErrorCode.MNT_DOUYIN_DATA_ERROR)  
    status = save_user_doc_to_vector(category_id, mnt_user_docs_path)  # 这个函数需要根据实际情况来实现  
    return vector_service_pb2.SaveDocToVectorResponse(status=status)  

def serve():

port = "18600"
server = grpc.server(futures.ThreadPoolExecutor(max_workers=30))  
vector_service_pb2_grpc.add_VectorServiceServicer_to_server(VectorService(), server)
server.add_insecure_port("0.0.0.0:" + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()

if name == 'main':

serve()
swl-douyin 向 vector 向量服务器发起请求,调用 SaveDocToVector 将文档保存到向量服务器,然后向量服务器返回向量数据库的有关 id 等信息,现在 swl-douyin 可以根据自己的向量文档与 LLM 大模型对话,我是通过 API 的方式调用 LLM 模型基于LangChain 框架,我是否新建另一个微服务称为 LangChain-LLM ,来对 vector 微服务发起请求然后返回进行文档对话聊天,还是应该跟 vector 向量服务器用的是同一个微服务?因为 vector 本质上也是调用了 LangChain 的内置库来做文本向量转换的。

我的设想:  前端用户上传文档 网站后端服务 Fastapi 接收鉴权成功 - Fastapi 请求 vector 微服务将文档保存到向量数据库 - 前端用户发起文档对话 - 网站后端服务 Fastapi 接收鉴权成功 - 后端Fastapi请求 LangChain-LLM 微服务 - LangChain-LLM微服务 请求 vector 进行向量相似度计算,并返回文本片段 - LangChain-LLM 微服务收到文本片段组合成 prompt - LangChain-LLM 微服务请求 LLM api - LangChain-LLM 微服务将返回的结果返回给后端 - 后端服务 Fastapi将结果返回给前端用户

另一个设想是不需要 LangChain-LLM 微服务,而是使用 vector 微服务,向量计算向量搜素和LLM请求都作为一个项目整体。

我的上述文档目录和项目架构是否合理?给我改进的建议

# 回复评论事件
你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、抖音 API 获取用户信息、视频评论回复等的功能。

git ls-files
.gitignore

api/jwt.py
Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据

api/login.py
抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token


api/readme.md :
说明文档

api/swl.http
http 请求草稿

api/upload.py
前端请求上传文件的路径

api/weixin_pay copy.py
api/weixin_pay.py:
用户通过微信支付扫码充值。充值后才能使用网站的付费应用

config.py :
本项目的配置文件

db/base.py :
数据库增删改查基类

db/docs.py :
用户上传的文档 docs 数据库模型定义,操作数据库的实现

db/engine.py
数据库引擎,用法: from db.engine import engine,create_all_table

db/readme.md
本目录的说明文档

db/user.py
用户登录的相关数据库定义和模型

douyin/access_token.py
抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息

douyin/user_info.py
抖音 API 接口,获取抖音用户公开信息

grpc_m/client.py
gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中

grpc_m/send_data_to_vector.py
发送文档数据到向量服务器

grpc_m/vector_service.proto
gRPC 协议

grpc_m/vector_service_pb2.py
grpc_m/vector_service_pb2.pyi
grpc_m/vector_service_pb2_grpc.py


main.py
主程序入口,已经完成代码

readme.md

帮我在 douyin/comment.py 写一个向抖音 API 发起回复视频评论的函数,以供给别的任务调用:

curl --location --request POST 'https://open.douyin.com/item/comment/reply/?open_id=43eae150-12eb-4035-8d40-3f9a2bfcdf4e' \
--header 'Content-Type: application/json' \
--header 'access-token: act.0308b91b5f4858eedd25b827fa7ed69ewKUKlr2o6b9zZOe17m4iKlMyZjJB' \
--data-raw '{
    "content": "评论",
    "item_id": "@9VwW0qOBUN9mJyD3bNxvGc791GLuPPGFOJx4rQOvLVAXbvj+60zdRmYqig357zEBXe9yX1kW9iI9RBd5IYhjRg==",
    "comment_id": "@9VwW0qOBUN9mJyD3bNxvGc791GLuP/mAOZ1yqQmlK1QQbfX460zdRmYqig357zEBlemy4tGnuHCtvx71JWmUEw=="
}'

在另一个文件的 task 中,我可以这样调用:

async def comment_respone_task(data):
    comment_data = {  
        "comment_id": data.get("content").get("comment_id"),  
        "content": data.get("content").get("content"),  
        "from_user_id": data.get("from_user_id"),  
        "reply_to_item_id": data.get("content").get("reply_to_item_id"),  
    }
    comment_model:Comment = CommentRepository().add_or_update(Comment(**comment_data))
    res = CategoriesRepository().select(Categories.open_id == comment_data.get("from_user_id"))
    categ:Categories = res.first()
    if categ:
        res = await langchain_chat(categ.id, comment_data.get("content"))
        # 以下是向抖音发起评论的请求
    else:
        logger.error(f"can not find {comment_data.get("from_user_id")} in CategoriesRepository")
        comment_model.status = CommentStatus.ERROR
        CommentRepository().add_or_update(comment_model)

请问 Fastapi 是否支持创建异步 client 让 douyin/comment.py 文件请求?如果不支持则使用 httpx

接收评论事件

你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、抖音 API 获取用户信息、视频评论回复等的功能。

git ls-files
.gitignore

api/jwt.py
Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据

api/login.py
抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token


api/readme.md :
说明文档

api/swl.http
http 请求草稿

api/upload.py
前端请求上传文件的路径

api/weixin_pay copy.py
api/weixin_pay.py:
用户通过微信支付扫码充值。充值后才能使用网站的付费应用

config.py :
本项目的配置文件

db/base.py :
数据库增删改查基类

db/docs.py :
用户上传的文档 docs 数据库模型定义,操作数据库的实现

db/engine.py
数据库引擎,用法: from db.engine import engine,create_all_table

db/readme.md
本目录的说明文档

db/user.py
用户登录的相关数据库定义和模型

douyin/access_token.py
抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息

douyin/user_info.py
抖音 API 接口,获取抖音用户公开信息

grpc_m/client.py
gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中

grpc_m/send_data_to_vector.py
发送文档数据到向量服务器

grpc_m/vector_service.proto
gRPC 协议

grpc_m/vector_service_pb2.py
grpc_m/vector_service_pb2.pyi
grpc_m/vector_service_pb2_grpc.py


main.py
主程序入口,已经完成代码

readme.md

现在我新建了一个 api/webhook.py :

import json
import hashlib  
import hmac  
from fastapi import FastAPI,APIRouter,  File, HTTPException, Depends, Request,Header, UploadFile  
from fastapi.responses import JSONResponse 
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials  
from config import logger,CLIENT_KEY
webhook_route = APIRouter()  

@webhook_route.post("/webhook")  
async def webhook(request: Request):  
    data = await request.json()  
    signature = request.headers.get('X-Douyin-Signature')  
    bytes = request.body()
    logger.info(f"{bytes} {signature}")
    verify = verify_signature(request.body(), signature, CLIENT_KEY)
    logger.info(f"{verify}")
    # 验证签名  
    # if not verify_signature(request.body(), signature, CLIENT_KEY):  
    #     raise HTTPException(status_code=403, detail="Invalid signature")  
  
    event = data.get("event")  
    # 处理验证请求  
    if event == "verify_webhook":  
        challenge = data.get("content").get("challenge")  
        return JSONResponse(content={"challenge": challenge})  
    handle_event(data)
    # 处理其他类型的消息  
    client_key = data.get("client_key")  
    from_user_id = data.get("from_user_id")  
    content = data.get("content")  
    log_id = data.get("log_id")  
    
    # 在这里添加处理不同类型消息的逻辑  
    # ...  
  
    # 返回成功的响应  
    return JSONResponse(status_code=200, content={"message": "success"})  
  
async def handle_event(data:dict):
    pass

抖音会对 /webhook post 如下信息:

{
    "event": "item_comment_reply",
    "from_user_id": "",
    "to_user_id": "",
    "client_key": "",
    "content": {}
}
其中 "content" 的内容为字典,内容示例如下:
{
    "comment_id":"",                    //评论id
    "comment_user_id",              //发评论的用户uid
    "content":,                             //评论内容
    "create_time":123123,           //评论创建时间(秒级时间戳)
    "digg_count":0,                     //该评论的点赞量
    "reply_comment_total":1,    //该评论下的回复评论量
    "reply_to_comment_id":"",   //该评论回复的上一级评论的评论id
    "reply_to_item_id":"@9VxT0uuFWsE7M3Koc4olFM791WbsNPGKOp1wrgiiJ1ERbfD060zdRmYqig357zEB1tSgbExoci7R0e1yFxAIMw==",//该评论回复的视频id
    "at_user_id": "",                   //评论@的用户uid
    "avatar": "https://uuue/ehdne",
    "nick_name": "xiaoming"
}

简要流程说明: 接收并验证 Webhook 请求:

首先,验证请求的签名以确保其来自抖音服务器。 解析请求体,获取事件类型和相关数据。 处理特定事件:

对于 item_comment_reply 事件,提取所需的字段,如 comment_id、content、from_user_id 和 reply_to_item_id。 数据存储与状态标记:

将提取的数据存储到数据库中,同时标记一个状态字段,例如 processing,表示该评论正在处理中。 存储时,可以考虑使用事务来确保数据的一致性。 异步处理:

使用 FastAPI 的后台任务功能(如依赖于 Celery 的任务队列)异步处理评论内容。这样可以避免阻塞 webhook 端点。 在后台任务中,可以将评论内容发送给 AI 大模型或其他微服务进行处理。 处理完成后的操作:

一旦 AI 大模型或其他微服务处理完成,更新数据库中的状态字段,例如设置为 processed。 同时,存储处理结果或相关标识。 回复抖音 API:

根据处理结果,使用适当的逻辑构造回复内容。 通过调用抖音 API 发送回复评论的请求。注意,这一步可能需要处理 API 调用的结果,例如检查是否成功发送了回复。

我想把收到的用户评论 "comment_id" 和 "content" 、"from_user_id" "reply_to_item_id" 取出来,进行后续处理。因为知道这几个参数才能准确定位是哪一个用户、哪一条视频、哪一条评论。随后我可能会根据 content 发送给 AI 大模型或者其他微服务,进行一系列计算和处理。最终处理完成后,会将之前得到的参数请求给 抖音 API ,回复该条视频的评论信息。 我的思路看看是否正确:并完成功能:

  • handle_event 中根据消息类型建立一个 task ,例如 item_comment_reply 创建一个处理评论的 task
  • 处理评论的 task 中,需要将参数保存到数据库
  • SQLmodel 定义数据表 comment ,使用一个接口将数据存放到数据库
  • ... 请完成相应的示例代码

抖音 webhook

用户在抖音生活服务内的数据变更时,可通过 webhook 的方式将消息推送给开发者,开发者可以根据需要订阅不同消息。消息仅包含基本的信息,如需要查询详细数据,可在收到消息后主动调用相关接口获取明细数据。 注意事项: 目前仅支持同一个应用配置一个接收消息地址,开发者需根据event字段区分消息类型对content内容进行不同的处理。 前三步参考文档技术服务商接入指南和自研开发者接入指南。 第四步提供的消息地址需满足抖音侧请求验证功能,才能配置成功。 抖音侧发送的 POST 验证请求示例如下:

{ "event": "verify_webhook", "client_key": "", "content": { "challenge": 12345 } }

当开发者收到抖音的验证请求时,需要解析出 challenge 值,并立即返回该 challenge 值作为响应。需要注意,返回内容需要放入响应的 Body 里,不能直接返回;并且返回内容为 text 格式的 json 数据。响应示例如下:

{ "challenge": 12345 }

当 推送事件不是 verify_webhook 时: HTTP method POST Msg-Id :同一实体下同一action的msg_id相同,服务商可根据msg_id对于消息去重 X-Douyin-Signature: 抖音侧签名,服务商可根据签名判断该消息是否来自抖音开放平台 Content-Type: 固定值application/json

请求体字段 event 消息类型,用于区分各类消息

client_key 对应服务商平台或开发者平台中的APPID,应用ID

from_user_id 标识用户身份的openId,同一用户在不同的APPID中openId不相同

content 消息内容,根据需要解析消息内容,不同类型的消息内容不同

log_id 抖音内部日志id,可提供给抖音方便排查问题 请求体示例 { "event": "life_trade_order_notify", "client_key": "axxxxxxxxxxxxx", "from_user_id": "f6e35c98-1e53-4943-ad6d-f476f869de", "content": "{\"action\": \"pay_success\",\"msg_time\": 1665991178,\"order\": {\"order_id\": \"123\",\"pay_amount\": 1, \"original_amount\": 1, \"account_id\": \"123\",\"create_time\": 1665991178,\"pay_time\": 1665991178}}", "log_id": "202210101930530102281180650970B5AF" }

验签方式 用户可通过请求头中的 X-Douyin-Signature 字段判断该消息是否来自抖音开放平台。 抖音服务端会将应用的 AppSecret 和消息体使用 sha1 哈希作为 X-Douyin-Signature 的 value。开发者可以自行使用 AppSecret 和收到的消息体进行 sha1 哈希,与该请求头进行比对,确认消息推送请求来自抖音。 java 版验签 demo

    import org.apache.commons.codec.digest.DigestUtils; // sha1算法库


    // 获取消息中body
    String str, wholeStr = "";
    try{
        BufferedReader br = re.getReader();
        while((str = br.readLine()) != null){
            wholeStr += str;
        }
    } catch (Exception e){
        log.warn("获取请求内容失败");
    }
    // 获取请求头中的加签信息
    String  signature = re.getHeader("X-Douyin-Signature");
    String data = appSecret + wholeStr;
    String sign = DigestUtils.sha1Hex(data);
    if(!sign.equals(signature)){
        log.error("验签失败");
    }

响应内容 开发者收到消息推送后,http code 响应 200 且响应时间小于 3s,抖音侧即认为推送成功。

若开发者 http 响应 code 非 200 或响应时间超过 3s,抖音侧会间隔 500ms 发起重试,最大重试次数为 3 次。

抖音侧收到成功请求时也可能会继续重复推送,请务必使用请求头中 Msg-Id 进行消息去重处理。

上述是抖音官方事件订阅的文档说明,你作为一个开发者,需要完成上述所需功能,并成功订阅事件通知。当前路径 api 目录中,是 Fastapi 框架下的项目 endpoint 路径,你需要用 shell 指令创建相关文件,然后完成 py 代码:

from fastapi import FastAPI,APIRouter,  File, HTTPException, Depends, Request,Header, UploadFile  
from fastapi.responses import JSONResponse 
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials  
from config import logger,CLIENT_KEY
webhook_route = APIRouter()  


@webhook_route.post("/webhook")  
async def webhook(request: Request):  
    data = await request.json()  
    signature = request.headers.get('X-Douyin-Signature')  
    logger.info(f"{request.headers}")
    logger.info(f"{data}")