Bläddra i källkod

fix session leak (#5656)

Robert Brennan 1 år sedan
förälder
incheckning
6ae84bf992
2 ändrade filer med 37 tillägg och 20 borttagningar
  1. 36 20
      openhands/server/session/agent_session.py
  2. 1 0
      openhands/server/session/manager.py

+ 36 - 20
openhands/server/session/agent_session.py

@@ -16,6 +16,9 @@ from openhands.security import SecurityAnalyzer, options
 from openhands.storage.files import FileStore
 from openhands.utils.async_utils import call_async_from_sync
 
+WAIT_TIME_BEFORE_CLOSE = 300
+WAIT_TIME_BEFORE_CLOSE_INTERVAL = 5
+
 
 class AgentSession:
     """Represents a session with an Agent
@@ -30,6 +33,7 @@ class AgentSession:
     controller: AgentController | None = None
     runtime: Runtime | None = None
     security_analyzer: SecurityAnalyzer | None = None
+    _initializing: bool = False
     _closed: bool = False
     loop: asyncio.AbstractEventLoop | None = None
 
@@ -111,6 +115,10 @@ class AgentSession:
         github_token: str | None = None,
         selected_repository: str | None = None,
     ):
+        if self._closed:
+            logger.warning('Session closed before starting')
+            return
+        self._initializing = True
         self._create_security_analyzer(config.security.security_analyzer)
         await self._create_runtime(
             runtime_name=runtime_name,
@@ -120,7 +128,7 @@ class AgentSession:
             selected_repository=selected_repository,
         )
 
-        self._create_controller(
+        self.controller = self._create_controller(
             agent,
             config.security.confirmation_mode,
             max_iterations,
@@ -131,9 +139,9 @@ class AgentSession:
         self.event_stream.add_event(
             ChangeAgentStateAction(AgentState.INIT), EventSource.ENVIRONMENT
         )
-        if self.controller:
-            self.controller.agent_task = self.controller.start_step_loop()
-            await self.controller.agent_task  # type: ignore
+        self.controller.agent_task = self.controller.start_step_loop()
+        self._initializing = False
+        await self.controller.agent_task  # type: ignore
 
     def close(self):
         """Closes the Agent session"""
@@ -143,6 +151,18 @@ class AgentSession:
         call_async_from_sync(self._close)
 
     async def _close(self):
+        seconds_waited = 0
+        while self._initializing:
+            logger.debug(
+                f'Waiting for initialization to finish before closing session {self.sid}'
+            )
+            await asyncio.sleep(WAIT_TIME_BEFORE_CLOSE_INTERVAL)
+            seconds_waited += WAIT_TIME_BEFORE_CLOSE_INTERVAL
+            if seconds_waited > WAIT_TIME_BEFORE_CLOSE:
+                logger.error(
+                    f'Waited too long for initialization to finish before closing session {self.sid}'
+                )
+                break
         if self.controller is not None:
             end_state = self.controller.get_state()
             end_state.save_to_session(self.sid, self.file_store)
@@ -209,18 +229,15 @@ class AgentSession:
                 )
             return
 
-        if self.runtime is not None:
-            self.runtime.clone_repo(github_token, selected_repository)
-            if agent.prompt_manager:
-                agent.prompt_manager.load_microagent_files(
-                    self.runtime.get_custom_microagents(selected_repository)
-                )
-
-            logger.debug(
-                f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}'
+        self.runtime.clone_repo(github_token, selected_repository)
+        if agent.prompt_manager:
+            agent.prompt_manager.load_microagent_files(
+                self.runtime.get_custom_microagents(selected_repository)
             )
-        else:
-            logger.warning('Runtime initialization failed')
+
+        logger.debug(
+            f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}'
+        )
 
     def _create_controller(
         self,
@@ -230,7 +247,7 @@ class AgentSession:
         max_budget_per_task: float | None = None,
         agent_to_llm_config: dict[str, LLMConfig] | None = None,
         agent_configs: dict[str, AgentConfig] | None = None,
-    ):
+    ) -> AgentController:
         """Creates an AgentController instance
 
         Parameters:
@@ -267,7 +284,7 @@ class AgentSession:
         )
         logger.debug(msg)
 
-        self.controller = AgentController(
+        controller = AgentController(
             sid=self.sid,
             event_stream=self.event_stream,
             agent=agent,
@@ -281,10 +298,9 @@ class AgentSession:
         )
         try:
             agent_state = State.restore_from_session(self.sid, self.file_store)
-            self.controller.set_initial_state(
-                agent_state, max_iterations, confirmation_mode
-            )
+            controller.set_initial_state(agent_state, max_iterations, confirmation_mode)
             logger.debug(f'Restored agent state from session, sid: {self.sid}')
         except Exception as e:
             logger.debug(f'State could not be restored: {e}')
         logger.debug('Agent controller initialized.')
+        return controller

+ 1 - 0
openhands/server/session/manager.py

@@ -308,6 +308,7 @@ class SessionManager:
             )
 
         await self._close_session(session)
+        return True
 
     async def _close_session(self, session: Session):
         logger.info(f'_close_session:{session.sid}')