processes_mgr.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import multiprocessing
  2. import subprocess
  3. import os
  4. import signal
  5. from typing import List, Dict
  6. from config.logu import logger, get_logger
  7. class ProcessManager:
  8. def __init__(self):
  9. self.processes: Dict[str, multiprocessing.Process] = {}
  10. def start_process(self, command: str, process_name: str):
  11. """启动一个子进程并记录它"""
  12. if process_name in self.processes and self.processes[process_name].is_alive():
  13. logger.info(f"Process {process_name} is already running.")
  14. return
  15. parent_conn, child_conn = multiprocessing.Pipe()
  16. p = multiprocessing.Process(target=self._run_command, args=(command, child_conn))
  17. p.start()
  18. pid = parent_conn.recv() # 获取子进程的PID
  19. logger.info(f"Started process {process_name} with PID: {pid}")
  20. self.processes[process_name] = p
  21. return pid
  22. def stop_process(self, process_name: str):
  23. """停止指定名称的子进程"""
  24. if process_name in self.processes and self.processes[process_name].is_alive():
  25. p = self.processes[process_name]
  26. try:
  27. os.kill(p.pid, signal.CTRL_BREAK_EVENT) # 向Windows上的子进程发送CTRL_BREAK事件
  28. except Exception as e:
  29. logger.info(f"Failed to send CTRL_BREAK_EVENT to process {p.pid}: {e}")
  30. p.terminate()
  31. p.join()
  32. del self.processes[process_name]
  33. logger.info(f"Stopped process {process_name}")
  34. def stop_all_processes(self):
  35. """停止所有子进程"""
  36. for process_name in list(self.processes.keys()):
  37. self.stop_process(process_name)
  38. @staticmethod
  39. def _run_command(command, conn):
  40. """在子进程中运行命令并将PID通过管道返回给主进程"""
  41. try:
  42. process = subprocess.Popen(
  43. command,
  44. stdout=subprocess.PIPE,
  45. stderr=subprocess.PIPE,
  46. creationflags=subprocess.CREATE_NEW_PROCESS_GROUP # 对于Windows有用
  47. )
  48. # 将子进程的PID发送给父进程
  49. conn.send(process.pid)
  50. conn.close() # 关闭管道,避免阻塞
  51. # 持续运行,不调用 process.communicate()
  52. process.wait() # 等待子进程结束(如果需要)
  53. except Exception as e:
  54. conn.send((None, str(e)))
  55. conn.close()
  56. process_manager = ProcessManager()