manager.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. import asyncio
  2. import time
  3. from fastapi import WebSocket
  4. from openhands.core.config import AppConfig
  5. from openhands.core.logger import openhands_logger as logger
  6. from openhands.runtime.utils.shutdown_listener import should_continue
  7. from openhands.server.session.session import Session
  8. from openhands.storage.files import FileStore
  9. class SessionManager:
  10. _sessions: dict[str, Session] = {}
  11. cleanup_interval: int = 300
  12. session_timeout: int = 600
  13. def __init__(self, config: AppConfig, file_store: FileStore):
  14. asyncio.create_task(self._cleanup_sessions())
  15. self.config = config
  16. self.file_store = file_store
  17. def add_or_restart_session(self, sid: str, ws_conn: WebSocket) -> Session:
  18. if sid in self._sessions:
  19. asyncio.create_task(self._sessions[sid].close())
  20. self._sessions[sid] = Session(
  21. sid=sid, file_store=self.file_store, ws=ws_conn, config=self.config
  22. )
  23. return self._sessions[sid]
  24. def get_session(self, sid: str) -> Session | None:
  25. if sid not in self._sessions:
  26. return None
  27. return self._sessions.get(sid)
  28. async def send(self, sid: str, data: dict[str, object]) -> bool:
  29. """Sends data to the client."""
  30. session = self.get_session(sid)
  31. if session is None:
  32. logger.error(f'*** No session found for {sid}, skipping message ***')
  33. return False
  34. return await session.send(data)
  35. async def send_error(self, sid: str, message: str) -> bool:
  36. """Sends an error message to the client."""
  37. return await self.send(sid, {'error': True, 'message': message})
  38. async def send_message(self, sid: str, message: str) -> bool:
  39. """Sends a message to the client."""
  40. return await self.send(sid, {'message': message})
  41. async def _cleanup_sessions(self):
  42. while should_continue():
  43. current_time = time.time()
  44. session_ids_to_remove = []
  45. for sid, session in list(self._sessions.items()):
  46. # if session inactive for a long time, remove it
  47. if (
  48. not session.is_alive
  49. and current_time - session.last_active_ts > self.session_timeout
  50. ):
  51. session_ids_to_remove.append(sid)
  52. for sid in session_ids_to_remove:
  53. to_del_session: Session | None = self._sessions.pop(sid, None)
  54. if to_del_session is not None:
  55. await to_del_session.close()
  56. logger.info(
  57. f'Session {sid} and related resource have been removed due to inactivity.'
  58. )
  59. await asyncio.sleep(self.cleanup_interval)