qyl 1 gadu atpakaļ
vecāks
revīzija
d34083e60b

+ 2 - 3
api/asr.py

@@ -2,7 +2,7 @@ import asyncio
 import datetime
 import os
 import sys
-from typing import List
+from typing import List, Optional
 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 import jwt
 from fastapi import FastAPI,APIRouter, HTTPException, Depends, Request,Header
@@ -25,8 +25,7 @@ class AsrPostData(BaseModel):
                         example="4.30 03/30 bNJ:/ L@w.FH 踩中风口真的可以翻身吗?从一穷二白到AI逆袭,他怎么做到的? # 东方对话 # 人工智能 # ai  https://v.douyin.com/i8Df1CC/ 复制此链接,打开Dou音搜索,直接观看视频!")
 
 
-
-
+    
 @asr_router.post("/asr")
 async def asr(data:AsrPostData, user: User = Depends(verify_user)):
     save_dir = os.path.join(TEMP_DIR, "asr", user.open_id, "")

+ 10 - 83
api/comment.py

@@ -20,87 +20,20 @@ from db.user import User,UserRepo,engine
 from db.video_data import VideoData,VideoItemDocRepo
 from grpc_m.client import get_doc_vector,save_doc_vector
 from sqlmodel import Field, SQLModel,Relationship,create_engine,Session,select,func,Column
-from tool.video_get import download_video_by_item_id,get_video_download_urls,download_video,get_iframe_by_item_id
+from tool.video_get import download_video_by_item_id,get_video_download_urls,download_video
 from tool.exec_asr_client import run_asr_client
 from enum import Enum 
-# 定义视频处理状态的枚举类  
-class VideoProcessStatus(Enum):  
-    INIT = 0  
-    IFRAME_OBTAINED = 1  
-    DOWNLOAD_URLS_OBTAINED = 2  
-    VIDEO_DOWNLOADED = 3  
-    ASR_COMPLETED = 4  
-    TEXT_WRITTEN = 5  
-    VECTOR_SAVED = 6  
-    PROCESSING_ERROR = -1  
-
-async def video_item_to_vector(video_item:VideoData):
-    user = video_item.user
-    file_name = os.path.splitext(video_item.doc.path)
-    proccess_status = {"status":VideoProcessStatus.INIT.value, "msg":"获取 iframe "}
-    
-    item_iframe_data = await get_iframe_by_item_id(video_item.item_id)
-    url_str = item_iframe_data.get("iframe_code")
-    if not url_str:
-        return
-    
-    logger.info(f"成功获取 item iframe: {url_str}")
-    # 下载路径不能使用 item_id ,因为它包含特殊字符,使用 id 只有整型,也方便硕源
-    save_video_path = os.path.join(TEMP_DIR, "video", f"{file_name}.mp4")
-    if not os.path.exists(os.path.dirname(save_video_path)):
-        os.makedirs(os.path.dirname(save_video_path))
-    urls = await get_video_download_urls(url_str)
-    if not urls:
-        return
-    
-    logger.info(f"成功获取无水印链接 : {urls}")
-    res = await download_video(urls, save_video_path)
-    if not res:
-        logger.error(f"download fail item_iframe_data: {item_iframe_data} \t save_video_path: {save_video_path}")
-        return
-    
-    logger.info(f"成功下载视频 : {res}")
-    video_asr_json_path = os.path.join(TEMP_DIR, "video", f"{file_name}.json")
-    if not await run_asr_client(save_video_path, video_asr_json_path):
-        return
-    
-    logger.info(f"视频语音识别成功 : {video_asr_json_path}")
-    video_asr_json = json.load(open(video_asr_json_path))
-    asr_text = video_asr_json.get("text")
-    if not asr_text:
-        logger.error(f"获取不到语音转文本的结果 video_asr_json_path: {video_asr_json_path}")
-        return
-    
-    video_to_txt_path = os.path.join(MNT_DOUYIN_DATA, user.open_id, "docs", "_video", f"{file_name}.txt")
-    if not os.path.exists(os.path.dirname(video_to_txt_path)):
-        os.makedirs(os.path.dirname(video_to_txt_path))
-    with open(video_to_txt_path, "w") as f:
-        f.write(asr_text)
-        
-    video_model = VideoItemDocRepo().add_vedio_item_doc(user,item_id,video_to_txt_path)
-    logger.info(f"写入数据库成功 video_model: {video_model.id}")
-    
-    doc = video_model.doc
-    save_vector_err = await save_doc_vector(str(doc.id), doc.path)
-    if save_vector_err:
-        logger.error(f"save_doc_vector err {save_vector_err}")
-        return
-    
-    logger.info(f"向量转换成功 doc.id: {doc.id}")
-    DocumentsRepository().set_status(doc, 100)
+from tool.job_video_asr import video_item_to_vector
     
 async def get_video_promt(user:User, event_model:Events):
     video_summarize =  ""
     content = ""
-    session = Session(engine)
-    video_item = session.exec(
-            select(VideoData).where(VideoData.item_id == event_model.content.reply_to_item_id)
-        ).first()
+    video_item = VideoItemDocRepo().get_or_create(user, event_model.content.reply_to_item_id)
     
     # 该视频没有提取文案和向量计算
-    if not video_item:
-        video_to_txt_path = os.path.join(MNT_DOUYIN_DATA, user.open_id, "docs", "_video", f"{event_model.id}.txt")
-        video_item = VideoItemDocRepo().add_vedio_item_doc(user, event_model.content.reply_to_item_id, video_to_txt_path)
+    if not video_item.doc or video_item.doc.status != 100:
+        # video_to_txt_path = os.path.join(MNT_DOUYIN_DATA, user.open_id, "docs", "_video", f"{event_model.id}.txt")
+        # video_item = VideoItemDocRepo().add_vedio_item_doc(user, event_model.content.reply_to_item_id, video_to_txt_path)
         asyncio.create_task(video_item_to_vector(video_item))
         return ""
     else:
@@ -253,14 +186,8 @@ async def item_comment_reply(data):
             await chat_to_langchain(event_model, comment_model, user_model)
     
 
