Selaa lähdekoodia

feat: support directly stream logs from container to stdout in debug mode (#5408)

Xingyao Wang 1 vuosi sitten
vanhempi
sitoutus
e0b231092a

+ 9 - 96
openhands/runtime/impl/eventstream/eventstream_runtime.py

@@ -43,6 +43,7 @@ from openhands.runtime.builder import DockerRuntimeBuilder
 from openhands.runtime.impl.eventstream.containers import remove_all_containers
 from openhands.runtime.plugins import PluginRequirement
 from openhands.runtime.utils import find_available_tcp_port
+from openhands.runtime.utils.log_streamer import LogStreamer
 from openhands.runtime.utils.request import send_request
 from openhands.runtime.utils.runtime_build import build_runtime_image
 from openhands.utils.async_utils import call_sync_from_async
@@ -58,68 +59,6 @@ def remove_all_runtime_containers():
 atexit.register(remove_all_runtime_containers)
 
 
-class LogBuffer:
-    """Synchronous buffer for Docker container logs.
-
-    This class provides a thread-safe way to collect, store, and retrieve logs
-    from a Docker container. It uses a list to store log lines and provides methods
-    for appending, retrieving, and clearing logs.
-    """
-
-    def __init__(self, container: docker.models.containers.Container, logFn: Callable):
-        self.init_msg = 'Runtime client initialized.'
-
-        self.buffer: list[str] = []
-        self.lock = threading.Lock()
-        self._stop_event = threading.Event()
-        self.log_generator = container.logs(stream=True, follow=True)
-        self.log_stream_thread = threading.Thread(target=self.stream_logs)
-        self.log_stream_thread.daemon = True
-        self.log_stream_thread.start()
-        self.log = logFn
-
-    def append(self, log_line: str):
-        with self.lock:
-            self.buffer.append(log_line)
-
-    def get_and_clear(self) -> list[str]:
-        with self.lock:
-            logs = list(self.buffer)
-            self.buffer.clear()
-            return logs
-
-    def stream_logs(self):
-        """Stream logs from the Docker container in a separate thread.
-
-        This method runs in its own thread to handle the blocking
-        operation of reading log lines from the Docker SDK's synchronous generator.
-        """
-        try:
-            for log_line in self.log_generator:
-                if self._stop_event.is_set():
-                    break
-                if log_line:
-                    decoded_line = log_line.decode('utf-8').rstrip()
-                    self.append(decoded_line)
-        except Exception as e:
-            self.log('error', f'Error streaming docker logs: {e}')
-
-    def __del__(self):
-        if self.log_stream_thread.is_alive():
-            self.log(
-                'warn',
-                "LogBuffer was not properly closed. Use 'log_buffer.close()' for clean shutdown.",
-            )
-            self.close(timeout=5)
-
-    def close(self, timeout: float = 5.0):
-        self._stop_event.set()
-        self.log_stream_thread.join(timeout)
-        # Close the log generator to release the file descriptor
-        if hasattr(self.log_generator, 'close'):
-            self.log_generator.close()
-
-
 class EventStreamRuntime(Runtime):
     """This runtime will subscribe the event stream.
     When receive an event, it will send the event to runtime-client which run inside the docker environment.
@@ -186,7 +125,7 @@ class EventStreamRuntime(Runtime):
         self.runtime_builder = DockerRuntimeBuilder(self.docker_client)
 
         # Buffer for container logs
-        self.log_buffer: LogBuffer | None = None
+        self.log_streamer: LogStreamer | None = None
 
         self.init_base_runtime(
             config,
@@ -241,7 +180,7 @@ class EventStreamRuntime(Runtime):
                 f'Container started: {self.container_name}. VSCode URL: {self.vscode_url}',
             )
 
-        self.log_buffer = LogBuffer(self.container, self.log)
+        self.log_streamer = LogStreamer(self.container, self.log)
 
         if not self.attach_to_existing:
             self.log('info', f'Waiting for client to become ready at {self.api_url}...')
@@ -407,27 +346,6 @@ class EventStreamRuntime(Runtime):
             f'attached to container: {self.container_name} {self._container_port} {self.api_url}',
         )
 
-    def _refresh_logs(self):
-        self.log('debug', 'Getting container logs...')
-
-        assert (
-            self.log_buffer is not None
-        ), 'Log buffer is expected to be initialized when container is started'
-
-        logs = self.log_buffer.get_and_clear()
-        if logs:
-            formatted_logs = '\n'.join([f'    |{log}' for log in logs])
-            self.log(
-                'debug',
-                '\n'
-                + '-' * 35
-                + 'Container logs:'
-                + '-' * 35
-                + f'\n{formatted_logs}'
-                + '\n'
-                + '-' * 80,
-            )
-
     @tenacity.retry(
         stop=tenacity.stop_after_delay(120) | stop_if_should_exit(),
         retry=tenacity.retry_if_exception_type(
@@ -446,8 +364,7 @@ class EventStreamRuntime(Runtime):
         except docker.errors.NotFound:
             raise RuntimeNotFoundError(f'Container {self.container_name} not found.')
 
-        self._refresh_logs()
-        if not self.log_buffer:
+        if not self.log_streamer:
             raise RuntimeError('Runtime client is not ready.')
 
         with send_request(
@@ -464,8 +381,8 @@ class EventStreamRuntime(Runtime):
         Parameters:
         - rm_all_containers (bool): Whether to remove all containers with the 'openhands-sandbox-' prefix
         """
-        if self.log_buffer:
-            self.log_buffer.close()
+        if self.log_streamer:
+            self.log_streamer.close()
 
         if self.session:
             self.session.close()
@@ -513,8 +430,6 @@ class EventStreamRuntime(Runtime):
                     'Action has been rejected by the user! Waiting for further user input.'
                 )
 
