job_runner.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from __future__ import print_function
  2. from multiprocessing import Pool
  3. import argparse
  4. from tqdm import tqdm
  5. import math
  6. class MultiProcessRunner:
  7. def __init__(self, fn):
  8. self.args = None
  9. self.process = fn
  10. def run(self):
  11. parser = argparse.ArgumentParser("")
  12. # Task-independent options
  13. parser.add_argument("--nj", type=int, default=16)
  14. parser.add_argument("--debug", action="store_true", default=False)
  15. parser.add_argument("--no_pbar", action="store_true", default=False)
  16. parser.add_argument("--verbose", action="store_ture", default=False)
  17. task_list, args = self.prepare(parser)
  18. result_list = self.pool_run(task_list, args)
  19. self.post(result_list, args)
  20. def prepare(self, parser):
  21. raise NotImplementedError("Please implement the prepare function.")
  22. def post(self, result_list, args):
  23. raise NotImplementedError("Please implement the post function.")
  24. def pool_run(self, tasks, args):
  25. results = []
  26. if args.debug:
  27. one_result = self.process(tasks[0])
  28. results.append(one_result)
  29. else:
  30. pool = Pool(args.nj)
  31. for one_result in tqdm(pool.imap(self.process, tasks), total=len(tasks), ascii=True, disable=args.no_pbar):
  32. results.append(one_result)
  33. pool.close()
  34. return results
  35. class MultiProcessRunnerV2:
  36. def __init__(self, fn):
  37. self.args = None
  38. self.process = fn
  39. def run(self):
  40. parser = argparse.ArgumentParser("")
  41. # Task-independent options
  42. parser.add_argument("--nj", type=int, default=16)
  43. parser.add_argument("--debug", action="store_true", default=False)
  44. parser.add_argument("--no_pbar", action="store_true", default=False)
  45. parser.add_argument("--verbose", action="store_true", default=False)
  46. task_list, args = self.prepare(parser)
  47. chunk_size = int(math.ceil(float(len(task_list)) / args.nj))
  48. if args.verbose:
  49. print("Split {} tasks into {} sub-tasks with chunk_size {}".format(len(task_list), args.nj, chunk_size))
  50. subtask_list = [task_list[i*chunk_size: (i+1)*chunk_size] for i in range(args.nj)]
  51. result_list = self.pool_run(subtask_list, args)
  52. self.post(result_list, args)
  53. def prepare(self, parser):
  54. raise NotImplementedError("Please implement the prepare function.")
  55. def post(self, result_list, args):
  56. raise NotImplementedError("Please implement the post function.")
  57. def pool_run(self, tasks, args):
  58. results = []
  59. if args.debug:
  60. one_result = self.process(tasks[0])
  61. results.append(one_result)
  62. else:
  63. pool = Pool(args.nj)
  64. for one_result in tqdm(pool.imap(self.process, tasks), total=len(tasks), ascii=True, disable=args.no_pbar):
  65. results.append(one_result)
  66. pool.close()
  67. return results
  68. class MultiProcessRunnerV3(MultiProcessRunnerV2):
  69. def run(self):
  70. parser = argparse.ArgumentParser("")
  71. # Task-independent options
  72. parser.add_argument("--nj", type=int, default=16)
  73. parser.add_argument("--debug", action="store_true", default=False)
  74. parser.add_argument("--no_pbar", action="store_true", default=False)
  75. parser.add_argument("--verbose", action="store_true", default=False)
  76. parser.add_argument("--sr", type=int, default=16000)
  77. task_list, shared_param, args = self.prepare(parser)
  78. chunk_size = int(math.ceil(float(len(task_list)) / args.nj))
  79. if args.verbose:
  80. print("Split {} tasks into {} sub-tasks with chunk_size {}".format(len(task_list), args.nj, chunk_size))
  81. subtask_list = [(i, task_list[i * chunk_size: (i + 1) * chunk_size], shared_param, args)
  82. for i in range(args.nj)]
  83. result_list = self.pool_run(subtask_list, args)
  84. self.post(result_list, args)