-def main():
-    db_comment = CommentRepository()
-    comment_model = db_comment.select(
-        CommentContent.comment_id == "@9VxS1/qCUc80K2etd8wkUc791mHpOPuKPZ11rQqkLFMTb/b160zdRmYqig357zEBvLcp915uLvFyTsyb/e9VYg=="
-        ).first()
-    
-    comment_replies = db_comment.get_comment_and_replies(comment_model.comment_id)
-    promt = gen_prompt("马工", comment_replies)
-    logger.info(f"{promt}")
+async def main():
+    data = {'event': 'item_comment_reply', 'client_key': 'aw6aipmfdtplwtyq', 'from_user_id': '_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy', 'to_user_id': '_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk', 'content': '{"at_user_id":"","avatar":"https://p11.douyinpic.com/aweme/720x720/aweme-avatar/tos-cn-i-0813_66c4e34ae8834399bbf967c3d3c919db.jpeg?from=3782654143","comment_id":"@9VxS1/qCUc80K2etd8wkUc791mHvNP6BPpF0rg6iK1Qba/L660zdRmYqig357zEBevPVhKm8JTHRrBUxsAwWCQ==","comment_user_id":"_000QadFMhmU1jNCI3JdPnyVDL6XavC70dFy","content":"7","create_time":1710604351,"digg_count":0,"level":1,"nick_name":"程序员马工","parent_id":"@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ==","reply_comment_total":0,"reply_to_comment_id":"","reply_to_item_id":"@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ=="}', 'log_id': '202403162352304940C40D949F65E2B93E'}
+    await item_comment_reply(data)
 if __name__ == "__main__":
-    main()
+    asyncio.run(main())

+ 1 - 1
config.py

@@ -47,7 +47,7 @@ CLIENT_SECRET = os.environ["CLIENT_SECRET"]
 # 这个网址 https://open-douyin.magong.site 对应这台服务器的 192.168.1.32:8600 端口,因为这台服务器没有公网ip,所以在本地计算机无法通过  http://192.168.1.32:8600/ 访问到 fastapi 接口,只能通过 https://open-douyin.magong.site/ 访问
 HOST = '::'
 
-PORT = 8601 if os.environ.get("USER")=="mrh" else 8600
+PORT = 8602 if os.environ.get("USER")=="mrh" else 8601
 GRPC_VECTOR_HOST="192.168.2.32"
 GRPC_VECTOR_PORT=18600
 SECRET_KEY = os.environ.get("SECRET_KEY")

+ 3 - 5
db/video_data.py

@@ -26,17 +26,17 @@ class VideoItemDocRepo(DouyinBaseRepository):
     def __init__(self, model: VideoData=VideoData, engine=engine):
         super().__init__(model, engine)
     
-    def get_by_item_id(self, user, item_id, ex_session=None):
+    def get_by_item_id(self, item_id, ex_session=None):
         session = ex_session or Session(bind=self.engine)
         video_data = session.exec(
             select(VideoData).join(LinkVideoUser)
-            .where(VideoData.item_id == item_id, LinkVideoUser.user_id==user.id)
+            .where(VideoData.item_id == item_id)
         ).first()
         return video_data
     
     def get_or_create(self, user:User, item_id:str, ex_session=None)->VideoData:
         session = ex_session or Session(bind=self.engine)
-        video_data = self.get_by_item_id(user, item_id, ex_session=session)
+        video_data = self.get_by_item_id(item_id, ex_session=session)
         if not video_data:
             video_data = VideoData(item_id=item_id, user=user)
             logger.info(f"create video_data: {video_data}")
@@ -73,11 +73,9 @@ class VideoItemDocRepo(DouyinBaseRepository):
             else:
                 new_mdata = update_dat
             video_data.mdata = new_mdata  
-            logger.info(f"add {video_data.mdata}")
             session.add(video_data)
             session.commit()
             session.refresh(video_data)
-            logger.info(f"refresh {video_data.mdata}")
             return video_data.mdata
             
             

+ 28 - 0
docs/gpt.md

@@ -1,3 +1,31 @@
+
+async def get_iframe_by_item_id(item_id):  
+    client_key = os.environ.get("CLIENT_KEY")  # 从环境变量中获取 client_key  
+    base64_encoded_item_id = base64.urlsafe_b64encode(item_id.encode()).decode('utf-8')  
+      
+    async with httpx.AsyncClient() as client:  
+        response = await client.get(  
+            f"https://open.douyin.com/api/douyin/v1/video/get_iframe_by_item?item_id={base64_encoded_item_id}&client_key={client_key}"  
+        )  
+          
+        res_json = response.json()  
+        err_no = res_json.get("err_no")  
+          
+        return res_json
+import asyncio
+import os
+import sys
+def main():
+    asyncio.run(task())
+
+if __name__ == "__main__":
+    main()
+    
+这是一个第三方请求的接口示例,从环境变量中获取 CLIENT_KEY 、CLIENT_SECRET 用 httpx 异步请求相应的接口,返回 json 数据。
+参考这个示例,写一个请求用来获取 video_list:
+curl --location --request GET 'https://open.douyin.com/api/douyin/v1/video/video_list/?open_id=ba253642-0590-40bc-9bdf-9a1334b94059&cursor=0&count=10' \
+--header 'access-token: act.1d1021d2aee3d41fee2d2add43456badMFZnrhFhfWotu3Ecuiuka27L56lr' \ 
+
 ## 
 项目简介
 项目名称: swl-douyin

+ 0 - 5
douyin/video_datapy

@@ -1,5 +0,0 @@
-# https://developer.open-douyin.com/docs/resource/zh-CN/dop/develop/openapi/video-management/douyin/search-video/video-data
-# 查询特定视频的数据。
-# 由于抖音开放平台网站应用没有正是上线,无法通过用户授权查询视频列表,只能通过评论回调查询到视频 item_id ,然后根据 item_id 查询到特定视频数据
-
-

+ 4 - 4
douyin/video_get_iframe_by_item.py

@@ -47,6 +47,9 @@ sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 from config import logger,ASR_EXE,ASR_ADDRESS,TEMP_DIR
 # https://developer.open-douyin.com/docs/resource/zh-CN/dop/develop/openapi/video-management/douyin/iframe-player/get-iframe-by-video
 # 该接口没有请求限制
+'''
+ret {'data': {'iframe_code': '<iframe width="1080" height="1920" frameborder="0" src="https://open.douyin.com/player/video?vid=7259290547288870144&autoplay=0" referrerpolicy="unsafe-url" allowfullscreen></iframe>', 'video_height': 1920, 'video_title': '全国肥胖率胖子多的省份排名,有你们省吗? #减肥 #肥胖率 #广西', 'video_width': 1080}, 'err_msg': '', 'err_no': 0, 'log_id': '2024032919281191047C2C11ACAC1423AD'}
+'''
 async def get_iframe_by_item_id(item_id):  
     client_key = os.environ.get("CLIENT_KEY")  # 从环境变量中获取 client_key  
     base64_encoded_item_id = base64.urlsafe_b64encode(item_id.encode()).decode('utf-8')  
@@ -59,9 +62,7 @@ async def get_iframe_by_item_id(item_id):
         res_json = response.json()  
         err_no = res_json.get("err_no")  
           