-            self._refresh_logs()
-
             assert action.timeout is not None
 
             try:
@@ -533,7 +448,7 @@ class EventStreamRuntime(Runtime):
                 raise RuntimeError(
                     f'Runtime failed to return execute_action before the requested timeout of {action.timeout}s'
                 )
-            self._refresh_logs()
+
             return obs
 
     def run(self, action: CmdRunAction) -> Observation:
@@ -564,7 +479,6 @@ class EventStreamRuntime(Runtime):
         if not os.path.exists(host_src):
             raise FileNotFoundError(f'Source file {host_src} does not exist')
 
-        self._refresh_logs()
         try:
             if recursive:
                 # For recursive copy, create a zip file
@@ -609,14 +523,13 @@ class EventStreamRuntime(Runtime):
             self.log(
                 'debug', f'Copy completed: host:{host_src} -> runtime:{sandbox_dest}'
             )
-            self._refresh_logs()
 
     def list_files(self, path: str | None = None) -> list[str]:
         """List files in the sandbox.
 
         If path is None, list files in the sandbox's initial working directory (e.g., /workspace).
         """
-        self._refresh_logs()
+
         try:
             data = {}
             if path is not None:
@@ -637,7 +550,7 @@ class EventStreamRuntime(Runtime):
 
     def copy_from(self, path: str) -> Path:
         """Zip all files in the sandbox and return as a stream of bytes."""
