linter.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. import os
  2. from collections import defaultdict
  3. from difflib import SequenceMatcher
  4. from openhands.linter.base import BaseLinter, LinterException, LintResult
  5. from openhands.linter.languages.python import PythonLinter
  6. from openhands.linter.languages.treesitter import TreesitterBasicLinter
  7. class DefaultLinter(BaseLinter):
  8. def __init__(self):
  9. self.linters: dict[str, list[BaseLinter]] = defaultdict(list)
  10. self.linters['.py'] = [PythonLinter()]
  11. # Add treesitter linter as a fallback for all linters
  12. self.basic_linter = TreesitterBasicLinter()
  13. for extension in self.basic_linter.supported_extensions:
  14. self.linters[extension].append(self.basic_linter)
  15. self._supported_extensions = list(self.linters.keys())
  16. @property
  17. def supported_extensions(self) -> list[str]:
  18. return self._supported_extensions
  19. def lint(self, file_path: str) -> list[LintResult]:
  20. if not os.path.isabs(file_path):
  21. raise LinterException(f'File path {file_path} is not an absolute path')
  22. file_extension = os.path.splitext(file_path)[1]
  23. linters: list[BaseLinter] = self.linters.get(file_extension, [])
  24. for linter in linters:
  25. res = linter.lint(file_path)
  26. # We always return the first linter's result (higher priority)
  27. if res:
  28. return res
  29. return []
  30. def lint_file_diff(
  31. self, original_file_path: str, updated_file_path: str
  32. ) -> list[LintResult]:
  33. """Only return lint errors that are introduced by the diff.
  34. Args:
  35. original_file_path: The original file path.
  36. updated_file_path: The updated file path.
  37. Returns:
  38. A list of lint errors that are introduced by the diff.
  39. """
  40. # 1. Lint the original and updated file
  41. original_lint_errors: list[LintResult] = self.lint(original_file_path)
  42. updated_lint_errors: list[LintResult] = self.lint(updated_file_path)
  43. # 2. Load the original and updated file content
  44. with open(original_file_path, 'r') as f:
  45. old_lines = f.readlines()
  46. with open(updated_file_path, 'r') as f:
  47. new_lines = f.readlines()
  48. # 3. Get line numbers that are changed & unchanged
  49. # Map the line number of the original file to the updated file
  50. # NOTE: this only works for lines that are not changed (i.e., equal)
  51. old_to_new_line_no_mapping: dict[int, int] = {}
  52. replace_or_inserted_lines: list[int] = []
  53. for (
  54. tag,
  55. old_idx_start,
  56. old_idx_end,
  57. new_idx_start,
  58. new_idx_end,
  59. ) in SequenceMatcher(
  60. isjunk=None,
  61. a=old_lines,
  62. b=new_lines,
  63. ).get_opcodes():
  64. if tag == 'equal':
  65. for idx, _ in enumerate(old_lines[old_idx_start:old_idx_end]):
  66. old_to_new_line_no_mapping[old_idx_start + idx + 1] = (
  67. new_idx_start + idx + 1
  68. )
  69. elif tag == 'replace' or tag == 'insert':
  70. for idx, _ in enumerate(old_lines[old_idx_start:old_idx_end]):
  71. replace_or_inserted_lines.append(new_idx_start + idx + 1)
  72. else:
  73. # omit the case of delete
  74. pass
  75. # 4. Get pre-existing errors in unchanged lines
  76. # increased error elsewhere introduced by the newlines
  77. # i.e., we omit errors that are already in original files and report new one
  78. new_line_no_to_original_errors: dict[int, list[LintResult]] = defaultdict(list)
  79. for error in original_lint_errors:
  80. if error.line in old_to_new_line_no_mapping:
  81. new_line_no_to_original_errors[
  82. old_to_new_line_no_mapping[error.line]
  83. ].append(error)
  84. # 5. Select errors from lint results in new file to report
  85. selected_errors = []
  86. for error in updated_lint_errors:
  87. # 5.1. Error introduced by replace/insert
  88. if error.line in replace_or_inserted_lines:
  89. selected_errors.append(error)
  90. # 5.2. Error introduced by modified lines that impacted
  91. # the unchanged lines that HAVE pre-existing errors
  92. elif error.line in new_line_no_to_original_errors:
  93. # skip if the error is already reported
  94. # or add if the error is new
  95. if not any(
  96. original_error.message == error.message
  97. and original_error.column == error.column
  98. for original_error in new_line_no_to_original_errors[error.line]
  99. ):
  100. selected_errors.append(error)
  101. # 5.3. Error introduced by modified lines that impacted
  102. # the unchanged lines that have NO pre-existing errors
  103. else:
  104. selected_errors.append(error)
  105. # 6. Sort errors by line and column
  106. selected_errors.sort(key=lambda x: (x.line, x.column))
  107. return selected_errors