-        if err_no != 0:  
-            raise Exception(f"Error fetching IFrame code: {res_json.get('err_msg')}")  
-        return res_json.get("data")
+        return res_json
     
 
 
@@ -74,7 +75,6 @@ async def task():
     # print(f"{vid}")
     
 import asyncio
-import aiofiles
 import os
 import sys
 def main():

+ 34 - 0
douyin/video_list.py

@@ -0,0 +1,34 @@
+import os  
+import httpx  
+# https://developer.open-douyin.com/docs/resource/zh-CN/dop/develop/openapi/video-management/douyin/search-video/account-video-list
+async def get_video_list(open_id, access_token, cursor=0, count=5, ):  
+    url = f"https://open.douyin.com/api/douyin/v1/video/video_list/?open_id={open_id}&cursor={cursor}&count={count}"  
+    headers = {  
+        "access-token": access_token  
+    }  
+      
+    async with httpx.AsyncClient() as client:  
+        response = await client.get(url, headers=headers)  
+          
+        if response.status_code == 200:  
+            res_json = response.json()  
+            return res_json  
+        else:  
+            response.raise_for_status()  # 如果响应状态码不是200,则抛出异常  
+
+import asyncio
+import os
+import sys
+def main():
+    # 示例用法:  
+    # 假设你已经从某处获取了 access_token、open_id 等必要的信息  
+    access_token = "act.3.WteK8MAbbNrIIRR1eaIqe1jmco-jOmnLAbfu6MmzC5x9VMTjSJTxxUub1XP2GQbqn_Jl-B25kD1hJO7dwpsn2VqGghm_qTe-fPzZxyBFSz30r-HFNz8tmNxeokTT20l3irP9seYtBEeuod-xNH73n5m9Z6MZLZ_B-E7FcA=="
+    open_id = "_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk"  # 示例 open_id,请替换为实际的 open_id  
+    cursor = 0  # 示例 cursor,根据需要调整  
+    count = 10  # 示例 count,根据需要调整  
+    res = asyncio.run(get_video_list(open_id=open_id, access_token=access_token))
+    print(res)
+
+if __name__ == "__main__":
+    main()
+  

+ 24 - 3
grpc_m/client.py

@@ -7,12 +7,32 @@ import asyncio
 from grpclib.client import Channel
 
 from grpc_m.vector import vector_grpc, vector_pb2  
+from grpc_m.health import health_pb2, health_grpc
 from db.docs import DocumentsRepository  
 from db.user_oauth import test_add  # 假设这里有必要的导入来获取open_id  
 from config import logger,GRPC_VECTOR
+# cp -r /home/user/code/LangChain/grpc_m/vector /home/user/code/open-douyin/grpc_m;cp -r /home/user/code/LangChain/grpc_m/health /home/user/code/open-douyin/grpc_m
+async def health_check(server_addr=GRPC_VECTOR) ->bool:
+    host,port = server_addr.split(":")
+    async with Channel(host,port) as channel:
+        stub = health_grpc.HealthStub(channel)
+        response = await stub.Check(health_pb2.HealthCheckRequest())
+        return response.status
 
-async def save_doc_vector(collection_name,user_doc_relative_path) ->vector_pb2.SearchResponses:
-    host,port = GRPC_VECTOR.split(":")
+async def save_doc_vector(collection_name,user_doc_relative_path, server_addr=GRPC_VECTOR) ->vector_pb2.SearchResponses:
+    host,port = server_addr.split(":")
+    async with Channel(host,port) as channel:  
+        stub = vector_grpc.VectorServiceStub(channel)  
+          
+        # 准备gRPC请求并发送  
+        request =  vector_pb2.SaveDocToVectorRequest(collection_name=collection_name, user_doc_relative_path=user_doc_relative_path)  
+        responses:vector_pb2.SaveDocToVectorResponse = await stub.SaveDocToVector(request)  
+        data = responses.status
+        return data
+
+
+async def save_doc_vector(collection_name,user_doc_relative_path, server_addr=GRPC_VECTOR) ->vector_pb2.SearchResponses:
+    host,port = server_addr.split(":")
     async with Channel(host,port) as channel:  
         stub = vector_grpc.VectorServiceStub(channel)  
           
@@ -50,7 +70,8 @@ async def get_doc_vector(collection_name, query) ->vector_pb2.SearchResponses:
 
 
 async def run():  
-    res = await get_doc_vector("思维链-文档说明", "什么是思维链")
+    # res = await get_doc_vector("思维链-文档说明", "什么是思维链")
+    res = await health_check()
     logger.info(f"{res}")
     
 if __name__ == '__main__':  

+ 24 - 0
grpc_m/health/health.proto

@@ -0,0 +1,24 @@
+syntax = "proto3";
+
+package grpc.health.v1;
+
+// python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpclib_python_out=. grpc_m/health/health.proto
+message HealthCheckRequest {
+  string service = 1;
+}
+
+message HealthCheckResponse {
+  enum ServingStatus {
+    UNKNOWN = 0;
+    SERVING = 1;
+    NOT_SERVING = 2;
+    SERVICE_UNKNOWN = 3;  // Used only by the Watch method.
+  }
+  ServingStatus status = 1;
+}
+
+service Health {
+  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
+
+  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
+}

+ 56 - 0
grpc_m/health/health_grpc.py

@@ -0,0 +1,56 @@
+# Generated by the Protocol Buffers compiler. DO NOT EDIT!
+# source: grpc_m/health/health.proto
+# plugin: grpclib.plugin.main
+import abc
+import typing
+
+import grpclib.const
+import grpclib.client
+if typing.TYPE_CHECKING:
+    import grpclib.server
+
+import grpc_m.health.health_pb2
+
+
+class HealthBase(abc.ABC):
+
+    @abc.abstractmethod
+    async def Check(self, stream: 'grpclib.server.Stream[grpc_m.health.health_pb2.HealthCheckRequest, grpc_m.health.health_pb2.HealthCheckResponse]') -> None:
+        pass
+
+    @abc.abstractmethod
+    async def Watch(self, stream: 'grpclib.server.Stream[grpc_m.health.health_pb2.HealthCheckRequest, grpc_m.health.health_pb2.HealthCheckResponse]') -> None:
+        pass
+
+    def __mapping__(self) -> typing.Dict[str, grpclib.const.Handler]:
+        return {
+            '/grpc.health.v1.Health/Check': grpclib.const.Handler(
+                self.Check,
+                grpclib.const.Cardinality.UNARY_UNARY,
+                grpc_m.health.health_pb2.HealthCheckRequest,
+                grpc_m.health.health_pb2.HealthCheckResponse,
+            ),
+            '/grpc.health.v1.Health/Watch': grpclib.const.Handler(
+                self.Watch,
+                grpclib.const.Cardinality.UNARY_STREAM,
+                grpc_m.health.health_pb2.HealthCheckRequest,
+                grpc_m.health.health_pb2.HealthCheckResponse,
+            ),
+        }
+
+
+class HealthStub:
+
+    def __init__(self, channel: grpclib.client.Channel) -> None:
+        self.Check = grpclib.client.UnaryUnaryMethod(
+            channel,
+            '/grpc.health.v1.Health/Check',
+            grpc_m.health.health_pb2.HealthCheckRequest,
+            grpc_m.health.health_pb2.HealthCheckResponse,
+        )
+        self.Watch = grpclib.client.UnaryStreamMethod(
+            channel,
+            '/grpc.health.v1.Health/Watch',
+            grpc_m.health.health_pb2.HealthCheckRequest,
+            grpc_m.health.health_pb2.HealthCheckResponse,
+        )

