|
|
@@ -1,7 +1,6 @@
|
|
|
import os
|
|
|
from pathlib import Path
|
|
|
|
|
|
-from opendevin.core.config import config
|
|
|
from opendevin.events.observation import (
|
|
|
ErrorObservation,
|
|
|
FileReadObservation,
|
|
|
@@ -10,7 +9,23 @@ from opendevin.events.observation import (
|
|
|
)
|
|
|
|
|
|
|
|
|
-def resolve_path(file_path, working_directory):
|
|
|
+def resolve_path(
|
|
|
+ file_path: str,
|
|
|
+ working_directory: str,
|
|
|
+ workspace_base: str,
|
|
|
+ workspace_mount_path_in_sandbox: str,
|
|
|
+):
|
|
|
+ """Resolve a file path to a path on the host filesystem.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ file_path: The path to resolve.
|
|
|
+ working_directory: The working directory of the agent.
|
|
|
+ workspace_mount_path_in_sandbox: The path to the workspace inside the sandbox.
|
|
|
+ workspace_base: The base path of the workspace on the host filesystem.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ The resolved path on the host filesystem.
|
|
|
+ """
|
|
|
path_in_sandbox = Path(file_path)
|
|
|
|
|
|
# Apply working directory
|
|
|
@@ -22,16 +37,16 @@ def resolve_path(file_path, working_directory):
|
|
|
abs_path_in_sandbox = path_in_sandbox.resolve()
|
|
|
|
|
|
# If the path is outside the workspace, deny it
|
|
|
- if not abs_path_in_sandbox.is_relative_to(config.workspace_mount_path_in_sandbox):
|
|
|
+ if not abs_path_in_sandbox.is_relative_to(workspace_mount_path_in_sandbox):
|
|
|
raise PermissionError(f'File access not permitted: {file_path}')
|
|
|
|
|
|
# Get path relative to the root of the workspace inside the sandbox
|
|
|
path_in_workspace = abs_path_in_sandbox.relative_to(
|
|
|
- Path(config.workspace_mount_path_in_sandbox)
|
|
|
+ Path(workspace_mount_path_in_sandbox)
|
|
|
)
|
|
|
|
|
|
# Get path relative to host
|
|
|
- path_in_host_workspace = Path(config.workspace_base) / path_in_workspace
|
|
|
+ path_in_host_workspace = Path(workspace_base) / path_in_workspace
|
|
|
|
|
|
return path_in_host_workspace
|
|
|
|
|
|
@@ -53,9 +68,13 @@ def read_lines(all_lines: list[str], start=0, end=-1):
|
|
|
return all_lines[begin:end]
|
|
|
|
|
|
|
|
|
-async def read_file(path, workdir, start=0, end=-1) -> Observation:
|
|
|
+async def read_file(
|
|
|
+ path, workdir, workspace_base, workspace_mount_path_in_sandbox, start=0, end=-1
|
|
|
+) -> Observation:
|
|
|
try:
|
|
|
- whole_path = resolve_path(path, workdir)
|
|
|
+ whole_path = resolve_path(
|
|
|
+ path, workdir, workspace_base, workspace_mount_path_in_sandbox
|
|
|
+ )
|
|
|
except PermissionError:
|
|
|
return ErrorObservation(
|
|
|
f"You're not allowed to access this path: {path}. You can only access paths inside the workspace."
|
|
|
@@ -84,11 +103,21 @@ def insert_lines(
|
|
|
return new_lines
|
|
|
|
|
|
|
|
|
-async def write_file(path, workdir, content, start=0, end=-1) -> Observation:
|
|
|
+async def write_file(
|
|
|
+ path,
|
|
|
+ workdir,
|
|
|
+ workspace_base,
|
|
|
+ workspace_mount_path_in_sandbox,
|
|
|
+ content,
|
|
|
+ start=0,
|
|
|
+ end=-1,
|
|
|
+) -> Observation:
|
|
|
insert = content.split('\n')
|
|
|
|
|
|
try:
|
|
|
- whole_path = resolve_path(path, workdir)
|
|
|
+ whole_path = resolve_path(
|
|
|
+ path, workdir, workspace_base, workspace_mount_path_in_sandbox
|
|
|
+ )
|
|
|
if not os.path.exists(os.path.dirname(whole_path)):
|
|
|
os.makedirs(os.path.dirname(whole_path))
|
|
|
mode = 'w' if not os.path.exists(whole_path) else 'r+'
|