Просмотр исходного кода

CLI concurrency (#2695)

* add session id in cli, evals

* fix main sid
Engel Nyst 1 год назад
Родитель
Сommit
874b4c9075

+ 1 - 0
evaluation/EDA/run_infer.py

@@ -126,6 +126,7 @@ def process_instance(
         main(
             instruction,
             fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(agent_class),
+            sid=instance['text'].strip(),
         )
     )
     # ======= Attempt to evaluate the agent's edits =======

+ 1 - 0
evaluation/agent_bench/run_infer.py

@@ -163,6 +163,7 @@ def process_instance(
             instruction,
             fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(agent_class),
             sandbox=sandbox,
+            sid=inst_id,
         )
     )
 

+ 4 - 0
evaluation/biocoder/run_infer.py

@@ -213,12 +213,16 @@ def process_instance(
     # NOTE: You can actually set slightly different instruction for different agents
     instruction += AGENT_CLS_TO_INST_SUFFIX.get(agent_class, '')
 
+    # use a session id for concurrent evaluation
+    sid = instance.test_case_id.replace('/', '__')
+
     # Here's how you can run the agent (similar to the `main` function) and get the final task state
     state: State = asyncio.run(
         main(
             instruction,
             fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(agent_class),
             sandbox=sandbox,
+            sid=sid,
         )
     )
 

+ 7 - 5
evaluation/bird/run_infer.py

@@ -153,13 +153,16 @@ def process_instance(
     # Set up the database path
     database_path = os.path.join(instance.db_id, f'{instance.db_id}.sqlite')
 
+    # use session id for concurrent evaluation
+    sid = instance.task_id.replace('/', '__')
+
     # Set up the logger properly, so you can run multi-processing to parallelize the evaluation
     if reset_logger:
         # Set up logger
         log_file = os.path.join(
             eval_output_dir,
             'logs',
-            f'instance_{instance.task_id.replace("/", "__")}.log',
+            f'instance_{sid}.log',
         )
         # Remove all existing handlers from logger
         for handler in logger.handlers[:]:
@@ -198,14 +201,12 @@ def process_instance(
         result = execute_sql(db_path, sql)
         print(result)
     """
-    path = os.path.join(
-        config.workspace_mount_path, f'{instance.task_id.replace("/", "__")}.py'
-    )
+    path = os.path.join(config.workspace_mount_path, f'{sid}.py')
     instruction = (
         f'You are a SQL expert and need to complete the following text-to-SQL tasks.'
         f'\n\n{instance.instruction}\n\n'
         'Please write the SQL in one line without line breaks.'
-        f'And write a new python file named {instance.task_id.replace("/", "__")}.py to call the SQL you wrote.'
+        f'And write a new python file named {sid}.py to call the SQL you wrote.'
         'You need to follow the code template below:'
         f'\n\n{statements}\n\n'
         'Environment has been set up for you to start working.'
@@ -222,6 +223,7 @@ def process_instance(
         main(
             instruction,
             fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(agent_class),
+            sid=sid,
         )
     )
 

+ 1 - 0
evaluation/gaia/run_infer.py

@@ -145,6 +145,7 @@ def process_instance(instance, agent_class, metadata, reset_logger: bool = True)
                 fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(
                     agent_class
                 ),
+                sid=instance['task_id'],
             )
         )
         # ======= Attempt to evaluate the agent's edits =======

+ 1 - 0
evaluation/gorilla/run_infer.py

@@ -117,6 +117,7 @@ def process_instance(
                 fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(
                     agent_class
                 ),
+                sid=question_id,
             )
         )
         # ======= Attempt to evaluate the agent's edits =======

+ 1 - 0
evaluation/gpqa/run_infer.py

@@ -251,6 +251,7 @@ def process_instance(
                 fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(
                     agent_class
                 ),
+                sid=instance.instance_id,
             )
         )
 

+ 6 - 4
evaluation/humanevalfix/run_infer.py

@@ -156,13 +156,16 @@ def process_instance(
         config.workspace_base = workspace_mount_path
         config.workspace_mount_path = workspace_mount_path
 
+        # use a session id for concurrent evaluation
+        sid = instance.task_id.replace('/', '__')
+
         # Setup the logger properly, so you can run multi-processing to parallelize the evaluation
         if reset_logger:
             # Set up logger
             log_file = os.path.join(
                 eval_output_dir,
                 'logs',
-                f'instance_{instance.task_id.replace("/", "__")}.log',
+                f'instance_{sid}.log',
             )
             # Remove all existing handlers from logger
             for handler in logger.handlers[:]:
@@ -189,9 +192,7 @@ def process_instance(
         problem_statement = (
             instance.declaration + instance.buggy_solution + '\n' + instance.test
         )
-        path = os.path.join(
-            workspace_mount_path, f'{instance.task_id.replace("/", "__")}.py'
-        )
+        path = os.path.join(workspace_mount_path, f'{sid}.py')
         with open(path, 'w') as f:
             f.write(problem_statement)
 
@@ -217,6 +218,7 @@ def process_instance(
                 fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(
                     agent_class
                 ),
+                sid=sid,
             )
         )
 

+ 4 - 1
evaluation/logic_reasoning/run_infer.py

@@ -207,7 +207,9 @@ def process_instance(
         # NOTE: You can actually set slightly different instruction for different agents
         instruction += AGENT_CLS_TO_INST_SUFFIX.get(agent_class, '')
 
-        sandbox = DockerSSHBox()
+        # use a session id for concurrent evaluation
+        sid = instance['id'] + '_' + str(os.getpid())
+        sandbox = DockerSSHBox(sid=sid)
         exit_code, command_output = sandbox.execute('pip install scitools-pyke')
 
         # Here's how you can run the agent (similar to the `main` function) and get the final task state
@@ -218,6 +220,7 @@ def process_instance(
                     agent_class
                 ),
                 sandbox=sandbox,
+                sid=sid,
             )
         )
         # ======= Attempt to evaluate the agent's edits =======

+ 1 - 0
evaluation/miniwob/run_infer.py

@@ -65,6 +65,7 @@ def process_instance(
             'PLACEHOLDER_GOAL',
             runtime_tools_config=runtime_tools_config,
             sandbox=docker_sandbox,
+            sid=env_id,
         )
     )
 

+ 4 - 1
evaluation/mint/run_infer.py

@@ -116,7 +116,9 @@ def process_instance(
     if not skip_workspace_mount:
         logger.info(f'Process-specific workspace mounted at {workspace_mount_path}')
 
-    sandbox = DockerSSHBox()
+    # use a session id for concurrent processing
+    sid = instance.task_id + '_' + str(os.getpid())
+    sandbox = DockerSSHBox(sid=sid)
 
     requirements_host_src = 'evaluation/mint/requirements.txt'
     requirements_sandbox_dest = '/opendevin/plugins/mint/requirements.txt'
@@ -159,6 +161,7 @@ def process_instance(
             instruction,
             fake_user_response_fn=fake_user_response_fn,
             sandbox=sandbox,
+            sid=sid,
         )
     )
 

+ 4 - 2
evaluation/ml_bench/run_infer.py

@@ -143,8 +143,9 @@ def process_instance(
 
         logger.info(f'Process-specific workspace mounted at {workspace_mount_path}')
 
-        # Create a sandbox, using the instance ID as the session ID to avoid conflicts
-        sandbox = DockerSSHBox(sid=str(instance['id']) + '_' + str(os.getpid()))
+        # Create a sandbox, using the instance ID and PID as the session ID to avoid conflicts
+        sid = str(instance['id']) + '_' + str(os.getpid())
+        sandbox = DockerSSHBox(sid=sid)
 
         # Set up the task environment
         sandbox.execute(f'conda activate {ID2CONDA[instance["github_id"]]}')
@@ -186,6 +187,7 @@ def process_instance(
                     agent_class
                 ),
                 sandbox=sandbox,
+                sid=sid,
             )
         )
         metrics = state.metrics.get() if state.metrics else {}

+ 1 - 0
evaluation/swe_bench/run_infer.py

@@ -310,6 +310,7 @@ IMPORTANT TIPS:
             instruction,
             fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(agent_class),
             sandbox=sandbox,
+            sid=instance.instance_id,
         )
     )
 

+ 1 - 1
evaluation/swe_bench/swe_env_box.py

@@ -38,7 +38,7 @@ class SWEBenchSSHBox(DockerSSHBox):
             container_image is not None
         ), 'container_image is required for SWEBenchSSHBox!'
         # Need to run as root to use SWEBench container
-        sid = f'swe_bench_{swe_instance_id}' + str(uuid.uuid4())
+        sid = f'swe_bench_{swe_instance_id}_' + str(uuid.uuid4())
         super().__init__(container_image, timeout, sid)
         self.init_plugins(sandbox_plugins)
 

+ 1 - 0
evaluation/toolqa/run_infer.py

@@ -108,6 +108,7 @@ def process_instance(task, agent_class, metadata, reset_logger: bool = True):
         main(
             instruction,
             fake_user_response_fn=AGENT_CLS_TO_FAKE_USER_RESPONSE_FN.get(agent_class),
+            sid=qid,
         )
     )
     # ======= Attempt to evaluate the agent's edits =======

+ 1 - 0
evaluation/webarena/run_infer.py

@@ -65,6 +65,7 @@ def process_instance(
             'PLACEHOLDER_GOAL',
             runtime_tools_config=runtime_tools_config,
             sandbox=docker_sandbox,
+            sid=env_id,
         )
     )
 

+ 2 - 1
opendevin/core/main.py

@@ -36,6 +36,7 @@ async def main(
     fake_user_response_fn: Optional[Callable[[Optional[State]], str]] = None,
     sandbox: Optional[Sandbox] = None,
     runtime_tools_config: Optional[dict] = None,
+    sid: str | None = None,
 ) -> Optional[State]:
     """Main coroutine to run the agent controller with task input flexibility.
     It's only used when you launch opendevin backend directly via cmdline.
@@ -84,7 +85,7 @@ async def main(
     AgentCls: Type[Agent] = Agent.get_cls(args.agent_cls)
     agent = AgentCls(llm=llm)
 
-    event_stream = EventStream('main')
+    event_stream = EventStream('main' + ('_' + sid if sid else ''))
     controller = AgentController(
         agent=agent,
         max_iterations=args.max_iterations,