| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- import asyncio
- import json
- from typing import Any
- from joblib import Memory
- import time
- import os
- import threading
- import os
- import sys
- sys.path.append(os.path.dirname(os.path.dirname(__file__)))
- from enum import Enum, auto
- from grpc_m.client import save_doc_vector
- from tool.video_get import get_iframe_by_item_id,get_video_download_urls,download_video,download_video_by_item_id
- from tool.exec_asr_client import run_asr_client
- from config import MNT_DOUYIN_DATA, TEMP_DIR
- from db.user import *
- from db.docs import DocumentsRepository
- # 定义视频处理状态的枚举类
- class VideoProcessStatus(Enum):
- INIT = (auto(), "正在启动。。。")
- GET_DOWNLOAD_URLS = (auto(), "获取视频无水印链接")
- DOWNLOADED_VIDEO = (auto(), "下载视频成功")
- VIDEO_ASR = (auto(), "视频转文本")
- USER_ADD_DOC = (auto(), "新增视频文档保存到 _video 目录、数据库记录")
- VECTOR_SAVED = (auto(), "向量转换")
-
- @property
- def code(self):
- """获取状态码"""
- return self.value[0]
- @property
- def msg(self):
- """获取状态码信息"""
- return self.value[1]
- @classmethod
- def all_status_descriptions(cls):
- return {item.code: item.msg for item in cls}
- class VideoProcessManager:
- def __init__(self, video_item:VideoData):
- # 注意,由于 joblib 无法 pikle 复杂对象,尤其是关系表或者连接了数据库节点,因此需要将内容提取出简单的变量
- self.video_id = video_item.id
- self.filename = str(video_item.id)
- self.item_id = video_item.item_id
- def get_process(self) -> Optional[Dict[str, Any]]:
- mdata = VideoItemDocRepo().mdata(self.video_id, is_update=False)
- if mdata and 'process' in mdata:
- return mdata['process']
- return None
-
- def update_process(self, status:VideoProcessStatus, msg:str="", err_code: int = 0, data: Optional[Dict[str, Any]] = None):
- process_dict = self.get_process() or {}
- process_dict['status'] = status.code
- if not msg:
- msg = status.msg
- process_dict['msg'] = msg
- process_dict['err_code'] = err_code
- process_dict['data'] = data or {}
- process_dict['total_status'] = len(VideoProcessStatus.__members__.items())
- # 更新mdata
- VideoItemDocRepo().mdata(self.video_id, {'process': process_dict})
- return process_dict
-
- async def video_process_init(self):
- video_temp_dir = os.path.join(TEMP_DIR, "video")
- if not os.path.exists(video_temp_dir):
- os.makedirs(os.path.dirname(video_temp_dir))
-
- # 可以在这里初始化微服务、清除旧数据、节点健康检查等等
- self.update_process(VideoProcessStatus.INIT)
- logger.info(f"video_process_init")
-
- async def get_download_urls(self, item_id):
- response = await get_iframe_by_item_id(item_id)
- if response.get("err_no"):
- msg = "获取iframe失败"
- process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, 1, data={"res":{response}})
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
- raise Exception(msg)
- data = response.get("data")
- iframe = data.get("iframe_code")
-
- response = await get_video_download_urls(iframe)
- err_code = response.get("code")
- if err_code:
- msg = "获取无水印链接失败"
- process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, err_code=1, msg=msg, data=response)
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
- raise Exception(msg)
- urls = response.get("data")
- process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, data=urls)
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {process_dict['msg']}成功")
- return urls
- async def download_video_from_urls(self, urls:List) ->str:
- save_video_path = os.path.join(TEMP_DIR, "video", f"{self.filename}.mp4")
- try:
- res_path = await download_video(urls, save_video_path)
- process_dict = self.update_process(VideoProcessStatus.DOWNLOADED_VIDEO, data=res_path)
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {process_dict['msg']}")
- return res_path
- except Exception as e:
- self.update_process(VideoProcessStatus.DOWNLOADED_VIDEO, msg=e, err_code=1)
- async def video_to_asr(self, save_video_res_path):
- video_asr_json_path = os.path.join(TEMP_DIR, "video", f"{self.filename}.json")
- if not await run_asr_client(save_video_res_path, video_asr_json_path):
- msg = f"视频语音识别失败,{save_video_res_path} \t {video_asr_json_path}"
- process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, err_code=1, msg=msg, data={"video_path":save_video_res_path})
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
- raise Exception(msg)
- msg = f"视频语音识别成功 : {video_asr_json_path}"
- process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, msg=msg, data={"video_path":save_video_res_path})
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
- video_asr_json = json.load(open(video_asr_json_path))
- asr_text = video_asr_json.get("text")
- if not asr_text:
- msg = f"获取不到语音转文本的结果 video_asr_json_path: {video_asr_json_path}"
- process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, err_code=1, msg=msg, data={"video_asr_json_path":video_asr_json_path})
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
- raise Exception(msg)
- return video_asr_json
-
-
- async def txt_to_vector(self, video_asr_json):
- video_model:VideoData = VideoItemDocRepo().get(self.video_id)
- user:User = video_item.user
- video_to_txt_path = os.path.join(MNT_DOUYIN_DATA, user.open_id, "docs", "_video", f"{self.filename}.txt")
- if not os.path.exists(os.path.dirname(video_to_txt_path)):
- os.makedirs(os.path.dirname(video_to_txt_path))
- try:
- asr_text = video_asr_json.get("text")
- with open(video_to_txt_path, "w") as f:
- f.write(asr_text)
-
- video_model = VideoItemDocRepo().add_vedio_item_doc(user,item_id,video_to_txt_path)
- doc:Documents = video_model.doc
- msg = f"文件保存成功 doc: {doc.path} {doc.categories[-1].name} "
- process_dict = self.update_process(VideoProcessStatus.USER_ADD_DOC, msg=msg, data=doc.path)
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
-
-
- save_vector_err = await save_doc_vector(str(doc.id), doc.path)
- if save_vector_err:
- msg = f"向量转换失败 {doc}"
- raise Exception(msg)
- msg = f"向量转换成功 doc.path: {doc.path}"
- DocumentsRepository().set_status(doc, 100)
- process_dict = self.update_process(VideoProcessStatus.VECTOR_SAVED, err_code=0, msg=msg, data=doc.path)
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
- except Exception as e:
- process_dict = self.update_process(VideoProcessStatus.VECTOR_SAVED, err_code=1, msg=msg, data=video_to_txt_path)
- logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
- raise Exception(e)
-
- def create_memory(location):
- return Memory(location=location, verbose=0)
- async def video_item_to_vector(video_item:VideoData):
- try:
- filename = str(video_item.id)
- proces_status = VideoProcessManager(video_item)
- mem_path = os.path.join(TEMP_DIR, "joblib_cache", str(video_item.user.id), filename)
- logger.info(f"get user menory path {mem_path}")
- user_memcache = create_memory(mem_path)
-
- status = video_item.mdata.get("process", {}).get("status", VideoProcessStatus.INIT.code)
- # 如果以前执行过 job video_item_to_vector ,则 status != init
- # 如果要重新向量计算,可以在数据库中手动将 status 去掉
- if status == VideoProcessStatus.INIT.code:
- await proces_status.video_process_init()
-
- get_download_urls_cached = user_memcache.cache(proces_status.get_download_urls)
- urls = await get_download_urls_cached(video_item.item_id)
-
- download_video_from_urls_cached = user_memcache.cache(proces_status.download_video_from_urls)
- save_video_res_path = await download_video_from_urls_cached(urls)
-
- video_to_asr_cached = user_memcache.cache(proces_status.video_to_asr)
- video_asr_json = await video_to_asr_cached(save_video_res_path)
-
- txt_to_vector_cached = user_memcache.cache(proces_status.txt_to_vector)
- await txt_to_vector_cached(video_asr_json)
-
- except Exception as e:
- logger.exception(f"出错了: {e}")
- # raise e
-
- return
- if __name__ == "__main__":
- db_user = UserRepo()
- db_video = VideoItemDocRepo()
- user:User = db_user.get_by_open_id("_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk")
- item_id = "@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ=="
- video_item = db_video.get_or_create(user, item_id)
- asyncio.run(video_item_to_vector(video_item))
- # video_item_to_vector()
- # logger.info(f"{len(VideoProcessStatus.__members__.items())}")
- # def step1(name):
- # print(f"{name} 执行步骤1: 获取视频链接")
- # time.sleep(1)
- # return name, "http://video.com/1.mp4"
- # def step2(name, video_url):
- # print(f"{name} 执行步骤2: 下载视频 {video_url}")
- # time.sleep(1)
- # return name,"/path/1.mp4"
- # def step3(name, video_path):
- # print(f"{name} 执行步骤3: 提取音频 {video_path}")
- # time.sleep(1)
- # # raise Exception("微服务器无法访问")
- # return name, "/path/1.mp3"
- # def step4(name, audio_path):
- # print(f"{name} 执行步骤4: 计算语音文本 {audio_path}")
- # time.sleep(1)
- # return name,"语音文本内容"
- # def user_task(name, memory):
- # try:
- # step1_cached = memory.cache(step1)
- # step2_cached = memory.cache(step2)
- # step3_cached = memory.cache(step3)
- # step4_cached = memory.cache(step4)
-
- # name, video_url = step1_cached(name)
- # name, video_path = step2_cached(name, video_url)
- # name, audio_path = step3_cached(name, video_path)
- # name, text = step4_cached(name, audio_path)
- # print(name, text)
- # except Exception as e:
- # print(f"出错了: {name} {e}")
- # def main():
- # memory1 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user1"))
- # memory2 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user2"))
- # memory3 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user3"))
- # memory4 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user4"))
- # user1_task = threading.Thread(target=user_task, args=("user1", memory1))
- # user1_task.start()
-
- # user2_task = threading.Thread(target=user_task, args=("user2", memory2))
- # user2_task.start()
-
- # user3_task = threading.Thread(target=user_task, args=("user3", memory3))
- # user3_task.start()
-
- # user_task("user4", memory4)
|