fileop.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import os
  2. from dataclasses import dataclass
  3. from pathlib import Path
  4. from opendevin.observation import (
  5. Observation,
  6. FileReadObservation,
  7. FileWriteObservation,
  8. AgentErrorObservation,
  9. )
  10. from opendevin.schema import ActionType
  11. from opendevin.sandbox import E2BBox
  12. from opendevin import config
  13. from opendevin.schema.config import ConfigType
  14. from .base import ExecutableAction
  15. SANDBOX_PATH_PREFIX = '/workspace/'
  16. def resolve_path(file_path):
  17. # Sanitize the path with respect to the root of the full sandbox
  18. # (deny any .. path traversal to parent directories of this)
  19. abs_path_in_sandbox = (Path(SANDBOX_PATH_PREFIX) / Path(file_path)).resolve()
  20. # If the path is outside the workspace, deny it
  21. if not abs_path_in_sandbox.is_relative_to(SANDBOX_PATH_PREFIX):
  22. raise PermissionError(f'File access not permitted: {file_path}')
  23. # Get path relative to the root of the workspace inside the sandbox
  24. path_in_workspace = abs_path_in_sandbox.relative_to(Path(SANDBOX_PATH_PREFIX))
  25. # Get path relative to host
  26. path_in_host_workspace = Path(config.get(ConfigType.WORKSPACE_BASE)) / path_in_workspace
  27. return path_in_host_workspace
  28. @dataclass
  29. class FileReadAction(ExecutableAction):
  30. """
  31. Reads a file from a given path.
  32. Can be set to read specific lines using start and end
  33. Default lines 0:-1 (whole file)
  34. """
  35. path: str
  36. start: int = 0
  37. end: int = -1
  38. thoughts: str = ''
  39. action: str = ActionType.READ
  40. def _read_lines(self, all_lines: list[str]):
  41. if self.end == -1:
  42. if self.start == 0:
  43. return all_lines
  44. else:
  45. return all_lines[self.start:]
  46. else:
  47. num_lines = len(all_lines)
  48. begin = max(0, min(self.start, num_lines - 2))
  49. end = -1 if self.end > num_lines else max(begin + 1, self.end)
  50. return all_lines[begin:end]
  51. async def run(self, controller) -> Observation:
  52. if isinstance(controller.action_manager.sandbox, E2BBox):
  53. content = controller.action_manager.sandbox.filesystem.read(
  54. self.path)
  55. read_lines = self._read_lines(content.split('\n'))
  56. code_view = ''.join(read_lines)
  57. else:
  58. try:
  59. whole_path = resolve_path(self.path)
  60. self.start = max(self.start, 0)
  61. try:
  62. with open(whole_path, 'r', encoding='utf-8') as file:
  63. read_lines = self._read_lines(file.readlines())
  64. code_view = ''.join(read_lines)
  65. except FileNotFoundError:
  66. return AgentErrorObservation(f'File not found: {self.path}')
  67. except IsADirectoryError:
  68. return AgentErrorObservation(f'Path is a directory: {self.path}. You can only read files')
  69. except PermissionError:
  70. return AgentErrorObservation(f'Malformed paths not permitted: {self.path}')
  71. return FileReadObservation(path=self.path, content=code_view)
  72. @property
  73. def message(self) -> str:
  74. return f'Reading file: {self.path}'
  75. @dataclass
  76. class FileWriteAction(ExecutableAction):
  77. path: str
  78. content: str
  79. start: int = 0
  80. end: int = -1
  81. thoughts: str = ''
  82. action: str = ActionType.WRITE
  83. def _insert_lines(self, to_insert: list[str], original: list[str]):
  84. """
  85. Insert the new content to the original content based on self.start and self.end
  86. """
  87. new_lines = [''] if self.start == 0 else original[:self.start]
  88. new_lines += [i + '\n' for i in to_insert]
  89. new_lines += [''] if self.end == -1 else original[self.end:]
  90. return new_lines
  91. async def run(self, controller) -> Observation:
  92. insert = self.content.split('\n')
  93. if isinstance(controller.action_manager.sandbox, E2BBox):
  94. files = controller.action_manager.sandbox.filesystem.list(self.path)
  95. if self.path in files:
  96. all_lines = controller.action_manager.sandbox.filesystem.read(self.path)
  97. new_file = self._insert_lines(self.content.split('\n'), all_lines)
  98. controller.action_manager.sandbox.filesystem.write(self.path, ''.join(new_file))
  99. else:
  100. return AgentErrorObservation(f'File not found: {self.path}')
  101. else:
  102. try:
  103. whole_path = resolve_path(self.path)
  104. mode = 'w' if not os.path.exists(whole_path) else 'r+'
  105. try:
  106. with open(whole_path, mode, encoding='utf-8') as file:
  107. if mode != 'w':
  108. all_lines = file.readlines()
  109. new_file = self._insert_lines(insert, all_lines)
  110. else:
  111. new_file = [i + '\n' for i in insert]
  112. file.seek(0)
  113. file.writelines(new_file)
  114. file.truncate()
  115. except FileNotFoundError:
  116. return AgentErrorObservation(f'File not found: {self.path}')
  117. except IsADirectoryError:
  118. return AgentErrorObservation(f'Path is a directory: {self.path}. You can only write to files')
  119. except PermissionError:
  120. return AgentErrorObservation(f'Malformed paths not permitted: {self.path}')
  121. return FileWriteObservation(content='', path=self.path)
  122. @property
  123. def message(self) -> str:
  124. return f'Writing file: {self.path}'