+ 32 - 0
grpc_m/health/health_pb2.py

@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# source: grpc_m/health/health.proto
+# Protobuf Python Version: 4.25.0
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1agrpc_m/health/health.proto\x12\x0egrpc.health.v1\"%\n\x12HealthCheckRequest\x12\x0f\n\x07service\x18\x01 \x01(\t\"\xa9\x01\n\x13HealthCheckResponse\x12\x41\n\x06status\x18\x01 \x01(\x0e\x32\x31.grpc.health.v1.HealthCheckResponse.ServingStatus\"O\n\rServingStatus\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0b\n\x07SERVING\x10\x01\x12\x0f\n\x0bNOT_SERVING\x10\x02\x12\x13\n\x0fSERVICE_UNKNOWN\x10\x03\x32\xae\x01\n\x06Health\x12P\n\x05\x43heck\x12\".grpc.health.v1.HealthCheckRequest\x1a#.grpc.health.v1.HealthCheckResponse\x12R\n\x05Watch\x12\".grpc.health.v1.HealthCheckRequest\x1a#.grpc.health.v1.HealthCheckResponse0\x01\x62\x06proto3')
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'grpc_m.health.health_pb2', _globals)
+if _descriptor._USE_C_DESCRIPTORS == False:
+  DESCRIPTOR._options = None
+  _globals['_HEALTHCHECKREQUEST']._serialized_start=46
+  _globals['_HEALTHCHECKREQUEST']._serialized_end=83
+  _globals['_HEALTHCHECKRESPONSE']._serialized_start=86
+  _globals['_HEALTHCHECKRESPONSE']._serialized_end=255
+  _globals['_HEALTHCHECKRESPONSE_SERVINGSTATUS']._serialized_start=176
+  _globals['_HEALTHCHECKRESPONSE_SERVINGSTATUS']._serialized_end=255
+  _globals['_HEALTH']._serialized_start=258
+  _globals['_HEALTH']._serialized_end=432
+# @@protoc_insertion_point(module_scope)

+ 28 - 0
grpc_m/health/health_pb2.pyi

@@ -0,0 +1,28 @@
+from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import message as _message
+from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union
+
+DESCRIPTOR: _descriptor.FileDescriptor
+
+class HealthCheckRequest(_message.Message):
+    __slots__ = ("service",)
+    SERVICE_FIELD_NUMBER: _ClassVar[int]
+    service: str
+    def __init__(self, service: _Optional[str] = ...) -> None: ...
+
+class HealthCheckResponse(_message.Message):
+    __slots__ = ("status",)
+    class ServingStatus(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
+        __slots__ = ()
+        UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus]
+        SERVING: _ClassVar[HealthCheckResponse.ServingStatus]
+        NOT_SERVING: _ClassVar[HealthCheckResponse.ServingStatus]
+        SERVICE_UNKNOWN: _ClassVar[HealthCheckResponse.ServingStatus]
+    UNKNOWN: HealthCheckResponse.ServingStatus
+    SERVING: HealthCheckResponse.ServingStatus
+    NOT_SERVING: HealthCheckResponse.ServingStatus
+    SERVICE_UNKNOWN: HealthCheckResponse.ServingStatus
+    STATUS_FIELD_NUMBER: _ClassVar[int]
+    status: HealthCheckResponse.ServingStatus
+    def __init__(self, status: _Optional[_Union[HealthCheckResponse.ServingStatus, str]] = ...) -> None: ...

+ 4 - 2
main.py

@@ -60,8 +60,10 @@ def main():
     logger.debug(f"http://sv-v2.magong.site:{PORT}  仅支持 ipv6 ,直连、满速、无延迟。缺点是不支持 https 协议,因为不经过 Caddy 代理,直达 Fastapi 没有配置 https")
     logger.debug(f"https://open-douyin.magong.site  内网穿透隧道,cloudflare tunnel ,经常访问不了")
     logger.info(f"http://localhost:{PORT} ⭐ 推荐。 vscode 会自动建立一条本地隧道,可以在本地浏览器直接打开")
-    logger.info(f"扫码登录 https://open.douyin.com/platform/oauth/connect/?client_key=aw6aipmfdtplwtyq&response_type=code&scope=user_info,renew_refresh_token,trial.whitelist,item.comment&redirect_uri=https://api.magong.site/swl/douyin/verify_callback")
-    logger.info(f"https://swl-8l9.pages.dev/  访问前端网站")
+    # logger.info(f"扫码登录 https://open.douyin.com/platform/oauth/connect/?client_key=aw6aipmfdtplwtyq&response_type=code&scope=user_info,renew_refresh_token,trial.whitelist,item.comment&redirect_uri=https://api.magong.site/swl/douyin/verify_callback")
+    # "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJfMDAwTGlWX28wRkdLTXdhWmdxR01DdmNWU2YtVUFuTlVfa2siLCJleHAiOjE3MjAzNDMyOTR9.vmTMpzXb-Z4uc963e0odepOT8AcvgYELtng0PDKBeDA"
+    logger.info(f"扫码登录 https://open.douyin.com/platform/oauth/connect/?client_key=aw6aipmfdtplwtyq&response_type=code&scope=user_info,renew_refresh_token&redirect_uri=https://api.magong.site/swl/douyin/verify_callback")
+    logger.info(f"https://swl.magong.site/  访问前端网站")
     uvicorn.run(app, host=None, port=PORT, log_level="info")
 
 if __name__ == "__main__":

+ 57 - 0
tool/consul_srv_client.py

