Browse Source

fix file descriptor leaks (#4988)

Co-authored-by: openhands <openhands@all-hands.dev>
Robert Brennan 1 year ago
parent
commit
c784151765

+ 4 - 0
openhands/controller/agent_controller.py

@@ -65,6 +65,7 @@ class AgentController:
     parent: 'AgentController | None' = None
     delegate: 'AgentController | None' = None
     _pending_action: Action | None = None
+    _closed: bool = False
     filter_out: ClassVar[tuple[type[Event], ...]] = (
         NullAction,
         NullObservation,
@@ -160,6 +161,7 @@ class AgentController:
 
         # unsubscribe from the event stream
         self.event_stream.unsubscribe(EventStreamSubscriber.AGENT_CONTROLLER, self.id)
+        self._closed = True
 
     def log(self, level: str, message: str, extra: dict | None = None):
         """Logs a message to the agent controller's logger.
@@ -194,6 +196,8 @@ class AgentController:
 
         self.log('info', 'Starting step loop...')
         while should_continue():
+            if self._closed:
+                break
             try:
                 await self._step()
             except asyncio.CancelledError:

+ 7 - 5
openhands/runtime/impl/eventstream/eventstream_runtime.py

@@ -111,6 +111,9 @@ class LogBuffer:
     def close(self, timeout: float = 5.0):
         self._stop_event.set()
         self.log_stream_thread.join(timeout)
+        # Close the log generator to release the file descriptor
+        if hasattr(self.log_generator, 'close'):
+            self.log_generator.close()
 
 
 class EventStreamRuntime(Runtime):
@@ -232,6 +235,8 @@ class EventStreamRuntime(Runtime):
                 f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
             )
 
+        self.log_buffer = LogBuffer(self.container, self.log)
+
         if not self.attach_to_existing:
             self.log('info', f'Waiting for client to become ready at {self.api_url}...')
             self.send_status_message('STATUS$WAITING_FOR_CLIENT')
@@ -358,7 +363,6 @@ class EventStreamRuntime(Runtime):
                 environment=environment,
                 volumes=volumes,
             )
-            self.log_buffer = LogBuffer(self.container, self.log)
             self.log('debug', f'Container started. Server url: {self.api_url}')
             self.send_status_message('STATUS$CONTAINER_STARTED')
         except docker.errors.APIError as e:
@@ -385,11 +389,9 @@ class EventStreamRuntime(Runtime):
             raise e
 
     def _attach_to_container(self):
-        container = self.docker_client.containers.get(self.container_name)
-        self.log_buffer = LogBuffer(container, self.log)
-        self.container = container
         self._container_port = 0
-        for port in container.attrs['NetworkSettings']['Ports']:
+        self.container = self.docker_client.containers.get(self.container_name)
+        for port in self.container.attrs['NetworkSettings']['Ports']:  # type: ignore
             self._container_port = int(port.split('/')[0])
             break
         self._host_port = self._container_port

+ 5 - 1
openhands/runtime/utils/request.py

@@ -58,5 +58,9 @@ def send_request(
     **kwargs: Any,
 ) -> requests.Response:
     response = session.request(method, url, **kwargs)
-    response.raise_for_status()
+    try:
+        response.raise_for_status()
+    finally:
+        response.close()
+
     return response

+ 6 - 4
openhands/server/github.py

@@ -115,13 +115,15 @@ async def get_github_user(token: str) -> str:
         github handle of the user
     """
     logger.debug('Fetching GitHub user info from token')
+    g = Github(token)
     try:
-        g = Github(token)
         user = await call_sync_from_async(g.get_user)
-        login = user.login
-        logger.info(f'Successfully retrieved GitHub user: {login}')
-        return login
     except GithubException as e:
         logger.error(f'Error making request to GitHub API: {str(e)}')
         logger.error(e)
         raise
+    finally:
+        g.close()
+    login = user.login
+    logger.info(f'Successfully retrieved GitHub user: {login}')
+    return login

+ 6 - 2
openhands/server/session/session.py

@@ -51,7 +51,12 @@ class Session:
 
     def close(self):
         self.is_alive = False
-        self.agent_session.close()
+        try:
+            if self.websocket is not None:
+                asyncio.run_coroutine_threadsafe(self.websocket.close(), self.loop)
+                self.websocket = None
+        finally:
+            self.agent_session.close()
 
     async def loop_recv(self):
         try:
@@ -107,7 +112,6 @@ class Session:
         agent_config = self.config.get_agent_config(agent_cls)
         agent = Agent.get_cls(agent_cls)(llm, agent_config)
 
-        # Create the agent session
         try:
             await self.agent_session.start(
                 runtime_name=self.config.runtime,