manager.py 2.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. import asyncio
  2. import time
  3. from typing import Optional
  4. from fastapi import WebSocket
  5. from opendevin.core.logger import opendevin_logger as logger
  6. from .session import Session
  7. class SessionManager:
  8. _sessions: dict[str, Session] = {}
  9. cleanup_interval: int = 300
  10. session_timeout: int = 600
  11. def __init__(self):
  12. asyncio.create_task(self._cleanup_sessions())
  13. def add_or_restart_session(self, sid: str, ws_conn: WebSocket) -> Session:
  14. if sid in self._sessions:
  15. asyncio.create_task(self._sessions[sid].close())
  16. self._sessions[sid] = Session(sid=sid, ws=ws_conn)
  17. return self._sessions[sid]
  18. def get_session(self, sid: str) -> Session | None:
  19. if sid not in self._sessions:
  20. return None
  21. return self._sessions.get(sid)
  22. async def send(self, sid: str, data: dict[str, object]) -> bool:
  23. """Sends data to the client."""
  24. if sid not in self._sessions:
  25. return False
  26. return await self._sessions[sid].send(data)
  27. async def send_error(self, sid: str, message: str) -> bool:
  28. """Sends an error message to the client."""
  29. return await self.send(sid, {'error': True, 'message': message})
  30. async def send_message(self, sid: str, message: str) -> bool:
  31. """Sends a message to the client."""
  32. return await self.send(sid, {'message': message})
  33. async def _cleanup_sessions(self):
  34. while True:
  35. current_time = time.time()
  36. session_ids_to_remove = []
  37. for sid, session in list(self._sessions.items()):
  38. # if session inactive for a long time, remove it
  39. if (
  40. not session.is_alive
  41. and current_time - session.last_active_ts > self.session_timeout
  42. ):
  43. session_ids_to_remove.append(sid)
  44. for sid in session_ids_to_remove:
  45. to_del_session: Optional[Session] = self._sessions.pop(sid, None)
  46. if to_del_session is not None:
  47. await to_del_session.close()
  48. logger.info(
  49. f'Session {sid} and related resource have been removed due to inactivity.'
  50. )
  51. await asyncio.sleep(self.cleanup_interval)