@@ -0,0 +1,57 @@
+import random
+import re
+import socket
+import struct
+from dns import resolver,exception
+from cachetools import TTLCache
+# 配置自定义 DNS 服务器
+custom_resolver = resolver.Resolver()
+custom_resolver.nameservers = ['10.0.0.1', '10.0.0.32', '10.0.0.2']
+cache = TTLCache(maxsize=100, ttl=90)
+
+# 设置端口号(这里假设所有服务器都使用 8600 端口)
+for server in custom_resolver.nameservers:
+    custom_resolver.port = 8600  # 注意:通常不需要对每个服务器单独设置,除非它们使用不同端口
+
+
+def extract_ip_prefix(encoded_address:str):
+    match = encoded_address.split(".")[0]
+    return  match
+
+def decode_address(encoded_address):
+    hex_address = extract_ip_prefix(encoded_address)
+    binary_address = bytes.fromhex(hex_address)
+    return socket.inet_ntoa(binary_address)    
+
+def get_srv(srv_record_name):
+    try:
+        if srv_record_name not in cache:
+            answer = custom_resolver.resolve(srv_record_name, 'SRV')
+            cache[srv_record_name] = answer
+        else:
+            answer = cache[srv_record_name]
+        
+        selected_rdata = random.choice(answer)
+        ip_address = decode_address(selected_rdata.target.to_text())
+        print(f"Service: {srv_record_name} - {ip_address}:{selected_rdata.port} ")
+        return ip_address, selected_rdata.port
+    except exception.DNSException as e:
+        print(f"DNS exception: {e}")
+        raise
+
+def del_cache(srv_record_name=""):
+
+    if srv_record_name:
+        cache.pop(srv_record_name, None)
+    else:
+        cache.clear()
+
+def main():
+    # 查询 SRV 记录
+    srv_record_name = 'video-get.service.consul'
+    # srv_record_name = 'consul.service.consul'
+    get_srv(srv_record_name)
+    get_srv("prefect.service.consul")
+
+if __name__ == "__main__":
+    main()

+ 11 - 0
tool/create_deployment.py

@@ -0,0 +1,11 @@
+from prefect import flow
+
+if __name__ == "__main__":
+    flow.from_source(
+        source="https://github.com/discdiver/demos.git",
+        entrypoint="my_gh_workflow.py:repo_info",
+    ).deploy(
+        name="my-first-deployment",
+        work_pool_name="my-managed-pool",
+        cron="0 1 * * *",
+    )

+ 1 - 1
tool/exec_asr_client.py

@@ -3,7 +3,7 @@ from config import *
 python_exec = sys.executable
 asr_client_py = os.path.join(WORK_DIR, "tool", "funasr_wss_client.py")
 
