瀏覽代碼

[Arch] Implement EventStream Runtime Client with Jupyter Support using Agnostic Sandbox (#2879)

* support loading a particular runtime class via config.runtime (default to server to not break things)

* move image agnostic util to shared runtime util

* move dependency

* include poetry.lock in sdist

* accept port as arg for client

* make client start server with specified port

* update image agnostic utility for eventstream runtime

* make client and runtime working with REST API

* rename execute_server

* add plugin to initialize stuff inside es-runtime;
cleanup runtime methods to delegate everything to container

* remove redundant ls -alh

* fix jupyter

* improve logging in agnostic sandbox

* improve logging of test function

* add read & edit

* update agnostic sandbox

* support setting work dir at start

* fix file read/write test

* fix unit test

* update tescase

* Fix unit test again

* fix unit test again again
Xingyao Wang 1 年之前
父節點
當前提交
e45d46c993

+ 3 - 2
opendevin/core/main.py

@@ -15,8 +15,8 @@ from opendevin.events.action import MessageAction
 from opendevin.events.event import Event
 from opendevin.events.observation import AgentStateChangedObservation
 from opendevin.llm.llm import LLM
+from opendevin.runtime import get_runtime_cls
 from opendevin.runtime.sandbox import Sandbox
-from opendevin.runtime.server.runtime import ServerRuntime
 
 
 def read_task_from_file(file_path: str) -> str:
@@ -79,7 +79,8 @@ async def run_agent_controller(
     )
 
     # runtime and tools
-    runtime = ServerRuntime(event_stream=event_stream, sandbox=sandbox)
+    runtime_cls = get_runtime_cls(config.runtime)
+    runtime = runtime_cls(event_stream=event_stream, sandbox=sandbox)
     runtime.init_sandbox_plugins(controller.agent.sandbox_plugins)
     runtime.init_runtime_tools(
         controller.agent.runtime_tools,

+ 31 - 1
opendevin/runtime/__init__.py

@@ -1,6 +1,36 @@
+from typing import TYPE_CHECKING, Type
+
 from .docker.local_box import LocalBox
 from .docker.ssh_box import DockerSSHBox
 from .e2b.sandbox import E2BBox
 from .sandbox import Sandbox
 
-__all__ = ['Sandbox', 'DockerSSHBox', 'E2BBox', 'LocalBox']
+if TYPE_CHECKING:
+    from .runtime import Runtime
+
+
+def get_runtime_cls(name: str) -> Type['Runtime']:
+    # Local imports to avoid circular imports
+    if name == 'server':
+        from .server.runtime import ServerRuntime
+
+        return ServerRuntime
+    elif name == 'client':
+        from .client.runtime import EventStreamRuntime
+
+        return EventStreamRuntime
+    elif name == 'e2b':
+        from .e2b.runtime import E2BRuntime
+
+        return E2BRuntime
+    else:
+        raise ValueError(f'Runtime {name} not supported')
+
+
+__all__ = [
+    'DockerSSHBox',
+    'E2BBox',
+    'LocalBox',
+    'Sandbox',
+    'get_runtime_cls',
+]

+ 205 - 196
opendevin/runtime/client/client.py

@@ -1,229 +1,238 @@
+import argparse
 import asyncio
 import os
-import websockets
+from pathlib import Path
+
 import pexpect
-import json
-import shutil
-import re
-from typing import Any
-from websockets.exceptions import ConnectionClosed
-from opendevin.events.serialization import event_to_dict, event_from_dict
-from opendevin.events.observation import Observation
-from opendevin.runtime.plugins import PluginRequirement
+from fastapi import FastAPI, HTTPException, Request
+from pydantic import BaseModel
+from uvicorn import run
+
+from opendevin.core.logger import opendevin_logger as logger
 from opendevin.events.action import (
     Action,
     CmdRunAction,
+    FileReadAction,
+    FileWriteAction,
     IPythonRunCellAction,
 )
 from opendevin.events.observation import (
     CmdOutputObservation,
     ErrorObservation,
+    FileReadObservation,
+    FileWriteObservation,
     Observation,
-    IPythonRunCellObservation
 )
+from opendevin.events.serialization import event_from_dict, event_to_dict
 from opendevin.runtime.plugins import (
-    AgentSkillsRequirement,
-    JupyterRequirement,
-    PluginRequirement,
+    ALL_PLUGINS,
+    JupyterPlugin,
+    Plugin,
 )
+from opendevin.runtime.server.files import insert_lines, read_lines
+
+app = FastAPI()
+
+
+class ActionRequest(BaseModel):
+    action: dict
+
+
+class RuntimeClient:
+    """RuntimeClient is running inside docker sandbox.
+    It is responsible for executing actions received from OpenDevin backend and producing observations.
+    """
+
+    def __init__(self, plugins_to_load: list[Plugin], work_dir: str) -> None:
+        self._init_bash_shell(work_dir)
+        self.lock = asyncio.Lock()
+        self.plugins: dict[str, Plugin] = {}
+
+        for plugin in plugins_to_load:
+            plugin.initialize()
+            self.plugins[plugin.name] = plugin
+            logger.info(f'Initializing plugin: {plugin.name}')
+
+    def _init_bash_shell(self, work_dir: str) -> None:
+        self.shell = pexpect.spawn('/bin/bash', encoding='utf-8', echo=False)
+        self.__bash_expect = r'\[PEXPECT\][\$\#] '
+        self.__bash_PS1 = r'\u@\h:\w [PEXPECT]\$ '
+        self.shell.sendline(f'export PS1="{self.__bash_PS1}"')
+        self.shell.expect(self.__bash_expect)
+        self.shell.sendline(f'cd {work_dir}')
+        self.shell.expect(self.__bash_expect)
+
+    def _execute_bash(self, command, keep_prompt: bool = True):
+        logger.info(f'Received command: {command}')
+        self.shell.sendline(command)
+        self.shell.expect(self.__bash_expect)
+        output = self.shell.before + '$ '
+        if not keep_prompt:
+            # remove the last line of the output (the prompt)
+            # e.g., user@host:~$
+            output = '\r\n'.join(output.split('\r\n')[:-1])
 
-class RuntimeClient():
-    # This runtime will listen to the websocket
-    # When receive an event, it will run the action and send the observation back to the websocket
-
-    def __init__(self) -> None:
-        self.init_shell()
-        # TODO: code will block at init_websocket, maybe we can open a subprocess to run websocket forever
-        # In case we need to run other code after init_websocket
-        self.init_websocket()
-
-    def init_websocket(self) -> None:
-        server = websockets.serve(self.listen, "0.0.0.0", 8080)
-        loop = asyncio.get_event_loop()
-        loop.run_until_complete(server)
-        loop.run_forever()
-    
-    def init_shell(self) -> None:
-        # run as root
-        self.shell = pexpect.spawn('/bin/bash', encoding='utf-8')
+        self.shell.sendline('echo $?')
         self.shell.expect(r'[$#] ')
+        exit_code = int(self.shell.before.split('\r\n')[0].strip())
+        return output, exit_code
 
-    async def listen(self, websocket):
-        try:
-            async for message in websocket:
-                event_str = json.loads(message)
-                event = event_from_dict(event_str)
-                if isinstance(event, Action):
-                    observation = self.run_action(event)
-                    await websocket.send(json.dumps(event_to_dict(observation)))
-        except ConnectionClosed:
-            print("Connection closed")
-    
-    def run_action(self, action) -> Observation:
-        # Should only receive Action CmdRunAction and IPythonRunCellAction
-        action_type = action.action  # type: ignore[attr-defined]
-        observation = getattr(self, action_type)(action)
-        # TODO: see comments in https://github.com/OpenDevin/OpenDevin/pull/2603#discussion_r1668994137
-        observation._parent = action.id  # type: ignore[attr-defined]
+    async def run_action(self, action) -> Observation:
+        action_type = action.action
+        observation = await getattr(self, action_type)(action)
+        observation._parent = action.id
         return observation
-    
-    def run(self, action: CmdRunAction) -> Observation:
-        return self._run_command(action.command)
-    
-    def _run_command(self, command: str) -> Observation:
+
+    async def run(self, action: CmdRunAction) -> CmdOutputObservation:
         try:
-            output, exit_code = self.execute(command)
+            output, exit_code = self._execute_bash(action.command)
             return CmdOutputObservation(
-                command_id=-1, content=str(output), command=command, exit_code=exit_code
+                command_id=-1,
+                content=str(output),
+                command=action.command,
+                exit_code=exit_code,
             )
         except UnicodeDecodeError:
-            return ErrorObservation('Command output could not be decoded as utf-8')
+            raise RuntimeError('Command output could not be decoded as utf-8')
 
-    def clean_up(self,input_text):
-        # Remove escape sequences
-        cleaned_text = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z]', '', input_text)
-        # Remove carriage returns and other control characters
-        cleaned_text = re.sub(r'[\r\n\t]', '', cleaned_text)
-        return cleaned_text
+    async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
+        if 'jupyter' in self.plugins:
+            _jupyter_plugin: JupyterPlugin = self.plugins['jupyter']  # type: ignore
+            return await _jupyter_plugin.run(action)
+        else:
+            raise RuntimeError(
+                'JupyterRequirement not found. Unable to run IPython action.'
+            )
 
-    def execute(self, command):
-        print(f"Received command: {command}")
-        self.shell.sendline(command)
-        self.shell.expect(r'[$#] ')
-        output = self.shell.before.strip().split('\r\n', 1)[1].strip()
-        # Get the exit code
-        self.shell.sendline('echo $?')
-        self.shell.expect(r'[$#] ')
-        exit_code = self.clean_up(self.shell.before.strip().split('\r\n')[1].strip())
-        return output, exit_code
+    def get_working_directory(self):
+        result, exit_code = self._execute_bash('pwd', keep_prompt=False)
+        if exit_code != 0:
+            raise RuntimeError('Failed to get working directory')
+        return result.strip()
+
+    def _resolve_path(self, path: str, working_dir: str) -> str:
+        filepath = Path(path)
+        if not filepath.is_absolute():
+            return str(Path(working_dir) / filepath)
+        return str(filepath)
+
+    async def read(self, action: FileReadAction) -> Observation:
+        # NOTE: the client code is running inside the sandbox,
+        # so there's no need to check permission
+        working_dir = self.get_working_directory()
+        filepath = self._resolve_path(action.path, working_dir)
+        try:
+            with open(filepath, 'r', encoding='utf-8') as file:
+                lines = read_lines(file.readlines(), action.start, action.end)
+        except FileNotFoundError:
+            return ErrorObservation(
+                f'File not found: {filepath}. Your current working directory is {working_dir}.'
+            )
+        except UnicodeDecodeError:
+            return ErrorObservation(f'File could not be decoded as utf-8: {filepath}.')
+        except IsADirectoryError:
+            return ErrorObservation(
+                f'Path is a directory: {filepath}. You can only read files'
+            )
 
-    def run_ipython(self, action: IPythonRunCellAction) -> Observation:
-        obs = self._run_command(
-            ("cat > /tmp/opendevin_jupyter_temp.py <<'EOL'\n" f'{action.code}\n' 'EOL'),
-        )
-        # run the code
-        obs = self._run_command('cat /tmp/opendevin_jupyter_temp.py | execute_cli')
-        output = obs.content
-        if 'pip install' in action.code:
-            print(output)
-            package_names = action.code.split(' ', 2)[-1]
-            is_single_package = ' ' not in package_names
-
-            if 'Successfully installed' in output:
-                restart_kernel = 'import IPython\nIPython.Application.instance().kernel.do_shutdown(True)'
-                if (
-                    'Note: you may need to restart the kernel to use updated packages.'
-                    in output
-                ):
-                    self._run_command(
-                        (
-                            "cat > /tmp/opendevin_jupyter_temp.py <<'EOL'\n"
-                            f'{restart_kernel}\n'
-                            'EOL'
-                        )
-                    )
-                    obs = self._run_command(
-                        'cat /tmp/opendevin_jupyter_temp.py | execute_cli'
-                    )
-                    output = '[Package installed successfully]'
-                    if "{'status': 'ok', 'restart': True}" != obs.content.strip():
-                        print(obs.content)
-                        output += (
-                            '\n[But failed to restart the kernel to load the package]'
-                        )
-                    else:
-                        output += (
-                            '\n[Kernel restarted successfully to load the package]'
-                        )
+        code_view = ''.join(lines)
+        return FileReadObservation(path=filepath, content=code_view)
 
-                    # re-init the kernel after restart
-                    if action.kernel_init_code:
-                        obs = self._run_command(
-                            (
-                                f"cat > /tmp/opendevin_jupyter_init.py <<'EOL'\n"
-                                f'{action.kernel_init_code}\n'
-                                'EOL'
-                            ),
-                        )
-                        obs = self._run_command(
-                            'cat /tmp/opendevin_jupyter_init.py | execute_cli',
+    async def write(self, action: FileWriteAction) -> Observation:
+        working_dir = self.get_working_directory()
+        filepath = self._resolve_path(action.path, working_dir)
+
+        insert = action.content.split('\n')
+        try:
+            if not os.path.exists(os.path.dirname(filepath)):
+                os.makedirs(os.path.dirname(filepath))
+            mode = 'w' if not os.path.exists(filepath) else 'r+'
+            try:
+                with open(filepath, mode, encoding='utf-8') as file:
+                    if mode != 'w':
+                        all_lines = file.readlines()
+                        new_file = insert_lines(
+                            insert, all_lines, action.start, action.end
                         )
-            elif (
-                is_single_package
-                and f'Requirement already satisfied: {package_names}' in output
-            ):
-                output = '[Package already installed]'
-        return IPythonRunCellObservation(content=output, code=action.code)
+                    else:
+                        new_file = [i + '\n' for i in insert]
+
+                    file.seek(0)
+                    file.writelines(new_file)
+                    file.truncate()
+            except FileNotFoundError:
+                return ErrorObservation(f'File not found: {filepath}')
+            except IsADirectoryError:
+                return ErrorObservation(
+                    f'Path is a directory: {filepath}. You can only write to files'
+                )
+            except UnicodeDecodeError:
+                return ErrorObservation(
+                    f'File could not be decoded as utf-8: {filepath}'
+                )
+        except PermissionError:
+            return ErrorObservation(f'Malformed paths not permitted: {filepath}')
+        return FileWriteObservation(content='', path=filepath)
 
     def close(self):
         self.shell.close()
-    
-    ############################################################################ 
-    # Initialization work inside sandbox image
-    ############################################################################ 
-
-    # init_runtime_tools do in EventStreamRuntime
 
-    def init_sandbox_plugins(self, requirements: list[PluginRequirement]) -> None:
-        # TODO:: test after settle donw the way to move code into sandbox
-        for requirement in requirements:
-            self._source_bashrc()
-
-            shutil.copytree(requirement.host_src, requirement.sandbox_dest)
-
-            # Execute the bash script
-            abs_path_to_bash_script = os.path.join(
-                requirement.sandbox_dest, requirement.bash_script_path
-            )
 
-            print(
-                    f'Initializing plugin [{requirement.name}] by executing [{abs_path_to_bash_script}] in the sandbox.'
-                )
-            output, exit_code = self.execute(abs_path_to_bash_script)
-            if exit_code != 0:
-                raise RuntimeError(
-                    f'Failed to initialize plugin {requirement.name} with exit code {exit_code} and output: {output}'
-                )
-            print(f'Plugin {requirement.name} initialized successfully.')
-        if len(requirements) > 0:
-            self._source_bashrc()
-    
-    def _source_bashrc(self):
-        output, exit_code = self.execute(
-            'source /opendevin/bash.bashrc && source ~/.bashrc'
-        )
-        if exit_code != 0:
-            raise RuntimeError(
-                f'Failed to source /opendevin/bash.bashrc and ~/.bashrc with exit code {exit_code} and output: {output}'
-            )
-        print('Sourced /opendevin/bash.bashrc and ~/.bashrc successfully')
-
-
-def test_run_commond():
-    client = RuntimeClient()
-    command = CmdRunAction(command="ls -l")
-    obs = client.run_action(command)
-    print(obs)
-    command = CmdRunAction(command="pwd")
-    obs = client.run_action(command)
-    print(obs)
-
-
-def test_shell(message):
-    shell = pexpect.spawn('/bin/bash', encoding='utf-8')
-    shell.expect(r'[$#] ')
-    print(f"Received command: {message}")
-    shell.sendline(message)
-    shell.expect(r'[$#] ')
-    output = shell.before.strip().split('\r\n', 1)[1].strip()
-    shell.close()
-    print(output)
-
-if __name__ == "__main__":
-    # print(test_shell("ls -l"))
-    # client = RuntimeClient()
-    test_run_commond()
-    # client.init_sandbox_plugins([AgentSkillsRequirement,JupyterRequirement])
-    # print(test_shell("whoami"))
-
-    
+# def test_run_commond():
+#     client = RuntimeClient()
+#     command = CmdRunAction(command='ls -l')
+#     obs = client.run_action(command)
+#     print(obs)
+
+# def test_shell(message):
+#     shell = pexpect.spawn('/bin/bash', encoding='utf-8')
+#     shell.expect(r'[$#] ')
+#     print(f'Received command: {message}')
+#     shell.sendline(message)
+#     shell.expect(r'[$#] ')
+#     output = shell.before.strip().split('\r\n', 1)[1].strip()
+#     print(f'Output: {output}')
+#     shell.close()
+
+if __name__ == '__main__':
+    parser = argparse.ArgumentParser()
+    parser.add_argument('port', type=int, help='Port to listen on')
+    parser.add_argument('--working-dir', type=str, help='Working directory')
+    parser.add_argument('--plugins', type=str, help='Plugins to initialize', nargs='+')
+    # example: python client.py 8000 --working-dir /workspace --plugins JupyterRequirement
+    args = parser.parse_args()
+
+    plugins_to_load: list[Plugin] = []
+    if args.plugins:
+        for plugin in args.plugins:
+            if plugin not in ALL_PLUGINS:
+                raise ValueError(f'Plugin {plugin} not found')
+            plugins_to_load.append(ALL_PLUGINS[plugin]())  # type: ignore
+
+    client = RuntimeClient(plugins_to_load, work_dir=args.working_dir)
+
+    @app.middleware('http')
+    async def one_request_at_a_time(request: Request, call_next):
+        async with client.lock:
+            response = await call_next(request)
+        return response
+
+    @app.post('/execute_action')
+    async def execute_action(action_request: ActionRequest):
+        try:
+            action = event_from_dict(action_request.action)
+            if not isinstance(action, Action):
+                raise HTTPException(status_code=400, detail='Invalid action type')
+            observation = await client.run_action(action)
+            return event_to_dict(observation)
+        except Exception as e:
+            logger.error(f'Error processing command: {str(e)}')
+            raise HTTPException(status_code=500, detail=str(e))
+
+    @app.get('/alive')
+    async def alive():
+        return {'status': 'ok'}
+
+    logger.info(f'Starting action execution API on port {args.port}')
+    print(f'Starting action execution API on port {args.port}')
+    run(app, host='0.0.0.0', port=args.port)

+ 254 - 145
opendevin/runtime/client/runtime.py

@@ -1,25 +1,15 @@
-from typing import Any
 import asyncio
-import json
-import websockets
-import docker
+import atexit
 import uuid
-from opendevin.events.serialization.action import ACTION_TYPE_TO_CLASS
-from opendevin.events.action.action import Action
-from opendevin.events.event import Event
-from opendevin.events.observation import Observation
-from opendevin.events.stream import EventStream
-from opendevin.events.serialization import event_to_dict, observation_from_dict
-from opendevin.runtime.runtime import Runtime
-from opendevin.runtime.server.browse import browse
-from opendevin.runtime.server.files import read_file, write_file
-from opendevin.runtime.plugins import PluginRequirement
+from typing import Optional
+
+import aiohttp
+import docker
+import tenacity
+
 from opendevin.core.config import config
-from opendevin.events.observation import (
-    ErrorObservation,
-    NullObservation,
-    Observation,
-)
+from opendevin.core.logger import opendevin_logger as logger
+from opendevin.events import EventSource, EventStream, EventStreamSubscriber
 from opendevin.events.action import (
     AgentRecallAction,
     BrowseInteractiveAction,
@@ -29,83 +19,156 @@ from opendevin.events.action import (
     FileWriteAction,
     IPythonRunCellAction,
 )
-import asyncio
-from opendevin.events import EventSource, EventStream, EventStreamSubscriber
+from opendevin.events.action.action import Action
+from opendevin.events.event import Event
+from opendevin.events.observation import (
+    ErrorObservation,
+    NullObservation,
+    Observation,
+)
+from opendevin.events.serialization import event_to_dict, observation_from_dict
+from opendevin.events.serialization.action import ACTION_TYPE_TO_CLASS
+from opendevin.runtime.plugins import (
+    AgentSkillsRequirement,
+    JupyterRequirement,
+    PluginRequirement,
+)
+from opendevin.runtime.runtime import Runtime
+from opendevin.runtime.utils import find_available_tcp_port
+from opendevin.runtime.utils.image_agnostic import get_od_sandbox_image
+
 
 class EventStreamRuntime(Runtime):
     # This runtime will subscribe the event stream
     # When receive an event, it will send the event to od-runtime-client which run inside the docker environment
-    
-    # websocket uri
-    uri = 'ws://localhost:8080'
+
     container_name_prefix = 'opendevin-sandbox-'
-    docker_client: docker.DockerClient
 
-    def __init__(self, event_stream: EventStream, sid: str = 'default',container_image: str | None = None):
-        # We don't need sandbox in this runtime, because it's equal to a websocket sandbox
-        self._init_event_stream(event_stream)
-        self._init_websocket()
-        self._init_docker(sid,container_image)
-
-    def _init_docker(self,sid,container_image):
-        self.container_image = container_image
-        # (
-        #     config.sandbox_container_image
-        #     if container_image is None
-        #     else container_image
-        # )
+    def __init__(
+        self,
+        event_stream: EventStream,
+        sid: str = 'default',
+        container_image: str | None = None,
+        plugins: list[PluginRequirement] | None = None,
+    ):
+        self._port = find_available_tcp_port()
+        self.api_url = f'http://localhost:{self._port}'
+        self.session: Optional[aiohttp.ClientSession] = None
+
         self.instance_id = (
             sid + str(uuid.uuid4()) if sid is not None else str(uuid.uuid4())
         )
+        self.docker_client: docker.DockerClient = self._init_docker_client()
+        self.container_image = (
+            config.sandbox.container_image
+            if container_image is None
+            else container_image
+        )
+        self.container_image = get_od_sandbox_image(
+            self.container_image, self.docker_client, is_eventstream_runtime=True
+        )
         self.container_name = self.container_name_prefix + self.instance_id
+        atexit.register(self.close)
+
+        # We don't need sandbox in this runtime, because it's equal to a websocket sandbox
+        self._init_event_stream(event_stream)
+        self.plugins = plugins if plugins is not None else []
+        self.container = self._init_container(
+            self.sandbox_workspace_dir,
+            mount_dir=config.workspace_mount_path,
+            plugins=plugins,
+        )
+
+    @staticmethod
+    def _init_docker_client() -> docker.DockerClient:
         try:
-            self.docker_client = docker.from_env()
-            self._init_sandbox()
+            return docker.from_env()
         except Exception as ex:
-            print(
-                "Launch docker client failed. Please make sure you have installed docker and started the docker daemon."
+            logger.error(
+                'Launch docker client failed. Please make sure you have installed docker and started the docker daemon.'
             )
             raise ex
-    
-    def _init_event_stream(self,event_stream: EventStream):
-        self.event_stream = event_stream
-        self.event_stream.subscribe(EventStreamSubscriber.RUNTIME, self.on_event)
 
-    def _init_websocket(self):
-        self.websocket = None
-        # TODO: need to initialization globally only once
-        # self.loop = asyncio.new_event_loop()
-        # asyncio.set_event_loop(self.loop)
-        # self.loop.run_until_complete(self._init_websocket_connect())
-    
-    async def _init_websocket_connect(self):
-        self.websocket = await websockets.connect(self.uri)
-    
-    def _init_sandbox(self):
+    @tenacity.retry(
+        stop=tenacity.stop_after_attempt(5),
+        wait=tenacity.wait_exponential(multiplier=1, min=4, max=60),
+    )
+    def _init_container(
+        self,
+        sandbox_workspace_dir: str,
+        mount_dir: str = config.workspace_mount_path,
+        plugins: list[PluginRequirement] | None = None,
+    ):
+        """Start a container and return the container object.
+
+        Args:
+            mount_dir: str: The directory (on host machine) to mount to the container
+            sandbox_workspace_dir: str: working directory in the container, also the target directory for the mount
+        """
+
         try:
             # start the container
-            mount_dir = config.workspace_mount_path
-            self.container = self.docker_client.containers.run(
+            logger.info(
+                f'Starting container with image: {self.container_image} and name: {self.container_name}'
+            )
+            if plugins is None:
+                plugins = []
+            plugin_names = ' '.join([plugin.name for plugin in plugins])
+            container = self.docker_client.containers.run(
                 self.container_image,
-                command='tail -f /dev/null',
-                # TODO: test the port mapping in mac and linux
-                # network_mode='host',
-                working_dir=self.sandbox_workspace_dir,
+                command=(
+                    f'/opendevin/miniforge3/bin/mamba run --no-capture-output -n base '
+                    'PYTHONUNBUFFERED=1 poetry run '
+                    f'python -u -m opendevin.runtime.client.client {self._port} '
+                    f'--working-dir {sandbox_workspace_dir} '
+                    f'--plugins {plugin_names} '
+                ),
+                # TODO: test it in mac and linux
+                network_mode='host',
+                working_dir='/opendevin/code/',
                 name=self.container_name,
                 detach=True,
-                ports={'8080/tcp': 8080},
-                volumes={mount_dir: {'bind': self.sandbox_workspace_dir, 'mode': 'rw'}},
+                volumes={mount_dir: {'bind': sandbox_workspace_dir, 'mode': 'rw'}},
             )
-            print('Container started')
+            logger.info(f'Container started. Server url: {self.api_url}')
+            return container
         except Exception as e:
-            print('Failed to start container')
+            logger.error('Failed to start container')
+            logger.exception(e)
+            self.close(close_client=False)
             raise e
 
+    def _init_event_stream(self, event_stream: EventStream):
+        self.event_stream = event_stream
+        self.event_stream.subscribe(EventStreamSubscriber.RUNTIME, self.on_event)
+
+    async def _ensure_session(self):
+        if self.session is None or self.session.closed:
+            self.session = aiohttp.ClientSession()
+        return self.session
+
+    @tenacity.retry(
+        stop=tenacity.stop_after_attempt(5),
+        wait=tenacity.wait_exponential(multiplier=2, min=4, max=600),
+    )
+    async def _wait_until_alive(self):
+        async with aiohttp.ClientSession() as session:
+            async with session.get(f'{self.api_url}/alive') as response:
+                if response.status == 200:
+                    return
+                else:
+                    logger.error(
+                        f'Action execution API is not alive. Response: {response}'
+                    )
+                    raise RuntimeError(
+                        f'Action execution API is not alive. Response: {response}'
+                    )
+
     @property
     def sandbox_workspace_dir(self):
         return config.workspace_mount_path_in_sandbox
 
-    def close(self):
+    def close(self, close_client: bool = True):
         containers = self.docker_client.containers.list(all=True)
         for container in containers:
             try:
@@ -113,18 +176,19 @@ class EventStreamRuntime(Runtime):
                     container.remove(force=True)
             except docker.errors.NotFound:
                 pass
-        self.docker_client.close()
-    
+        if close_client:
+            self.docker_client.close()
+
     async def on_event(self, event: Event) -> None:
-        print("EventStreamRuntime: on_event triggered")
+        print('EventStreamRuntime: on_event triggered')
         if isinstance(event, Action):
             observation = await self.run_action(event)
-            print("EventStreamRuntime: observation", observation)
+            print('EventStreamRuntime: observation', observation)
             # observation._cause = event.id  # type: ignore[attr-defined]
             source = event.source if event.source else EventSource.AGENT
             await self.event_stream.add_event(observation, source)
-    
-    async def run_action(self, action: Action) -> Observation:
+
+    async def run_action(self, action: Action, timeout: int = 600) -> Observation:
         """
         Run an action and return the resulting observation.
         If the action is not runnable in any runtime, a NullObservation is returned.
@@ -140,66 +204,92 @@ class EventStreamRuntime(Runtime):
             return ErrorObservation(
                 f'Action {action_type} is not supported in the current runtime.'
             )
-        observation = await getattr(self, action_type)(action)
-        # TODO: fix ID problem, see comments https://github.com/OpenDevin/OpenDevin/pull/2603#discussion_r1668994137
-        observation._parent = action.id  # type: ignore[attr-defined]
-        return observation
-    
-    async def run(self, action: CmdRunAction) -> Observation:
-        return await self._run_command(action)
-    
-    async def _run_command(
-        self, action: Action, _stream: bool = False, timeout: int | None = None
-    ) -> Observation:
-        # Send action into websocket and get the result
-        # TODO: need to initialization globally only once
-        self.websocket = await websockets.connect(self.uri)
-        if self.websocket is None:
-            raise Exception("WebSocket is not connected.")
+
+        # Run action in od-runtime-client
+        session = await self._ensure_session()
+        await self._wait_until_alive()
         try:
-            await self.websocket.send(json.dumps(event_to_dict(action)))
-            output = await asyncio.wait_for(self.websocket.recv(), timeout=timeout)
-            output = json.loads(output)
-            print("Received output: ", output)
+            async with session.post(
+                f'{self.api_url}/execute_action',
+                json={'action': event_to_dict(action)},
+                timeout=timeout,
+            ) as response:
+                if response.status == 200:
+                    output = await response.json()
+                    obs = observation_from_dict(output)
+                    obs._cause = action.id  # type: ignore[attr-defined]
+                    return obs
+                else:
+                    error_message = await response.text()
+                    logger.error(f'Error from server: {error_message}')
+                    obs = ErrorObservation(f'Command execution failed: {error_message}')
         except asyncio.TimeoutError:
-            print("No response received within the timeout period.")
-        await self.websocket.close()
-        return observation_from_dict(output)
-        
-    async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
-        return await self._run_command(action)
+            logger.error('No response received within the timeout period.')
+            obs = ErrorObservation('Command execution timed out')
+        except Exception as e:
+            logger.error(f'Error during command execution: {e}')
+            obs = ErrorObservation(f'Command execution failed: {str(e)}')
+        # TODO: fix ID problem, see comments https://github.com/OpenDevin/OpenDevin/pull/2603#discussion_r1668994137
+        obs._parent = action.id  # type: ignore[attr-defined]
+        return obs
 
-    ############################################################################ 
-    # Keep the same with other runtimes
-    ############################################################################ 
+    async def run(self, action: CmdRunAction) -> Observation:
+        return await self.run_action(action)
+
+    async def run_ipython(self, action: IPythonRunCellAction) -> Observation:
+        return await self.run_action(action)
 
-    def get_working_directory(self):
-        # TODO: should we get this from od-runtime-client
-        return config.workspace_base
-    
     async def read(self, action: FileReadAction) -> Observation:
-        working_dir = self.get_working_directory()
-        return await read_file(action.path, working_dir, action.start, action.end)
-    
+        return await self.run_action(action)
+
     async def write(self, action: FileWriteAction) -> Observation:
-        working_dir = self.get_working_directory()
-        return await write_file(
-            action.path, working_dir, action.content, action.start, action.end
-        )
-    
+        return await self.run_action(action)
+
     async def browse(self, action: BrowseURLAction) -> Observation:
-        return await browse(action, self.browser)
+        return await self.run_action(action)
 
     async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
-        return await browse(action, self.browser)
+        return await self.run_action(action)
 
     async def recall(self, action: AgentRecallAction) -> Observation:
-        return NullObservation('')
+        return await self.run_action(action)
+
+    ############################################################################
+    # Keep the same with other runtimes
+    ############################################################################
+
+    def get_working_directory(self):
+        # FIXME: this is not needed for the agent - we keep this
+        # method to be consistent with the other runtimes
+        # but eventually we will remove this method across all runtimes
+        # when we use EventStreamRuntime to replace the other sandbox-based runtime
+        raise NotImplementedError(
+            'This method is not implemented in the runtime client.'
+        )
+
+    # async def read(self, action: FileReadAction) -> Observation:
+    #     working_dir = self.get_working_directory()
+    #     return await read_file(action.path, working_dir, action.start, action.end)
+
+    # async def write(self, action: FileWriteAction) -> Observation:
+    #     working_dir = self.get_working_directory()
+    #     return await write_file(
+    #         action.path, working_dir, action.content, action.start, action.end
+    #     )
+
+    # async def browse(self, action: BrowseURLAction) -> Observation:
+    #     return await browse(action, self.browser)
+
+    # async def browse_interactive(self, action: BrowseInteractiveAction) -> Observation:
+    #     return await browse(action, self.browser)
 
-    ############################################################################ 
+    # async def recall(self, action: AgentRecallAction) -> Observation:
+    #     return NullObservation('')
+
+    ############################################################################
     # Initialization work inside sandbox image
-    ############################################################################ 
-    
+    ############################################################################
+
     # init_runtime_tools direcctly do as what Runtime do
 
     # Do in the od_runtime_client
@@ -208,53 +298,72 @@ class EventStreamRuntime(Runtime):
         pass
 
 
-    
 def test_run_command():
-    sid = "test"
+    sid = 'test'
     cli_session = 'main' + ('_' + sid if sid else '')
     event_stream = EventStream(cli_session)
     runtime = EventStreamRuntime(event_stream)
-    asyncio.run(runtime._run_command(CmdRunAction('ls -l')))
+    asyncio.run(runtime.run_action(CmdRunAction('ls -l')))
+
 
 async def test_event_stream():
-    sid = "test"
+    sid = 'test'
     cli_session = 'main' + ('_' + sid if sid else '')
     event_stream = EventStream(cli_session)
-    runtime = EventStreamRuntime(event_stream)
+    runtime = EventStreamRuntime(
+        event_stream,
+        sid,
+        'ubuntu:22.04',
+        plugins=[JupyterRequirement(), AgentSkillsRequirement()],
+    )
     # Test run command
     action_cmd = CmdRunAction(command='ls -l')
-    print(await runtime.run_action(action_cmd))
+    logger.info(action_cmd, extra={'msg_type': 'ACTION'})
+    logger.info(await runtime.run_action(action_cmd), extra={'msg_type': 'OBSERVATION'})
 
     # Test run ipython
-    test_code = "print('Hello, `World`!\n')"
-    action_opython = IPythonRunCellAction(code=test_code)
-    print(await runtime.run_action(action_opython))
+    test_code = "print('Hello, `World`!\\n')"
+    action_ipython = IPythonRunCellAction(code=test_code)
+    logger.info(action_ipython, extra={'msg_type': 'ACTION'})
+    logger.info(
+        await runtime.run_action(action_ipython), extra={'msg_type': 'OBSERVATION'}
+    )
 
-    # Test read file
+    # Test read file (file should not exist)
     action_read = FileReadAction(path='hello.sh')
-    print(await runtime.run_action(action_read))
+    logger.info(action_read, extra={'msg_type': 'ACTION'})
+    logger.info(
+        await runtime.run_action(action_read), extra={'msg_type': 'OBSERVATION'}
+    )
 
     # Test write file
     action_write = FileWriteAction(content='echo "Hello, World!"', path='hello.sh')
-    print(await runtime.run_action(action_write))
+    logger.info(action_write, extra={'msg_type': 'ACTION'})
+    logger.info(
+        await runtime.run_action(action_write), extra={'msg_type': 'OBSERVATION'}
+    )
+
+    # Test read file (file should exist)
+    action_read = FileReadAction(path='hello.sh')
+    logger.info(action_read, extra={'msg_type': 'ACTION'})
+    logger.info(
+        await runtime.run_action(action_read), extra={'msg_type': 'OBSERVATION'}
+    )
 
     # Test browse
     action_browse = BrowseURLAction(url='https://google.com')
-    print(await runtime.run_action(action_browse))
+    logger.info(action_browse, extra={'msg_type': 'ACTION'})
+    logger.info(
+        await runtime.run_action(action_browse), extra={'msg_type': 'OBSERVATION'}
+    )
 
     # Test recall
     action_recall = AgentRecallAction(query='who am I?')
-    print(await runtime.run_action(action_recall))
+    logger.info(action_recall, extra={'msg_type': 'ACTION'})
+    logger.info(
+        await runtime.run_action(action_recall), extra={'msg_type': 'OBSERVATION'}
+    )
 
-def test_docker_launch():
-    sid = "test"
-    cli_session = 'main' + ('_' + sid if sid else '')
-    event_stream = EventStream(cli_session)
-    runtime = EventStreamRuntime(event_stream,sid,"ghcr.io/opendevin/sandbox:main")
-    runtime.close()
 
-if __name__ == "__main__":
+if __name__ == '__main__':
     asyncio.run(test_event_stream())
-
-
-    

+ 11 - 3
opendevin/runtime/plugins/__init__.py

@@ -1,14 +1,22 @@
 # Requirements
-from .agent_skills import AgentSkillsRequirement
-from .jupyter import JupyterRequirement
+from .agent_skills import AgentSkillsPlugin, AgentSkillsRequirement
+from .jupyter import JupyterPlugin, JupyterRequirement
 from .mixin import PluginMixin
-from .requirement import PluginRequirement
+from .requirement import Plugin, PluginRequirement
 from .swe_agent_commands import SWEAgentCommandsRequirement
 
 __all__ = [
+    'Plugin',
     'PluginMixin',
     'PluginRequirement',
     'AgentSkillsRequirement',
+    'AgentSkillsPlugin',
     'JupyterRequirement',
+    'JupyterPlugin',
     'SWEAgentCommandsRequirement',
 ]
+
+ALL_PLUGINS = {
+    'jupyter': JupyterPlugin,
+    'agent_skills': AgentSkillsPlugin,
+}

+ 5 - 1
opendevin/runtime/plugins/agent_skills/__init__.py

@@ -2,7 +2,7 @@ import os
 from dataclasses import dataclass
 
 from opendevin.runtime.plugins.agent_skills.agentskills import DOCUMENTATION
-from opendevin.runtime.plugins.requirement import PluginRequirement
+from opendevin.runtime.plugins.requirement import Plugin, PluginRequirement
 
 
 @dataclass
@@ -14,3 +14,7 @@ class AgentSkillsRequirement(PluginRequirement):
     sandbox_dest: str = '/opendevin/plugins/agent_skills'
     bash_script_path: str = 'setup.sh'
     documentation: str = DOCUMENTATION
+
+
+class AgentSkillsPlugin(Plugin):
+    name: str = 'agent_skills'

+ 62 - 1
opendevin/runtime/plugins/jupyter/__init__.py

@@ -1,7 +1,14 @@
 import os
+import subprocess
+import time
 from dataclasses import dataclass
 
-from opendevin.runtime.plugins.requirement import PluginRequirement
+from opendevin.events.action import Action, IPythonRunCellAction
+from opendevin.events.observation import IPythonRunCellObservation, Observation
+from opendevin.runtime.plugins.requirement import Plugin, PluginRequirement
+from opendevin.runtime.utils import find_available_tcp_port
+
+from .execute_server import JupyterKernel
 
 
 @dataclass
@@ -12,3 +19,57 @@ class JupyterRequirement(PluginRequirement):
     )  # The directory of this file (opendevin/runtime/plugins/jupyter)
     sandbox_dest: str = '/opendevin/plugins/jupyter'
     bash_script_path: str = 'setup.sh'
+
+    # ================================================================
+    # Plugin methods, which will ONLY be used in the runtime client
+    # running inside docker
+    # ================================================================
+
+
+class JupyterPlugin(Plugin):
+    name: str = 'jupyter'
+
+    def initialize(self, kernel_id: str = 'opendevin-default'):
+        self.kernel_gateway_port = find_available_tcp_port()
+        self.kernel_id = kernel_id
+        self.gateway_process = subprocess.Popen(
+            [
+                '/opendevin/miniforge3/bin/mamba',
+                'run',
+                '-n',
+                'base',
+                'poetry',
+                'run',
+                'jupyter',
+                'kernelgateway',
+                '--KernelGatewayApp.ip=0.0.0.0',
+                f'--KernelGatewayApp.port={self.kernel_gateway_port}',
+            ],
+            stderr=subprocess.STDOUT,
+        )
+        # read stdout until the kernel gateway is ready
+        while True and self.gateway_process.stdout is not None:
+            line = self.gateway_process.stdout.readline().decode('utf-8')
+            if 'at' in line:
+                break
+            time.sleep(1)
+            print('Waiting for jupyter kernel gateway to start...')
+
+    async def run(self, action: Action) -> Observation:
+        if not isinstance(action, IPythonRunCellAction):
+            raise ValueError(
+                f'Jupyter plugin only supports IPythonRunCellAction, but got {action}'
+            )
+
+        if not hasattr(self, 'kernel'):
+            self.kernel = JupyterKernel(
+                f'localhost:{self.kernel_gateway_port}', self.kernel_id
+            )
+
+        if not self.kernel.initialized:
+            await self.kernel.initialize()
+        output = await self.kernel.execute(action.code)
+        return IPythonRunCellObservation(
+            content=output,
+            code=action.code,
+        )

+ 4 - 0
opendevin/runtime/plugins/jupyter/execute_server → opendevin/runtime/plugins/jupyter/execute_server.py

@@ -64,6 +64,7 @@ class JupyterKernel:
 
         self.heartbeat_interval = 10000  # 10 seconds
         self.heartbeat_callback = None
+        self.initialized = False
 
     async def initialize(self):
         await self.execute(r'%colors nocolor')
@@ -76,6 +77,7 @@ class JupyterKernel:
         for tool in self.tools_to_run:
             # logging.info(f'Tool initialized:\n{tool}')
             await self.execute(tool)
+        self.initialized = True
 
     async def _send_heartbeat(self):
         if not self.ws:
@@ -139,6 +141,7 @@ class JupyterKernel:
             await self._connect()
 
         msg_id = uuid4().hex
+        assert self.ws is not None
         self.ws.write_message(
             json_encode(
                 {
@@ -169,6 +172,7 @@ class JupyterKernel:
         async def wait_for_messages():
             execution_done = False
             while not execution_done:
+                assert self.ws is not None
                 msg = await self.ws.read_message()
                 msg = json_decode(msg)
                 msg_type = msg['msg_type']

+ 1 - 1
opendevin/runtime/plugins/jupyter/setup.sh

@@ -64,7 +64,7 @@ echo "JupyterKernelGateway started with PID: $JUPYTER_GATEWAY_PID"
 # Start the jupyter_server
 export JUPYTER_EXEC_SERVER_PORT=$(find_free_port 30000 40000)
 echo "export JUPYTER_EXEC_SERVER_PORT=$JUPYTER_EXEC_SERVER_PORT" >> ~/.bashrc
-$OPENDEVIN_PYTHON_INTERPRETER /opendevin/plugins/jupyter/execute_server > /opendevin/logs/jupyter_execute_server.log 2>&1 &
+$OPENDEVIN_PYTHON_INTERPRETER /opendevin/plugins/jupyter/execute_server.py > /opendevin/logs/jupyter_execute_server.log 2>&1 &
 export JUPYTER_EXEC_SERVER_PID=$!
 echo "export JUPYTER_EXEC_SERVER_PID=$JUPYTER_EXEC_SERVER_PID" >> ~/.bashrc
 echo "Execution server started with PID: $JUPYTER_EXEC_SERVER_PID"

+ 23 - 0
opendevin/runtime/plugins/requirement.py

@@ -1,5 +1,28 @@
+from abc import abstractmethod
 from dataclasses import dataclass
 
+from opendevin.events.action import Action
+from opendevin.events.observation import Observation
+
+
+class Plugin:
+    """Base class for a plugin.
+
+    This will be initialized by the runtime client, which will run inside docker.
+    """
+
+    name: str
+
+    @abstractmethod
+    def initialize(self):
+        """Initialize the plugin."""
+        pass
+
+    @abstractmethod
+    async def run(self, action: Action) -> Observation:
+        """Run the plugin for a given action."""
+        pass
+
 
 @dataclass
 class PluginRequirement:

+ 137 - 20
opendevin/runtime/utils/image_agnostic.py

@@ -1,11 +1,14 @@
+import os
 import tempfile
 
 import docker
 
 from opendevin.core.logger import opendevin_logger as logger
 
+from .source import create_project_source_dist
 
-def generate_dockerfile_content(base_image: str) -> str:
+
+def generate_dockerfile(base_image: str) -> str:
     """
     Generate the Dockerfile content for the agnostic sandbox image based on user-provided base image.
 
@@ -32,28 +35,116 @@ def generate_dockerfile_content(base_image: str) -> str:
     return dockerfile_content
 
 
+def generate_dockerfile_for_eventstream_runtime(
+    base_image: str, temp_dir: str, skip_init: bool = False
+) -> str:
+    """
+    Generate the Dockerfile content for the eventstream runtime image based on user-provided base image.
+
+    NOTE: This is only tested on debian yet.
+    """
+    if skip_init:
+        dockerfile_content = f'FROM {base_image}\n'
+    else:
+        dockerfile_content = (
+            f'FROM {base_image}\n'
+            'RUN apt update && apt install -y wget sudo\n'
+            'RUN mkdir -p /opendevin && mkdir -p /opendevin/logs && chmod 777 /opendevin/logs\n'
+            'RUN echo "" > /opendevin/bash.bashrc\n'
+            'RUN if [ ! -d /opendevin/miniforge3 ]; then \\\n'
+            '        wget --progress=bar:force -O Miniforge3.sh "https://github.com/conda-forge/miniforge/releases/latest/download/Miniforge3-$(uname)-$(uname -m).sh" && \\\n'
+            '        bash Miniforge3.sh -b -p /opendevin/miniforge3 && \\\n'
+            '        rm Miniforge3.sh && \\\n'
+            '        chmod -R g+w /opendevin/miniforge3 && \\\n'
+            '        bash -c ". /opendevin/miniforge3/etc/profile.d/conda.sh && conda config --set changeps1 False && conda config --append channels conda-forge"; \\\n'
+            '    fi\n'
+            'RUN /opendevin/miniforge3/bin/mamba install python=3.11\n'
+            'RUN /opendevin/miniforge3/bin/mamba install conda-forge::poetry\n'
+        )
+
+    tarball_path = create_project_source_dist()
+    filename = os.path.basename(tarball_path)
+    filename = filename.removesuffix('.tar.gz')
+
+    # move the tarball to temp_dir
+    os.rename(tarball_path, os.path.join(temp_dir, 'project.tar.gz'))
+    logger.info(
+        f'Source distribution moved to {os.path.join(temp_dir, "project.tar.gz")}'
+    )
+
+    # Copy the project directory to the container
+    dockerfile_content += 'COPY project.tar.gz /opendevin\n'
+    # remove /opendevin/code if it exists
+    dockerfile_content += (
+        'RUN if [ -d /opendevin/code ]; then rm -rf /opendevin/code; fi\n'
+    )
+    # unzip the tarball to /opendevin/code
+    dockerfile_content += (
+        'RUN cd /opendevin && tar -xzvf project.tar.gz && rm project.tar.gz\n'
+    )
+    dockerfile_content += f'RUN mv /opendevin/{filename} /opendevin/code\n'
+    # install (or update) the dependencies
+    dockerfile_content += (
+        'RUN cd /opendevin/code && '
+        '/opendevin/miniforge3/bin/mamba run -n base poetry env use python3.11 && '
+        '/opendevin/miniforge3/bin/mamba run -n base poetry install\n'
+    )
+    return dockerfile_content
+
+
 def _build_sandbox_image(
-    base_image: str, target_image_name: str, docker_client: docker.DockerClient
+    base_image: str,
+    target_image_name: str,
+    docker_client: docker.DockerClient,
+    eventstream_runtime: bool = False,
+    skip_init: bool = False,
 ):
     try:
         with tempfile.TemporaryDirectory() as temp_dir:
-            dockerfile_content = generate_dockerfile_content(base_image)
-            logger.info(f'Building agnostic sandbox image: {target_image_name}')
-            logger.info(
-                (
-                    f'===== Dockerfile content =====\n'
-                    f'{dockerfile_content}\n'
-                    f'==============================='
+            if eventstream_runtime:
+                dockerfile_content = generate_dockerfile_for_eventstream_runtime(
+                    base_image, temp_dir, skip_init=skip_init
+                )
+            else:
+                dockerfile_content = generate_dockerfile(base_image)
+
+            if skip_init:
+                logger.info(
+                    f'Reusing existing od_sandbox image [{target_image_name}] but will update the source code in it.'
+                )
+                logger.info(
+                    (
+                        f'===== Dockerfile content =====\n'
+                        f'{dockerfile_content}\n'
+                        f'==============================='
+                    )
+                )
+            else:
+                logger.info(f'Building agnostic sandbox image: {target_image_name}')
+                logger.info(
+                    (
+                        f'===== Dockerfile content =====\n'
+                        f'{dockerfile_content}\n'
+                        f'==============================='
+                    )
                 )
-            )
             with open(f'{temp_dir}/Dockerfile', 'w') as file:
                 file.write(dockerfile_content)
 
             api_client = docker_client.api
             build_logs = api_client.build(
-                path=temp_dir, tag=target_image_name, rm=True, decode=True
+                path=temp_dir,
+                tag=target_image_name,
+                rm=True,
+                decode=True,
+                # do not use cache when skip_init is True (i.e., when we want to update the source code in the existing image)
+                nocache=skip_init,
             )
 
+            if skip_init:
+                logger.info(
+                    f'Rebuilding existing od_sandbox image [{target_image_name}] to update the source code.'
+                )
             for log in build_logs:
                 if 'stream' in log:
                     print(log['stream'].strip())
@@ -71,16 +162,23 @@ def _build_sandbox_image(
         raise e
 
 
-def _get_new_image_name(base_image: str) -> str:
+def _get_new_image_name(base_image: str, is_eventstream_runtime: bool) -> str:
+    prefix = 'od_sandbox'
+    if is_eventstream_runtime:
+        prefix = 'od_eventstream_runtime'
     if ':' not in base_image:
         base_image = base_image + ':latest'
 
     [repo, tag] = base_image.split(':')
     repo = repo.replace('/', '___')
-    return f'od_sandbox:{repo}__{tag}'
+    return f'{prefix}:{repo}__{tag}'
 
 
-def get_od_sandbox_image(base_image: str, docker_client: docker.DockerClient) -> str:
+def get_od_sandbox_image(
+    base_image: str,
+    docker_client: docker.DockerClient,
+    is_eventstream_runtime: bool = False,
+) -> str:
     """Return the sandbox image name based on user-provided base image.
 
     The returned sandbox image is assumed to contains all the required dependencies for OpenDevin.
@@ -90,18 +188,37 @@ def get_od_sandbox_image(base_image: str, docker_client: docker.DockerClient) ->
     if 'ghcr.io/opendevin/sandbox' in base_image:
         return base_image
 
-    new_image_name = _get_new_image_name(base_image)
+    new_image_name = _get_new_image_name(base_image, is_eventstream_runtime)
 
     # Detect if the sandbox image is built
+    image_exists = False
     images = docker_client.images.list()
     for image in images:
         if new_image_name in image.tags:
             logger.info('Found existing od_sandbox image, reuse:' + new_image_name)
+            image_exists = True
+            break
+
+    skip_init = False
+    if image_exists:
+        if is_eventstream_runtime:
+            skip_init = True
+            base_image = new_image_name
+            logger.info(
+                f'Reusing existing od_sandbox image [{new_image_name}] but will update the source code.'
+            )
+        else:
             return new_image_name
-
-    # If the sandbox image is not found, build it
-    logger.info(
-        f'od_sandbox image is not found for {base_image}, will build: {new_image_name}'
+    else:
+        # If the sandbox image is not found, build it
+        logger.info(
+            f'od_sandbox image is not found for {base_image}, will build: {new_image_name}'
+        )
+    _build_sandbox_image(
+        base_image,
+        new_image_name,
+        docker_client,
+        is_eventstream_runtime,
+        skip_init=skip_init,
     )
-    _build_sandbox_image(base_image, new_image_name, docker_client)
     return new_image_name

+ 32 - 0
opendevin/runtime/utils/source.py

@@ -0,0 +1,32 @@
+import os
+import subprocess
+from importlib.metadata import version
+
+import opendevin
+from opendevin.core.logger import opendevin_logger as logger
+
+
+def create_project_source_dist():
+    """Create a source distribution of the project. Return the path to the tarball."""
+
+    # Copy the project directory to the container
+    # get the location of "opendevin" package
+    project_root = os.path.dirname(os.path.dirname(os.path.abspath(opendevin.__file__)))
+    logger.info(f'Using project root: {project_root}')
+
+    # run "python -m build -s" on project_root
+    result = subprocess.run(['python', '-m', 'build', '-s', project_root])
+    if result.returncode != 0:
+        logger.error(f'Build failed: {result}')
+        raise Exception(f'Build failed: {result}')
+    logger.info(f'Source distribution create result: {result}')
+
+    tarball_path = os.path.join(
+        project_root, 'dist', f'opendevin-{version("opendevin")}.tar.gz'
+    )
+    if not os.path.exists(tarball_path):
+        logger.error(f'Source distribution not found at {tarball_path}')
+        raise Exception(f'Source distribution not found at {tarball_path}')
+    logger.info(f'Source distribution created at {tarball_path}')
+
+    return tarball_path

+ 5 - 14
opendevin/server/session/agent.py

@@ -9,10 +9,7 @@ from opendevin.core.logger import opendevin_logger as logger
 from opendevin.core.schema import ConfigType
 from opendevin.events.stream import EventStream
 from opendevin.llm.llm import LLM
-from opendevin.runtime import DockerSSHBox
-from opendevin.runtime.e2b.runtime import E2BRuntime
-from opendevin.runtime.runtime import Runtime
-from opendevin.runtime.server.runtime import ServerRuntime
+from opendevin.runtime import DockerSSHBox, Runtime, get_runtime_cls
 
 
 class AgentSession:
@@ -60,16 +57,10 @@ class AgentSession:
     async def _create_runtime(self):
         if self.runtime is not None:
             raise Exception('Runtime already created')
-        if config.runtime == 'server':
-            logger.info('Using server runtime')
-            self.runtime = ServerRuntime(self.event_stream, self.sid)
-        elif config.runtime == 'e2b':
-            logger.info('Using E2B runtime')
-            self.runtime = E2BRuntime(self.event_stream, self.sid)
-        else:
-            raise Exception(
-                f'Runtime not defined in config, or is invalid: {config.runtime}'
-            )
+
+        logger.info(f'Using runtime: {config.runtime}')
+        runtime_cls = get_runtime_cls(config.runtime)
+        self.runtime = runtime_cls(self.event_stream, self.sid)
 
     async def _create_controller(self, start_event: dict):
         """Creates an AgentController instance.

文件差異過大導致無法顯示
+ 804 - 5
poetry.lock


+ 13 - 7
pyproject.toml

@@ -6,6 +6,7 @@ authors = ["OpenDevin"]
 license = "MIT"
 readme = "README.md"
 repository = "https://github.com/OpenDevin/OpenDevin"
+include = ["poetry.lock"]
 
 [tool.poetry.dependencies]
 python = "^3.11"
@@ -56,13 +57,6 @@ pytest = "*"
 pytest-cov = "*"
 pytest-asyncio = "*"
 pytest-forked = "*"
-flake8 = "*"
-openai = "*"
-python-docx = "*"
-PyPDF2 = "*"
-pylatexenc = "*"
-python-pptx = "*"
-opencv-python = "*"
 pandas = "*"
 reportlab = "*"
 
@@ -77,6 +71,18 @@ retry = "*"
 evaluate = "*"
 swebench = { git = "https://github.com/OpenDevin/SWE-bench.git" }
 
+
+[tool.poetry.group.runtime.dependencies]
+jupyterlab = "*"
+notebook = "*"
+jupyter_kernel_gateway = "*"
+flake8 = "*"
+python-docx = "*"
+PyPDF2 = "*"
+python-pptx = "*"
+pylatexenc = "*"
+opencv-python = "*"
+
 [build-system]
 build-backend = "poetry.core.masonry.api"
 requires = [

+ 15 - 8
tests/unit/test_image_agnostic_util.py

@@ -2,14 +2,14 @@ from unittest.mock import MagicMock, patch
 
 from opendevin.runtime.utils.image_agnostic import (
     _get_new_image_name,
-    generate_dockerfile_content,
+    generate_dockerfile,
     get_od_sandbox_image,
 )
 
 
-def test_generate_dockerfile_content():
+def test_generate_dockerfile():
     base_image = 'debian:11'
-    dockerfile_content = generate_dockerfile_content(base_image)
+    dockerfile_content = generate_dockerfile(base_image)
     assert base_image in dockerfile_content
     assert (
         'RUN apt update && apt install -y openssh-server wget sudo'
@@ -17,17 +17,18 @@ def test_generate_dockerfile_content():
     )
 
 
-def test_get_new_image_name():
+def test_get_new_image_name_legacy():
+    # test non-eventstream runtime (sandbox-based)
     base_image = 'debian:11'
-    new_image_name = _get_new_image_name(base_image)
+    new_image_name = _get_new_image_name(base_image, is_eventstream_runtime=False)
     assert new_image_name == 'od_sandbox:debian__11'
 
     base_image = 'ubuntu:22.04'
-    new_image_name = _get_new_image_name(base_image)
+    new_image_name = _get_new_image_name(base_image, is_eventstream_runtime=False)
     assert new_image_name == 'od_sandbox:ubuntu__22.04'
 
     base_image = 'ubuntu'
-    new_image_name = _get_new_image_name(base_image)
+    new_image_name = _get_new_image_name(base_image, is_eventstream_runtime=False)
     assert new_image_name == 'od_sandbox:ubuntu__latest'
 
 
@@ -46,5 +47,11 @@ def test_get_od_sandbox_image(mock_docker_client, mock_build_sandbox_image):
     image_name = get_od_sandbox_image(base_image, mock_docker_client)
     assert image_name == 'od_sandbox:debian__11'
     mock_build_sandbox_image.assert_called_once_with(
-        base_image, 'od_sandbox:debian__11', mock_docker_client
+        base_image,
+        'od_sandbox:debian__11',
+        mock_docker_client,
+        # eventstream runtime specific arguments, not used for sandbox-based runtime
+        # is_eventstream_runtime=
+        False,
+        skip_init=False,
     )

部分文件因文件數量過多而無法顯示