manager.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import asyncio
  2. import time
  3. from typing import Optional
  4. from fastapi import WebSocket
  5. from openhands.core.config import AppConfig
  6. from openhands.core.logger import openhands_logger as logger
  7. from openhands.runtime.utils.shutdown_listener import should_continue
  8. from openhands.server.session.session import Session
  9. from openhands.storage.files import FileStore
  10. class SessionManager:
  11. _sessions: dict[str, Session] = {}
  12. cleanup_interval: int = 300
  13. session_timeout: int = 600
  14. _session_cleanup_task: Optional[asyncio.Task] = None
  15. def __init__(self, config: AppConfig, file_store: FileStore):
  16. self.config = config
  17. self.file_store = file_store
  18. async def __aenter__(self):
  19. if not self._session_cleanup_task:
  20. self._session_cleanup_task = asyncio.create_task(self._cleanup_sessions())
  21. return self
  22. async def __aexit__(self, exc_type, exc_value, traceback):
  23. if self._session_cleanup_task:
  24. self._session_cleanup_task.cancel()
  25. self._session_cleanup_task = None
  26. def add_or_restart_session(self, sid: str, ws_conn: WebSocket) -> Session:
  27. if sid in self._sessions:
  28. asyncio.create_task(self._sessions[sid].close())
  29. self._sessions[sid] = Session(
  30. sid=sid, file_store=self.file_store, ws=ws_conn, config=self.config
  31. )
  32. return self._sessions[sid]
  33. def get_session(self, sid: str) -> Session | None:
  34. if sid not in self._sessions:
  35. return None
  36. return self._sessions.get(sid)
  37. async def send(self, sid: str, data: dict[str, object]) -> bool:
  38. """Sends data to the client."""
  39. session = self.get_session(sid)
  40. if session is None:
  41. logger.error(f'*** No session found for {sid}, skipping message ***')
  42. return False
  43. return await session.send(data)
  44. async def send_error(self, sid: str, message: str) -> bool:
  45. """Sends an error message to the client."""
  46. return await self.send(sid, {'error': True, 'message': message})
  47. async def send_message(self, sid: str, message: str) -> bool:
  48. """Sends a message to the client."""
  49. return await self.send(sid, {'message': message})
  50. async def _cleanup_sessions(self):
  51. while should_continue():
  52. current_time = time.time()
  53. session_ids_to_remove = []
  54. for sid, session in list(self._sessions.items()):
  55. # if session inactive for a long time, remove it
  56. if (
  57. not session.is_alive
  58. and current_time - session.last_active_ts > self.session_timeout
  59. ):
  60. session_ids_to_remove.append(sid)
  61. for sid in session_ids_to_remove:
  62. to_del_session: Session | None = self._sessions.pop(sid, None)
  63. if to_del_session is not None:
  64. await to_del_session.close()
  65. logger.info(
  66. f'Session {sid} and related resource have been removed due to inactivity.'
  67. )
  68. await asyncio.sleep(self.cleanup_interval)