瀏覽代碼

fix: delete the docker exec output prefix(8 bytes). (#247)

iFurySt 1 年之前
父節點
當前提交
1c5c84faea
共有 1 個文件被更改,包括 70 次插入27 次删除
  1. 70 27
      opendevin/sandbox/sandbox.py

+ 70 - 27
opendevin/sandbox/sandbox.py

@@ -11,7 +11,9 @@ import atexit
 InputType = namedtuple("InputType", ["content"])
 OutputType = namedtuple("OutputType", ["content"])
 
-DIRECTORY_REWRITE = os.getenv("DIRECTORY_REWRITE", "") # helpful for docker-in-docker scenarios
+DIRECTORY_REWRITE = os.getenv(
+    "DIRECTORY_REWRITE", ""
+)  # helpful for docker-in-docker scenarios
 CONTAINER_IMAGE = os.getenv("SANDBOX_CONTAINER_IMAGE", "opendevin/sandbox:v0.1")
 # FIXME: On some containers, the devin user doesn't have enough permission, e.g. to install packages
 # How do we make this more flexible?
@@ -22,50 +24,80 @@ if os.getenv("SANDBOX_USER_ID") is not None:
 elif hasattr(os, "getuid"):
     USER_ID = os.getuid()
 
+
 class BackgroundCommand:
     def __init__(self, id: int, command: str, result):
         self.id = id
         self.command = command
         self.result = result
 
+    def parse_docker_exec_output(self, logs: bytes) -> Tuple[bytes, bytes]:
+        res = b""
+        tail = b""
+        i = 0
+        while i < len(logs):
+            prefix = logs[i : i + 8]
+            if len(prefix) < 8:
+                msg_type = prefix[0:1]
+                if msg_type in [b"\x00", b"\x01", b"\x02", b"\x03"]:
+                    tail = prefix
+                break
+
+            msg_type = prefix[0:1]
+            padding = prefix[1:4]
+            if (
+                msg_type in [b"\x00", b"\x01", b"\x02", b"\x03"]
+                and padding == b"\x00\x00\x00"
+            ):
+                msg_length = int.from_bytes(prefix[4:8])  # , byteorder='big'
+                res += logs[i + 8 : i + 8 + msg_length]
+                i += 8 + msg_length
+            else:
+                res += logs[i : i + 1]
+                i += 1
+        return res, tail
+
     def read_logs(self) -> str:
         # TODO: get an exit code if process is exited
-        logs = ""
+        logs = b""
+        last_remains = b""
         while True:
-            ready_to_read, _, _ = select.select([self.result.output], [], [], .1) # type: ignore[has-type]
+            ready_to_read, _, _ = select.select([self.result.output], [], [], 0.1)  # type: ignore[has-type]
             if ready_to_read:
-                data = self.result.output.read(4096) # type: ignore[has-type]
+                data = self.result.output.read(4096)  # type: ignore[has-type]
                 if not data:
                     break
-                # FIXME: we're occasionally seeing some escape characters like `\x02` and `\x00` in the logs...
-                chunk = data.decode('utf-8')
+                chunk, last_remains = self.parse_docker_exec_output(last_remains + data)
                 logs += chunk
             else:
                 break
-        return logs
+        return (logs + last_remains).decode("utf-8")
 
     def kill(self):
         # FIXME: this doesn't actually kill the process!
         self.result.output.close()
 
+
 class DockerInteractive:
     closed = False
     cur_background_id = 0
-    background_commands : Dict[int, BackgroundCommand] = {}
+    background_commands: Dict[int, BackgroundCommand] = {}
 
     def __init__(
         self,
         workspace_dir: str | None = None,
         container_image: str | None = None,
         timeout: int = 120,
-        id: str | None = None
+        id: str | None = None,
     ):
         if id is not None:
             self.instance_id = id
         else:
             self.instance_id = str(uuid.uuid4())
         if workspace_dir is not None:
-            assert os.path.exists(workspace_dir), f"Directory {workspace_dir} does not exist."
+            assert os.path.exists(
+                workspace_dir
+            ), f"Directory {workspace_dir} does not exist."
             # expand to absolute path
             self.workspace_dir = os.path.abspath(workspace_dir)
         else:
@@ -95,18 +127,20 @@ class DockerInteractive:
         atexit.register(self.cleanup)
 
     def setup_devin_user(self):
-        exit_code, logs = self.container.exec_run([
-            '/bin/bash', '-c',
-            f'useradd --shell /bin/bash -u {USER_ID} -o -c \"\" -m devin'
+        exit_code, logs = self.container.exec_run(
+            [
+                "/bin/bash",
+                "-c",
+                f'useradd --shell /bin/bash -u {USER_ID} -o -c "" -m devin',
             ],
-            workdir="/workspace"
+            workdir="/workspace",
         )
 
     def get_exec_cmd(self, cmd: str) -> List[str]:
         if RUN_AS_DEVIN:
-            return ['su', 'devin', '-c', cmd]
+            return ["su", "devin", "-c", cmd]
         else:
-            return ['/bin/bash', '-c', cmd]
+            return ["/bin/bash", "-c", cmd]
 
     def read_logs(self, id) -> str:
         if id not in self.background_commands:
@@ -116,11 +150,15 @@ class DockerInteractive:
 
     def execute(self, cmd: str) -> Tuple[int, str]:
         # TODO: each execute is not stateful! We need to keep track of the current working directory
-        exit_code, logs = self.container.exec_run(self.get_exec_cmd(cmd), workdir="/workspace")
-        return exit_code, logs.decode('utf-8')
+        exit_code, logs = self.container.exec_run(
+            self.get_exec_cmd(cmd), workdir="/workspace"
+        )
+        return exit_code, logs.decode("utf-8")
 
     def execute_in_background(self, cmd: str) -> BackgroundCommand:
-        result = self.container.exec_run(self.get_exec_cmd(cmd), socket=True, workdir="/workspace")
+        result = self.container.exec_run(
+            self.get_exec_cmd(cmd), socket=True, workdir="/workspace"
+        )
         result.output._sock.setblocking(0)
         bg_cmd = BackgroundCommand(self.cur_background_id, cmd, result)
         self.background_commands[bg_cmd.id] = bg_cmd
@@ -160,13 +198,14 @@ class DockerInteractive:
         docker_client = docker.from_env()
         try:
             self.container = docker_client.containers.run(
-                    self.container_image,
-                    command="tail -f /dev/null",
-                    network_mode='host',
-                    working_dir="/workspace",
-                    name=self.container_name,
-                    detach=True,
-                    volumes={self.workspace_dir: {"bind": "/workspace", "mode": "rw"}})
+                self.container_image,
+                command="tail -f /dev/null",
+                network_mode="host",
+                working_dir="/workspace",
+                name=self.container_name,
+                detach=True,
+                volumes={self.workspace_dir: {"bind": "/workspace", "mode": "rw"}},
+            )
         except Exception as e:
             print(f"Failed to start container: {e}")
             raise e
@@ -193,8 +232,10 @@ class DockerInteractive:
             return
         self.container.remove(force=True)
 
+
 if __name__ == "__main__":
     import argparse
+
     parser = argparse.ArgumentParser(description="Interactive Docker container")
     parser.add_argument(
         "-d",
@@ -210,7 +251,9 @@ if __name__ == "__main__":
     )
     print("Interactive Docker container started. Type 'exit' or use Ctrl+C to exit.")
 
-    bg_cmd = docker_interactive.execute_in_background("while true; do echo 'dot ' && sleep 1; done")
+    bg_cmd = docker_interactive.execute_in_background(
+        "while true; do echo 'dot ' && sleep 1; done"
+    )
 
     sys.stdout.flush()
     try: