|
|
@@ -1,63 +1,225 @@
|
|
|
-import multiprocessing
|
|
|
-import subprocess
|
|
|
+import asyncio
|
|
|
import os
|
|
|
import signal
|
|
|
-from typing import List, Dict
|
|
|
-from utils.logu import logger, get_logger
|
|
|
+import subprocess
|
|
|
+import sys
|
|
|
+import time
|
|
|
+import atexit
|
|
|
+import platform
|
|
|
+from pathlib import Path
|
|
|
+from typing import Dict, Optional
|
|
|
+from utils.logu import get_logger,LOG_DIR
|
|
|
+from collections.abc import MutableMapping
|
|
|
+
|
|
|
+# Windows特定导入
|
|
|
+if platform.system() == 'Windows':
|
|
|
+ import win32api
|
|
|
+ import win32job
|
|
|
+ import win32con
|
|
|
+
|
|
|
+logger = get_logger('process_mgr')
|
|
|
|
|
|
class ProcessManager:
|
|
|
def __init__(self):
|
|
|
- self.processes: Dict[str, multiprocessing.Process] = {}
|
|
|
-
|
|
|
- def start_process(self, command: str, process_name: str):
|
|
|
- """启动一个子进程并记录它"""
|
|
|
- if process_name in self.processes and self.processes[process_name].is_alive():
|
|
|
- logger.info(f"Process {process_name} is already running.")
|
|
|
- return
|
|
|
-
|
|
|
- parent_conn, child_conn = multiprocessing.Pipe()
|
|
|
- p = multiprocessing.Process(target=self._run_command, args=(command, child_conn))
|
|
|
- p.start()
|
|
|
- pid = parent_conn.recv() # 获取子进程的PID
|
|
|
- logger.info(f"Started process {process_name} with PID: {pid}")
|
|
|
- self.processes[process_name] = p
|
|
|
- return pid
|
|
|
- def stop_process(self, process_name: str):
|
|
|
- """停止指定名称的子进程"""
|
|
|
- if process_name in self.processes and self.processes[process_name].is_alive():
|
|
|
- p = self.processes[process_name]
|
|
|
+ self.processes: MutableMapping[str, dict] = {}
|
|
|
+ self.job_object = None
|
|
|
+ self.lock = asyncio.Lock()
|
|
|
+
|
|
|
+ if platform.system() == 'Windows':
|
|
|
+ self._create_windows_job()
|
|
|
+
|
|
|
+ self._register_signal_handlers()
|
|
|
+ atexit.register(self._sync_cleanup)
|
|
|
+
|
|
|
+ def _create_windows_job(self):
|
|
|
+ """创建Windows作业对象用于进程生命周期管理"""
|
|
|
+ try:
|
|
|
+ self.job_object = win32job.CreateJobObject(None, "")
|
|
|
+ info = win32job.QueryInformationJobObject(
|
|
|
+ self.job_object,
|
|
|
+ win32job.JobObjectExtendedLimitInformation
|
|
|
+ )
|
|
|
+ info['BasicLimitInformation']['LimitFlags'] = (
|
|
|
+ win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE |
|
|
|
+ win32job.JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK
|
|
|
+ )
|
|
|
+ win32job.SetInformationJobObject(
|
|
|
+ self.job_object,
|
|
|
+ win32job.JobObjectExtendedLimitInformation,
|
|
|
+ info
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Windows job object creation failed: {e}")
|
|
|
+
|
|
|
+ def _register_signal_handlers(self):
|
|
|
+ """跨平台信号处理注册"""
|
|
|
+ if platform.system() == 'Windows':
|
|
|
+ self._setup_windows_ctrl_handler()
|
|
|
+ else:
|
|
|
+ self._setup_unix_signal_handlers()
|
|
|
+ def _setup_windows_ctrl_handler(self):
|
|
|
+ """Windows控制台事件处理"""
|
|
|
+ try:
|
|
|
+ import win32api
|
|
|
+ win32api.SetConsoleCtrlHandler(self._windows_ctrl_handler, True)
|
|
|
+ except ImportError:
|
|
|
+ logger.warning("pywin32 not installed, Windows signal handling disabled")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Windows control handler setup failed: {str(e)}")
|
|
|
+
|
|
|
+ def _windows_ctrl_handler(self, dwCtrlType):
|
|
|
+ """Windows控制台事件回调"""
|
|
|
+ if dwCtrlType in {win32con.CTRL_C_EVENT, win32con.CTRL_BREAK_EVENT}:
|
|
|
+ asyncio.create_task(self.cleanup())
|
|
|
+ return True # 表示已处理该事件
|
|
|
+ return False # 继续传递事件
|
|
|
+
|
|
|
+ def _setup_unix_signal_handlers(self):
|
|
|
+ """Unix信号处理配置"""
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
+ for sig in (signal.SIGTERM, signal.SIGINT):
|
|
|
+ try:
|
|
|
+ loop.add_signal_handler(sig, self._unix_signal_handler, sig)
|
|
|
+ except NotImplementedError:
|
|
|
+ logger.warning(f"Signal {sig} not supported on this platform")
|
|
|
+
|
|
|
+ def _unix_signal_handler(self, signum):
|
|
|
+ """Unix信号处理回调"""
|
|
|
+ logger.info(f"Received signal {signum.name}")
|
|
|
+ asyncio.create_task(self.cleanup())
|
|
|
+ def _signal_handler(self):
|
|
|
+ """信号处理入口"""
|
|
|
+ asyncio.create_task(self.cleanup())
|
|
|
+
|
|
|
+ async def start_process(
|
|
|
+ self,
|
|
|
+ name: str,
|
|
|
+ command: list,
|
|
|
+ log_dir: Path = LOG_DIR / "process_mgr"
|
|
|
+ ) -> Optional[int]:
|
|
|
+ """启动并管理后台进程"""
|
|
|
+ async with self.lock:
|
|
|
+ if name in self.processes:
|
|
|
+ logger.warning(f"Process {name} already exists")
|
|
|
+ return None
|
|
|
+
|
|
|
+ log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
+ log_file = log_dir / f"{name}.log"
|
|
|
+
|
|
|
try:
|
|
|
- os.kill(p.pid, signal.CTRL_BREAK_EVENT) # 向Windows上的子进程发送CTRL_BREAK事件
|
|
|
+ # 使用二进制追加模式打开日志文件
|
|
|
+ log_fd = open(log_file, "ab")
|
|
|
+
|
|
|
+ # 创建子进程
|
|
|
+ process = await asyncio.create_subprocess_exec(
|
|
|
+ *command,
|
|
|
+ stdout=log_fd,
|
|
|
+ stderr=subprocess.STDOUT,
|
|
|
+ stdin=subprocess.DEVNULL,
|
|
|
+ start_new_session=True # 重要:创建新会话/进程组
|
|
|
+ )
|
|
|
+
|
|
|
+ # Windows作业对象绑定
|
|
|
+ if platform.system() == 'Windows' and self.job_object:
|
|
|
+ self._bind_to_windows_job(process.pid)
|
|
|
+
|
|
|
+ self.processes[name] = {
|
|
|
+ "process": process,
|
|
|
+ "log_file": log_file,
|
|
|
+ "start_time": time.time(),
|
|
|
+ "log_fd": log_fd
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.info(f"Started process {name} (PID: {process.pid})")
|
|
|
+ return process.pid
|
|
|
+
|
|
|
except Exception as e:
|
|
|
- logger.info(f"Failed to send CTRL_BREAK_EVENT to process {p.pid}: {e}")
|
|
|
- p.terminate()
|
|
|
- p.join()
|
|
|
- del self.processes[process_name]
|
|
|
- logger.info(f"Stopped process {process_name}")
|
|
|
-
|
|
|
- def stop_all_processes(self):
|
|
|
- """停止所有子进程"""
|
|
|
- for process_name in list(self.processes.keys()):
|
|
|
- self.stop_process(process_name)
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def _run_command(command, conn):
|
|
|
- """在子进程中运行命令并将PID通过管道返回给主进程"""
|
|
|
+ logger.error(f"Failed to start {name}: {str(e)}")
|
|
|
+ if 'log_fd' in locals():
|
|
|
+ log_fd.close()
|
|
|
+ return None
|
|
|
+
|
|
|
+ def _bind_to_windows_job(self, pid: int):
|
|
|
+ """将进程绑定到Windows作业对象"""
|
|
|
try:
|
|
|
- process = subprocess.Popen(
|
|
|
- command,
|
|
|
- stdout=subprocess.PIPE,
|
|
|
- stderr=subprocess.PIPE,
|
|
|
- creationflags=subprocess.CREATE_NEW_PROCESS_GROUP # 对于Windows有用
|
|
|
+ h_process = win32api.OpenProcess(
|
|
|
+ win32con.PROCESS_ALL_ACCESS, False, pid
|
|
|
)
|
|
|
- # 将子进程的PID发送给父进程
|
|
|
- conn.send(process.pid)
|
|
|
- conn.close() # 关闭管道,避免阻塞
|
|
|
-
|
|
|
- # 持续运行,不调用 process.communicate()
|
|
|
- # process.wait() # 等待子进程结束(如果需要)
|
|
|
+ win32job.AssignProcessToJobObject(self.job_object, h_process)
|
|
|
+ win32api.CloseHandle(h_process)
|
|
|
except Exception as e:
|
|
|
- conn.send((None, str(e)))
|
|
|
- conn.close()
|
|
|
+ logger.error(f"Windows job assignment failed: {str(e)}")
|
|
|
+
|
|
|
+ async def stop_process(self, name: str) -> bool:
|
|
|
+ """停止指定进程"""
|
|
|
+ async with self.lock:
|
|
|
+ if name not in self.processes:
|
|
|
+ return False
|
|
|
+
|
|
|
+ proc_info = self.processes[name]
|
|
|
+ process = proc_info["process"]
|
|
|
+ log_fd = proc_info["log_fd"]
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 进程已自然退出
|
|
|
+ if process.returncode is not None:
|
|
|
+ del self.processes[name]
|
|
|
+ log_fd.close()
|
|
|
+ return True
|
|
|
+
|
|
|
+ # 跨平台终止逻辑
|
|
|
+ if platform.system() == 'Windows':
|
|
|
+ subprocess.run(
|
|
|
+ ["taskkill", "/F", "/T", "/PID", str(process.pid)],
|
|
|
+ check=True,
|
|
|
+ stdout=subprocess.DEVNULL,
|
|
|
+ stderr=subprocess.DEVNULL
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ # 发送信号到整个进程组
|
|
|
+ os.killpg(os.getpgid(process.pid), signal.SIGTERM)
|
|
|
+
|
|
|
+ # 等待进程终止
|
|
|
+ await process.wait()
|
|
|
+ logger.info(f"Stopped process {name} (PID: {process.pid})")
|
|
|
+ return True
|
|
|
+
|
|
|
+ except subprocess.CalledProcessError:
|
|
|
+ logger.warning(f"Force terminating {name} (PID: {process.pid})")
|
|
|
+ process.terminate()
|
|
|
+ await process.wait()
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error stopping {name}: {str(e)}")
|
|
|
+ return False
|
|
|
+ finally:
|
|
|
+ log_fd.close()
|
|
|
+ del self.processes[name]
|
|
|
+
|
|
|
+ async def cleanup(self):
|
|
|
+ """异步清理所有资源"""
|
|
|
+ async with self.lock:
|
|
|
+ logger.info("Performing async cleanup...")
|
|
|
+ for name in list(self.processes.keys()):
|
|
|
+ await self.stop_process(name)
|
|
|
+
|
|
|
+ # 清理Windows作业对象
|
|
|
+ if platform.system() == 'Windows' and self.job_object:
|
|
|
+ win32api.CloseHandle(self.job_object)
|
|
|
+ self.job_object = None
|
|
|
+
|
|
|
+ def _sync_cleanup(self):
|
|
|
+ """同步清理用于atexit"""
|
|
|
+ if platform.system() == 'Windows':
|
|
|
+ asyncio.run(self.cleanup())
|
|
|
+ else:
|
|
|
+ try:
|
|
|
+ loop = asyncio.get_event_loop()
|
|
|
+ if loop.is_running():
|
|
|
+ loop.create_task(self.cleanup())
|
|
|
+ else:
|
|
|
+ loop.run_until_complete(self.cleanup())
|
|
|
+ except RuntimeError:
|
|
|
+ pass
|
|
|
+
|
|
|
|
|
|
-process_manager = ProcessManager()
|