| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- import json
- import multiprocessing as mp
- import os
- import pathlib
- import subprocess
- import time
- from asyncio.log import logger
- from concurrent.futures import ProcessPoolExecutor
- from typing import Any, Callable
- import pandas as pd
- from pydantic import BaseModel
- from tqdm import tqdm
- from opendevin.controller.state.state import State
- from opendevin.core.config import LLMConfig
- from opendevin.events.action import Action
- from opendevin.events.action.message import MessageAction
- class EvalMetadata(BaseModel):
- agent_class: str
- llm_config: LLMConfig
- max_iterations: int
- eval_output_dir: str
- start_time: str
- git_commit: str
- dataset: str | None = None
- data_split: str | None = None
- details: dict[str, Any] | None = None
- def codeact_user_response(
- state: State,
- encapsulate_solution: bool = False,
- try_parse: Callable[[Action], str] | None = None,
- ) -> str:
- encaps_str = (
- (
- 'Please encapsulate your final answer (answer ONLY) within <solution> and </solution>.\n'
- 'For example: The answer to the question is <solution> 42 </solution>.\n'
- )
- if encapsulate_solution
- else ''
- )
- msg = (
- 'Please continue working on the task on whatever approach you think is suitable.\n'
- 'If you think you have solved the task, please first send your answer to user through message and then <execute_bash> exit </execute_bash>.\n'
- f'{encaps_str}'
- 'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP.\n'
- )
- if state.history:
- if try_parse is not None:
- last_action, _ = state.history[-1]
- ans = try_parse(last_action)
- if ans is not None:
- return '/exit'
- user_msgs = [
- action
- for action, _ in state.history
- if isinstance(action, MessageAction) and action.source == 'user'
- ]
- if len(user_msgs) >= 2:
- # let the agent know that it can give up when it has tried 3 times
- return (
- msg
- + 'If you want to give up, run: <execute_bash> exit </execute_bash>.\n'
- )
- return msg
- def monologue_user_response(state: State) -> str:
- raise NotImplementedError('MonologueAgent should never ask for user responses.')
- def cleanup():
- print('Cleaning up child processes...')
- for process in mp.active_children():
- print(f'Terminating child process: {process.name}')
- process.terminate()
- process.join()
- def make_metadata(
- llm_config: LLMConfig,
- dataset_name: str,
- agent_class: str,
- max_iterations: int,
- eval_note: str | None,
- eval_output_dir: str,
- data_split: str | None = None,
- details: dict[str, Any] | None = None,
- ) -> EvalMetadata:
- model_name = llm_config.model.split('/')[-1]
- eval_note = f'_N_{eval_note}' if eval_note else ''
- eval_output_path = os.path.join(
- eval_output_dir,
- dataset_name,
- agent_class,
- f'{model_name}_maxiter_{max_iterations}{eval_note}',
- )
- pathlib.Path(eval_output_path).mkdir(parents=True, exist_ok=True)
- pathlib.Path(os.path.join(eval_output_path, 'logs')).mkdir(
- parents=True, exist_ok=True
- )
- logger.info(f'Using evaluation output directory: {eval_output_path}')
- metadata = EvalMetadata(
- agent_class=agent_class,
- llm_config=llm_config,
- max_iterations=max_iterations,
- eval_output_dir=eval_output_path,
- start_time=time.strftime('%Y-%m-%d %H:%M:%S'),
- git_commit=subprocess.check_output(['git', 'rev-parse', 'HEAD'])
- .decode('utf-8')
- .strip(),
- dataset=dataset_name,
- data_split=data_split,
- details=details,
- )
- metadata_json = metadata.model_dump_json()
- logger.info(f'Metadata: {metadata_json}')
- with open(os.path.join(eval_output_path, 'metadata.json'), 'w') as f:
- f.write(metadata_json)
- return metadata
- def prepare_dataset(dataset: pd.DataFrame, output_file, eval_n_limit, id_column):
- logger.info(f'Writing evaluation output to {output_file}')
- finished_ids = set()
- if os.path.exists(output_file):
- with open(output_file, 'r') as f:
- for line in f:
- data = json.loads(line)
- finished_ids.add(data[id_column])
- logger.warning(
- f'Output file {output_file} already exists. Loaded {len(finished_ids)} finished instances.'
- )
- if eval_n_limit:
- dataset = dataset.head(eval_n_limit)
- logger.info(f'Limiting evaluation to first {eval_n_limit} instances.')
- new_dataset = [
- instance
- for _, instance in dataset.iterrows()
- if instance[id_column] not in finished_ids
- ]
- logger.info(
- f'Finished instances: {len(finished_ids)}, Remaining instances: {len(new_dataset)}'
- )
- return pd.DataFrame(new_dataset)
- def run_evaluation(
- dataset: pd.DataFrame,
- metadata: EvalMetadata,
- output_file: str,
- num_workers: int,
- process_instance_func: Callable[[pd.Series, EvalMetadata, bool], Any],
- id_column: str,
- ):
- logger.info(
- f'Evaluation started with Agent {metadata.agent_class}, '
- f'model {metadata.llm_config.model}, max iterations {metadata.max_iterations}.'
- )
- pbar = tqdm(total=len(dataset))
- output_fp = open(output_file, 'a')
- def update_progress(future):
- pbar.update(1)
- output = future.result()
- pbar.set_description(f'Instance {output[id_column]}')
- pbar.set_postfix_str(f'Test Result: {output["test_result"]["result"]}')
- logger.info(
- f'Finished evaluation for instance {output[id_column]}: {output["test_result"]["result"]}'
- )
- output_fp.write(json.dumps(output) + '\n')
- output_fp.flush()
- try:
- with ProcessPoolExecutor(num_workers) as executor:
- futures = []
- for _, instance in dataset.iterrows():
- future = executor.submit(
- process_instance_func,
- instance,
- metadata,
- bool(num_workers > 1),
- )
- future.add_done_callback(update_progress)
- futures.append(future)
- for future in futures:
- future.result()
- except KeyboardInterrupt:
- print('KeyboardInterrupt received. Cleaning up...')
- cleanup()
- output_fp.close()
- logger.info('Evaluation finished.')
|