| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import asyncio
- from concurrent import futures
- from concurrent.futures import ThreadPoolExecutor
- from typing import Callable, Coroutine, Iterable, List
- GENERAL_TIMEOUT: int = 15
- EXECUTOR = ThreadPoolExecutor()
- async def call_sync_from_async(fn: Callable, *args, **kwargs):
- """
- Shorthand for running a function in the default background thread pool executor
- and awaiting the result. The nature of synchronous code is that the future
- returned by this function is not cancellable
- """
- loop = asyncio.get_event_loop()
- coro = loop.run_in_executor(None, lambda: fn(*args, **kwargs))
- result = await coro
- return result
- def call_async_from_sync(
- corofn: Callable, timeout: float = GENERAL_TIMEOUT, *args, **kwargs
- ):
- """
- Shorthand for running a coroutine in the default background thread pool executor
- and awaiting the result
- """
- if corofn is None:
- raise ValueError('corofn is None')
- if not asyncio.iscoroutinefunction(corofn):
- raise ValueError('corofn is not a coroutine function')
- async def arun():
- coro = corofn(*args, **kwargs)
- result = await coro
- return result
- def run():
- loop_for_thread = asyncio.new_event_loop()
- try:
- asyncio.set_event_loop(loop_for_thread)
- return asyncio.run(arun())
- finally:
- loop_for_thread.close()
- future = EXECUTOR.submit(run)
- futures.wait([future], timeout=timeout or None)
- result = future.result()
- return result
- async def call_coro_in_bg_thread(
- corofn: Callable, timeout: float = GENERAL_TIMEOUT, *args, **kwargs
- ):
- """Function for running a coroutine in a background thread."""
- await call_sync_from_async(call_async_from_sync, corofn, timeout, *args, **kwargs)
- async def wait_all(
- iterable: Iterable[Coroutine], timeout: int = GENERAL_TIMEOUT
- ) -> List:
- """
- Shorthand for waiting for all the coroutines in the iterable given in parallel. Creates
- a task for each coroutine.
- Returns a list of results in the original order. If any single task raised an exception, this is raised.
- If multiple tasks raised exceptions, an AsyncException is raised containing all exceptions.
- """
- tasks = [asyncio.create_task(c) for c in iterable]
- if not tasks:
- return []
- _, pending = await asyncio.wait(tasks, timeout=timeout)
- if pending:
- for task in pending:
- task.cancel()
- raise asyncio.TimeoutError()
- results = []
- errors = []
- for task in tasks:
- try:
- results.append(task.result())
- except Exception as e:
- errors.append(e)
- if errors:
- if len(errors) == 1:
- raise errors[0]
- raise AsyncException(errors)
- return [task.result() for task in tasks]
- class AsyncException(Exception):
- def __init__(self, exceptions):
- self.exceptions = exceptions
- def __str__(self):
- return '\n'.join(str(e) for e in self.exceptions)
|