files.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import os
  2. from pathlib import Path
  3. from opendevin.core.config import config
  4. from opendevin.events.observation import (
  5. ErrorObservation,
  6. FileReadObservation,
  7. FileWriteObservation,
  8. Observation,
  9. )
  10. def resolve_path(file_path, working_directory):
  11. path_in_sandbox = Path(file_path)
  12. # Apply working directory
  13. if not path_in_sandbox.is_absolute():
  14. path_in_sandbox = Path(working_directory) / path_in_sandbox
  15. # Sanitize the path with respect to the root of the full sandbox
  16. # (deny any .. path traversal to parent directories of the sandbox)
  17. abs_path_in_sandbox = path_in_sandbox.resolve()
  18. # If the path is outside the workspace, deny it
  19. if not abs_path_in_sandbox.is_relative_to(config.workspace_mount_path_in_sandbox):
  20. raise PermissionError(f'File access not permitted: {file_path}')
  21. # Get path relative to the root of the workspace inside the sandbox
  22. path_in_workspace = abs_path_in_sandbox.relative_to(
  23. Path(config.workspace_mount_path_in_sandbox)
  24. )
  25. # Get path relative to host
  26. path_in_host_workspace = Path(config.workspace_base) / path_in_workspace
  27. return path_in_host_workspace
  28. def read_lines(all_lines: list[str], start=0, end=-1):
  29. start = max(start, 0)
  30. start = min(start, len(all_lines))
  31. end = -1 if end == -1 else max(end, 0)
  32. end = min(end, len(all_lines))
  33. if end == -1:
  34. if start == 0:
  35. return all_lines
  36. else:
  37. return all_lines[start:]
  38. else:
  39. num_lines = len(all_lines)
  40. begin = max(0, min(start, num_lines - 2))
  41. end = -1 if end > num_lines else max(begin + 1, end)
  42. return all_lines[begin:end]
  43. async def read_file(path, workdir, start=0, end=-1) -> Observation:
  44. try:
  45. whole_path = resolve_path(path, workdir)
  46. except PermissionError:
  47. return ErrorObservation(
  48. f"You're not allowed to access this path: {path}. You can only access paths inside the workspace."
  49. )
  50. try:
  51. with open(whole_path, 'r', encoding='utf-8') as file:
  52. lines = read_lines(file.readlines(), start, end)
  53. except FileNotFoundError:
  54. return ErrorObservation(f'File not found: {path}')
  55. except UnicodeDecodeError:
  56. return ErrorObservation(f'File could not be decoded as utf-8: {path}')
  57. except IsADirectoryError:
  58. return ErrorObservation(f'Path is a directory: {path}. You can only read files')
  59. code_view = ''.join(lines)
  60. return FileReadObservation(path=path, content=code_view)
  61. def insert_lines(
  62. to_insert: list[str], original: list[str], start: int = 0, end: int = -1
  63. ):
  64. """
  65. Insert the new content to the original content based on start and end
  66. """
  67. new_lines = [''] if start == 0 else original[:start]
  68. new_lines += [i + '\n' for i in to_insert]
  69. new_lines += [''] if end == -1 else original[end:]
  70. return new_lines
  71. async def write_file(path, workdir, content, start=0, end=-1) -> Observation:
  72. insert = content.split('\n')
  73. try:
  74. whole_path = resolve_path(path, workdir)
  75. if not os.path.exists(os.path.dirname(whole_path)):
  76. os.makedirs(os.path.dirname(whole_path))
  77. mode = 'w' if not os.path.exists(whole_path) else 'r+'
  78. try:
  79. with open(whole_path, mode, encoding='utf-8') as file:
  80. if mode != 'w':
  81. all_lines = file.readlines()
  82. new_file = insert_lines(insert, all_lines, start, end)
  83. else:
  84. new_file = [i + '\n' for i in insert]
  85. file.seek(0)
  86. file.writelines(new_file)
  87. file.truncate()
  88. except FileNotFoundError:
  89. return ErrorObservation(f'File not found: {path}')
  90. except IsADirectoryError:
  91. return ErrorObservation(
  92. f'Path is a directory: {path}. You can only write to files'
  93. )
  94. except UnicodeDecodeError:
  95. return ErrorObservation(f'File could not be decoded as utf-8: {path}')
  96. except PermissionError:
  97. return ErrorObservation(f'Malformed paths not permitted: {path}')
  98. return FileWriteObservation(content='', path=path)