Browse Source

Feat: add stream output to exec_run (#1625)

* Feat: add stream output to exec_run

* Using command timeout to control the exec_box's timeout.
* add bash -c to source command to compatible for sh.

Signed-off-by: ifuryst <ifuryst@gmail.com>

* Feat: add stream output to SSHBox execute

Signed-off-by: ifuryst <ifuryst@gmail.com>

* fix the test case fail.

Signed-off-by: ifuryst <ifuryst@gmail.com>

* fix the test case import wrong path for method.

Signed-off-by: ifuryst <ifuryst@gmail.com>

---------

Signed-off-by: ifuryst <ifuryst@gmail.com>
Leo 1 year ago
parent
commit
e89cc8f19b

+ 9 - 3
evaluation/regression/run_tests.py

@@ -13,9 +13,15 @@ if __name__ == '__main__':
         python script_name.py [--OPENAI_API_KEY=<api_key>] [--model=<model_name>]
 
     """
-    parser = argparse.ArgumentParser(description='This script runs pytest with specific arguments and configuration.')
-    parser.add_argument('--OPENAI_API_KEY', type=str, required=True, help='Your OpenAI API key')
-    parser.add_argument('--model', type=str, required=True, help='The model name to use')
+    parser = argparse.ArgumentParser(
+        description='This script runs pytest with specific arguments and configuration.'
+    )
+    parser.add_argument(
+        '--OPENAI_API_KEY', type=str, required=True, help='Your OpenAI API key'
+    )
+    parser.add_argument(
+        '--model', type=str, required=True, help='The model name to use'
+    )
 
     parser_args = parser.parse_args()
     config.config['OPENAI_API_KEY'] = parser_args.OPENAI_API_KEY

+ 3 - 0
opendevin/core/schema/__init__.py

@@ -2,10 +2,13 @@ from .action import ActionType
 from .agent import AgentState
 from .config import ConfigType
 from .observation import ObservationType
+from .stream import CancellableStream, StreamMixin
 
 __all__ = [
     'ActionType',
     'ObservationType',
     'ConfigType',
     'AgentState',
+    'CancellableStream',
+    'StreamMixin',
 ]

+ 27 - 0
opendevin/core/schema/stream.py

@@ -0,0 +1,27 @@
+from abc import ABC, abstractmethod
+from typing import Union
+
+
+class StreamMixin:
+    def __init__(self, generator):
+        self.generator = generator
+        self.closed = False
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        if self.closed:
+            raise StopIteration
+        else:
+            return next(self.generator)
+
+
+class CancellableStream(StreamMixin, ABC):
+    @abstractmethod
+    def close(self):
+        pass
+
+    @abstractmethod
+    def exit_code(self) -> Union[int, None]:
+        pass

+ 90 - 30
opendevin/runtime/docker/exec_box.py

@@ -1,6 +1,6 @@
 import atexit
-import concurrent.futures
 import os
+import shlex
 import sys
 import tarfile
 import time
@@ -14,6 +14,7 @@ from opendevin.const.guide_url import TROUBLESHOOTING_URL
 from opendevin.core.config import config
 from opendevin.core.exceptions import SandboxInvalidBackgroundCommandError
 from opendevin.core.logger import opendevin_logger as logger
+from opendevin.core.schema import CancellableStream
 from opendevin.runtime.docker.process import DockerProcess, Process
 from opendevin.runtime.sandbox import Sandbox
 
@@ -22,6 +23,76 @@ InputType = namedtuple('InputType', ['content'])
 OutputType = namedtuple('OutputType', ['content'])
 
 
+ExecResult = namedtuple('ExecResult', 'exit_code,output')
+""" A result of Container.exec_run with the properties ``exit_code`` and
+    ``output``. """
+
+
+class DockerExecCancellableStream(CancellableStream):
+    # Reference: https://github.com/docker/docker-py/issues/1989
+    def __init__(self, _client, _id, _output):
+        super().__init__(self.read_output())
+        self._id = _id
+        self._client = _client
+        self._output = _output
+
+    def close(self):
+        self.closed = True
+
+    def exit_code(self):
+        return self.inspect()['ExitCode']
+
+    def inspect(self):
+        return self._client.api.exec_inspect(self._id)
+
+    def read_output(self):
+        for chunk in self._output:
+            yield chunk.decode('utf-8')
+
+
+def container_exec_run(
+    container,
+    cmd,
+    stdout=True,
+    stderr=True,
+    stdin=False,
+    tty=False,
+    privileged=False,
+    user='',
+    detach=False,
+    stream=False,
+    socket=False,
+    environment=None,
+    workdir=None,
+) -> ExecResult:
+    exec_id = container.client.api.exec_create(
+        container.id,
+        cmd,
+        stdout=stdout,
+        stderr=stderr,
+        stdin=stdin,
+        tty=tty,
+        privileged=privileged,
+        user=user,
+        environment=environment,
+        workdir=workdir,
+    )['Id']
+
+    output = container.client.api.exec_start(
+        exec_id, detach=detach, tty=tty, stream=stream, socket=socket
+    )
+
+    if stream:
+        return ExecResult(
+            None, DockerExecCancellableStream(container.client, exec_id, output)
+        )
+
+    if socket:
+        return ExecResult(None, output)
+
+    return ExecResult(container.client.api.exec_inspect(exec_id)['ExitCode'], output)
+
+
 class DockerExecBox(Sandbox):
     instance_id: str
     container_image: str
@@ -106,38 +177,27 @@ class DockerExecBox(Sandbox):
         bg_cmd = self.background_commands[id]
         return bg_cmd.read_logs()
 
-    def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]:
+    def execute(
+        self, cmd: str, stream: bool = False, timeout: int | None = None
+    ) -> tuple[int, str | CancellableStream]:
         timeout = timeout if timeout is not None else self.timeout
+        wrapper = f'timeout {self.timeout}s bash -c {shlex.quote(cmd)}'
+        _exit_code, _output = container_exec_run(
+            self.container,
+            wrapper,
+            stream=stream,
+            workdir=self.sandbox_workspace_dir,
+            environment=self._env,
+        )
 
-        # TODO: each execute is not stateful! We need to keep track of the current working directory
-        def run_command(container, command):
-            return container.exec_run(
-                command, workdir=self.sandbox_workspace_dir, environment=self._env
-            )
+        if stream:
+            return _exit_code, _output
 
-        # Use ThreadPoolExecutor to control command and set timeout
-        with concurrent.futures.ThreadPoolExecutor() as executor:
-            future = executor.submit(
-                run_command, self.container, self.get_exec_cmd(cmd)
-            )
-            try:
-                exit_code, logs = future.result(timeout=timeout)
-            except concurrent.futures.TimeoutError:
-                logger.exception(
-                    'Command timed out, killing process...', exc_info=False
-                )
-                pid = self.get_pid(cmd)
-                if pid is not None:
-                    self.container.exec_run(
-                        f'kill -9 {pid}',
-                        workdir=self.sandbox_workspace_dir,
-                        environment=self._env,
-                    )
-                return -1, f'Command: "{cmd}" timed out'
-        logs_out = logs.decode('utf-8')
-        if logs_out.endswith('\n'):
-            logs_out = logs_out[:-1]
-        return exit_code, logs_out
+        print(_output)
+        _output = _output.decode('utf-8')
+        if _output.endswith('\n'):
+            _output = _output[:-1]
+        return _exit_code, _output
 
     def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
         # mkdir -p sandbox_dest if it doesn't exist

+ 4 - 1
opendevin/runtime/docker/local_box.py

@@ -5,6 +5,7 @@ import sys
 
 from opendevin.core.config import config
 from opendevin.core.logger import opendevin_logger as logger
+from opendevin.core.schema import CancellableStream
 from opendevin.runtime.docker.process import DockerProcess, Process
 from opendevin.runtime.sandbox import Sandbox
 
@@ -33,7 +34,9 @@ class LocalBox(Sandbox):
         atexit.register(self.cleanup)
         super().__init__()
 
-    def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]:
+    def execute(
+        self, cmd: str, stream: bool = False, timeout: int | None = None
+    ) -> tuple[int, str | CancellableStream]:
         timeout = timeout if timeout is not None else self.timeout
         try:
             completed_process = subprocess.run(

+ 77 - 5
opendevin/runtime/docker/ssh_box.py

@@ -1,6 +1,7 @@
 import atexit
 import json
 import os
+import re
 import sys
 import tarfile
 import tempfile
@@ -10,12 +11,13 @@ from collections import namedtuple
 from glob import glob
 
 import docker
-from pexpect import pxssh
+from pexpect import exceptions, pxssh
 
 from opendevin.const.guide_url import TROUBLESHOOTING_URL
 from opendevin.core.config import config
 from opendevin.core.exceptions import SandboxInvalidBackgroundCommandError
 from opendevin.core.logger import opendevin_logger as logger
+from opendevin.core.schema import CancellableStream
 from opendevin.runtime.docker.process import DockerProcess, Process
 from opendevin.runtime.plugins import (
     JupyterRequirement,
@@ -29,6 +31,71 @@ InputType = namedtuple('InputType', ['content'])
 OutputType = namedtuple('OutputType', ['content'])
 
 
+class SSHExecCancellableStream(CancellableStream):
+    def __init__(self, ssh, cmd, timeout):
+        super().__init__(self.read_output())
+        self.ssh = ssh
+        self.cmd = cmd
+        self.timeout = timeout
+
+    def close(self):
+        self.closed = True
+
+    def exit_code(self):
+        self.ssh.sendline('echo $?')
+        success = self.ssh.prompt(timeout=self.timeout)
+        if not success:
+            return -1
+
+        _exit_code = self.ssh.before.strip()
+        return int(_exit_code)
+
+    def read_output(self):
+        st = time.time()
+        buf = ''
+        crlf = '\r\n'
+        lf = '\n'
+        prompt_len = len(self.ssh.PROMPT)
+        while True:
+            try:
+                if self.closed:
+                    break
+                _output = self.ssh.read_nonblocking(timeout=1)
+                if not _output:
+                    continue
+
+                buf += _output
+
+                if len(buf) < prompt_len:
+                    continue
+
+                match = re.search(self.ssh.PROMPT, buf)
+                if match:
+                    idx, _ = match.span()
+                    yield buf[:idx].replace(crlf, lf)
+                    buf = ''
+                    break
+
+                res = buf[:-prompt_len]
+                if len(res) == 0 or res.find(crlf) == -1:
+                    continue
+                buf = buf[-prompt_len:]
+                yield res.replace(crlf, lf)
+            except exceptions.TIMEOUT:
+                if time.time() - st < self.timeout:
+                    match = re.search(self.ssh.PROMPT, buf)
+                    if match:
+                        idx, _ = match.span()
+                        yield buf[:idx].replace(crlf, lf)
+                        break
+                    continue
+                else:
+                    yield buf.replace(crlf, lf)
+                break
+            except exceptions.EOF:
+                break
+
+
 def split_bash_commands(commands):
     # States
     NORMAL = 0
@@ -128,6 +195,7 @@ class DockerSSHBox(Sandbox):
 
     _ssh_password: str
     _ssh_port: int
+    ssh: pxssh.pxssh
 
     cur_background_id = 0
     background_commands: dict[int, Process] = {}
@@ -344,9 +412,10 @@ class DockerSSHBox(Sandbox):
             f'Command: "{cmd}" timed out. Sending SIGINT to the process: {command_output}',
         )
 
-    def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]:
+    def execute(
+        self, cmd: str, stream: bool = False, timeout: int | None = None
+    ) -> tuple[int, str | CancellableStream]:
         timeout = timeout if timeout is not None else self.timeout
-
         commands = split_bash_commands(cmd)
         if len(commands) > 1:
             all_output = ''
@@ -354,11 +423,14 @@ class DockerSSHBox(Sandbox):
                 exit_code, output = self.execute(command)
                 if all_output:
                     all_output += '\r\n'
-                all_output += output
+                all_output += str(output)
                 if exit_code != 0:
                     return exit_code, all_output
             return 0, all_output
+
         self.ssh.sendline(cmd)
+        if stream:
+            return 0, SSHExecCancellableStream(self.ssh, cmd, self.timeout)
         success = self.ssh.prompt(timeout=timeout)
         if not success:
             logger.exception('Command timed out, killing process...', exc_info=False)
@@ -499,7 +571,7 @@ class DockerSSHBox(Sandbox):
         exit_code, result = self.execute('pwd')
         if exit_code != 0:
             raise Exception('Failed to get working directory')
-        return result.strip()
+        return str(result).strip()
 
     @property
     def user_id(self):

+ 4 - 1
opendevin/runtime/e2b/sandbox.py

@@ -9,6 +9,7 @@ from e2b.sandbox.exception import (
 
 from opendevin.core.config import config
 from opendevin.core.logger import opendevin_logger as logger
+from opendevin.core.schema import CancellableStream
 from opendevin.runtime.e2b.process import E2BProcess
 from opendevin.runtime.process import Process
 from opendevin.runtime.sandbox import Sandbox
@@ -72,7 +73,9 @@ class E2BBox(Sandbox):
         assert isinstance(proc, E2BProcess)
         return '\n'.join([m.line for m in proc.output_messages])
 
-    def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]:
+    def execute(
+        self, cmd: str, stream: bool = False, timeout: int | None = None
+    ) -> tuple[int, str | CancellableStream]:
         timeout = timeout if timeout is not None else self.timeout
         process = self.sandbox.process.start(cmd, env_vars=self._env)
         try:

+ 24 - 6
opendevin/runtime/plugins/mixin.py

@@ -2,13 +2,16 @@ import os
 from typing import Protocol
 
 from opendevin.core.logger import opendevin_logger as logger
+from opendevin.core.schema import CancellableStream
 from opendevin.runtime.plugins.requirement import PluginRequirement
 
 
 class SandboxProtocol(Protocol):
     # https://stackoverflow.com/questions/51930339/how-do-i-correctly-add-type-hints-to-mixin-classes
 
-    def execute(self, cmd: str) -> tuple[int, str]: ...
+    def execute(
+        self, cmd: str, stream: bool = False
+    ) -> tuple[int, str | CancellableStream]: ...
 
     def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False): ...
 
@@ -36,12 +39,27 @@ class PluginMixin:
             logger.info(
                 f'Initializing plugin [{requirement.name}] by executing [{abs_path_to_bash_script}] in the sandbox.'
             )
-            exit_code, output = self.execute(abs_path_to_bash_script)
-            if exit_code != 0:
-                raise RuntimeError(
-                    f'Failed to initialize plugin {requirement.name} with exit code {exit_code} and output: {output}'
+            exit_code, output = self.execute(abs_path_to_bash_script, stream=True)
+            if isinstance(output, CancellableStream):
+                for line in output:
+                    if line.endswith('\n'):
+                        line = line[:-1]
+                    logger.info(line)
+                _exit_code = output.exit_code()
+                output.close()
+                if _exit_code != 0:
+                    raise RuntimeError(
+                        f'Failed to initialize plugin {requirement.name} with exit code {_exit_code} and output {output}'
+                    )
+                logger.info(f'Plugin {requirement.name} initialized successfully')
+            else:
+                if exit_code != 0:
+                    raise RuntimeError(
+                        f'Failed to initialize plugin {requirement.name} with exit code {exit_code} and output: {output}'
+                    )
+                logger.info(
+                    f'Plugin {requirement.name} initialized successfully\n:{output}'
                 )
-            logger.info(f'Plugin {requirement.name} initialized successfully.')
 
         if len(requirements) > 0:
             exit_code, output = self.execute('source ~/.bashrc')

+ 4 - 1
opendevin/runtime/sandbox.py

@@ -1,6 +1,7 @@
 import os
 from abc import ABC, abstractmethod
 
+from opendevin.core.schema import CancellableStream
 from opendevin.runtime.docker.process import Process
 from opendevin.runtime.plugins.mixin import PluginMixin
 
@@ -19,7 +20,9 @@ class Sandbox(ABC, PluginMixin):
         self._env[key] = value
 
     @abstractmethod
-    def execute(self, cmd: str, timeout: int | None = None) -> tuple[int, str]:
+    def execute(
+        self, cmd: str, stream: bool = False, timeout: int | None = None
+    ) -> tuple[int, str | CancellableStream]:
         pass
 
     @abstractmethod

+ 1 - 1
opendevin/runtime/server/runtime.py

@@ -75,7 +75,7 @@ class ServerRuntime(Runtime):
         try:
             exit_code, output = self.sandbox.execute(command)
             return CmdOutputObservation(
-                command_id=-1, content=output, command=command, exit_code=exit_code
+                command_id=-1, content=str(output), command=command, exit_code=exit_code
             )
         except UnicodeDecodeError:
             return ErrorObservation('Command output could not be decoded as utf-8')

+ 4 - 1
opendevin/server/session/manager.py

@@ -91,7 +91,10 @@ class SessionManager:
             session_ids_to_remove = []
             for sid, session in list(self._sessions.items()):
                 # if session inactive for a long time, remove it
-                if not session.is_alive and current_time - session.last_active_ts > self.session_timeout:
+                if (
+                    not session.is_alive
+                    and current_time - session.last_active_ts > self.session_timeout
+                ):
                     session_ids_to_remove.append(sid)
 
             for sid in session_ids_to_remove:

+ 1 - 1
tests/test_fileops.py

@@ -3,7 +3,7 @@ from pathlib import Path
 import pytest
 
 from opendevin.core.config import config
-from opendevin.events.action import files
+from opendevin.runtime.server import files
 
 SANDBOX_PATH_PREFIX = '/workspace'
 

+ 1 - 1
tests/unit/test_sandbox.py

@@ -145,7 +145,7 @@ def test_ssh_box_multi_line_cmd_run_as_devin(temp_dir):
         config, 'sandbox_type', new='ssh'
     ):
         for box in [DockerSSHBox(), DockerExecBox()]:
-            exit_code, output = box.execute('pwd\nls -l')
+            exit_code, output = box.execute('pwd && ls -l')
             assert exit_code == 0, (
                 'The exit code should be 0 for ' + box.__class__.__name__
             )