项目简介 项目名称: swl-douyin
项目简介: 一个基于 FastAPI 框架的网站后端,用于用户登录、文档上传、文档向量转换、抖音 API 获取用户信息、视频评论回复等功能。
项目结构如下:
api/ 目录下包含了一系列处理用户登录鉴权、上传文档、微信支付等相关功能的 Python 脚本。 douyin/ 目录下是与抖音 API 相关的接口,用于获取 access_token 和用户信息。 grpc_m/ 目录下为 gRPC 相关的客户端和服务端代码,其中微服务 VectorService 用于接收文档并将其转换为向量数据存储。
├── api │ ├── jwt.py │ ├── login.py │ ├── readme.md │ ├── swl.http │ ├── upload.py │ ├── weixin_pay.py ├── config.py ├── db │ ├── base.py │ ├── docs.py │ ├── engine.py │ ├── readme.md │ ├── user.py ├── douyin │ ├── access_token.py │ ├── user_info.py ├── grpc_m │ ├── client.py │ ├── send_data_to_vector.py │ ├── vector_service.proto │ ├── vector_service_pb2.py │ ├── vector_service_pb2_grpc.py ├── main.py ├── readme.md 项目功能:
用户通过抖音扫码登录 用户上传文档 将文档转换为向量 获取抖音用户公开信息 评论抖音视频
class Documents(SQLModel, table=True):
id: UUID4 = Field(default_factory=uuid.uuid1, primary_key=True,index=True) # 使用 UUID v1 作为主键
open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True) # 关联到用户表的外键
path: str = Field(nullable=False) # 相对路径
status: int = Field(nullable=False) # 文档状态
update_time: datetime = Field(default_factory=datetime.now) # 创建时间、更新时间
__table_args__ = (UniqueConstraint('open_id', 'path', name='uq_documents'),)
如上向量文档数据库模型,用来存储用户自定义上传的文档,以便于后续向量化处理。 同时用户自己也有许多视频,我需要把视频也转化成文本,每一个视频 item_id ,对应一个文档 Documents.id ,并且进行向量化。 对于自定义文档和视频文本两者的数据关系,我应该如何定义表结构? 如果自定义文档我假设有一个分类表,可以按不同文档分类:
class Categories(SQLModel, table=True):
id: UUID4 = Field(default_factory=uuid.uuid1, primary_key=True) # 使用 UUID v1 作为主键
open_id: str = Field(foreign_key="useroauthtoken.open_id",index=True) # 关联到用户表的外键
name: str = Field(default="default", index=True) # 分类的名称,添加索引以优化查询性能
update_time: datetime = Field(default_factory=datetime.now) # 创建时间、更新时间
# 添加联合唯一约束
__table_args__ = (UniqueConstraint('open_id', 'name', name='uq_open_id_ctname'),)
class DocumentCategories(SQLModel, table=True):
id: UUID4 = Field(foreign_key="documents.id",index=True, primary_key=True) # 关联到文档表的外键
category_id: UUID4 = Field(foreign_key="categories.id",index=True) # 关联到分类表的外键
__table_args__ = (UniqueConstraint('id', 'category_id', ),)
那么视频对应的一个文档,又该如何关联这些表结构或分类?给我最佳合理的程序设计、表结构设计架构。
# Event: {'event': 'item_comment_reply', 'client_key': 'aw6aipmfdtplwtyq', 'from_user_id': '_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy', 'to_user_id': '_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk', 'content': CommentContent, 'log_id': '20240227180303CB402D84F669A90052F5'}
# CommentContent: {"at_user_id":"","avatar":"https://p11.douyinpic.com/aweme/720x720/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143","comment_id":"@9VxS1/qCUc80K2etd8wkUc791mHpPPGEPZB1rgyvKVQRavj960zdRmYqig357zEBaMZS/8P5ZrVUQVC/27oJAA==","comment_user_id":"_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy","content":"测试","create_time":1709028183,"digg_count":0,"level":1,"nick_name":"程序员马工","parent_id":"@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ==","reply_comment_total":0,"reply_to_comment_id":"","reply_to_item_id":"@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ=="}
class CommentContent(SQLModel, table=True):
id: int = Field(primary_key=True)
at_user_id: Optional[str] = Field(default=None)
avatar: Optional[str] = Field(default=None)
# 该条评论的id
comment_id: Optional[str] = Field(default=None)
# 发评论的用户uid
comment_user_id: Optional[str] = Field(default=None)
# 评论内容
content: Optional[str] = Field(default=None)
# 该评论回复的总父级评论。也就是说在视频中直接看到的评论。
parent_id: Optional[str] = Field(default=None)
reply_comment_total: Optional[int] = Field(default=None)
reply_to_comment_id: Optional[str] = Field(default=None)
# 视频 id
reply_to_item_id: Optional[str] = Field(default=None)
nick_name: Optional[str] = Field(default=None)
event: "Events" = Relationship(back_populates="content", link_model=EventCommentLink)
class Events(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
event: str
client_key:str
# 发起评论的人,可以是自己,也可以是别人
from_user_id: str
# to_user_id 一直是自己的 open_id,你授权了思维链管理视频评论,订阅事件,当你自己的视频被评论时,to user 就是自己
to_user_id: str
# 使用 Relationship 定义与 CommentContent 的关系
content: Optional[CommentContent] = Relationship(back_populates="event", link_model=EventCommentLink)
log_id:str
class EventRepository(DouyinBaseRepository):
def __init__(self, engine=engine):
super().__init__(CommentContent, engine)
self.model:Events
def save_item_comment_reply(self, data:dict) -> Tuple[Events,CommentContent]:
with Session(self.engine) as session:
content = json.loads(data.pop("content"))
content_model = CommentContent(**content)
event_model = Events(**data)
event_model.content = content_model
session.add(event_model)
session.commit()
return event_model,content_model
上述模型是参考的 SQLmodel 官方的代码风格,定义了3个数据表,并用表关联的方式 save_item_comment_reply 存放数据。具体说明请看注释。 不过 parent_id 、 reply_to_comment_id 是两个复杂的字段, 假如用户在视频中发表了评论,这条评论视为根极评论假设 comment_id = 1,则 parent_id 是视频 id 也就是 reply_to_item_id, reply_to_comment_id='' ,因为仅仅是评论视频而没有产生回复。
如果 根极评论 comment_id = 1 被人评论,则产生子评论 comment_id = 2, parent_id 则变成 comment_id = 1 ,reply_to_comment_id='' ,因为仅仅是评论根极评论而没有产生回复。
如果子评论 comment_id = 2 被人回复,则产生新的评论数据 comment_id = 3 , 因为在根极评论 comment_id = 1下的回复,父级 parent_id 仍然是 comment_id = 1 ,但是 reply_to_comment_id=comment_id = 2
基于以上逻辑,能否根据 comment_id 、parent_id 、 reply_to_comment_id 等这几个重要的字段递归查询发生了哪些对话。然后按照顺序返回对话内容
示例数据: data = {'event': 'item_comment_reply', 'from_user_id': 'mg', 'to_user_id': 'wang', 'content': {data_content}...} data_content ={"comment_id":"@9VxS1","comment_user_id":"mg","content":"这是编程吗","parent_id":"video1","reply_comment_total":0,"reply_to_comment_id":"0","reply_to_item_id":"video1",nick_name":"程序员马工"}
假设有json 数据 data 里面有一个字段是 'content' ,它的字段是字典,如示例中的 data_content 。那么python SQLmodel 、SQLAlmodel 2.0 基于 PostgreSQL 是否支持这样的子嵌套数据库模型:
from sqlmodel import Field, SQLModel
from typing import Dict, Any
class Content(SQLModel):
comment_id: str
comment_user_id: str
content: str
parent_id: str
reply_comment_total: int
reply_to_comment_id: str
reply_to_item_id: str
nick_name: str
# 如果你希望这个模型也作为一个Pydantic模型进行验证,可以添加一个`__root__`字段
# 但这在作为数据库模型的一部分时通常是不必要的
#__root__: Dict[str, Any]
class EventData(SQLModel, table=True):
id: int = Field(default=None, primary_key=True)
event: str
from_user_id: str
to_user_id: str
content: Content = Field(sa_column_kwargs={'type': 'jsonb'}) # 使用PostgreSQL的JSONB类型
如果支持,给一个 session.add 示例
from sqlmodel import create_engine, Session
from your_module import EventData, Content # 假设EventData和Content是从你的模块中导入的
# 创建数据库引擎
engine = create_engine('postgresql://user:password@localhost/dbname')
# 创建表(如果它们还不存在)
SQLModel.metadata.create_all(engine)
# 创建一个新的会话
with Session(engine) as session:
# 创建一个Content实例
content_data = Content(
comment_id="@9VxS1",
comment_user_id="mg",
content="这是编程吗",
parent_id="video1",
reply_comment_total=0,
reply_to_comment_id="0",
reply_to_item_id="video1",
nick_name="程序员马工"
)
# 创建一个EventData实例,并将Content实例赋值给它的content字段
event_data = EventData(
event="item_comment_reply",
from_user_id="mg",
to_user_id="wang",
content=content_data
)
# 将EventData实例添加到会话中
session.add(event_data)
# 提交会话以保存更改到数据库
session.commit()
# 会话在with块结束时自动关闭,不需要手动关闭
# 所有在会话中添加并提交的实例现在都应该已经保存到数据库中了
你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、抖音 API 获取用户信息、视频评论回复等的功能。
git ls-files .gitignore
api/jwt.py Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据
api/login.py 抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token
api/comment.py 视频产生评论时,抖音向 Fastapi 后端服务器 post 请求的用户评论 webhook
api/readme.md : 说明文档
api/upload.py 前端请求上传文件的路径
api/weixin_pay.py: 用户通过微信支付扫码充值。充值后才能使用网站的付费应用
config.py : 本项目的配置文件
db/base.py : 数据库增删改查基类
db/docs.py : 用户上传的文档 docs 数据库模型定义,操作数据库的实现
db/engine.py 数据库引擎,用法: from db.engine import engine,create_all_table
db/readme.md 本目录的说明文档
db/user.py 用户登录的相关数据库定义和模型 class DouyinBaseRepository(BaseRepository):
def __init__(self, model: SQLModel, engine=engine):
super().__init__(model, engine)
def add_or_update(self, dict_data: dict) -> SQLModel:
class UserInfoRepository(DouyinBaseRepository):
def __init__(self, engine=engine):
super().__init__(UserInfo, engine)
self.model:UserInfo
class UserOAuthRepository(DouyinBaseRepository):
def __init__(self, engine=engine):
super().__init__(UserOAuthToken, engine)
self.model:UserOAuthToken
douyin/access_token.py 抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息 async def get_access_token(code) async def refresh_access_token(refresh_token):
db/comment.py 抖音 API 接口,用户对自己的视频评论进行回复。 示例: async def reply_to_comment(open_id: str, access_token: str, content: str='', item_id:str='', comment_id='')
douyin/user_info.py 抖音 API 接口,获取抖音用户公开信息
grpc_m/client.py gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中
grpc_m/send_data_to_vector.py 发送文档数据到向量服务器
grpc_m/vector_service.proto gRPC 协议
grpc_m/vector_service_pb2.py grpc_m/vector_service_pb2.pyi grpc_m/vector_service_pb2_grpc.py
main.py 主程序入口,已经完成代码
readme.md
现在 db 和 douyin 目录还是相对独立的,保持单一职责,没有太多耦合。可是当 comment 评论发生的时候,需要一系列复杂判断,例如向数据库 db 获取用户 id 等信息,从 id 中获取数据库用户文档元数据 docs ,回复 reply_to_comment 时还需要判断 access_token 是否过期,refresh_access_token还需要向db数据库查询。因此有了非常多的逻辑和耦合
因此我想新建一个文件: douyin/manage_user.py
python from db.user import UserInfo,UserOAuthRepository,UserOAuthToken,UserInfoRepository from db.docs import DocStatus,DocumentCategories,DocumentCategoriesRepository,Documents,DocumentBase,DocumentsRepository from douyin.access_token import get_access_token,refresh_access_token from douyin.comment import reply_to_comment
class UserManage:
def __init__(self, open_id) -> None:
self.open_id = open_id
def get_token_model(self) -> UserOAuthToken:
res = UserOAuthRepository().select(UserOAuthToken.open_id == self.open_id)
return res.first()
def get_info_model(self) -> UserInfo:
res = UserInfoRepository().select(UserInfo.open_id == self.open_id)
return res.first()
def get_access_token(self) -> str:
model = self.get_token_model()
return model.access_token
用来统一管理所有业务。 get_access_token 理应 自行判断 token过期,如果过期则自动刷新 access_token ,如果刷新 token 也过期,才需要提示错误。
这是 model :
class UserOAuthToken(SQLModel, table=True):
id: Optional[int] = Field(primary_key=True)
access_token:str
expires_in: Optional[int] = None
open_id:str = Field(index=True)
refresh_expires_in: Optional[int] = None
refresh_token:str
scope: str
update_time: datetime = Field(default_factory=datetime.now) # 添加时间戳字段
__table_args__ = (UniqueConstraint('open_id'),)
expires_in int64 access_token 接口调用凭证超时时间,单位(秒)
refresh_expires_in int64 refresh_token 凭证超时时间,单位(秒)
refresh_access_token 函数正常示例:
{
"data": {
"description": "",
"error_code": 0,
"expires_in": "86400",
"refresh_token": "refresh_token"
},
"message": "success"
}
异常示例
{
"data": {
"description": "Parameter error",
"error_code": 2100005
},
"extra": {
"logid": "2020070614111601022506808001045D59",
"now": 1594015876138
}
}
我是否可以直接修改数据库字段 expire_in ,表明 token 过期时间?这样才能判断字段是否过期,如果需要,请帮我完成代码
## 项目简介 swl-douyin
你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、抖音 API 获取用户信息、视频评论回复等的功能。
git ls-files
.gitignore
api/jwt.py
Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据
api/login.py
抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token
api/readme.md :
说明文档
api/swl.http
http 请求草稿
api/upload.py
前端请求上传文件的路径
api/weixin_pay copy.py
api/weixin_pay.py:
用户通过微信支付扫码充值。充值后才能使用网站的付费应用
config.py :
本项目的配置文件
db/base.py :
数据库增删改查基类
db/docs.py :
用户上传的文档 docs 数据库模型定义,操作数据库的实现
db/engine.py
数据库引擎,用法: from db.engine import engine,create_all_table
db/readme.md
本目录的说明文档
db/user.py
用户登录的相关数据库定义和模型
douyin/access_token.py
抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息
douyin/user_info.py
抖音 API 接口,获取抖音用户公开信息
grpc_m/client.py
gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中
grpc_m/send_data_to_vector.py
发送文档数据到向量服务器
grpc_m/vector_service.proto
gRPC 协议
grpc_m/vector_service_pb2.py
grpc_m/vector_service_pb2.pyi
grpc_m/vector_service_pb2_grpc.py
main.py
主程序入口,已经完成代码
readme.md
在另一个微服务 LangChain 项目中 main.py 如下:
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
from concurrent import futures
import time
import grpc
import logging
import uuid
from db_vector.vector_for_douyin import save_user_doc_to_vector # 假设这里有必要的导入
from grpc_m import vector_service_pb2, vector_service_pb2_grpc
from config import MNT_DOUYIN_DATA
class VectorService(vector_service_pb2_grpc.VectorServiceServicer):
def SaveDocToVector(self, request, context):
category_id = request.category_id
user_doc_relative_path = request.user_doc_relative_path
mnt_user_docs_path = os.path.join(MNT_DOUYIN_DATA, user_doc_relative_path)
if not mnt_user_docs_path:
return vector_service_pb2.SaveDocToVectorResponse(status=vector_service_pb2.ErrorCode.MNT_DOUYIN_DATA_ERROR)
status = save_user_doc_to_vector(category_id, mnt_user_docs_path) # 这个函数需要根据实际情况来实现
return vector_service_pb2.SaveDocToVectorResponse(status=status)
def serve():
port = "18600"
server = grpc.server(futures.ThreadPoolExecutor(max_workers=30))
vector_service_pb2_grpc.add_VectorServiceServicer_to_server(VectorService(), server)
server.add_insecure_port("0.0.0.0:" + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()
if __name__ == '__main__':
serve()
gRPC grpc_m/vector_service.proto 文件:
syntax = "proto3";
package grpc_m;
// 定义错误枚举类型
enum ErrorCode {
SUCCESS = 0;
DOC_CONVERT_ERROR = 1;
VECTOR_SERVER_ERROR = 2;
}
service VectorService {
rpc SaveDocToVector (SaveDocToVectorRequest) returns (SaveDocToVectorResponse) {}
}
message SaveDocToVectorRequest {
string category_id = 1;
string user_doc_relative_path = 2;
}
message SaveDocToVectorResponse {
ErrorCode status = 1;
}
具体逻辑:前端用户上传文档 网站后端服务 Fastapi 接收鉴权成功 - Fastapi 请求 LangChain 微服务将文档保存到向量数据库 - 前端用户发起文档对话 - 网站后端服务 Fastapi 接收鉴权成功 - 后端Fastapi请求 LangChain 微服务 进行向量相似度计算,文本片段组合成 prompt 请求 LLM api ,将 LLM 回复的结果给后端 - 后端服务 Fastapi将结果返回给前端用户
目前我已经完成了 api/upload.py 文档上传 - status = save_user_doc_to_vector(category_id, mnt_user_docs_path) 保存在向量 请在这两个微服务 Fastapi 后端和 LangChain 项目中,实现文档对话的代码,你会如何实现?包括在两个项目中的 shell 指令新建目录及文件发给我
你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、对话聊天测试的功能。
git ls-files .gitignore
api/jwt.py Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据
api/login.py 抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token
api/readme.md : 说明文档
api/swl.http http 请求草稿
api/upload.py 前端请求上传文件的路径
api/weixin_pay copy.py api/weixin_pay.py: 用户通过微信支付扫码充值。充值后才能使用网站的付费应用
config.py : 本项目的配置文件
db/base.py : 数据库增删改查基类
db/docs.py : 用户上传的文档 docs 数据库模型定义,操作数据库的实现
db/engine.py 数据库引擎,用法: from db.engine import engine,create_all_table
db/readme.md 本目录的说明文档
db/user.py 用户登录的相关数据库定义和模型
douyin/access_token.py 抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息
douyin/user_info.py 抖音 API 接口,获取抖音用户公开信息
grpc_m/client.py gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中
grpc_m/send_data_to_vector.py 发送文档数据到向量服务器
grpc_m/vector_service.proto gRPC 协议
grpc_m/vector_service_pb2.py grpc_m/vector_service_pb2.pyi grpc_m/vector_service_pb2_grpc.py
main.py 主程序入口
readme.md
在另一个微服务 vector 项目中主文件如下:
python
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(file)))
from concurrent import futures
import time
import grpc
import logging
import uuid
from db_vector.vector_for_douyin import save_user_doc_to_vector # 假设这里有必要的导入
from grpc_m import vector_service_pb2, vector_service_pb2_grpc
from config import MNT_DOUYIN_DATA
class VectorService(vector_service_pb2_grpc.VectorServiceServicer):
def SaveDocToVector(self, request, context):
category_id = request.category_id
user_doc_relative_path = request.user_doc_relative_path
mnt_user_docs_path = os.path.join(MNT_DOUYIN_DATA, user_doc_relative_path)
if not mnt_user_docs_path:
return vector_service_pb2.SaveDocToVectorResponse(status=vector_service_pb2.ErrorCode.MNT_DOUYIN_DATA_ERROR)
status = save_user_doc_to_vector(category_id, mnt_user_docs_path) # 这个函数需要根据实际情况来实现
return vector_service_pb2.SaveDocToVectorResponse(status=status)
def serve():
port = "18600"
server = grpc.server(futures.ThreadPoolExecutor(max_workers=30))
vector_service_pb2_grpc.add_VectorServiceServicer_to_server(VectorService(), server)
server.add_insecure_port("0.0.0.0:" + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()
if name == 'main':
serve()
swl-douyin 向 vector 向量服务器发起请求,调用 SaveDocToVector 将文档保存到向量服务器,然后向量服务器返回向量数据库的有关 id 等信息,现在 swl-douyin 可以根据自己的向量文档与 LLM 大模型对话,我是通过 API 的方式调用 LLM 模型基于LangChain 框架,我是否新建另一个微服务称为 LangChain-LLM ,来对 vector 微服务发起请求然后返回进行文档对话聊天,还是应该跟 vector 向量服务器用的是同一个微服务?因为 vector 本质上也是调用了 LangChain 的内置库来做文本向量转换的。
我的设想: 前端用户上传文档 网站后端服务 Fastapi 接收鉴权成功 - Fastapi 请求 vector 微服务将文档保存到向量数据库 - 前端用户发起文档对话 - 网站后端服务 Fastapi 接收鉴权成功 - 后端Fastapi请求 LangChain-LLM 微服务 - LangChain-LLM微服务 请求 vector 进行向量相似度计算,并返回文本片段 - LangChain-LLM 微服务收到文本片段组合成 prompt - LangChain-LLM 微服务请求 LLM api - LangChain-LLM 微服务将返回的结果返回给后端 - 后端服务 Fastapi将结果返回给前端用户
另一个设想是不需要 LangChain-LLM 微服务,而是使用 vector 微服务,向量计算向量搜素和LLM请求都作为一个项目整体。
我的上述文档目录和项目架构是否合理?给我改进的建议
# 回复评论事件
你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、抖音 API 获取用户信息、视频评论回复等的功能。
git ls-files
.gitignore
api/jwt.py
Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据
api/login.py
抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token
api/readme.md :
说明文档
api/swl.http
http 请求草稿
api/upload.py
前端请求上传文件的路径
api/weixin_pay copy.py
api/weixin_pay.py:
用户通过微信支付扫码充值。充值后才能使用网站的付费应用
config.py :
本项目的配置文件
db/base.py :
数据库增删改查基类
db/docs.py :
用户上传的文档 docs 数据库模型定义,操作数据库的实现
db/engine.py
数据库引擎,用法: from db.engine import engine,create_all_table
db/readme.md
本目录的说明文档
db/user.py
用户登录的相关数据库定义和模型
douyin/access_token.py
抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息
douyin/user_info.py
抖音 API 接口,获取抖音用户公开信息
grpc_m/client.py
gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中
grpc_m/send_data_to_vector.py
发送文档数据到向量服务器
grpc_m/vector_service.proto
gRPC 协议
grpc_m/vector_service_pb2.py
grpc_m/vector_service_pb2.pyi
grpc_m/vector_service_pb2_grpc.py
main.py
主程序入口,已经完成代码
readme.md
帮我在 douyin/comment.py 写一个向抖音 API 发起回复视频评论的函数,以供给别的任务调用:
curl --location --request POST 'https://open.douyin.com/item/comment/reply/?open_id=43eae150-12eb-4035-8d40-3f9a2bfcdf4e' \
--header 'Content-Type: application/json' \
--header 'access-token: act.0308b91b5f4858eedd25b827fa7ed69ewKUKlr2o6b9zZOe17m4iKlMyZjJB' \
--data-raw '{
"content": "评论",
"item_id": "@9VwW0qOBUN9mJyD3bNxvGc791GLuPPGFOJx4rQOvLVAXbvj+60zdRmYqig357zEBXe9yX1kW9iI9RBd5IYhjRg==",
"comment_id": "@9VwW0qOBUN9mJyD3bNxvGc791GLuP/mAOZ1yqQmlK1QQbfX460zdRmYqig357zEBlemy4tGnuHCtvx71JWmUEw=="
}'
在另一个文件的 task 中,我可以这样调用:
async def comment_respone_task(data):
comment_data = {
"comment_id": data.get("content").get("comment_id"),
"content": data.get("content").get("content"),
"from_user_id": data.get("from_user_id"),
"reply_to_item_id": data.get("content").get("reply_to_item_id"),
}
comment_model:Comment = CommentRepository().add_or_update(Comment(**comment_data))
res = CategoriesRepository().select(Categories.open_id == comment_data.get("from_user_id"))
categ:Categories = res.first()
if categ:
res = await langchain_chat(categ.id, comment_data.get("content"))
# 以下是向抖音发起评论的请求
else:
logger.error(f"can not find {comment_data.get("from_user_id")} in CategoriesRepository")
comment_model.status = CommentStatus.ERROR
CommentRepository().add_or_update(comment_model)
请问 Fastapi 是否支持创建异步 client 让 douyin/comment.py 文件请求?如果不支持则使用 httpx
你当前在 open-douyin 这个目录下面。这个项目是一个基于 Fastapi 框架的网站后端。用户实现授权第三方平台抖音登录,注册到我的网站中。网站包含了文档上传、删除文档、文档向量转换、抖音 API 获取用户信息、视频评论回复等的功能。
git ls-files
.gitignore
api/jwt.py
Fastapi 用户鉴权登录,扫码后会通过 jwt token 解析到用户的 open_id,查询 open_id 是否在数据库中,如果存在说明登录成功,允许用户访问后台数据
api/login.py
抖音扫码登录后,前端通过扫码结果的 code 信息请求 login 到这个端点上,然后 login.py 通过 code 访问 抖音 API 如果返回成功,则登录成功,并记录和返回携带用户 open_id 的 Token
api/readme.md :
说明文档
api/swl.http
http 请求草稿
api/upload.py
前端请求上传文件的路径
api/weixin_pay copy.py
api/weixin_pay.py:
用户通过微信支付扫码充值。充值后才能使用网站的付费应用
config.py :
本项目的配置文件
db/base.py :
数据库增删改查基类
db/docs.py :
用户上传的文档 docs 数据库模型定义,操作数据库的实现
db/engine.py
数据库引擎,用法: from db.engine import engine,create_all_table
db/readme.md
本目录的说明文档
db/user.py
用户登录的相关数据库定义和模型
douyin/access_token.py
抖音 API 接口,请求用户 access_toekn ,用户获取抖音用户公开信息
douyin/user_info.py
抖音 API 接口,获取抖音用户公开信息
grpc_m/client.py
gRPC 客户端,用于将文档发送到我的另一个 微服务 vector server 并转换成向量数据存放在该服务中
grpc_m/send_data_to_vector.py
发送文档数据到向量服务器
grpc_m/vector_service.proto
gRPC 协议
grpc_m/vector_service_pb2.py
grpc_m/vector_service_pb2.pyi
grpc_m/vector_service_pb2_grpc.py
main.py
主程序入口,已经完成代码
readme.md
现在我新建了一个 api/webhook.py :
import json
import hashlib
import hmac
from fastapi import FastAPI,APIRouter, File, HTTPException, Depends, Request,Header, UploadFile
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import logger,CLIENT_KEY
webhook_route = APIRouter()
@webhook_route.post("/webhook")
async def webhook(request: Request):
data = await request.json()
signature = request.headers.get('X-Douyin-Signature')
bytes = request.body()
logger.info(f"{bytes} {signature}")
verify = verify_signature(request.body(), signature, CLIENT_KEY)
logger.info(f"{verify}")
# 验证签名
# if not verify_signature(request.body(), signature, CLIENT_KEY):
# raise HTTPException(status_code=403, detail="Invalid signature")
event = data.get("event")
# 处理验证请求
if event == "verify_webhook":
challenge = data.get("content").get("challenge")
return JSONResponse(content={"challenge": challenge})
handle_event(data)
# 处理其他类型的消息
client_key = data.get("client_key")
from_user_id = data.get("from_user_id")
content = data.get("content")
log_id = data.get("log_id")
# 在这里添加处理不同类型消息的逻辑
# ...
# 返回成功的响应
return JSONResponse(status_code=200, content={"message": "success"})
async def handle_event(data:dict):
pass
抖音会对 /webhook post 如下信息:
{
"event": "item_comment_reply",
"from_user_id": "",
"to_user_id": "",
"client_key": "",
"content": {}
}
其中 "content" 的内容为字典,内容示例如下:
{
"comment_id":"", //评论id
"comment_user_id", //发评论的用户uid
"content":, //评论内容
"create_time":123123, //评论创建时间(秒级时间戳)
"digg_count":0, //该评论的点赞量
"reply_comment_total":1, //该评论下的回复评论量
"reply_to_comment_id":"", //该评论回复的上一级评论的评论id
"reply_to_item_id":"@9VxT0uuFWsE7M3Koc4olFM791WbsNPGKOp1wrgiiJ1ERbfD060zdRmYqig357zEB1tSgbExoci7R0e1yFxAIMw==",//该评论回复的视频id
"at_user_id": "", //评论@的用户uid
"avatar": "https://uuue/ehdne",
"nick_name": "xiaoming"
}
简要流程说明: 接收并验证 Webhook 请求:
首先,验证请求的签名以确保其来自抖音服务器。 解析请求体,获取事件类型和相关数据。 处理特定事件:
对于 item_comment_reply 事件,提取所需的字段,如 comment_id、content、from_user_id 和 reply_to_item_id。 数据存储与状态标记:
将提取的数据存储到数据库中,同时标记一个状态字段,例如 processing,表示该评论正在处理中。 存储时,可以考虑使用事务来确保数据的一致性。 异步处理:
使用 FastAPI 的后台任务功能(如依赖于 Celery 的任务队列)异步处理评论内容。这样可以避免阻塞 webhook 端点。 在后台任务中,可以将评论内容发送给 AI 大模型或其他微服务进行处理。 处理完成后的操作:
一旦 AI 大模型或其他微服务处理完成,更新数据库中的状态字段,例如设置为 processed。 同时,存储处理结果或相关标识。 回复抖音 API:
根据处理结果,使用适当的逻辑构造回复内容。 通过调用抖音 API 发送回复评论的请求。注意,这一步可能需要处理 API 调用的结果,例如检查是否成功发送了回复。
我想把收到的用户评论 "comment_id" 和 "content" 、"from_user_id" "reply_to_item_id" 取出来,进行后续处理。因为知道这几个参数才能准确定位是哪一个用户、哪一条视频、哪一条评论。随后我可能会根据 content 发送给 AI 大模型或者其他微服务,进行一系列计算和处理。最终处理完成后,会将之前得到的参数请求给 抖音 API ,回复该条视频的评论信息。 我的思路看看是否正确:并完成功能:
用户在抖音生活服务内的数据变更时,可通过 webhook 的方式将消息推送给开发者,开发者可以根据需要订阅不同消息。消息仅包含基本的信息,如需要查询详细数据,可在收到消息后主动调用相关接口获取明细数据。 注意事项: 目前仅支持同一个应用配置一个接收消息地址,开发者需根据event字段区分消息类型对content内容进行不同的处理。 前三步参考文档技术服务商接入指南和自研开发者接入指南。 第四步提供的消息地址需满足抖音侧请求验证功能,才能配置成功。 抖音侧发送的 POST 验证请求示例如下:
{ "event": "verify_webhook", "client_key": "", "content": { "challenge": 12345 } }
当开发者收到抖音的验证请求时,需要解析出 challenge 值,并立即返回该 challenge 值作为响应。需要注意,返回内容需要放入响应的 Body 里,不能直接返回;并且返回内容为 text 格式的 json 数据。响应示例如下:
{ "challenge": 12345 }
当 推送事件不是 verify_webhook 时: HTTP method POST Msg-Id :同一实体下同一action的msg_id相同,服务商可根据msg_id对于消息去重 X-Douyin-Signature: 抖音侧签名,服务商可根据签名判断该消息是否来自抖音开放平台 Content-Type: 固定值application/json
请求体字段 event 消息类型,用于区分各类消息
client_key 对应服务商平台或开发者平台中的APPID,应用ID
from_user_id 标识用户身份的openId,同一用户在不同的APPID中openId不相同
content 消息内容,根据需要解析消息内容,不同类型的消息内容不同
log_id 抖音内部日志id,可提供给抖音方便排查问题
请求体示例
{
"event": "life_trade_order_notify",
"client_key": "axxxxxxxxxxxxx",
"from_user_id": "f6e35c98-1e53-4943-ad6d-f476f869de",
"content": "{\"action\": \"pay_success\",\"msg_time\": 1665991178,\"order\": {\"order_id\": \"123\",\"pay_amount\": 1, \"original_amount\": 1, \"account_id\": \"123\",\"create_time\": 1665991178,\"pay_time\": 1665991178}}",
"log_id": "202210101930530102281180650970B5AF"
}
验签方式 用户可通过请求头中的 X-Douyin-Signature 字段判断该消息是否来自抖音开放平台。 抖音服务端会将应用的 AppSecret 和消息体使用 sha1 哈希作为 X-Douyin-Signature 的 value。开发者可以自行使用 AppSecret 和收到的消息体进行 sha1 哈希,与该请求头进行比对,确认消息推送请求来自抖音。 java 版验签 demo
import org.apache.commons.codec.digest.DigestUtils; // sha1算法库
// 获取消息中body
String str, wholeStr = "";
try{
BufferedReader br = re.getReader();
while((str = br.readLine()) != null){
wholeStr += str;
}
} catch (Exception e){
log.warn("获取请求内容失败");
}
// 获取请求头中的加签信息
String signature = re.getHeader("X-Douyin-Signature");
String data = appSecret + wholeStr;
String sign = DigestUtils.sha1Hex(data);
if(!sign.equals(signature)){
log.error("验签失败");
}
响应内容 开发者收到消息推送后,http code 响应 200 且响应时间小于 3s,抖音侧即认为推送成功。
若开发者 http 响应 code 非 200 或响应时间超过 3s,抖音侧会间隔 500ms 发起重试,最大重试次数为 3 次。
抖音侧收到成功请求时也可能会继续重复推送,请务必使用请求头中 Msg-Id 进行消息去重处理。
上述是抖音官方事件订阅的文档说明,你作为一个开发者,需要完成上述所需功能,并成功订阅事件通知。当前路径 api 目录中,是 Fastapi 框架下的项目 endpoint 路径,你需要用 shell 指令创建相关文件,然后完成 py 代码:
from fastapi import FastAPI,APIRouter, File, HTTPException, Depends, Request,Header, UploadFile
from fastapi.responses import JSONResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from config import logger,CLIENT_KEY
webhook_route = APIRouter()
@webhook_route.post("/webhook")
async def webhook(request: Request):
data = await request.json()
signature = request.headers.get('X-Douyin-Signature')
logger.info(f"{request.headers}")
logger.info(f"{data}")