import asyncio
import json
import logging
import multiprocessing as mp
import os
import pathlib
import subprocess
import time
from concurrent.futures import ProcessPoolExecutor
from tqdm import tqdm
from utils import download_data, download_tools, encode_question, eval_answer, get_data
from opendevin.controller.state.state import State
from opendevin.core.config import config, get_llm_config_arg, get_parser
from opendevin.core.logger import get_console_handler
from opendevin.core.logger import opendevin_logger as logger
from opendevin.core.main import main
from opendevin.events.action import MessageAction
from opendevin.events.serialization.event import event_to_dict
def cleanup():
print('Cleaning up child processes...')
for process in mp.active_children():
print(f'Terminating child process: {process.name}')
process.terminate()
process.join()
def codeact_user_response(state: State) -> str:
msg = (
'Please continue working on the task on whatever approach you think is suitable.\n'
'When you think you finished the task, respond with `Finish[answer]` where you include your answer in `[]`\n'
'IMPORTANT: YOU SHOULD NEVER ASK FOR HUMAN HELP OR USE THE INTERNET TO SOLVE THIS TASK.\n'
)
if state.history:
user_msgs = [
action
for action, _ in state.history
if isinstance(action, MessageAction) and action.source == 'user'
]
if len(user_msgs) >= 2:
# let the agent know that it can give up when it has tried 3 times
return (
msg
+ 'If you want to give up, run: exit .\n'
)
return msg
def monologue_user_response(state: State) -> str:
raise NotImplementedError('MonologueAgent should never ask for user responses.')
AGENT_CLS_TO_FAKE_USER_RESPONSE_FN = {
'CodeActAgent': codeact_user_response,
'MonologueAgent': monologue_user_response,
}
AGENT_CLS_TO_INST_SUFFIX = {
'CodeActAgent': 'When you think you have completed the request, please run the following command: exit .\n'
}
def process_instance(task, agent_class, metadata, reset_logger: bool = True):
# create process-specific workspace dir
# we will create a workspace directory for EACH process
# so that different agent don't interfere with each other.
workspace_mount_path = config.workspace_mount_path
pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True)
# Setup the logger properly, so you can run multi-processing to parallelize the evaluation
eval_output_dir = metadata['eval_output_dir']
qid = task['qid']
question = task['question']
answer = task['answer']
if reset_logger:
# Set up logger
log_file = os.path.join(eval_output_dir, 'logs', f'instance_{qid}.log')
# Remove all existing handlers from logger
for handler in logger.handlers[:]:
logger.removeHandler(handler)
# add back the console handler to print ONE line
logger.addHandler(get_console_handler())
logger.info(
f'Starting evaluation for instance {qid}.\nHint: run "tail -f {log_file}" to see live logs in a separate shell'
)
# Remove all existing handlers from logger
for handler in logger.handlers[:]:
logger.removeHandler(handler)
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(
logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
)
logger.addHandler(file_handler)
logger.info(f'Process-specific workspace mounted at {workspace_mount_path}')
# Prepare instruction
instruction = encode_question(question)
instruction += 'IMPORTANT: You should ONLY interact with the environment provided to you AND NEVER ASK FOR HUMAN HELP.\n'
# NOTE: You can actually set slightly different instruction for different agents
instruction += AGENT_CLS_TO_INST_SUFFIX.get(agent_class, '')
# logger.info(f'Instruction:\n{instruction}', extra={'msg_type': 'OBSERVATION'})
# Here's how you can run the agent (similar to the `main` function) and get the final task state
state: State = asyncio.run(
main(
instruction,
fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(agent_class),
)
)
# ======= Attempt to evaluate the agent's edits =======
# If you are working on simpler benchmark that only evaluates the final model output (e.g., in a MessageAction)
# You can simply get the LAST `MessageAction` from the returned `state.history` and parse it for evaluation.
if state is None:
raise ValueError('State should not be None.')
model_answer_raw = ''
for act, _ in reversed(state.history):
if isinstance(act, MessageAction) and act.source == 'agent':
model_answer_raw = act.content
break
# attempt to parse model_answer
correct = eval_answer(str(model_answer_raw), str(answer))
metrics = state.metrics.get() if state.metrics else None
logger.info(f'Final message: {model_answer_raw} | Correctness: {correct}')
# Save the output
output = {
'qid': qid,
'text': model_answer_raw,
'correct': correct,
'answer_id': 'None',
'model_id': metadata['model_name'],
'metadata': metadata,
'history': [
(event_to_dict(action), event_to_dict(obs)) for action, obs in state.history
],
'metrics': metrics,
'error': state.error if state and state.error else None,
}
return output
if __name__ == '__main__':
parser = get_parser()
parser.add_argument(
'--dataset',
type=str,
help='Which dataset to evaluate from ToolQA. ToolQA contains 8 datasets, namely agenda, airbnb, coffee, dblp, flight, gsm8k, scirex, yelp. For example, the default is --dataset flight.',
default='flight',
)
parser.add_argument(
'--hardness',
type=str,
help='Which level of difficulty to evaluate from ToolQA. ToolQA contains 2 levels of hardness, namely easy and hard. For example, the default is --hardness easy.',
default='easy',
)
parser.add_argument(
'--wolfram_alpha_appid',
type=str,
help='wolfram alpha appid to use for wolfram alpha related tests',
default='YOUR_WOLFRAMALPHA_APPID',
)
args, _ = parser.parse_known_args()
if args.directory:
config.workspace_base = os.path.abspath(args.directory)
print(f'Setting workspace base to {config.workspace_base}')
# Check https://github.com/OpenDevin/OpenDevin/blob/main/evaluation/swe_bench/README.md#configure-opendevin-and-your-llm
# for details of how to set `llm_config`
if args.llm_config:
specified_llm_config = get_llm_config_arg(args.llm_config)
if specified_llm_config:
config.llm = specified_llm_config
logger.info(f'Config for evaluation: {config}')
agent_class = args.agent_cls
assert (
agent_class in AGENT_CLS_TO_FAKE_USER_RESPONSE_FN
), f'Unsupported agent class: {agent_class}'
model_name = config.llm.model.split('/')[-1]
max_iterations = args.max_iterations
eval_note = ''
if args.eval_note is not None:
eval_note += '_N_' + args.eval_note
eval_output_dir = os.path.join(
args.eval_output_dir,
'toolqa',
agent_class,
model_name + '_maxiter_' + str(max_iterations) + eval_note,
)
pathlib.Path(eval_output_dir).mkdir(parents=True, exist_ok=True)
pathlib.Path(os.path.join(eval_output_dir, 'logs')).mkdir(
parents=True, exist_ok=True
)
logger.info(f'Using evaluation output directory: {eval_output_dir}')
dataset = ''
hardness = ''
dataset_choices = [
'agenda',
'airbnb',
'coffee',
'dblp',
'flight',
'gsm8k',
'scirex',
'yelp',
'genda',
]
if args.dataset in dataset_choices:
dataset = args.dataset
else:
raise ValueError(
'Please choose from agenda, airbnb, coffee, dblp, flight, gsm8k, scirex, yelp for dataset.'
)
if args.hardness == 'easy':
hardness = 'easy'
elif args.hardness == 'hard':
hardness = 'hard'
else:
raise ValueError('Please choose from easy and hard for hardness.')
logger.info(f'Evaluating ToolQA {dataset} {hardness} test')
# workspace_mount_path = os.path.join(config.workspace_mount_path, '_eval_workspace')
workspace_mount_path = config.workspace_mount_path
pathlib.Path(workspace_mount_path).mkdir(parents=True, exist_ok=True)
toolqa_test = get_data(dataset, hardness)
toolqa_data_path = download_data(workspace_mount_path)
toolqa_tool_path = download_tools(workspace_mount_path, args.wolfram_alpha_appid)
# TEST METADATA
metadata = {
'dataset': dataset,
'hardness': hardness,
'agent_class': agent_class,
'model_name': model_name,
'max_iterations': max_iterations,
'eval_output_dir': eval_output_dir,
'start_time': time.strftime('%Y-%m-%d %H:%M:%S'),
# get the commit id of current repo for reproduciblity
'git_commit': subprocess.check_output(['git', 'rev-parse', 'HEAD'])
.decode('utf-8')
.strip(),
}
logger.info(f'Metadata: {metadata}')
with open(
os.path.join(eval_output_dir, f'metadata_{dataset}_{hardness}.json'), 'w'
) as f:
json.dump(metadata, f)
# LIMIT EVALUATION
eval_n_limit = args.eval_n_limit
if eval_n_limit:
toolqa_test = toolqa_test[:eval_n_limit]
logger.info(
f'Limiting evaluation to a total of first {eval_n_limit} instances.'
)
output_file = os.path.join(
eval_output_dir, f'output_{model_name}_{dataset}_{hardness}.jsonl'
)
logger.info(f'Writing evaluation output to {output_file}')
finished_task_ids = set()
if os.path.exists(output_file):
with open(output_file, 'r') as f:
for line in f:
task = json.loads(line)
finished_task_ids.add(task['qid'])
logger.warning(
f'Output file {output_file} already exists. Loaded {len(finished_task_ids)} finished instances.'
)
output_fp = open(output_file, 'a')
logger.info(
f'Evaluation started with Agent {agent_class}, model {model_name}, max iterations {max_iterations}.'
)
# =============================================
# filter out finished instances
new_toolqa_test = []
for task in toolqa_test:
qid = task['qid']
if qid in finished_task_ids:
logger.info(f'Skipping instance {qid} as it is already finished.')
continue
new_toolqa_test.append(task)
finished_task_number = len(finished_task_ids)
toolqa_test = new_toolqa_test
logger.info(
f'Finished instances: {finished_task_number}, Remaining instances: {len(toolqa_test)}'
)
# =============================================
pbar = tqdm(total=len(toolqa_test))
# This function tracks the progress AND write the output to a JSONL file
def update_progress(future):
pbar.update(1)
output = future.result()
pbar.set_description(f'Instance {output["qid"]}')
pbar.set_postfix_str(f'Test Result: {output["correct"]}')
logger.info(
f'Finished evaluation for instance {output["qid"]}: {output["correct"]}'
)
output_fp.write(json.dumps(output) + '\n')
output_fp.flush()
finished_task_ids.add(output['qid'])
# This sets the multi-processing
num_workers = args.eval_num_workers
logger.info(f'Using {num_workers} workers for evaluation.')
try:
with ProcessPoolExecutor(num_workers) as executor:
futures = []
# This is how we perform multi-processing
for task in toolqa_test:
try:
future = executor.submit(
process_instance,
task,
agent_class,
metadata,
reset_logger=bool(num_workers > 1),
)
future.add_done_callback(update_progress)
futures.append(future)
except Exception:
continue
# Wait for all futures to complete
for future in futures:
try:
future.result()
except Exception:
continue
except KeyboardInterrupt:
logger.info('KeyboardInterrupt received. Cleaning up...')
cleanup()
output_fp.close()
total_correct = 0
output = []
with open(output_file, 'r') as f:
for line in f:
data = json.loads(line)
output.append(data)
if data['qid'] in finished_task_ids:
if str(data['correct']).lower() == 'true':
total_correct += 1
# sort all output by question_id
output = sorted(output, key=lambda x: x['qid'])
with open(output_file, 'w') as f:
for dat in output:
f.write(json.dumps(dat) + '\n')
f.flush()
logger.info(
f'Evaluation finished for {dataset}-{hardness}. Total: {len(toolqa_test)+finished_task_number}; Correct: {total_correct}; Accuracy: {total_correct / (len(toolqa_test)+finished_task_number)}'
)