video_asr_job.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. from joblib import Memory
  2. import time
  3. import os
  4. import threading
  5. import os
  6. import sys
  7. sys.path.append(os.path.dirname(os.path.dirname(__file__)))
  8. from config import TEMP_DIR
  9. def create_memory(location):
  10. return Memory(location=location, verbose=0)
  11. def step1(name):
  12. print(f"{name} 执行步骤1: 获取视频链接")
  13. time.sleep(1)
  14. return name, "http://video.com/1.mp4"
  15. def step2(name, video_url):
  16. print(f"{name} 执行步骤2: 下载视频 {video_url}")
  17. time.sleep(1)
  18. return name,"/path/1.mp4"
  19. def step3(name, video_path):
  20. print(f"{name} 执行步骤3: 提取音频 {video_path}")
  21. time.sleep(1)
  22. # raise Exception("微服务器无法访问")
  23. return name, "/path/1.mp3"
  24. def step4(name, audio_path):
  25. print(f"{name} 执行步骤4: 计算语音文本 {audio_path}")
  26. time.sleep(1)
  27. return name,"语音文本内容"
  28. def user_task(name, memory):
  29. try:
  30. step1_cached = memory.cache(step1)
  31. step2_cached = memory.cache(step2)
  32. step3_cached = memory.cache(step3)
  33. step4_cached = memory.cache(step4)
  34. name, video_url = step1_cached(name)
  35. name, video_path = step2_cached(name, video_url)
  36. name, audio_path = step3_cached(name, video_path)
  37. name, text = step4_cached(name, audio_path)
  38. print(name, text)
  39. except Exception as e:
  40. print(f"出错了: {name} {e}")
  41. def main():
  42. memory1 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user1"))
  43. memory2 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user2"))
  44. memory3 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user3"))
  45. memory4 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user4"))
  46. user1_task = threading.Thread(target=user_task, args=("user1", memory1))
  47. user1_task.start()
  48. user2_task = threading.Thread(target=user_task, args=("user2", memory2))
  49. user2_task.start()
  50. user3_task = threading.Thread(target=user_task, args=("user3", memory3))
  51. user3_task.start()
  52. user_task("user4", memory4)
  53. if __name__ == "__main__":
  54. main()