Browse Source

[eval] Use `imap_unorderd` for parallizing evaluation (#4040)

Xingyao Wang 1 year ago
parent
commit
1b1d8f0b02
1 changed files with 10 additions and 13 deletions
  1. 10 13
      evaluation/utils/shared.py

+ 10 - 13
evaluation/utils/shared.py

@@ -301,6 +301,11 @@ def _process_instance_wrapper(
             time.sleep(5)
 
 
+def _process_instance_wrapper_mp(args):
+    """Wrapper for multiprocessing, especially for imap_unordered."""
+    return _process_instance_wrapper(*args)
+
+
 def run_evaluation(
     dataset: pd.DataFrame,
     metadata: EvalMetadata | None,
@@ -328,21 +333,13 @@ def run_evaluation(
     try:
         if use_multiprocessing:
             with mp.Pool(num_workers) as pool:
-                results = [
-                    pool.apply_async(
-                        _process_instance_wrapper,
-                        args=(
-                            process_instance_func,
-                            instance,
-                            metadata,
-                            True,
-                            max_retries,
-                        ),
-                    )
+                args_iter = (
+                    (process_instance_func, instance, metadata, True, max_retries)
                     for _, instance in dataset.iterrows()
-                ]
+                )
+                results = pool.imap_unordered(_process_instance_wrapper_mp, args_iter)
                 for result in results:
-                    update_progress(result.get(), pbar, output_fp)
+                    update_progress(result, pbar, output_fp)
         else:
             for _, instance in dataset.iterrows():
                 result = _process_instance_wrapper(