request.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from typing import Any
  2. import requests
  3. from requests.exceptions import (
  4. ChunkedEncodingError,
  5. ConnectionError,
  6. )
  7. from urllib3.exceptions import IncompleteRead
  8. def is_server_error(exception):
  9. return (
  10. isinstance(exception, requests.HTTPError)
  11. and exception.response.status_code >= 500
  12. )
  13. def is_404_error(exception):
  14. return (
  15. isinstance(exception, requests.HTTPError)
  16. and exception.response.status_code == 404
  17. )
  18. def is_429_error(exception):
  19. return (
  20. isinstance(exception, requests.HTTPError)
  21. and exception.response.status_code == 429
  22. )
  23. def is_503_error(exception):
  24. return (
  25. isinstance(exception, requests.HTTPError)
  26. and exception.response.status_code == 503
  27. )
  28. def is_502_error(exception):
  29. return (
  30. isinstance(exception, requests.HTTPError)
  31. and exception.response.status_code == 502
  32. )
  33. DEFAULT_RETRY_EXCEPTIONS = [
  34. ConnectionError,
  35. IncompleteRead,
  36. ChunkedEncodingError,
  37. ]
  38. def send_request(
  39. session: requests.Session,
  40. method: str,
  41. url: str,
  42. timeout: int = 10,
  43. **kwargs: Any,
  44. ) -> requests.Response:
  45. response = session.request(method, url, **kwargs)
  46. response.raise_for_status()
  47. return response