-async def run_asr_client(audio_file, output_json):
+async def run_asr_client(audio_file, output_json, asr_address=ASR_ADDRESS):
     host, port = ASR_ADDRESS.split(':')  
       
     cmd = [  

+ 266 - 0
tool/job_video_asr.py

@@ -0,0 +1,266 @@
+import asyncio
+import json
+from typing import Any
+from joblib import Memory
+import time
+import os
+import threading
+import os
+import sys
+sys.path.append(os.path.dirname(os.path.dirname(__file__)))
+from enum import Enum, auto
+from grpc_m.client import save_doc_vector
+from tool.video_get import get_iframe_by_item_id,get_video_download_urls,download_video,download_video_by_item_id
+from tool.exec_asr_client import run_asr_client
+
+
+from config import MNT_DOUYIN_DATA, TEMP_DIR
+from db.user import *
+from db.docs import DocumentsRepository
+
+
+# 定义视频处理状态的枚举类  
+class VideoProcessStatus(Enum):  
+    INIT = (auto(), "正在启动。。。")  
+    GET_DOWNLOAD_URLS = (auto(), "获取视频无水印链接")  
+    DOWNLOADED_VIDEO = (auto(), "下载视频成功")  
+    VIDEO_ASR = (auto(), "视频转文本")  
+    USER_ADD_DOC = (auto(), "新增视频文档保存到 _video 目录、数据库记录")  
+    VECTOR_SAVED = (auto(), "向量转换")  
+    
+    @property
+    def code(self):
+        """获取状态码"""
+        return self.value[0]
+
+    @property
+    def msg(self):
+        """获取状态码信息"""
+        return self.value[1]
+    @classmethod  
+    def all_status_descriptions(cls):  
+        return {item.code: item.msg for item in cls} 
+
+class VideoProcessManager:      
+    def __init__(self, video_item:VideoData):  
+        # 注意,由于 joblib 无法 pikle 复杂对象,尤其是关系表或者连接了数据库节点,因此需要将内容提取出简单的变量
+        self.video_id = video_item.id
+        self.filename = str(video_item.id)
+        self.item_id = video_item.item_id
+
+    def get_process(self) -> Optional[Dict[str, Any]]:  
+        mdata = VideoItemDocRepo().mdata(self.video_id, is_update=False)  
+        if mdata and 'process' in mdata:  
+            return mdata['process']  
+        return None  
+  
+    def update_process(self, status:VideoProcessStatus, msg:str="", err_code: int = 0, data: Optional[Dict[str, Any]] = None):  
+        process_dict = self.get_process() or {}  
+        process_dict['status'] = status.code  
+        if not msg:
+            msg = status.msg  
+        process_dict['msg'] = msg
+        process_dict['err_code'] = err_code  
+        process_dict['data'] = data or {}    
+        process_dict['total_status'] = len(VideoProcessStatus.__members__.items())
+        # 更新mdata  
+        VideoItemDocRepo().mdata(self.video_id, {'process': process_dict})      
+        return process_dict
+    
+    async def video_process_init(self):
+        video_temp_dir = os.path.join(TEMP_DIR, "video")
+        if not os.path.exists(video_temp_dir):
+            os.makedirs(os.path.dirname(video_temp_dir))
+        
+        # 可以在这里初始化微服务、清除旧数据、节点健康检查等等
+        self.update_process(VideoProcessStatus.INIT)
+        logger.info(f"video_process_init")
+        
+    async def get_download_urls(self, item_id):
+        response = await get_iframe_by_item_id(item_id)
+        if response.get("err_no"):
+            msg = "获取iframe失败"
+            process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, 1, data={"res":{response}})
+            logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
+            raise Exception(msg)
+        data = response.get("data")
+        iframe = data.get("iframe_code")
+        
+        response = await get_video_download_urls(iframe)
+        err_code = response.get("code")
+        if err_code:
+            msg = "获取无水印链接失败"
+            process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, err_code=1, msg=msg, data=response)
+            logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
+            raise Exception(msg)
+        urls = response.get("data")
+        process_dict = self.update_process(VideoProcessStatus.GET_DOWNLOAD_URLS, data=urls)
+        logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {process_dict['msg']}成功")
+        return urls
+
+    async def download_video_from_urls(self, urls:List) ->str:
+        save_video_path = os.path.join(TEMP_DIR, "video", f"{self.filename}.mp4")
+        try:
+            res_path = await download_video(urls, save_video_path)
+            process_dict = self.update_process(VideoProcessStatus.DOWNLOADED_VIDEO, data=res_path)
+            logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {process_dict['msg']}")
+            return res_path
+        except Exception as e:
+            self.update_process(VideoProcessStatus.DOWNLOADED_VIDEO, msg=e, err_code=1)
+
+
+    async def video_to_asr(self, save_video_res_path):
+        video_asr_json_path = os.path.join(TEMP_DIR, "video", f"{self.filename}.json")
+        if not await run_asr_client(save_video_res_path, video_asr_json_path):
+            msg = f"视频语音识别失败,{save_video_res_path} \t {video_asr_json_path}"
+            process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, err_code=1, msg=msg, data={"video_path":save_video_res_path})
+            logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
+            raise Exception(msg)
+        msg = f"视频语音识别成功 : {video_asr_json_path}"
+        process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, msg=msg, data={"video_path":save_video_res_path})
+        logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
+        video_asr_json = json.load(open(video_asr_json_path))
+        asr_text = video_asr_json.get("text")
+        if not asr_text:
+            msg = f"获取不到语音转文本的结果 video_asr_json_path: {video_asr_json_path}"
+            process_dict = self.update_process(VideoProcessStatus.VIDEO_ASR, err_code=1, msg=msg, data={"video_asr_json_path":video_asr_json_path})
+            logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
+            raise Exception(msg)
+        return video_asr_json
+
+    
+        
+    async def txt_to_vector(self, video_asr_json):
+        video_model:VideoData = VideoItemDocRepo().get(self.video_id)
+        user:User = video_item.user
+        video_to_txt_path = os.path.join(MNT_DOUYIN_DATA, user.open_id, "docs", "_video", f"{self.filename}.txt")
+        if not os.path.exists(os.path.dirname(video_to_txt_path)):
+            os.makedirs(os.path.dirname(video_to_txt_path))
+        try:
+            asr_text = video_asr_json.get("text")
+            with open(video_to_txt_path, "w") as f:
+                f.write(asr_text)
+            
+            video_model = VideoItemDocRepo().add_vedio_item_doc(user,item_id,video_to_txt_path)
+            doc:Documents = video_model.doc
+            msg = f"文件保存成功 doc: {doc.path} {doc.categories[-1].name} "
+            process_dict = self.update_process(VideoProcessStatus.USER_ADD_DOC, msg=msg, data=doc.path)
+            logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
+            
+            
+            save_vector_err = await save_doc_vector(str(doc.id), doc.path)
+            if save_vector_err:
+                msg = f"向量转换失败 {doc}"
+                raise Exception(msg)
+            msg = f"向量转换成功 doc.path: {doc.path}"
+            DocumentsRepository().set_status(doc, 100)
+            process_dict = self.update_process(VideoProcessStatus.VECTOR_SAVED, err_code=0, msg=msg, data=doc.path)
+            logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
+
+        except Exception as e:
+            process_dict = self.update_process(VideoProcessStatus.VECTOR_SAVED, err_code=1, msg=msg, data=video_to_txt_path)
+            logger.info(f"step:{process_dict['status']}/{process_dict['total_status']} {msg}")
+            raise Exception(e)
+        
+
+def create_memory(location):
+    return Memory(location=location, verbose=0)
+
+
+async def video_item_to_vector(video_item:VideoData):
+    try:   
+        filename = str(video_item.id)
+        proces_status = VideoProcessManager(video_item)
+        mem_path = os.path.join(TEMP_DIR, "joblib_cache", str(video_item.user.id), filename) 
+        logger.info(f"get user menory path {mem_path}")
+        user_memcache = create_memory(mem_path)
+        
+        status = video_item.mdata.get("process", {}).get("status", VideoProcessStatus.INIT.code)
+        # 如果以前执行过 job video_item_to_vector ,则 status != init
+        # 如果要重新向量计算,可以在数据库中手动将 status 去掉
+        if status == VideoProcessStatus.INIT.code:
+            await proces_status.video_process_init()
+        
+        get_download_urls_cached = user_memcache.cache(proces_status.get_download_urls)
+        urls = await get_download_urls_cached(video_item.item_id)
+        
+        download_video_from_urls_cached = user_memcache.cache(proces_status.download_video_from_urls)
+        save_video_res_path = await download_video_from_urls_cached(urls)
+        
+        video_to_asr_cached = user_memcache.cache(proces_status.video_to_asr)
+        video_asr_json = await video_to_asr_cached(save_video_res_path)
+        
+        txt_to_vector_cached = user_memcache.cache(proces_status.txt_to_vector)
+        await txt_to_vector_cached(video_asr_json)
+        
+    except Exception as e:
+        logger.exception(f"出错了: {e}")
+        # raise e
+    
+    return
+
+
+if __name__ == "__main__":
+    db_user = UserRepo()
+    db_video = VideoItemDocRepo()
+    user:User = db_user.get_by_open_id("_000LiV_o0FGKMwaZgqGMCvcVSf-UAnNU_kk")
+    item_id = "@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ=="
+    video_item = db_video.get_or_create(user, item_id)
+
+    asyncio.run(video_item_to_vector(video_item))
+    # video_item_to_vector()
+    # logger.info(f"{len(VideoProcessStatus.__members__.items())}")
+
+
+# def step1(name):
+#     print(f"{name} 执行步骤1: 获取视频链接")
+#     time.sleep(1)
+#     return name, "http://video.com/1.mp4"
+
+# def step2(name, video_url):
+#     print(f"{name} 执行步骤2: 下载视频 {video_url}")
+#     time.sleep(1)
+#     return name,"/path/1.mp4"
+
+# def step3(name, video_path):
+#     print(f"{name} 执行步骤3: 提取音频 {video_path}")
+#     time.sleep(1)
+#     # raise Exception("微服务器无法访问")
+#     return name, "/path/1.mp3"
+
+# def step4(name, audio_path):
+#     print(f"{name} 执行步骤4: 计算语音文本 {audio_path}")
+#     time.sleep(1)
+#     return name,"语音文本内容"
+
+# def user_task(name, memory):
+#     try:    
+#         step1_cached = memory.cache(step1)
+#         step2_cached = memory.cache(step2)
+#         step3_cached = memory.cache(step3)
+#         step4_cached = memory.cache(step4)
+        
+#         name, video_url = step1_cached(name)
+#         name, video_path = step2_cached(name, video_url)
+#         name, audio_path = step3_cached(name, video_path)
+#         name, text = step4_cached(name, audio_path)
+#         print(name, text)
+#     except Exception as e:
+#         print(f"出错了: {name} {e}")
+
+# def main():
+#     memory1 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user1"))
+#     memory2 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user2"))
+#     memory3 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user3"))
+#     memory4 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user4"))
+
+#     user1_task = threading.Thread(target=user_task, args=("user1", memory1))
+#     user1_task.start()
+    
+#     user2_task = threading.Thread(target=user_task, args=("user2", memory2))
+#     user2_task.start()
+    
+#     user3_task = threading.Thread(target=user_task, args=("user3", memory3))
+#     user3_task.start()
+    
+#     user_task("user4", memory4)

