import multiprocessing import subprocess import os import signal from typing import List, Dict from config.logu import logger, get_logger class ProcessManager: def __init__(self): self.processes: Dict[str, multiprocessing.Process] = {} def start_process(self, command: str, process_name: str): """启动一个子进程并记录它""" if process_name in self.processes and self.processes[process_name].is_alive(): logger.info(f"Process {process_name} is already running.") return parent_conn, child_conn = multiprocessing.Pipe() p = multiprocessing.Process(target=self._run_command, args=(command, child_conn)) p.start() pid = parent_conn.recv() # 获取子进程的PID logger.info(f"Started process {process_name} with PID: {pid}") self.processes[process_name] = p return pid def stop_process(self, process_name: str): """停止指定名称的子进程""" if process_name in self.processes and self.processes[process_name].is_alive(): p = self.processes[process_name] try: os.kill(p.pid, signal.CTRL_BREAK_EVENT) # 向Windows上的子进程发送CTRL_BREAK事件 except Exception as e: logger.info(f"Failed to send CTRL_BREAK_EVENT to process {p.pid}: {e}") p.terminate() p.join() del self.processes[process_name] logger.info(f"Stopped process {process_name}") def stop_all_processes(self): """停止所有子进程""" for process_name in list(self.processes.keys()): self.stop_process(process_name) @staticmethod def _run_command(command, conn): """在子进程中运行命令并将PID通过管道返回给主进程""" try: process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP # 对于Windows有用 ) # 将子进程的PID发送给父进程 conn.send(process.pid) conn.close() # 关闭管道,避免阻塞 # 持续运行,不调用 process.communicate() process.wait() # 等待子进程结束(如果需要) except Exception as e: conn.send((None, str(e))) conn.close() process_manager = ProcessManager()