files.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. import os
  2. from pathlib import Path
  3. from opendevin.core.config import config
  4. from opendevin.events.observation import (
  5. ErrorObservation,
  6. FileReadObservation,
  7. FileWriteObservation,
  8. Observation,
  9. )
  10. def resolve_path(file_path, working_directory):
  11. path_in_sandbox = Path(file_path)
  12. # Apply working directory
  13. if not path_in_sandbox.is_absolute():
  14. path_in_sandbox = Path(working_directory) / path_in_sandbox
  15. # Sanitize the path with respect to the root of the full sandbox
  16. # (deny any .. path traversal to parent directories of the sandbox)
  17. abs_path_in_sandbox = path_in_sandbox.resolve()
  18. # If the path is outside the workspace, deny it
  19. if not abs_path_in_sandbox.is_relative_to(config.workspace_mount_path_in_sandbox):
  20. raise PermissionError(f'File access not permitted: {file_path}')
  21. # Get path relative to the root of the workspace inside the sandbox
  22. path_in_workspace = abs_path_in_sandbox.relative_to(
  23. Path(config.workspace_mount_path_in_sandbox)
  24. )
  25. # Get path relative to host
  26. path_in_host_workspace = Path(config.workspace_base) / path_in_workspace
  27. return path_in_host_workspace
  28. def read_lines(all_lines: list[str], start=0, end=-1):
  29. start = max(start, 0)
  30. start = min(start, len(all_lines))
  31. end = -1 if end == -1 else max(end, 0)
  32. end = min(end, len(all_lines))
  33. if end == -1:
  34. if start == 0:
  35. return all_lines
  36. else:
  37. return all_lines[start:]
  38. else:
  39. num_lines = len(all_lines)
  40. begin = max(0, min(start, num_lines - 2))
  41. end = -1 if end > num_lines else max(begin + 1, end)
  42. return all_lines[begin:end]
  43. async def read_file(path, workdir, start=0, end=-1) -> Observation:
  44. try:
  45. whole_path = resolve_path(path, workdir)
  46. except PermissionError:
  47. return ErrorObservation(f'Malformed paths not permitted: {path}')
  48. try:
  49. with open(whole_path, 'r', encoding='utf-8') as file:
  50. lines = read_lines(file.readlines(), start, end)
  51. except FileNotFoundError:
  52. return ErrorObservation(f'File not found: {path}')
  53. except UnicodeDecodeError:
  54. return ErrorObservation(f'File could not be decoded as utf-8: {path}')
  55. except IsADirectoryError:
  56. return ErrorObservation(f'Path is a directory: {path}. You can only read files')
  57. code_view = ''.join(lines)
  58. return FileReadObservation(path=path, content=code_view)
  59. def insert_lines(
  60. to_insert: list[str], original: list[str], start: int = 0, end: int = -1
  61. ):
  62. """
  63. Insert the new content to the original content based on start and end
  64. """
  65. new_lines = [''] if start == 0 else original[:start]
  66. new_lines += [i + '\n' for i in to_insert]
  67. new_lines += [''] if end == -1 else original[end:]
  68. return new_lines
  69. async def write_file(path, workdir, content, start=0, end=-1) -> Observation:
  70. insert = content.split('\n')
  71. try:
  72. whole_path = resolve_path(path, workdir)
  73. if not os.path.exists(os.path.dirname(whole_path)):
  74. os.makedirs(os.path.dirname(whole_path))
  75. mode = 'w' if not os.path.exists(whole_path) else 'r+'
  76. try:
  77. with open(whole_path, mode, encoding='utf-8') as file:
  78. if mode != 'w':
  79. all_lines = file.readlines()
  80. new_file = insert_lines(insert, all_lines, start, end)
  81. else:
  82. new_file = [i + '\n' for i in insert]
  83. file.seek(0)
  84. file.writelines(new_file)
  85. file.truncate()
  86. except FileNotFoundError:
  87. return ErrorObservation(f'File not found: {path}')
  88. except IsADirectoryError:
  89. return ErrorObservation(
  90. f'Path is a directory: {path}. You can only write to files'
  91. )
  92. except UnicodeDecodeError:
  93. return ErrorObservation(f'File could not be decoded as utf-8: {path}')
  94. except PermissionError:
  95. return ErrorObservation(f'Malformed paths not permitted: {path}')
  96. return FileWriteObservation(content='', path=path)