+ 23 - 0
tool/prefect_cloud.py

@@ -0,0 +1,23 @@
+import os
+import consul_srv_client
+ip,port = consul_srv_client.get_srv("prefect.service.consul")
+prefect_api = f"http://{ip}:{port}/api"
+# 在导入 prefect 之前设置环境变量
+os.environ["PREFECT_API_URL"] = prefect_api
+
+from prefect import flow
+
+# 
+@flow(log_prints=True)
+def quickstart_flow(msg:str="none"):
+    print(f"Hello, {msg}! Local quickstart flow is running! ")
+
+if __name__ == "__main__":
+    print(os.environ["PREFECT_API_URL"])
+    # cli 触发: prefect deployment run -p msg="fuck you 123" 'quickstart-flow/my-quickstart-flow-local'
+    # API 触发: 
+    quickstart_flow.serve(
+        name="my-quickstart-flow-local",
+        tags=["testing", "tutorial"],
+        description="Given a GitHub repository, logs repository statistics for that repo.",
+        version="tutorial/deployments",)

+ 0 - 68
tool/video_asr_job.py

@@ -1,68 +0,0 @@
-from joblib import Memory
-import time
-import os
-import threading
-import os
-import sys
-sys.path.append(os.path.dirname(os.path.dirname(__file__)))
-
-from config import TEMP_DIR
-
-def create_memory(location):
-    return Memory(location=location, verbose=0)
-
-def step1(name):
-    print(f"{name} 执行步骤1: 获取视频链接")
-    time.sleep(1)
-    return name, "http://video.com/1.mp4"
-
-def step2(name, video_url):
-    print(f"{name} 执行步骤2: 下载视频 {video_url}")
-    time.sleep(1)
-    return name,"/path/1.mp4"
-
-def step3(name, video_path):
-    print(f"{name} 执行步骤3: 提取音频 {video_path}")
-    time.sleep(1)
-    # raise Exception("微服务器无法访问")
-    return name, "/path/1.mp3"
-
-def step4(name, audio_path):
-    print(f"{name} 执行步骤4: 计算语音文本 {audio_path}")
-    time.sleep(1)
-    return name,"语音文本内容"
-
-def user_task(name, memory):
-    try:    
-        step1_cached = memory.cache(step1)
-        step2_cached = memory.cache(step2)
-        step3_cached = memory.cache(step3)
-        step4_cached = memory.cache(step4)
-        
-        name, video_url = step1_cached(name)
-        name, video_path = step2_cached(name, video_url)
-        name, audio_path = step3_cached(name, video_path)
-        name, text = step4_cached(name, audio_path)
-        print(name, text)
-    except Exception as e:
-        print(f"出错了: {name} {e}")
-
-def main():
-    memory1 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user1"))
-    memory2 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user2"))
-    memory3 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user3"))
-    memory4 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user4"))
-
-    user1_task = threading.Thread(target=user_task, args=("user1", memory1))
-    user1_task.start()
-    
-    user2_task = threading.Thread(target=user_task, args=("user2", memory2))
-    user2_task.start()
-    
-    user3_task = threading.Thread(target=user_task, args=("user3", memory3))
-    user3_task.start()
-    
-    user_task("user4", memory4)
-
-if __name__ == "__main__":
-    main()

+ 141 - 0
tool/video_asr_prefect.py