-        self._refresh_logs()
+
         try:
             params = {'path': path}
             with send_request(

+ 32 - 21
openhands/runtime/impl/modal/modal_runtime.py

@@ -12,7 +12,7 @@ from openhands.core.config import AppConfig
 from openhands.events import EventStream
 from openhands.runtime.impl.eventstream.eventstream_runtime import (
     EventStreamRuntime,
-    LogBuffer,
+    LogStreamer,
 )
 from openhands.runtime.plugins import PluginRequirement
 from openhands.runtime.utils.command import get_remote_startup_command
@@ -32,24 +32,38 @@ def bytes_shim(string_generator) -> Generator[bytes, None, None]:
         yield line.encode('utf-8')
 
 
-class ModalLogBuffer(LogBuffer):
-    """Synchronous buffer for Modal sandbox logs.
+class ModalLogStreamer(LogStreamer):
+    """Streams Modal sandbox logs to stdout.
 
-    This class provides a thread-safe way to collect, store, and retrieve logs
-    from a Modal sandbox. It uses a list to store log lines and provides methods
-    for appending, retrieving, and clearing logs.
+    This class provides a way to stream logs from a Modal sandbox directly to stdout
+    through the provided logging function.
     """
 
-    def __init__(self, sandbox: modal.Sandbox):
-        self.init_msg = 'Runtime client initialized.'
-
-        self.buffer: list[str] = []
-        self.lock = threading.Lock()
+    def __init__(
+        self,
+        sandbox: modal.Sandbox,
+        logFn: Callable,
+    ):
+        self.log = logFn
         self._stop_event = threading.Event()
         self.log_generator = bytes_shim(sandbox.stderr)
-        self.log_stream_thread = threading.Thread(target=self.stream_logs)
-        self.log_stream_thread.daemon = True
-        self.log_stream_thread.start()
+
+        # Start the stdout streaming thread
+        self.stdout_thread = threading.Thread(target=self._stream_logs)
+        self.stdout_thread.daemon = True
+        self.stdout_thread.start()
+
+    def _stream_logs(self):
+        """Stream logs from the Modal sandbox."""
+        try:
+            for log_line in self.log_generator:
+                if self._stop_event.is_set():
+                    break
+                if log_line:
+                    decoded_line = log_line.decode('utf-8').rstrip()
+                    self.log('debug', f'[inside sandbox] {decoded_line}')
+        except Exception as e:
+            self.log('error', f'Error streaming modal logs: {e}')
 
 
 class ModalRuntime(EventStreamRuntime):
@@ -109,7 +123,7 @@ class ModalRuntime(EventStreamRuntime):
         self.action_semaphore = threading.Semaphore(1)  # Ensure one action at a time
 
         # Buffer for container logs
-        self.log_buffer: LogBuffer | None = None
+        self.log_streamer: LogStreamer | None = None
 
         if self.config.sandbox.runtime_extra_deps:
             self.log(
@@ -156,7 +170,7 @@ class ModalRuntime(EventStreamRuntime):
 
             self.send_status_message('STATUS$CONTAINER_STARTED')
 
-        self.log_buffer = ModalLogBuffer(self.sandbox)
+        self.log_streamer = ModalLogStreamer(self.sandbox, self.log)
         if self.sandbox is None:
             raise Exception('Sandbox not initialized')
         tunnel = self.sandbox.tunnels()[self.container_port]
@@ -278,11 +292,8 @@ echo 'export INPUTRC=/etc/inputrc' >> /etc/bash.bashrc
 
     def close(self):
         """Closes the ModalRuntime and associated objects."""
-        # if self.temp_dir_handler:
-        # self.temp_dir_handler.__exit__(None, None, None)
-
-        if self.log_buffer:
-            self.log_buffer.close()
+        if self.log_streamer:
+            self.log_streamer.close()
 
         if self.session:
             self.session.close()

+ 29 - 53
openhands/runtime/impl/runloop/runloop_runtime.py

@@ -12,49 +12,43 @@ from runloop_api_client.types.shared_params import LaunchParameters
 from openhands.core.config import AppConfig
 from openhands.core.logger import openhands_logger as logger
 from openhands.events import EventStream
-from openhands.runtime.impl.eventstream.eventstream_runtime import (
-    EventStreamRuntime,
-    LogBuffer,
-)
+from openhands.runtime.impl.eventstream.eventstream_runtime import EventStreamRuntime
 from openhands.runtime.plugins import PluginRequirement
 from openhands.runtime.utils.command import get_remote_startup_command
+from openhands.runtime.utils.log_streamer import LogStreamer
 from openhands.runtime.utils.request import send_request
 from openhands.utils.tenacity_stop import stop_if_should_exit
 
 CONTAINER_NAME_PREFIX = 'openhands-runtime-'
 
 
-class RunloopLogBuffer(LogBuffer):
-    """Synchronous buffer for Runloop devbox logs.
+class RunloopLogStreamer(LogStreamer):
+    """Streams Runloop devbox logs to stdout.
 
-    This class provides a thread-safe way to collect, store, and retrieve logs
-    from a Docker container. It uses a list to store log lines and provides methods
-    for appending, retrieving, and clearing logs.
+    This class provides a way to stream logs from a Runloop devbox directly to stdout
+    through the provided logging function.
     """
 
-    def __init__(self, runloop_api_client: Runloop, devbox_id: str):
-        self.client_ready = False
-        self.init_msg = 'Runtime client initialized.'
-
-        self.buffer: list[str] = []
-        self.lock = threading.Lock()
-        self._stop_event = threading.Event()
+    def __init__(
+        self,
+        runloop_api_client: Runloop,
+        devbox_id: str,
+        logFn: Callable,
+    ):
         self.runloop_api_client = runloop_api_client
         self.devbox_id = devbox_id
+        self.log = logFn
         self.log_index = 0
-        self.log_stream_thread = threading.Thread(target=self.stream_logs)
-        self.log_stream_thread.daemon = True
-        self.log_stream_thread.start()
-
-    def stream_logs(self):
-        """Stream logs from the Docker container in a separate thread.
+        self._stop_event = threading.Event()
 
-        This method runs in its own thread to handle the blocking
-        operation of reading log lines from the Docker SDK's synchronous generator.
-        """
+        # Start the stdout streaming thread
+        self.stdout_thread = threading.Thread(target=self._stream_logs)
+        self.stdout_thread.daemon = True
+        self.stdout_thread.start()
 
+    def _stream_logs(self):
+        """Stream logs from the Runloop devbox."""
         try:
-            # TODO(Runloop) Replace with stream
             while True:
                 raw_logs = self.runloop_api_client.devboxes.logs.list(
                     self.devbox_id
@@ -70,29 +64,11 @@ class RunloopLogBuffer(LogBuffer):
                     break
                 if logs:
                     for log_line in logs:
-                        self.append(log_line)
-                        if self.init_msg in log_line:
-                            self.client_ready = True
+                        self.log('debug', f'[inside devbox] {log_line}')
 
                 time.sleep(1)
         except Exception as e:
-            logger.error(f'Error streaming runloop logs: {e}')
-
-    # NB: Match LogBuffer behavior on below methods
-
-    def get_and_clear(self) -> list[str]:
-        with self.lock:
-            logs = list(self.buffer)
-            self.buffer.clear()
-            return logs
-
-    def append(self, log_line: str):
-        with self.lock:
-            self.buffer.append(log_line)
-
-    def close(self, timeout: float = 5.0):
-        self._stop_event.set()
-        self.log_stream_thread.join(timeout)
+            self.log('error', f'Error streaming runloop logs: {e}')
 
 
 class RunloopRuntime(EventStreamRuntime):
@@ -132,7 +108,7 @@ class RunloopRuntime(EventStreamRuntime):
             headless_mode,
         )
         # Buffer for container logs
-        self.log_buffer: LogBuffer | None = None
+        self.log_streamer: LogStreamer | None = None
         self._vscode_url: str | None = None
 
     @tenacity.retry(
@@ -224,7 +200,9 @@ class RunloopRuntime(EventStreamRuntime):
         )
 
         # Hook up logs
-        self.log_buffer = RunloopLogBuffer(self.runloop_api_client, self.devbox.id)
+        self.log_streamer = RunloopLogStreamer(
+            self.runloop_api_client, self.devbox.id, logger.info
+        )
         self.api_url = tunnel.url
         logger.info(f'Container started. Server url: {self.api_url}')
 
@@ -248,9 +226,7 @@ class RunloopRuntime(EventStreamRuntime):
         reraise=(ConnectionRefusedError,),
     )
     def _wait_until_alive(self):
