| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- import multiprocessing
- import subprocess
- import os
- import signal
- from typing import List, Dict
- from config.logu import logger, get_logger
- class ProcessManager:
- def __init__(self):
- self.processes: Dict[str, multiprocessing.Process] = {}
- def start_process(self, command: str, process_name: str):
- """启动一个子进程并记录它"""
- if process_name in self.processes and self.processes[process_name].is_alive():
- logger.info(f"Process {process_name} is already running.")
- return
- parent_conn, child_conn = multiprocessing.Pipe()
- p = multiprocessing.Process(target=self._run_command, args=(command, child_conn))
- p.start()
- pid = parent_conn.recv() # 获取子进程的PID
- logger.info(f"Started process {process_name} with PID: {pid}")
- self.processes[process_name] = p
- return pid
- def stop_process(self, process_name: str):
- """停止指定名称的子进程"""
- if process_name in self.processes and self.processes[process_name].is_alive():
- p = self.processes[process_name]
- try:
- os.kill(p.pid, signal.CTRL_BREAK_EVENT) # 向Windows上的子进程发送CTRL_BREAK事件
- except Exception as e:
- logger.info(f"Failed to send CTRL_BREAK_EVENT to process {p.pid}: {e}")
- p.terminate()
- p.join()
- del self.processes[process_name]
- logger.info(f"Stopped process {process_name}")
- def stop_all_processes(self):
- """停止所有子进程"""
- for process_name in list(self.processes.keys()):
- self.stop_process(process_name)
- @staticmethod
- def _run_command(command, conn):
- """在子进程中运行命令并将PID通过管道返回给主进程"""
- try:
- process = subprocess.Popen(
- command,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- creationflags=subprocess.CREATE_NEW_PROCESS_GROUP # 对于Windows有用
- )
- # 将子进程的PID发送给父进程
- conn.send(process.pid)
- conn.close() # 关闭管道,避免阻塞
-
- # 持续运行,不调用 process.communicate()
- process.wait() # 等待子进程结束(如果需要)
- except Exception as e:
- conn.send((None, str(e)))
- conn.close()
- process_manager = ProcessManager()
|