process_mgr.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. import asyncio
  2. import os
  3. import signal
  4. import subprocess
  5. import sys
  6. import time
  7. import atexit
  8. import platform
  9. from pathlib import Path
  10. from typing import Dict, Optional
  11. from utils.logu import get_logger,LOG_DIR
  12. from collections.abc import MutableMapping
  13. # Windows特定导入
  14. if platform.system() == 'Windows':
  15. import win32api
  16. import win32job
  17. import win32con
  18. logger = get_logger('process_mgr')
  19. class ProcessManager:
  20. def __init__(self):
  21. self.processes: MutableMapping[str, dict] = {}
  22. self.job_object = None
  23. self.lock = asyncio.Lock() # 直接初始化锁
  24. if platform.system() == 'Windows':
  25. self._create_windows_job()
  26. # self._register_signal_handlers()
  27. # atexit.register(self._sync_cleanup)
  28. def _create_windows_job(self):
  29. """创建Windows作业对象用于进程生命周期管理"""
  30. try:
  31. self.job_object = win32job.CreateJobObject(None, "")
  32. info = win32job.QueryInformationJobObject(
  33. self.job_object,
  34. win32job.JobObjectExtendedLimitInformation
  35. )
  36. info['BasicLimitInformation']['LimitFlags'] = (
  37. win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE |
  38. win32job.JOB_OBJECT_LIMIT_SILENT_BREAKAWAY_OK
  39. )
  40. win32job.SetInformationJobObject(
  41. self.job_object,
  42. win32job.JobObjectExtendedLimitInformation,
  43. info
  44. )
  45. except Exception as e:
  46. logger.error(f"Windows job object creation failed: {e}")
  47. def _register_signal_handlers(self):
  48. """跨平台信号处理注册"""
  49. if platform.system() == 'Windows':
  50. self._setup_windows_ctrl_handler()
  51. else:
  52. self._setup_unix_signal_handlers()
  53. def _setup_windows_ctrl_handler(self):
  54. """Windows控制台事件处理"""
  55. try:
  56. import win32api
  57. win32api.SetConsoleCtrlHandler(self._windows_ctrl_handler, True)
  58. except ImportError:
  59. logger.warning("pywin32 not installed, Windows signal handling disabled")
  60. except Exception as e:
  61. logger.error(f"Windows control handler setup failed: {str(e)}")
  62. def _windows_ctrl_handler(self, dwCtrlType):
  63. """Windows控制台事件回调"""
  64. if dwCtrlType in {win32con.CTRL_C_EVENT, win32con.CTRL_BREAK_EVENT}:
  65. try:
  66. # 使用独立事件循环处理清理
  67. loop = asyncio.new_event_loop()
  68. asyncio.set_event_loop(loop)
  69. loop.run_until_complete(self.cleanup())
  70. loop.close()
  71. # 退出应用程序
  72. os._exit(0)
  73. return True
  74. except Exception as e:
  75. logger.error(f"Critical error during cleanup: {str(e)}")
  76. os._exit(1)
  77. return False
  78. def _setup_unix_signal_handlers(self):
  79. """Unix信号处理配置"""
  80. loop = asyncio.get_running_loop()
  81. for sig in (signal.SIGTERM, signal.SIGINT):
  82. try:
  83. loop.add_signal_handler(sig, self._unix_signal_handler, sig)
  84. except NotImplementedError:
  85. logger.warning(f"Signal {sig} not supported on this platform")
  86. def _unix_signal_handler(self, signum):
  87. """Unix信号处理回调"""
  88. logger.info(f"Received signal {signum.name}")
  89. asyncio.create_task(self.cleanup())
  90. async def start_process(
  91. self,
  92. name: str,
  93. command: list,
  94. cwd:str = os.getcwd(),
  95. log_dir: Path = LOG_DIR / "process_mgr",
  96. env: Optional[Dict[str, str]] = None
  97. ) -> Optional[int]:
  98. """启动并管理后台进程"""
  99. async with self.lock:
  100. if name in self.processes:
  101. logger.warning(f"Process {name} already exists")
  102. return None
  103. log_dir.mkdir(parents=True, exist_ok=True)
  104. log_file = log_dir / f"{name}.log"
  105. try:
  106. log_fd = open(log_file, "ab")
  107. process = await asyncio.create_subprocess_exec(
  108. *command,
  109. stdout=log_fd,
  110. cwd=cwd,
  111. stderr=subprocess.STDOUT,
  112. stdin=subprocess.DEVNULL,
  113. start_new_session=True,
  114. env=env
  115. )
  116. if platform.system() == 'Windows' and self.job_object:
  117. self._bind_to_windows_job(process.pid)
  118. self.processes[name] = {
  119. "process": process,
  120. "log_file": log_file,
  121. "start_time": time.time(),
  122. "log_fd": log_fd,
  123. "pid": process.pid
  124. }
  125. logger.info(f"Started process {name} (PID: {process.pid}) , {" ".join(command)}")
  126. return process.pid
  127. except Exception as e:
  128. logger.error(f"Failed to start {name}: {str(e)}")
  129. if 'log_fd' in locals():
  130. log_fd.close()
  131. return None
  132. def _bind_to_windows_job(self, pid: int):
  133. """将进程绑定到Windows作业对象"""
  134. try:
  135. h_process = win32api.OpenProcess(
  136. win32con.PROCESS_ALL_ACCESS, False, pid
  137. )
  138. win32job.AssignProcessToJobObject(self.job_object, h_process)
  139. win32api.CloseHandle(h_process)
  140. except Exception as e:
  141. logger.error(f"Windows job assignment failed: {str(e)}")
  142. async def stop_process(self, name: str) -> bool:
  143. """停止指定进程"""
  144. async with self.lock:
  145. if name not in self.processes:
  146. return False
  147. proc_info = self.processes[name]
  148. process = proc_info["process"]
  149. log_fd = proc_info["log_fd"]
  150. try:
  151. if process.returncode is not None:
  152. del self.processes[name]
  153. log_fd.close()
  154. return True
  155. if platform.system() == 'Windows':
  156. subprocess.run(
  157. ["taskkill", "/F", "/T", "/PID", str(process.pid)],
  158. check=True,
  159. stdout=subprocess.DEVNULL,
  160. stderr=subprocess.DEVNULL
  161. )
  162. else:
  163. os.killpg(os.getpgid(process.pid), signal.SIGTERM)
  164. await process.wait()
  165. logger.info(f"Stopped process {name} (PID: {process.pid})")
  166. return True
  167. except subprocess.CalledProcessError:
  168. logger.warning(f"Force terminating {name} (PID: {process.pid})")
  169. process.terminate()
  170. await process.wait()
  171. return True
  172. except Exception as e:
  173. logger.error(f"Error stopping {name}: {str(e)}")
  174. return False
  175. finally:
  176. log_fd.close()
  177. del self.processes[name]
  178. async def cleanup(self):
  179. """异步清理所有资源"""
  180. async with self.lock:
  181. logger.info("Performing async cleanup...")
  182. for name in list(self.processes.keys()):
  183. await self.stop_process(name)
  184. if platform.system() == 'Windows' and self.job_object:
  185. win32api.CloseHandle(self.job_object)
  186. self.job_object = None
  187. def _sync_cleanup(self):
  188. """同步清理用于atexit"""
  189. if platform.system() == 'Windows':
  190. asyncio.run(self.cleanup())
  191. else:
  192. try:
  193. loop = asyncio.get_event_loop()
  194. if loop.is_running():
  195. loop.create_task(self.cleanup())
  196. else:
  197. loop.run_until_complete(self.cleanup())
  198. except RuntimeError:
  199. pass
  200. process_manager = ProcessManager()