import asyncio import os import signal import subprocess import sys import time import atexit import platform from pathlib import Path from typing import Dict, Optional from utils.logu import get_logger,LOG_DIR from collections.abc import MutableMapping # Windows特定导入 if platform.system() == 'Windows': import win32api import win32job import win32con logger = get_logger('process_mgr') class ProcessManager: def __init__(self): self.processes: MutableMapping[str, dict] = {} self.job_object = None self.lock = asyncio.Lock() # 直接初始化锁 if platform.system() == 'Windows': self._create_windows_job() # self._register_signal_handlers() # atexit.register(self._sync_cleanup) def _create_windows_job(self): """创建Windows作业对象用于进程生命周期管理""" try: self.job_object = win32job.CreateJobObject(None, "") info = win32job.QueryInformationJobObject( self.job_object, win32job.JobObjectExtendedLimitInformation ) info['BasicLimitInformation']['LimitFlags'] = ( win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | win32job.JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK ) win32job.SetInformationJobObject( self.job_object, win32job.JobObjectExtendedLimitInformation, info ) except Exception as e: logger.error(f"Windows job object creation failed: {e}") def _register_signal_handlers(self): """跨平台信号处理注册""" if platform.system() == 'Windows': self._setup_windows_ctrl_handler() else: self._setup_unix_signal_handlers() def _setup_windows_ctrl_handler(self): """Windows控制台事件处理""" try: import win32api win32api.SetConsoleCtrlHandler(self._windows_ctrl_handler, True) except ImportError: logger.warning("pywin32 not installed, Windows signal handling disabled") except Exception as e: logger.error(f"Windows control handler setup failed: {str(e)}") def _windows_ctrl_handler(self, dwCtrlType): """Windows控制台事件回调""" if dwCtrlType in {win32con.CTRL_C_EVENT, win32con.CTRL_BREAK_EVENT}: try: # 使用独立事件循环处理清理 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(self.cleanup()) loop.close() # 退出应用程序 os._exit(0) return True except Exception as e: logger.error(f"Critical error during cleanup: {str(e)}") os._exit(1) return False def _setup_unix_signal_handlers(self): """Unix信号处理配置""" loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): try: loop.add_signal_handler(sig, self._unix_signal_handler, sig) except NotImplementedError: logger.warning(f"Signal {sig} not supported on this platform") def _unix_signal_handler(self, signum): """Unix信号处理回调""" logger.info(f"Received signal {signum.name}") asyncio.create_task(self.cleanup()) async def start_process( self, name: str, command: list, cwd:str = os.getcwd(), log_dir: Path = LOG_DIR / "process_mgr" ) -> Optional[int]: """启动并管理后台进程""" async with self.lock: if name in self.processes: logger.warning(f"Process {name} already exists") return None log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / f"{name}.log" try: log_fd = open(log_file, "ab") process = await asyncio.create_subprocess_exec( *command, stdout=log_fd, cwd=cwd, stderr=subprocess.STDOUT, stdin=subprocess.DEVNULL, start_new_session=True ) if platform.system() == 'Windows' and self.job_object: self._bind_to_windows_job(process.pid) self.processes[name] = { "process": process, "log_file": log_file, "start_time": time.time(), "log_fd": log_fd, "pid": process.pid } logger.info(f"Started process {name} (PID: {process.pid})") return process.pid except Exception as e: logger.error(f"Failed to start {name}: {str(e)}") if 'log_fd' in locals(): log_fd.close() return None def _bind_to_windows_job(self, pid: int): """将进程绑定到Windows作业对象""" try: h_process = win32api.OpenProcess( win32con.PROCESS_ALL_ACCESS, False, pid ) win32job.AssignProcessToJobObject(self.job_object, h_process) win32api.CloseHandle(h_process) except Exception as e: logger.error(f"Windows job assignment failed: {str(e)}") async def stop_process(self, name: str) -> bool: """停止指定进程""" async with self.lock: if name not in self.processes: return False proc_info = self.processes[name] process = proc_info["process"] log_fd = proc_info["log_fd"] try: if process.returncode is not None: del self.processes[name] log_fd.close() return True if platform.system() == 'Windows': subprocess.run( ["taskkill", "/F", "/T", "/PID", str(process.pid)], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) else: os.killpg(os.getpgid(process.pid), signal.SIGTERM) await process.wait() logger.info(f"Stopped process {name} (PID: {process.pid})") return True except subprocess.CalledProcessError: logger.warning(f"Force terminating {name} (PID: {process.pid})") process.terminate() await process.wait() return True except Exception as e: logger.error(f"Error stopping {name}: {str(e)}") return False finally: log_fd.close() del self.processes[name] async def cleanup(self): """异步清理所有资源""" async with self.lock: logger.info("Performing async cleanup...") for name in list(self.processes.keys()): await self.stop_process(name) if platform.system() == 'Windows' and self.job_object: win32api.CloseHandle(self.job_object) self.job_object = None def _sync_cleanup(self): """同步清理用于atexit""" if platform.system() == 'Windows': asyncio.run(self.cleanup()) else: try: loop = asyncio.get_event_loop() if loop.is_running(): loop.create_task(self.cleanup()) else: loop.run_until_complete(self.cleanup()) except RuntimeError: pass process_manager = ProcessManager()