utils.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. import json
  2. import logging
  3. import multiprocessing as mp
  4. import os
  5. from typing import Callable
  6. import pandas as pd
  7. from openhands.controller.state.state import State
  8. from openhands.core.logger import get_console_handler
  9. from openhands.core.logger import openhands_logger as logger
  10. from openhands.events.action import Action
  11. from openhands.events.action.message import MessageAction
  12. def codeact_user_response(
  13. state: State,
  14. encapsulate_solution: bool = False,
  15. try_parse: Callable[[Action | None], str] | None = None,
  16. ) -> str:
  17. encaps_str = (
  18. (
  19. 'Please encapsulate your final answer (answer ONLY) within <solution> and </solution>.\n'
  20. 'For example: The answer to the question is <solution> 42 </solution>.\n'
  21. )
  22. if encapsulate_solution
  23. else ''
  24. )
  25. msg = (
  26. 'Please continue working on the task on whatever approach you think is suitable.\n'
  27. 'If you think you have solved the task, please first send your answer to user through message and then finish the interaction.\n'
  28. f'{encaps_str}'
  29. 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP.\n'
  30. )
  31. if state.history:
  32. # check if the last action has an answer, if so, early exit
  33. if try_parse is not None:
  34. last_action = next(
  35. (
  36. event
  37. for event in reversed(state.history)
  38. if isinstance(event, Action)
  39. ),
  40. None,
  41. )
  42. ans = try_parse(last_action)
  43. if ans is not None:
  44. return '/exit'
  45. # check if the agent has tried to talk to the user 3 times, if so, let the agent know it can give up
  46. user_msgs = [
  47. event
  48. for event in state.history
  49. if isinstance(event, MessageAction) and event.source == 'user'
  50. ]
  51. if len(user_msgs) >= 2:
  52. # let the agent know that it can give up when it has tried 3 times
  53. return (
  54. msg
  55. + 'If you want to give up, run: <execute_bash> exit </execute_bash>.\n'
  56. )
  57. return msg
  58. def cleanup():
  59. print('Cleaning up child processes...')
  60. for process in mp.active_children():
  61. print(f'Terminating child process: {process.name}')
  62. process.terminate()
  63. process.join()
  64. def prepare_dataset(dataset: pd.DataFrame, output_file: str, eval_n_limit: int):
  65. assert 'instance_id' in dataset.columns, (
  66. "Expected 'instance_id' column in the dataset. You should define your own "
  67. "unique identifier for each instance and use it as the 'instance_id' column."
  68. )
  69. id_column = 'instance_id'
  70. logger.info(f'Writing evaluation output to {output_file}')
  71. finished_ids = set()
  72. if os.path.exists(output_file):
  73. with open(output_file, 'r') as f:
  74. for line in f:
  75. data = json.loads(line)
  76. finished_ids.add(data[id_column])
  77. logger.warning(
  78. f'Output file {output_file} already exists. Loaded '
  79. f'{len(finished_ids)} finished instances.'
  80. )
  81. if eval_n_limit:
  82. dataset = dataset.head(eval_n_limit)
  83. logger.info(f'Limiting evaluation to first {eval_n_limit} instances.')
  84. new_dataset = [
  85. instance
  86. for _, instance in dataset.iterrows()
  87. if instance[id_column] not in finished_ids
  88. ]
  89. logger.info(
  90. f'Finished instances: {len(finished_ids)}, '
  91. f'Remaining instances: {len(new_dataset)}'
  92. )
  93. return pd.DataFrame(new_dataset)
  94. def reset_logger_for_multiprocessing(
  95. logger: logging.Logger, instance_id: str, log_dir: str
  96. ):
  97. """Reset the logger for multiprocessing.
  98. Save logs to a separate file for each process, instead of trying to write to the
  99. same file/console from multiple processes.
  100. """
  101. # Set up logger
  102. log_file = os.path.join(
  103. log_dir,
  104. f'instance_{instance_id}.log',
  105. )
  106. # Remove all existing handlers from logger
  107. for handler in logger.handlers[:]:
  108. logger.removeHandler(handler)
  109. # add back the console handler to print ONE line
  110. logger.addHandler(get_console_handler())
  111. logger.info(
  112. f'Starting resolver for instance {instance_id}.\n'
  113. f'Hint: run "tail -f {log_file}" to see live logs in a separate shell'
  114. )
  115. # Remove all existing handlers from logger
  116. for handler in logger.handlers[:]:
  117. logger.removeHandler(handler)
  118. os.makedirs(os.path.dirname(log_file), exist_ok=True)
  119. file_handler = logging.FileHandler(log_file)
  120. file_handler.setFormatter(
  121. logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  122. )
  123. logger.addHandler(file_handler)