job_video_asr.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. import asyncio
  2. import json
  3. from typing import Any
  4. from joblib import Memory
  5. import time
  6. import os
  7. import threading
  8. import os
  9. import sys
  10. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  11. from enum import Enum, auto
  12. from grpc_m.client import save_doc_vector
  13. from tool.video_get import get_iframe_by_item_id,get_video_download_urls,download_video,download_video_by_item_id
  14. from tool.exec_asr_client import run_asr_client
  15. from config import MNT_DOUYIN_DATA, TEMP_DIR
  16. from db.user import *
  17. from db.docs import DocumentsRepository
  18. # 定义视频处理状态的枚举类
  19. class VideoProcessStatus(Enum):
  20. INIT = (auto(), "正在启动。。。")
  21. GET_DOWNLOAD_URLS = (auto(), "获取视频无水印链接")
  22. DOWNLOADED_VIDEO = (auto(), "下载视频成功")
  23. VIDEO_ASR = (auto(), "视频转文本")
  24. USER_ADD_DOC = (auto(), "新增视频文档保存到 _video 目录、数据库记录")
  25. VECTOR_SAVED = (auto(), "向量转换")
  26. @property
  27. def code(self):
  28. """获取状态码"""
  29. return self.value[0]
  30. @property
  31. def msg(self):
  32. """获取状态码信息"""
  33. return self.value[1]
  34. @classmethod
  35. def all_status_descriptions(cls):
  36. return {item.code: item.msg for item in cls}
  37. class VideoProcessManager:
  38. def __init__(self, video_item:VideoData):
  39. # 注意,由于 joblib 无法 pikle 复杂对象,尤其是关系表或者连接了数据库节点,因此需要将内容提取出简单的变量
  40. self.video_id = video_item.id
  41. self.filename = str(video_item.id)
  42. self.item_id = video_item.item_id
  43. def get_process(self) -> Optional[Dict[str, Any]]:
  44. mdata = VideoItemDocRepo().mdata(self.video_id, is_update=False)
  45. if mdata and 'process' in mdata:
  46. return mdata['process']
  47. return None
  48. def update_process(self, status:VideoProcessStatus, msg:str="", err_code: int = 0, data: Optional[Dict[str, Any]] = None):
  49. process_dict = self.get_process() or {}
  50. process_dict['status'] = status.code
  51. if not msg:
  52. msg = status.msg
  53. process_dict['msg'] = msg
  54. process_dict['err_code'] = err_code
  55. process_dict['data'] = data or {}
  56. process_dict['total_status'] = len(VideoProcessStatus.__members__.items())
  57. # 更新mdata
  58. VideoItemDocRepo().mdata(self.video_id, {'process': process_dict})
  59. return process_dict
  60. async def video_process_init(self):
  61. video_temp_dir = os.path.join(TEMP_DIR, "video")
  62. if not os.path.exists(video_temp_dir):
  63. os.makedirs(os.path.dirname(video_temp_dir))
  64. # 可以在这里初始化微服务、清除旧数据、节点健康检查等等
  65. self.update_process(VideoProcessStatus.INIT)
  66. logger.info(f"video_process_init")
  67. async def get_download_urls(self, item_id):
  68. response = await get_iframe_by_item_id(item_id)
  69. if response.get("err_no"):
  70. msg = "获取iframe失败"
  71. process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, 1, data={"res":{response}})
  72. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
  73. raise Exception(msg)
  74. data = response.get("data")
  75. iframe = data.get("iframe_code")
  76. response = await get_video_download_urls(iframe)
  77. err_code = response.get("code")
  78. if err_code:
  79. msg = "获取无水印链接失败"
  80. process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, err_code=1, msg=msg, data=response)
  81. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
  82. raise Exception(msg)
  83. urls = response.get("data")
  84. process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, data=urls)
  85. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {process_dict['msg']}成功")
  86. return urls
  87. async def download_video_from_urls(self, urls:List) ->str:
  88. save_video_path = os.path.join(TEMP_DIR, "video", f"{self.filename}.mp4")
  89. try:
  90. res_path = await download_video(urls, save_video_path)
  91. process_dict = self.update_process(VideoProcessStatus.DOWNLOADED_VIDEO, data=res_path)
  92. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {process_dict['msg']}")
  93. return res_path
  94. except Exception as e:
  95. self.update_process(VideoProcessStatus.DOWNLOADED_VIDEO, msg=e, err_code=1)
  96. async def video_to_asr(self, save_video_res_path):
  97. video_asr_json_path = os.path.join(TEMP_DIR, "video", f"{self.filename}.json")
  98. if not await run_asr_client(save_video_res_path, video_asr_json_path):
  99. msg = f"视频语音识别失败,{save_video_res_path} \t {video_asr_json_path}"
  100. process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, err_code=1, msg=msg, data={"video_path":save_video_res_path})
  101. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
  102. raise Exception(msg)
  103. msg = f"视频语音识别成功 : {video_asr_json_path}"
  104. process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, msg=msg, data={"video_path":save_video_res_path})
  105. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
  106. video_asr_json = json.load(open(video_asr_json_path))
  107. asr_text = video_asr_json.get("text")
  108. if not asr_text:
  109. msg = f"获取不到语音转文本的结果 video_asr_json_path: {video_asr_json_path}"
  110. process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, err_code=1, msg=msg, data={"video_asr_json_path":video_asr_json_path})
  111. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
  112. raise Exception(msg)
  113. return video_asr_json
  114. async def txt_to_vector(self, video_asr_json):
  115. video_model:VideoData = VideoItemDocRepo().get(self.video_id)
  116. user:User = video_item.user
  117. video_to_txt_path = os.path.join(MNT_DOUYIN_DATA, user.open_id, "docs", "_video", f"{self.filename}.txt")
  118. if not os.path.exists(os.path.dirname(video_to_txt_path)):
  119. os.makedirs(os.path.dirname(video_to_txt_path))
  120. try:
  121. asr_text = video_asr_json.get("text")
  122. with open(video_to_txt_path, "w") as f:
  123. f.write(asr_text)
  124. video_model = VideoItemDocRepo().add_vedio_item_doc(user,item_id,video_to_txt_path)
  125. doc:Documents = video_model.doc
  126. msg = f"文件保存成功 doc: {doc.path} {doc.categories[-1].name} "
  127. process_dict = self.update_process(VideoProcessStatus.USER_ADD_DOC, msg=msg, data=doc.path)
  128. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
  129. save_vector_err = await save_doc_vector(str(doc.id), doc.path)
  130. if save_vector_err:
  131. msg = f"向量转换失败 {doc}"
  132. raise Exception(msg)
  133. msg = f"向量转换成功 doc.path: {doc.path}"
  134. DocumentsRepository().set_status(doc, 100)
  135. process_dict = self.update_process(VideoProcessStatus.VECTOR_SAVED, err_code=0, msg=msg, data=doc.path)
  136. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
  137. except Exception as e:
  138. process_dict = self.update_process(VideoProcessStatus.VECTOR_SAVED, err_code=1, msg=msg, data=video_to_txt_path)
  139. logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
  140. raise Exception(e)
  141. def create_memory(location):
  142. return Memory(location=location, verbose=0)
  143. async def video_item_to_vector(video_item:VideoData):
  144. try:
  145. filename = str(video_item.id)
  146. proces_status = VideoProcessManager(video_item)
  147. mem_path = os.path.join(TEMP_DIR, "joblib_cache", str(video_item.user.id), filename)
  148. logger.info(f"get user menory path {mem_path}")
  149. user_memcache = create_memory(mem_path)
  150. status = video_item.mdata.get("process", {}).get("status", VideoProcessStatus.INIT.code)
  151. # 如果以前执行过 job video_item_to_vector ,则 status != init
  152. # 如果要重新向量计算,可以在数据库中手动将 status 去掉
  153. if status == VideoProcessStatus.INIT.code:
  154. await proces_status.video_process_init()
  155. get_download_urls_cached = user_memcache.cache(proces_status.get_download_urls)
  156. urls = await get_download_urls_cached(video_item.item_id)
  157. download_video_from_urls_cached = user_memcache.cache(proces_status.download_video_from_urls)
  158. save_video_res_path = await download_video_from_urls_cached(urls)
  159. video_to_asr_cached = user_memcache.cache(proces_status.video_to_asr)
  160. video_asr_json = await video_to_asr_cached(save_video_res_path)
  161. txt_to_vector_cached = user_memcache.cache(proces_status.txt_to_vector)
  162. await txt_to_vector_cached(video_asr_json)
  163. except Exception as e:
  164. logger.exception(f"出错了: {e}")
  165. # raise e
  166. return
  167. if __name__ == "__main__":
  168. db_user = UserRepo()
  169. db_video = VideoItemDocRepo()
  170. user:User = db_user.get_by_open_id("_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk")
  171. item_id = "@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ=="
  172. video_item = db_video.get_or_create(user, item_id)
  173. asyncio.run(video_item_to_vector(video_item))
  174. # video_item_to_vector()
  175. # logger.info(f"{len(VideoProcessStatus.__members__.items())}")
  176. # def step1(name):
  177. # print(f"{name} 执行步骤1: 获取视频链接")
  178. # time.sleep(1)
  179. # return name, "http://video.com/1.mp4"
  180. # def step2(name, video_url):
  181. # print(f"{name} 执行步骤2: 下载视频 {video_url}")
  182. # time.sleep(1)
  183. # return name,"/path/1.mp4"
  184. # def step3(name, video_path):
  185. # print(f"{name} 执行步骤3: 提取音频 {video_path}")
  186. # time.sleep(1)
  187. # # raise Exception("微服务器无法访问")
  188. # return name, "/path/1.mp3"
  189. # def step4(name, audio_path):
  190. # print(f"{name} 执行步骤4: 计算语音文本 {audio_path}")
  191. # time.sleep(1)
  192. # return name,"语音文本内容"
  193. # def user_task(name, memory):
  194. # try:
  195. # step1_cached = memory.cache(step1)
  196. # step2_cached = memory.cache(step2)
  197. # step3_cached = memory.cache(step3)
  198. # step4_cached = memory.cache(step4)
  199. # name, video_url = step1_cached(name)
  200. # name, video_path = step2_cached(name, video_url)
  201. # name, audio_path = step3_cached(name, video_path)
  202. # name, text = step4_cached(name, audio_path)
  203. # print(name, text)
  204. # except Exception as e:
  205. # print(f"出错了: {name} {e}")
  206. # def main():
  207. # memory1 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user1"))
  208. # memory2 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user2"))
  209. # memory3 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user3"))
  210. # memory4 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user4"))
  211. # user1_task = threading.Thread(target=user_task, args=("user1", memory1))
  212. # user1_task.start()
  213. # user2_task = threading.Thread(target=user_task, args=("user2", memory2))
  214. # user2_task.start()
  215. # user3_task = threading.Thread(target=user_task, args=("user3", memory3))
  216. # user3_task.start()
  217. # user_task("user4", memory4)