@@ -0,0 +1,141 @@
+import base64
+import hashlib
+import os  
+import json  
+import asyncio
+import prefect
+from typing import Optional  
+from prefect import Flow, task  
+import httpx
+from prefect import flow, task
+from pydantic import BaseModel
+from video_get import get_iframe_by_item_id, get_video_download_urls, download_video  
+from exec_asr_client import run_asr_client  # 导入语音转文本的函数  
+from grpc_m.client import save_doc_vector
+
+from enum import Enum, auto  
+from prefect.tasks import task_input_hash
+from datetime import timedelta
+from config import TEMP_DIR,logger
+from prefect.filesystems import LocalFileSystem, S3
+# TEMP_DIR = os.path.join("/home/user/code/open-douyin", "temp")
+from prefect import runtime
+from prefect.states import Completed, Failed
+VIDEO_TEMP_DIR = os.path.join(TEMP_DIR, "video")  
+
+def cache_key_byfilename(data, args):  
+    file_name = runtime.flow_run.parameters.get('file_name')
+    name = runtime.task_run.name
+    logger.info(f"{file_name}_{name}")
+    return f"{file_name}_{name}"
+
+@task
+async def video_process_init(video_temp_dir):  
+    if not os.path.exists(video_temp_dir):  
+        os.makedirs(video_temp_dir)  
+    return video_temp_dir
+
+# @task(cache_key_fn=cache_key_byfilename, cache_expiration=timedelta(minutes=50))
+@task(cache_key_fn=task_input_hash)
+async def get_iframe_task(item_id):
+    logger.info(f"获取播放页 iframe 代码,item_id: {item_id}")
+    response = await get_iframe_by_item_id(item_id)  
+    # {'data': {'iframe_code':""}}
+    iframe = response.get("data").get("iframe_code")
+    if not iframe:
+        logger.error(f"获取 iframe 失败, {response}  - {item_id}")
+    return iframe
+
+# 获取无水印链接相当于加载 iframe ,不会增加多少网络负载或损耗,因此不推荐缓存,况且无水印链接 return 时常会变,也不建议缓存
+@task
+async def get_urls_task(iframe):  
+    logger.info(f"获取视频无水印下载链接,iframe: {iframe}")
+    response = await get_video_download_urls(iframe)  
+    urls = response.get("data")
+    return urls
+
+def custom_cache_download_task(data, args):  
+    file_name = runtime.flow_run.parameters.get('file_name')
+    save_file_path = args.get('save_file_path')
+    key = f"{file_name}_{save_file_path}"
+    return key
+
+
+@task(cache_key_fn=custom_cache_download_task)
+async def download_task(urls, save_file_path):  
+    logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}")
+    logger.info(f"下载视频 urls : {urls}, save_file_path : {save_file_path}")
+    res = await download_video(urls, save_file_path) 
+    return res
+
+# @task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=50))
+@task
+async def asr_task(audio_file, output_json_path):  
+    # 假设 run_asr_client 返回一个包含转录文本的 JSON 字符串或路径  
+    if await run_asr_client(audio_file, output_json_path):  # None 是占位符,根据你的实现进行调整
+        return output_json_path  # 或者返回实际的转录文本,取决于你的需求  
+    else:
+        raise Exception("err")
+
+@task  
+async def vector_task(output_json_path, collection_name, doc_path, server_addr):
+    video_asr_json = json.load(open(output_json_path))
+    asr_text = video_asr_json.get("text")
+    response = await save_doc_vector(collection_name, doc_path, server_addr)  # 这里可能需要调整参数以匹配你的实现  
+    return response    
+
+@flow
+async def get_download_url(item_id):  
+    logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}")
+    logger.info(f"runtime.flow_run.id {runtime.flow_run.id}")
+    logger.info(f"runtime.flow_run.flow_name {runtime.flow_run.flow_name}")
+    iframe = await get_iframe_task(item_id)
+    if not iframe:
+        return Failed(message=f"获取 iframe 失败,item_id: {item_id}")
+    urls = await get_urls_task(iframe)
+    return urls
+
+def generate_result_storage():
+    flow_name = runtime.flow_run.flow_name
+
+    parameters = runtime.flow_run.parameters
+    file_name = parameters["file_name"]
+
+    return os.path.join(VIDEO_TEMP_DIR, file_name)
+
+@flow(flow_run_name="{file_name}")
+async def download_video_flow(file_name , item_id, refresh_cache=False):  
+    get_iframe_task.refresh_cache = refresh_cache
+    download_task.refresh_cache = refresh_cache
+    logger.info(f"runtime.flow_run.parameters {runtime.flow_run.parameters}")
+    logger.info(f"runtime.flow_run.id {runtime.flow_run.id}")
+    logger.info(f"runtime.flow_run.flow_name {runtime.flow_run.flow_name}")
+    urls = await get_download_url(item_id)  
+    
+    save_video_path = os.path.join(VIDEO_TEMP_DIR, file_name + ".mp4")
+    path = await download_task(urls, save_video_path)
+    if not path:
+        return Failed(message=f"下载视频失败,urls : {urls}, save_file_path : {path}")
+    logger.info(f"download urls: {urls}")
+    logger.info(f"download path: {path}")
+    # logger.info(f"Cache location: {cache_location}")
+    # return path,urls
+    # iframe = await get_iframe_task(item_id)  
+    # urls = await get_urls_task(iframe)  
+    # save_file_path = os.path.join(video_temp_dir, f"{item_id}.mp4")  # 示例文件名  
+    # downloaded_file_path = await download_task(urls, save_file_path)  # 确保 download_task 正确返回下载的文件路径  
+      
+    # # 添加语音转文本任务  
+    # output_json_path = await asr_task(downloaded_file_path)  
+      
+    # # 添加向量转换存储任务(这里我们假设转录文本直接用于向量转换,但你可能需要额外的处理)  
+    # vector_response = await vector_task(output_json_path, "your_collection_name", output_json_path)  # 替换 "your_collection_name" 为实际的集合名称  
+      
+    # 这里可以添加更多的任务处理或返回结果等逻辑...  
+    # return vector_response  
+# 运行流  
+if __name__ == "__main__":  
+    import asyncio  
+    # item_id = "@9VxS1/qCUc80K2etd8wkUc7912DgP/GCPpF2qwKuJ1YTb/X460zdRmYqig357zEBKzkoKYjBMUvl9Bs6h+CwYQ=="  # 从某处获取实际的 item_id  
+    item_id = "@9VxS1/qCUc80K2etd8wkUc791mfoNf+EMpZzqQKiLVIaaPD660zdRmYqig357zEBdcJfEwgOpm1bLVAnSdwvLg=="
+    asyncio.run(download_video_flow("11", item_id))

+ 9 - 12
tool/video_get.py

@@ -21,7 +21,7 @@ ret = {
     ]
 }
 '''
-async def get_video_download_urls(url, video_type="", platform=""):  
+async def get_video_download_urls(url, video_type="", platform="", server_addr="http://10.0.0.12:9082/video-get"):  
     payload = {  
         "url": url,  
         "type": video_type,  
@@ -29,9 +29,9 @@ async def get_video_download_urls(url, video_type="", platform=""):
     }  
       
     async with httpx.AsyncClient() as client:  
-        logger.info(f"{json.dumps(payload)  }")
+        logger.debug(f"{json.dumps(payload)  }")
         response = await client.post(  
-            "http://10.0.0.12:9082/video-get",  
+            server_addr,  
             headers={"Content-Type": "application/json"},  
             content=json.dumps(payload)  
         )  
@@ -39,12 +39,9 @@ async def get_video_download_urls(url, video_type="", platform=""):
         res_json = response.json()  
           
         # 假设响应中包含'err_no'和可能的'err_msg'字段来表示错误  
-        err_no = res_json.get("code")  
+        # err_no = res_json.get("code")  
           
-        if err_no != 0:  
-            logger.error(f"Error fetching video data: {url} \tres_json: {res_json}")  
-            return
-        return res_json.get("data")
+        return res_json
 
 
 async def download_video(url, save_file_path) ->str:
@@ -64,14 +61,14 @@ async def download_video(url, save_file_path) ->str:
                         file.write(chunk)  
                 return save_file_path
             else:  
-                logger.error(f"Failed to download video,status: {response.status_code},url: {url}")    
+                logger.info(f"try to download video faild,status: {response.status_code},url: {url}")    
                 continue
-
+        # raise Exception(f"遍历所有 urls 都无法下载 {urls}")
 
 async def download_video_by_item_id(item_id, save_file_path):
-    data = await get_iframe_by_item_id(item_id)
+    response = await get_iframe_by_item_id(item_id)
     # # iframe = '<iframe width="1080" height="1920" frameborder="0" src="https://open.douyin.com/player/video?vid=7259290547288870144&autoplay=0" referrerpolicy="unsafe-url" allowfullscreen></iframe>',
-    iframe = data.get("iframe_code")
+    iframe = response.get("data").get("iframe_code")
     urls = await get_video_download_urls(iframe)
     return await download_video(urls, save_file_path)