sandbox.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import json
  2. import os
  3. from abc import ABC, abstractmethod
  4. from opendevin.core.config import config
  5. from opendevin.core.schema import CancellableStream
  6. from opendevin.runtime.docker.process import Process
  7. from opendevin.runtime.plugins.mixin import PluginMixin
  8. class Sandbox(ABC, PluginMixin):
  9. background_commands: dict[int, Process] = {}
  10. _env: dict[str, str] = {}
  11. def __init__(self, **kwargs):
  12. for key in os.environ:
  13. if key.startswith('SANDBOX_ENV_'):
  14. sandbox_key = key.removeprefix('SANDBOX_ENV_')
  15. self.add_to_env(sandbox_key, os.environ[key])
  16. if config.enable_auto_lint:
  17. self.add_to_env('ENABLE_AUTO_LINT', 'true')
  18. def add_to_env(self, key: str, value: str):
  19. self._env[key] = value
  20. # Note: json.dumps gives us nice escaping for free
  21. self.execute(f'export {key}={json.dumps(value)}')
  22. @abstractmethod
  23. def execute(
  24. self, cmd: str, stream: bool = False, timeout: int | None = None
  25. ) -> tuple[int, str | CancellableStream]:
  26. pass
  27. @abstractmethod
  28. def execute_in_background(self, cmd: str) -> Process:
  29. pass
  30. @abstractmethod
  31. def kill_background(self, id: int) -> Process:
  32. pass
  33. @abstractmethod
  34. def read_logs(self, id: int) -> str:
  35. pass
  36. @abstractmethod
  37. def close(self):
  38. pass
  39. @abstractmethod
  40. def copy_to(self, host_src: str, sandbox_dest: str, recursive: bool = False):
  41. pass
  42. @abstractmethod
  43. def get_working_directory(self):
  44. pass