from joblib import Memory import time import os import threading import os import sys sys.path.append(os.path.dirname(os.path.dirname(__file__))) from config import TEMP_DIR def create_memory(location): return Memory(location=location, verbose=0) def step1(name): print(f"{name} 执行步骤1: 获取视频链接") time.sleep(1) return name, "http://video.com/1.mp4" def step2(name, video_url): print(f"{name} 执行步骤2: 下载视频 {video_url}") time.sleep(1) return name,"/path/1.mp4" def step3(name, video_path): print(f"{name} 执行步骤3: 提取音频 {video_path}") time.sleep(1) # raise Exception("微服务器无法访问") return name, "/path/1.mp3" def step4(name, audio_path): print(f"{name} 执行步骤4: 计算语音文本 {audio_path}") time.sleep(1) return name,"语音文本内容" def user_task(name, memory): try: step1_cached = memory.cache(step1) step2_cached = memory.cache(step2) step3_cached = memory.cache(step3) step4_cached = memory.cache(step4) name, video_url = step1_cached(name) name, video_path = step2_cached(name, video_url) name, audio_path = step3_cached(name, video_path) name, text = step4_cached(name, audio_path) print(name, text) except Exception as e: print(f"出错了: {name} {e}") def main(): memory1 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user1")) memory2 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user2")) memory3 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user3")) memory4 = create_memory(os.path.join(TEMP_DIR, "joblib_cache", "user4")) user1_task = threading.Thread(target=user_task, args=("user1", memory1)) user1_task.start() user2_task = threading.Thread(target=user_task, args=("user2", memory2)) user2_task.start() user3_task = threading.Thread(target=user_task, args=("user3", memory3)) user3_task.start() user_task("user4", memory4) if __name__ == "__main__": main()