| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- import asyncio
- import os
- import pandas as pd
- from datasets import load_dataset
- from evaluation.utils.shared import (
- EvalMetadata,
- EvalOutput,
- codeact_user_response,
- compatibility_for_eval_history_pairs,
- make_metadata,
- prepare_dataset,
- reset_logger_for_multiprocessing,
- run_evaluation,
- )
- from openhands.controller.state.state import State
- from openhands.core.config import (
- AppConfig,
- SandboxConfig,
- get_llm_config_arg,
- get_parser,
- )
- from openhands.core.logger import openhands_logger as logger
- from openhands.core.main import create_runtime, run_controller
- from openhands.events.action import (
- AgentFinishAction,
- CmdRunAction,
- IPythonRunCellAction,
- MessageAction,
- )
- from openhands.events.observation import CmdOutputObservation
- from openhands.runtime.base import Runtime
- from openhands.utils.async_utils import call_async_from_sync
- AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = {
- 'CodeActAgent': codeact_user_response,
- }
- AGENT_CLS_TO_INST_SUFFIX = {
- 'CodeActAgent': 'When you think you have solved the question, please first send your answer to user through message and then exit.\n'
- }
- def get_config(
- metadata: EvalMetadata,
- ) -> AppConfig:
- config = AppConfig(
- default_agent=metadata.agent_class,
- run_as_openhands=False,
- runtime='eventstream',
- max_iterations=metadata.max_iterations,
- sandbox=SandboxConfig(
- base_container_image='xingyaoww/od-eval-logic-reasoning:v1.0',
- enable_auto_lint=True,
- use_host_network=False,
- runtime_extra_deps='$OH_INTERPRETER_PATH -m pip install scitools-pyke',
- ),
- # do not mount workspace
- workspace_base=None,
- workspace_mount_path=None,
- )
- config.set_llm_config(metadata.llm_config)
- return config
- def get_choice(answer_str):
- choices = [
- 'A',
- 'B',
- 'C',
- 'D',
- 'E',
- 'F',
- 'G',
- 'H',
- 'A)',
- 'B)',
- 'C)',
- 'D)',
- 'E)',
- 'F)',
- 'G)',
- 'H)',
- 'A.',
- 'B.',
- 'C.',
- 'D.',
- 'E.',
- 'F.',
- 'G.',
- 'H.',
- ]
- for c in choices:
- if answer_str.startswith(c):
- return c.replace(')', '')
- if answer_str.startswith(':'):
- return answer_str.replace(':', '').replace('.', '').strip()
- return None
- def get_test_result(
- model_answer: str,
- ground_truth: str,
- ) -> dict[str, bool]:
- gold_answer = ground_truth.replace('(', '').replace(')', '').strip()
- answer_str = model_answer if model_answer is not None else ''
- prediction = get_choice(answer_str)
- indicators = [
- 'the correct option is',
- 'the correct answer is',
- 'The correct answer is',
- 'The correct option is',
- 'the answer is',
- ]
- if prediction is None:
- for indicator in indicators:
- if answer_str.find(indicator) >= 0:
- answer_str = answer_str.split(indicator)[1].strip()
- prediction = get_choice(answer_str)
- break
- isTrue = prediction == gold_answer
- test_result = {'result': isTrue}
- return test_result
- CUR_EVAL_DIR = os.path.dirname(__file__)
- def initialize_runtime(
- runtime: Runtime,
- instance: pd.Series, # this argument is not required
- ):
- """Initialize the runtime for the agent.
- This function is called before the runtime is used to run the agent.
- """
- logger.info(f"{'-' * 50} BEGIN Runtime Initialization Fn {'-' * 50}")
- obs: CmdOutputObservation
- # Set instance id
- action = CmdRunAction(command='mkdir -p /workspace')
- logger.info(action, extra={'msg_type': 'ACTION'})
- obs = runtime.run_action(action)
- assert obs.exit_code == 0
- action = CmdRunAction(command='cd /workspace')
- logger.info(action, extra={'msg_type': 'ACTION'})
- obs = runtime.run_action(action)
- assert obs.exit_code == 0
- # copy logic_inference.py to /workspace
- runtime.copy_to(os.path.join(CUR_EVAL_DIR, 'logic_inference.py'), '/workspace')
- # check if the file exists
- obs = runtime.run_action(CmdRunAction(command='ls /workspace'))
- assert obs.exit_code == 0
- assert 'logic_inference.py' in obs.content
- runtime.add_env_vars({'DATASET_NAME': metadata.dataset})
- action = CmdRunAction(command='mkdir -p /workspace/.cache_program')
- logger.info(action, extra={'msg_type': 'ACTION'})
- obs = runtime.run_action(action)
- assert obs.exit_code == 0
- action = IPythonRunCellAction(code='%pip install scitools-pyke')
- logger.info(action, extra={'msg_type': 'ACTION'})
- ipynb_obs = runtime.run_action(action)
- logger.info(ipynb_obs, extra={'msg_type': 'OBSERVATION'})
- logger.info(f"{'-' * 50} END Runtime Initialization Fn {'-' * 50}")
- # Prepare instruction
- with open(os.path.join(CUR_EVAL_DIR, 'instruction.txt'), 'r') as f:
- INSTRUCTION_TEMPLATE = f.read()
- def process_instance(
- instance: pd.Series,
- metadata: EvalMetadata,
- reset_logger: bool = True,
- ):
- config = get_config(metadata)
- # Setup the logger properly, so you can run multi-processing to parallelize the evaluation
- if reset_logger:
- log_dir = os.path.join(metadata.eval_output_dir, 'infer_logs')
- reset_logger_for_multiprocessing(logger, instance['instance_id'], log_dir)
- else:
- logger.info(f'Starting evaluation for instance {instance["instance_id"]}.')
- instance_logic_programs = instance['raw_logic_programs'][0].strip()
- instruction = (
- INSTRUCTION_TEMPLATE.replace('[[dataset_name]]', dataset_name)
- .replace('[[logic_programs]]', instance_logic_programs)
- .replace('[[logic_inference_path.py]]', '/workspace/logic_inference.py')
- )
- # NOTE: You can actually set slightly different instruction for different agents
- instruction += AGENT_CLS_TO_INST_SUFFIX[metadata.agent_class]
- runtime = create_runtime(config)
- call_async_from_sync(runtime.connect)
- initialize_runtime(runtime, instance)
- # Here's how you can run the agent (similar to the `main` function) and get the final task state
- state: State | None = asyncio.run(
- run_controller(
- config=config,
- initial_user_action=MessageAction(content=instruction),
- runtime=runtime,
- fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(
- metadata.agent_class
- ),
- )
- )
- # ======= Attempt to evaluate the agent's edits =======
- # If you are working on simpler benchmark that only evaluates the final model output (e.g., in a MessageAction)
- # You can simply get the LAST `MessageAction` from the returned `state.history` and parse it for evaluation.
- if state is None:
- raise ValueError('State should not be None.')
- final_message = ''
- for event in reversed(state.history):
- if isinstance(event, AgentFinishAction):
- final_message = event.thought
- break
- elif isinstance(event, MessageAction):
- final_message = event.content
- break
- final_message = final_message.strip("'")
- logger.info(
- f'Predicted answer: {final_message}, Ground truth: {instance["answer"]}'
- )
- test_result = get_test_result(
- model_answer=final_message, ground_truth=instance['answer']
- )
- test_result['final_message'] = final_message
- metrics = state.metrics.get() if state.metrics else None
- # history is now available as a stream of events, rather than list of pairs of (Action, Observation)
- # for compatibility with the existing output format, we can remake the pairs here
- # remove when it becomes unnecessary
- histories = compatibility_for_eval_history_pairs(state.history)
- # Save the output
- output = EvalOutput(
- instance_id=instance['instance_id'],
- instruction=instruction,
- metadata=metadata,
- history=histories,
- metrics=metrics,
- error=state.last_error if state and state.last_error else None,
- test_result=test_result,
- )
- return output
- if __name__ == '__main__':
- parser = get_parser()
- parser.add_argument(
- '--dataset',
- type=str,
- help='the logic reasoning dataset to evaluate on {ProntoQA, ProofWriter}',
- default='ProofWriter',
- )
- parser.add_argument(
- '--data_split',
- type=str,
- help='data split to evaluate on {validation}', # right now we only support validation split
- default='validation',
- )
- args, _ = parser.parse_known_args()
- dataset_name = args.dataset
- data_split = args.data_split
- dataset = load_dataset(f'renma/{dataset_name}')
- dataset_df = dataset[data_split].to_pandas()
- dataset_df.rename(columns={'id': 'instance_id'}, inplace=True)
- llm_config = None
- if args.llm_config:
- llm_config = get_llm_config_arg(args.llm_config)
- if llm_config is None:
- raise ValueError(f'Could not find LLM config: --llm_config {args.llm_config}')
- metadata = make_metadata(
- llm_config,
- dataset_name,
- args.agent_cls,
- args.max_iterations,
- args.eval_note,
- args.eval_output_dir,
- )
- output_file = os.path.join(metadata.eval_output_dir, 'output.jsonl')
- instances = prepare_dataset(dataset_df, output_file, args.eval_n_limit)
- run_evaluation(
- instances, metadata, output_file, args.eval_num_workers, process_instance
- )
|