comment.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. import asyncio
  2. import hashlib
  3. import json
  4. from typing import List
  5. import os
  6. import sys
  7. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  8. from db.comment import CommentRepository
  9. from db.docs import CategoriesRepository,Categories
  10. from db.user_oauth import UserOAuthRepository,UserOAuthToken
  11. from config import logger,TEMP_DIR,MNT_DOUYIN_DATA
  12. from grpc_m.send_data_to_vector import langchain_chat,simarity_search
  13. import douyin.comment_reply
  14. from fastapi import FastAPI,APIRouter, File, HTTPException
  15. from db.comment import CommentContent,Events,EventRepository
  16. from db.user_info import UserInfo,UserInfoRepository
  17. from db.docs import DocumentsRepository,Documents
  18. from db.user import User,UserRepo,engine
  19. from db.video_data import VideoData,VideoItemDocRepo
  20. from grpc_m.client import get_doc_vector,save_doc_vector
  21. from sqlmodel import Field, SQLModel,Relationship,create_engine,Session,select,func,Column
  22. from tool.video_get import download_video_by_item_id,get_video_download_urls,download_video,get_iframe_by_item_id
  23. from tool.exec_asr_client import run_asr_client
  24. from enum import Enum
  25. # 定义视频处理状态的枚举类
  26. class VideoProcessStatus(Enum):
  27. INIT = 0
  28. IFRAME_OBTAINED = 1
  29. DOWNLOAD_URLS_OBTAINED = 2
  30. VIDEO_DOWNLOADED = 3
  31. ASR_COMPLETED = 4
  32. TEXT_WRITTEN = 5
  33. VECTOR_SAVED = 6
  34. PROCESSING_ERROR = -1
  35. async def video_item_to_vector(video_item:VideoData):
  36. user = video_item.user
  37. file_name = os.path.splitext(video_item.doc.path)
  38. proccess_status = {"status":VideoProcessStatus.INIT.value, "msg":"获取 iframe "}
  39. item_iframe_data = await get_iframe_by_item_id(video_item.item_id)
  40. url_str = item_iframe_data.get("iframe_code")
  41. if not url_str:
  42. return
  43. logger.info(f"成功获取 item iframe: {url_str}")
  44. # 下载路径不能使用 item_id ,因为它包含特殊字符,使用 id 只有整型,也方便硕源
  45. save_video_path = os.path.join(TEMP_DIR, "video", f"{file_name}.mp4")
  46. if not os.path.exists(os.path.dirname(save_video_path)):
  47. os.makedirs(os.path.dirname(save_video_path))
  48. urls = await get_video_download_urls(url_str)
  49. if not urls:
  50. return
  51. logger.info(f"成功获取无水印链接 : {urls}")
  52. res = await download_video(urls, save_video_path)
  53. if not res:
  54. logger.error(f"download fail item_iframe_data: {item_iframe_data} \t save_video_path: {save_video_path}")
  55. return
  56. logger.info(f"成功下载视频 : {res}")
  57. video_asr_json_path = os.path.join(TEMP_DIR, "video", f"{file_name}.json")
  58. if not await run_asr_client(save_video_path, video_asr_json_path):
  59. return
  60. logger.info(f"视频语音识别成功 : {video_asr_json_path}")
  61. video_asr_json = json.load(open(video_asr_json_path))
  62. asr_text = video_asr_json.get("text")
  63. if not asr_text:
  64. logger.error(f"获取不到语音转文本的结果 video_asr_json_path: {video_asr_json_path}")
  65. return
  66. video_to_txt_path = os.path.join(MNT_DOUYIN_DATA, user.open_id, "docs", "_video", f"{file_name}.txt")
  67. if not os.path.exists(os.path.dirname(video_to_txt_path)):
  68. os.makedirs(os.path.dirname(video_to_txt_path))
  69. with open(video_to_txt_path, "w") as f:
  70. f.write(asr_text)
  71. video_model = VideoItemDocRepo().add_vedio_item_doc(user,item_id,video_to_txt_path)
  72. logger.info(f"写入数据库成功 video_model: {video_model.id}")
  73. doc = video_model.doc
  74. save_vector_err = await save_doc_vector(str(doc.id), doc.path)
  75. if save_vector_err:
  76. logger.error(f"save_doc_vector err {save_vector_err}")
  77. return
  78. logger.info(f"向量转换成功 doc.id: {doc.id}")
  79. DocumentsRepository().set_status(doc, 100)
  80. async def get_video_promt(user:User, event_model:Events):
  81. video_summarize = ""
  82. content = ""
  83. session = Session(engine)
  84. video_item = session.exec(
  85. select(VideoData).where(VideoData.item_id == event_model.content.reply_to_item_id)
  86. ).first()
  87. # 该视频没有提取文案和向量计算
  88. if not video_item:
  89. video_to_txt_path = os.path.join(MNT_DOUYIN_DATA, user.open_id, "docs", "_video", f"{event_model.id}.txt")
  90. video_item = VideoItemDocRepo().add_vedio_item_doc(user, event_model.content.reply_to_item_id, video_to_txt_path)
  91. asyncio.create_task(video_item_to_vector(video_item))
  92. return ""
  93. else:
  94. if video_item.doc.summarize:
  95. video_summarize = f"<summarize>{video_item.doc.summarize}<summarize/>"
  96. doc_query_res = await get_doc_vector(video_item.doc.id, event_model.content.content)
  97. content = ""
  98. chunks = ""
  99. for item in doc_query_res:
  100. chunk_data = item.chunk
  101. metadata_dict = dict(item.metadata)
  102. score_value = item.score
  103. uuid_value = item.uuid
  104. chunks += f'<chunks score="{item.score}">{item.chunk}<chunks/>'
  105. if chunks:
  106. content = f'<content>{chunks}<content/>'
  107. video_promt = f"""<video relpath="{video_item.doc.path}">{video_summarize}{content}<video/>"""
  108. logger.info(f"video_promt: {video_promt}")
  109. return video_promt
  110. def gen_prompt(nickname, content_models:List[CommentContent], video_promt):
  111. role = """
  112. 你是一个抖音视频的创作者的客服助理,你的昵称是'{nick_name}'。你发布了一个视频,<vedio_content> 是视频的部分文案片段。
  113. <chat> 是用户在视频评论、或用户各自的讨论。你需要回答最后一条 chat 信息。""".format(nick_name=nickname)
  114. logger.info(f"{role}")
  115. prompt = """
  116. {role}
  117. <chat>
  118. {chat}
  119. <chat/>
  120. <docs>
  121. {docs}
  122. <docs/>
  123. """
  124. chat_record = ''
  125. for model in content_models:
  126. chat_record += f"{model.nick_name}: {model.content}\n"
  127. return prompt.format(chat=chat_record, role=role, docs = "docs", vedio_content="none")
  128. async def chat_to_langchain(event_model:Events, comment_model:CommentContent, user_model:User):
  129. oauth_model:UserOAuthToken = user_model.oauth
  130. # 递归查找对话
  131. comment_replies = CommentRepository().get_comment_and_replies(comment_model.comment_id)
  132. query = comment_replies[-1].content
  133. video_promt = await get_video_promt(user_model, event_model)
  134. prompt = gen_prompt(user_model.info.nickname, comment_replies, video_promt)
  135. logger.info(f"query:{query} prompt: {prompt}")
  136. # langchain_res = await langchain_chat(str("4ff71182-5c43-497f-ba16-5b3ba252e478"), prompt)
  137. langchain_res = "这是一个示例回复"
  138. if not langchain_res:
  139. logger.error(f"langchain_chat {langchain_res} ")
  140. return
  141. response = await douyin.comment_reply.reply_to_comment(oauth_model.open_id,oauth_model.access_token, content=langchain_res, comment_id=comment_model.comment_id, item_id=comment_model.reply_to_item_id)
  142. if not response.get('data').get('error_code'):
  143. logger.info(f"回复评论成功: {langchain_res}")
  144. else:
  145. # 一般是秘钥过期、参数错误。还有一种特殊情况,两个账号都授权了思维链,一个账号在另一个账号下是 event_model.to_user_id 恰好授权了思维链,但是他是在别的授权
  146. logger.error(f"回复评论失败: {response}")
  147. # async def save_video_item(open_id, item_id):
  148. # hash_object = hashlib.md5(item_id.encode())
  149. # hex_dig = hash_object.hexdigest()
  150. # download_dir = os.path.join(TEMP_DIR,"video")
  151. # if not os.path.exists(download_dir):
  152. # os.makedirs(download_dir)
  153. # file_path = os.path.join(download_dir, hex_dig + ".mp4")
  154. # try:
  155. # file_path = await download_video(item_id,file_path)
  156. # if file_path:
  157. # pass
  158. # except Exception as e:
  159. # logger.exception(e)
  160. # if not os.path.exists(file_path):
  161. # os.remove(file_path)
  162. '''
  163. data = {'event': 'item_comment_reply', 'client_key': 'aw6aipmfdtplwtyq', 'from_user_id': '_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk', 'to_user_id': '_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk', 'content': '', 'log_id': '021708779405655fdbdfdbdfdbdfdbd0000000000000008cb19c7'}
  164. data_content = json.loads(data.get("content"))
  165. data_content = {"at_user_id":"","avatar":"https://p26.douyinpic.com/aweme/720x720/aweme-avatar/tos-cn-i-0813_a2afe121cfee43c7856b1ec0d6997690.jpeg?from=3782654143","comment_id":"@9VxS1/qCUc80K2etd8wkUc791mbgPP2DPZV2qA6mLFEQaPT960zdRmYqig357zEBoZm7vZ+ZZZz6H3mOVdTOlw==","comment_user_id":"_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk","content":"测试","create_time":1708779397,"digg_count":0,"level":1,"nick_name":"王孙草爱搞钱","parent_id":"7259290547288870144","reply_comment_total":0,"reply_to_comment_id":"0","reply_to_item_id":"@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ=="}
  166. 定义:
  167. - from_user_id: 在你视频下评论的人
  168. - to_user_id: 你自己的 open_id ,只有授权了思维链,才会收到自己视频的评论事件
  169. - 评论: 点开视频评论区就能看到的评论列表,或者在视频评论区中被人评论。
  170. - 例如:打开抖音视频,在评论区中,任何类似 “头像 昵称 \n 评论” 的内容,都可以看做一条评论。示例值: parent_id 、 comment_id
  171. - parent_id 父级评论id ,如果该评论没有父级评论,默认使用视频id作为父级
  172. - 回复评论: 回复“视频评论”的评论,也就是点开视频评论中“更多回复”中展示的各种子评论。
  173. - 例如:在抖音视频评论区中,点击“展开更多” ,看到类似 “头像 昵称 > 昵称 \n 评论” 这样的内容,都可以看做是回复
  174. - 示例值: reply_to_comment_id 、 comment_id
  175. - 如果回复了视频评论, reply_to_comment_id = 0
  176. - 如果回复了子评论, reply_to_comment_id = 被回复的 comment_id
  177. '''
  178. # 如果抖音API支持的话,理论上每一条 AI 回复的评论都会添加尾部注释: (此消息由@思维链AI助手辅助生成)。
  179. # 这不仅仅是打广告,也是为了观众区分哪一条是人工回复,哪一条是 AI 回复。
  180. async def item_comment_reply(data):
  181. logger.info(f"new item_comment_reply event, comment_data: {data}")
  182. db_events = EventRepository()
  183. db_comment = CommentRepository()
  184. event_model,comment_model = db_events.save_item_comment_reply(data)
  185. event_model:Events
  186. comment_model:CommentContent
  187. user_model:UserInfo = UserRepo().get_by_open_id(event_model.to_user_id)
  188. # 理论上不太可能 not user_model ,因为用户登录后授权,数据库 User 中就记录了用户信息。
  189. # 登录扫码时 url scopes=item.comment ,则就会收到 item_comment_reply 的回调事件
  190. if not user_model:
  191. logger.info(f"没有授权登录到思维链 {event_model.to_user_id}")
  192. return
  193. '''原则上不能让AI自己回复自己的评论,原因如下
  194. - 如果 AI 回答不完整,想手动回复该条评论,会导致让AI再次回复你自己手动评论的内容
  195. - 如果你回复自己的视频,在评论区阐述自己的观点,AI会回复你这条评论。但这很矛盾,你为什么要自己阐述完又让AI补充?
  196. - 会陷入死循环,自己回复自己。虽然技术上可以做到不让AI回复AI产生的评论,但意义何在?既然你选择手动评论,说明这段对话中已经不需要AI
  197. '''
  198. # 任何一条评论都可能 @某个用户,当 @思维链AI助手 或者 @自己 时,允许回复一次
  199. if comment_model.at_user_id == event_model.to_user_id:
  200. return
  201. if comment_model.comment_user_id == event_model.to_user_id:
  202. return
  203. # 如果是回复他人的视频评论
  204. elif comment_model.reply_to_comment_id == '':
  205. exist_comment:CommentContent = CommentRepository().select(
  206. CommentContent.comment_id == comment_model.parent_id,
  207. CommentContent.comment_user_id != event_model.to_user_id).first()
  208. # 如果存在,说明这条评论自己账号发表的
  209. if exist_comment:
  210. logger.info(f"收到 AI 发表评论的回调 {comment_model} exist_comment_id:{exist_comment.id}")
  211. pass
  212. else:
  213. await chat_to_langchain(event_model, comment_model, user_model)
  214. # 如果是回复他人的回复
  215. elif comment_model.reply_to_comment_id:
  216. # 查询这个回复事件是回复哪一条已有评论。
  217. exist_comment:CommentContent = CommentRepository().select(
  218. CommentContent.comment_id == comment_model.reply_to_comment_id,
  219. CommentContent.comment_user_id == event_model.to_user_id).first()
  220. logger.info(f"reply to exist_comment {exist_comment}")
  221. # 查询到已存在的评论,是回复自己
  222. if exist_comment:
  223. await chat_to_langchain(event_model, comment_model, user_model)
  224. def main():
  225. db_comment = CommentRepository()
  226. comment_model = db_comment.select(
  227. CommentContent.comment_id == "@9VxS1/qCUc80K2etd8wkUc791mHpOPuKPZ11rQqkLFMTb/b160zdRmYqig357zEBvLcp915uLvFyTsyb/e9VYg=="
  228. ).first()
  229. comment_replies = db_comment.get_comment_and_replies(comment_model.comment_id)
  230. promt = gen_prompt("马工", comment_replies)
  231. logger.info(f"{promt}")
  232. if __name__ == "__main__":
  233. main()