| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- import base64
- import hashlib
- import os
- import json
- import asyncio
- import prefect
- from typing import Optional
- from prefect import Flow, task
- import httpx
- from prefect import flow, task
- from pydantic import BaseModel
- from video_get import get_iframe_by_item_id, get_video_download_urls, download_video
- from exec_asr_client import run_asr_client # 导入语音转文本的函数
- from grpc_m.client import save_doc_vector
- from enum import Enum, auto
- from prefect.tasks import task_input_hash
- from datetime import timedelta
- from config import TEMP_DIR,logger
- from prefect.filesystems import LocalFileSystem, S3
- # TEMP_DIR = os.path.join("/home/user/code/open-douyin", "temp")
- from prefect import runtime
- from prefect.states import Completed, Failed
- VIDEO_TEMP_DIR = os.path.join(TEMP_DIR, "video")
- def cache_key_byfilename(data, args):
- file_name = runtime.flow_run.parameters.get('file_name')
- name = runtime.task_run.name
- logger.info(f"{file_name}_{name}")
- return f"{file_name}_{name}"
- @task
- async def video_process_init(video_temp_dir):
- if not os.path.exists(video_temp_dir):
- os.makedirs(video_temp_dir)
- return video_temp_dir
- # @task(cache_key_fn=cache_key_byfilename, cache_expiration=timedelta(minutes=50))
- @task(cache_key_fn=task_input_hash)
- async def get_iframe_task(item_id):
- logger.info(f"获取播放页 iframe 代码,item_id: {item_id}")
- response = await get_iframe_by_item_id(item_id)
- # {'data': {'iframe_code':""}}
- iframe = response.get("data").get("iframe_code")
- if not iframe:
- logger.error(f"获取 iframe 失败, {response} - {item_id}")
- return iframe
- # 获取无水印链接相当于加载 iframe ,不会增加多少网络负载或损耗,因此不推荐缓存,况且无水印链接 return 时常会变,也不建议缓存
- @task
- async def get_urls_task(iframe):
- logger.info(f"获取视频无水印下载链接,iframe: {iframe}")
- response = await get_video_download_urls(iframe)
- urls = response.get("data")
- return urls
- def custom_cache_download_task(data, args):
- file_name = runtime.flow_run.parameters.get('file_name')
- save_file_path = args.get('save_file_path')
- key = f"{file_name}_{save_file_path}"
- return key
- @task(cache_key_fn=custom_cache_download_task)
- async def download_task(urls, save_file_path):
- logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}")
- logger.info(f"下载视频 urls : {urls}, save_file_path : {save_file_path}")
- res = await download_video(urls, save_file_path)
- return res
- # @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=50))
- @task
- async def asr_task(audio_file, output_json_path):
- # 假设 run_asr_client 返回一个包含转录文本的 JSON 字符串或路径
- if await run_asr_client(audio_file, output_json_path): # None 是占位符,根据你的实现进行调整
- return output_json_path # 或者返回实际的转录文本,取决于你的需求
- else:
- raise Exception("err")
- @task
- async def vector_task(output_json_path, collection_name, doc_path, server_addr):
- video_asr_json = json.load(open(output_json_path))
- asr_text = video_asr_json.get("text")
- response = await save_doc_vector(collection_name, doc_path, server_addr) # 这里可能需要调整参数以匹配你的实现
- return response
- @flow
- async def get_download_url(item_id):
- logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}")
- logger.info(f"runtime.flow_run.id {runtime.flow_run.id}")
- logger.info(f"runtime.flow_run.flow_name {runtime.flow_run.flow_name}")
- iframe = await get_iframe_task(item_id)
- if not iframe:
- return Failed(message=f"获取 iframe 失败,item_id: {item_id}")
- urls = await get_urls_task(iframe)
- return urls
- def generate_result_storage():
- flow_name = runtime.flow_run.flow_name
- parameters = runtime.flow_run.parameters
- file_name = parameters["file_name"]
- return os.path.join(VIDEO_TEMP_DIR, file_name)
- @flow(flow_run_name="{file_name}")
- async def download_video_flow(file_name , item_id, refresh_cache=False):
- get_iframe_task.refresh_cache = refresh_cache
- download_task.refresh_cache = refresh_cache
- logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}")
- logger.info(f"runtime.flow_run.id {runtime.flow_run.id}")
- logger.info(f"runtime.flow_run.flow_name {runtime.flow_run.flow_name}")
- urls = await get_download_url(item_id)
-
- save_video_path = os.path.join(VIDEO_TEMP_DIR, file_name + ".mp4")
- path = await download_task(urls, save_video_path)
- if not path:
- return Failed(message=f"下载视频失败,urls : {urls}, save_file_path : {path}")
- logger.info(f"download urls: {urls}")
- logger.info(f"download path: {path}")
- # logger.info(f"Cache location: {cache_location}")
- # return path,urls
- # iframe = await get_iframe_task(item_id)
- # urls = await get_urls_task(iframe)
- # save_file_path = os.path.join(video_temp_dir, f"{item_id}.mp4") # 示例文件名
- # downloaded_file_path = await download_task(urls, save_file_path) # 确保 download_task 正确返回下载的文件路径
-
- # # 添加语音转文本任务
- # output_json_path = await asr_task(downloaded_file_path)
-
- # # 添加向量转换存储任务(这里我们假设转录文本直接用于向量转换,但你可能需要额外的处理)
- # vector_response = await vector_task(output_json_path, "your_collection_name", output_json_path) # 替换 "your_collection_name" 为实际的集合名称
-
- # 这里可以添加更多的任务处理或返回结果等逻辑...
- # return vector_response
- # 运行流
- if __name__ == "__main__":
- import asyncio
- # item_id = "@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ==" # 从某处获取实际的 item_id
- item_id = "@9VxS1/qCUc80K2etd8wkUc791mfoNf+EMpZzqQKiLVIaaPD660zdRmYqig357zEBdcJfEwgOpm1bLVAnSdwvLg=="
- asyncio.run(download_video_flow("11", item_id))
|