import base64 import hashlib import os import json import asyncio import prefect from typing import Optional from prefect import Flow, task import httpx from prefect import flow, task from pydantic import BaseModel from video_get import get_iframe_by_item_id, get_video_download_urls, download_video from exec_asr_client import run_asr_client # 导入语音转文本的函数 from grpc_m.client import save_doc_vector from enum import Enum, auto from prefect.tasks import task_input_hash from datetime import timedelta from config import TEMP_DIR,logger from prefect.filesystems import LocalFileSystem, S3 # TEMP_DIR = os.path.join("/home/user/code/open-douyin", "temp") from prefect import runtime from prefect.states import Completed, Failed VIDEO_TEMP_DIR = os.path.join(TEMP_DIR, "video") def cache_key_byfilename(data, args): file_name = runtime.flow_run.parameters.get('file_name') name = runtime.task_run.name logger.info(f"{file_name}_{name}") return f"{file_name}_{name}" @task async def video_process_init(video_temp_dir): if not os.path.exists(video_temp_dir): os.makedirs(video_temp_dir) return video_temp_dir # @task(cache_key_fn=cache_key_byfilename, cache_expiration=timedelta(minutes=50)) @task(cache_key_fn=task_input_hash) async def get_iframe_task(item_id): logger.info(f"获取播放页 iframe 代码,item_id: {item_id}") response = await get_iframe_by_item_id(item_id) # {'data': {'iframe_code':""}} iframe = response.get("data").get("iframe_code") if not iframe: logger.error(f"获取 iframe 失败, {response} - {item_id}") return iframe # 获取无水印链接相当于加载 iframe ,不会增加多少网络负载或损耗,因此不推荐缓存,况且无水印链接 return 时常会变,也不建议缓存 @task async def get_urls_task(iframe): logger.info(f"获取视频无水印下载链接,iframe: {iframe}") response = await get_video_download_urls(iframe) urls = response.get("data") return urls def custom_cache_download_task(data, args): file_name = runtime.flow_run.parameters.get('file_name') save_file_path = args.get('save_file_path') key = f"{file_name}_{save_file_path}" return key @task(cache_key_fn=custom_cache_download_task) async def download_task(urls, save_file_path): logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}") logger.info(f"下载视频 urls : {urls}, save_file_path : {save_file_path}") res = await download_video(urls, save_file_path) return res # @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=50)) @task async def asr_task(audio_file, output_json_path): # 假设 run_asr_client 返回一个包含转录文本的 JSON 字符串或路径 if await run_asr_client(audio_file, output_json_path): # None 是占位符,根据你的实现进行调整 return output_json_path # 或者返回实际的转录文本,取决于你的需求 else: raise Exception("err") @task async def vector_task(output_json_path, collection_name, doc_path, server_addr): video_asr_json = json.load(open(output_json_path)) asr_text = video_asr_json.get("text") response = await save_doc_vector(collection_name, doc_path, server_addr) # 这里可能需要调整参数以匹配你的实现 return response @flow async def get_download_url(item_id): logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}") logger.info(f"runtime.flow_run.id {runtime.flow_run.id}") logger.info(f"runtime.flow_run.flow_name {runtime.flow_run.flow_name}") iframe = await get_iframe_task(item_id) if not iframe: return Failed(message=f"获取 iframe 失败,item_id: {item_id}") urls = await get_urls_task(iframe) return urls def generate_result_storage(): flow_name = runtime.flow_run.flow_name parameters = runtime.flow_run.parameters file_name = parameters["file_name"] return os.path.join(VIDEO_TEMP_DIR, file_name) @flow(flow_run_name="{file_name}") async def download_video_flow(file_name , item_id, refresh_cache=False): get_iframe_task.refresh_cache = refresh_cache download_task.refresh_cache = refresh_cache logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}") logger.info(f"runtime.flow_run.id {runtime.flow_run.id}") logger.info(f"runtime.flow_run.flow_name {runtime.flow_run.flow_name}") urls = await get_download_url(item_id) save_video_path = os.path.join(VIDEO_TEMP_DIR, file_name + ".mp4") path = await download_task(urls, save_video_path) if not path: return Failed(message=f"下载视频失败,urls : {urls}, save_file_path : {path}") logger.info(f"download urls: {urls}") logger.info(f"download path: {path}") # logger.info(f"Cache location: {cache_location}") # return path,urls # iframe = await get_iframe_task(item_id) # urls = await get_urls_task(iframe) # save_file_path = os.path.join(video_temp_dir, f"{item_id}.mp4") # 示例文件名 # downloaded_file_path = await download_task(urls, save_file_path) # 确保 download_task 正确返回下载的文件路径 # # 添加语音转文本任务 # output_json_path = await asr_task(downloaded_file_path) # # 添加向量转换存储任务(这里我们假设转录文本直接用于向量转换,但你可能需要额外的处理) # vector_response = await vector_task(output_json_path, "your_collection_name", output_json_path) # 替换 "your_collection_name" 为实际的集合名称 # 这里可以添加更多的任务处理或返回结果等逻辑... # return vector_response # 运行流 if __name__ == "__main__": import asyncio # item_id = "@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ==" # 从某处获取实际的 item_id item_id = "@9VxS1/qCUc80K2etd8wkUc791mfoNf+EMpZzqQKiLVIaaPD660zdRmYqig357zEBdcJfEwgOpm1bLVAnSdwvLg==" asyncio.run(download_video_flow("11", item_id))