files.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. import os
  2. from pathlib import Path
  3. from openhands.events.observation import (
  4. ErrorObservation,
  5. FileReadObservation,
  6. FileWriteObservation,
  7. Observation,
  8. )
  9. def resolve_path(
  10. file_path: str,
  11. working_directory: str,
  12. workspace_base: str,
  13. workspace_mount_path_in_sandbox: str,
  14. ):
  15. """Resolve a file path to a path on the host filesystem.
  16. Args:
  17. file_path: The path to resolve.
  18. working_directory: The working directory of the agent.
  19. workspace_mount_path_in_sandbox: The path to the workspace inside the sandbox.
  20. workspace_base: The base path of the workspace on the host filesystem.
  21. Returns:
  22. The resolved path on the host filesystem.
  23. """
  24. path_in_sandbox = Path(file_path)
  25. # Apply working directory
  26. if not path_in_sandbox.is_absolute():
  27. path_in_sandbox = Path(working_directory) / path_in_sandbox
  28. # Sanitize the path with respect to the root of the full sandbox
  29. # (deny any .. path traversal to parent directories of the sandbox)
  30. abs_path_in_sandbox = path_in_sandbox.resolve()
  31. # If the path is outside the workspace, deny it
  32. if not abs_path_in_sandbox.is_relative_to(workspace_mount_path_in_sandbox):
  33. raise PermissionError(f'File access not permitted: {file_path}')
  34. # Get path relative to the root of the workspace inside the sandbox
  35. path_in_workspace = abs_path_in_sandbox.relative_to(
  36. Path(workspace_mount_path_in_sandbox)
  37. )
  38. # Get path relative to host
  39. path_in_host_workspace = Path(workspace_base) / path_in_workspace
  40. return path_in_host_workspace
  41. def read_lines(all_lines: list[str], start=0, end=-1):
  42. start = max(start, 0)
  43. start = min(start, len(all_lines))
  44. end = -1 if end == -1 else max(end, 0)
  45. end = min(end, len(all_lines))
  46. if end == -1:
  47. if start == 0:
  48. return all_lines
  49. else:
  50. return all_lines[start:]
  51. else:
  52. num_lines = len(all_lines)
  53. begin = max(0, min(start, num_lines - 2))
  54. end = -1 if end > num_lines else max(begin + 1, end)
  55. return all_lines[begin:end]
  56. async def read_file(
  57. path, workdir, workspace_base, workspace_mount_path_in_sandbox, start=0, end=-1
  58. ) -> Observation:
  59. try:
  60. whole_path = resolve_path(
  61. path, workdir, workspace_base, workspace_mount_path_in_sandbox
  62. )
  63. except PermissionError:
  64. return ErrorObservation(
  65. f"You're not allowed to access this path: {path}. You can only access paths inside the workspace."
  66. )
  67. try:
  68. with open(whole_path, 'r', encoding='utf-8') as file:
  69. lines = read_lines(file.readlines(), start, end)
  70. except FileNotFoundError:
  71. return ErrorObservation(f'File not found: {path}')
  72. except UnicodeDecodeError:
  73. return ErrorObservation(f'File could not be decoded as utf-8: {path}')
  74. except IsADirectoryError:
  75. return ErrorObservation(f'Path is a directory: {path}. You can only read files')
  76. code_view = ''.join(lines)
  77. return FileReadObservation(path=path, content=code_view)
  78. def insert_lines(
  79. to_insert: list[str], original: list[str], start: int = 0, end: int = -1
  80. ):
  81. """Insert the new content to the original content based on start and end"""
  82. new_lines = [''] if start == 0 else original[:start]
  83. new_lines += [i + '\n' for i in to_insert]
  84. new_lines += [''] if end == -1 else original[end:]
  85. return new_lines
  86. async def write_file(
  87. path,
  88. workdir,
  89. workspace_base,
  90. workspace_mount_path_in_sandbox,
  91. content,
  92. start=0,
  93. end=-1,
  94. ) -> Observation:
  95. insert = content.split('\n')
  96. try:
  97. whole_path = resolve_path(
  98. path, workdir, workspace_base, workspace_mount_path_in_sandbox
  99. )
  100. if not os.path.exists(os.path.dirname(whole_path)):
  101. os.makedirs(os.path.dirname(whole_path))
  102. mode = 'w' if not os.path.exists(whole_path) else 'r+'
  103. try:
  104. with open(whole_path, mode, encoding='utf-8') as file:
  105. if mode != 'w':
  106. all_lines = file.readlines()
  107. new_file = insert_lines(insert, all_lines, start, end)
  108. else:
  109. new_file = [i + '\n' for i in insert]
  110. file.seek(0)
  111. file.writelines(new_file)
  112. file.truncate()
  113. except FileNotFoundError:
  114. return ErrorObservation(f'File not found: {path}')
  115. except IsADirectoryError:
  116. return ErrorObservation(
  117. f'Path is a directory: {path}. You can only write to files'
  118. )
  119. except UnicodeDecodeError:
  120. return ErrorObservation(f'File could not be decoded as utf-8: {path}')
  121. except PermissionError:
  122. return ErrorObservation(f'Malformed paths not permitted: {path}')
  123. return FileWriteObservation(content='', path=path)