-        # NB(Runloop): Remote logs are not guaranteed realtime, removing client_ready check from logs
-        self._refresh_logs()
-        if not self.log_buffer:
+        if not self.log_streamer:
             raise RuntimeError('Runtime client is not ready.')
         response = send_request(
             self.session,
@@ -266,8 +242,8 @@ class RunloopRuntime(EventStreamRuntime):
             raise RuntimeError(msg)
 
     def close(self, rm_all_containers: bool | None = True):
-        if self.log_buffer:
-            self.log_buffer.close()
+        if self.log_streamer:
+            self.log_streamer.close()
 
         if self.session:
             self.session.close()

+ 51 - 0
openhands/runtime/utils/log_streamer.py

@@ -0,0 +1,51 @@
+import threading
+from typing import Callable
+
+import docker
+
+
+class LogStreamer:
+    """Streams Docker container logs to stdout.
+
+    This class provides a way to stream logs from a Docker container directly to stdout
+    through the provided logging function.
+    """
+
+    def __init__(
+        self,
+        container: docker.models.containers.Container,
+        logFn: Callable,
+    ):
+        self.log = logFn
+        self.log_generator = container.logs(stream=True, follow=True)
+        self._stop_event = threading.Event()
+
+        # Start the stdout streaming thread
+        self.stdout_thread = threading.Thread(target=self._stream_logs)
+        self.stdout_thread.daemon = True
+        self.stdout_thread.start()
+
+    def _stream_logs(self):
+        """Stream logs from the Docker container to stdout."""
+        try:
+            for log_line in self.log_generator:
+                if self._stop_event.is_set():
+                    break
+                if log_line:
+                    decoded_line = log_line.decode('utf-8').rstrip()
+                    self.log('debug', f'[inside container] {decoded_line}')
+        except Exception as e:
+            self.log('error', f'Error streaming docker logs to stdout: {e}')
+
+    def __del__(self):
+        if self.stdout_thread and self.stdout_thread.is_alive():
+            self.close(timeout=5)
+
+    def close(self, timeout: float = 5.0):
+        """Clean shutdown of the log streaming."""
+        self._stop_event.set()
+        if self.stdout_thread and self.stdout_thread.is_alive():
+            self.stdout_thread.join(timeout)
+        # Close the log generator to release the file descriptor
+        if hasattr(self.log_generator, 'close'):
+            self.log_generator.close()