retry.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import abc
  2. import socket
  3. from time import sleep
  4. from typing import (
  5. TYPE_CHECKING,
  6. Any,
  7. Callable,
  8. Generic,
  9. Iterable,
  10. Optional,
  11. Tuple,
  12. Type,
  13. TypeVar,
  14. Union,
  15. )
  16. from redis.exceptions import ConnectionError, TimeoutError
  17. T = TypeVar("T")
  18. E = TypeVar("E", bound=Exception, covariant=True)
  19. if TYPE_CHECKING:
  20. from redis.backoff import AbstractBackoff
  21. class AbstractRetry(Generic[E], abc.ABC):
  22. """Retry a specific number of times after a failure"""
  23. _supported_errors: Tuple[Type[E], ...]
  24. def __init__(
  25. self,
  26. backoff: "AbstractBackoff",
  27. retries: int,
  28. supported_errors: Tuple[Type[E], ...],
  29. ):
  30. """
  31. Initialize a `Retry` object with a `Backoff` object
  32. that retries a maximum of `retries` times.
  33. `retries` can be negative to retry forever.
  34. You can specify the types of supported errors which trigger
  35. a retry with the `supported_errors` parameter.
  36. """
  37. self._backoff = backoff
  38. self._retries = retries
  39. self._supported_errors = supported_errors
  40. @abc.abstractmethod
  41. def __eq__(self, other: Any) -> bool:
  42. return NotImplemented
  43. def __hash__(self) -> int:
  44. return hash((self._backoff, self._retries, frozenset(self._supported_errors)))
  45. def update_supported_errors(self, specified_errors: Iterable[Type[E]]) -> None:
  46. """
  47. Updates the supported errors with the specified error types
  48. """
  49. self._supported_errors = tuple(
  50. set(self._supported_errors + tuple(specified_errors))
  51. )
  52. def get_retries(self) -> int:
  53. """
  54. Get the number of retries.
  55. """
  56. return self._retries
  57. def update_retries(self, value: int) -> None:
  58. """
  59. Set the number of retries.
  60. """
  61. self._retries = value
  62. class Retry(AbstractRetry[Exception]):
  63. __hash__ = AbstractRetry.__hash__
  64. def __init__(
  65. self,
  66. backoff: "AbstractBackoff",
  67. retries: int,
  68. supported_errors: Tuple[Type[Exception], ...] = (
  69. ConnectionError,
  70. TimeoutError,
  71. socket.timeout,
  72. ),
  73. ):
  74. super().__init__(backoff, retries, supported_errors)
  75. def __eq__(self, other: Any) -> bool:
  76. if not isinstance(other, Retry):
  77. return NotImplemented
  78. return (
  79. self._backoff == other._backoff
  80. and self._retries == other._retries
  81. and set(self._supported_errors) == set(other._supported_errors)
  82. )
  83. def call_with_retry(
  84. self,
  85. do: Callable[[], T],
  86. fail: Union[Callable[[Exception], Any], Callable[[Exception, int], Any]],
  87. is_retryable: Optional[Callable[[Exception], bool]] = None,
  88. with_failure_count: bool = False,
  89. ) -> T:
  90. """
  91. Execute an operation that might fail and returns its result, or
  92. raise the exception that was thrown depending on the `Backoff` object.
  93. `do`: the operation to call. Expects no argument.
  94. `fail`: the failure handler, expects the last error that was thrown
  95. ``is_retryable``: optional function to determine if an error is retryable
  96. ``with_failure_count``: if True, the failure count is passed to the failure handler
  97. """
  98. self._backoff.reset()
  99. failures = 0
  100. while True:
  101. try:
  102. return do()
  103. except self._supported_errors as error:
  104. if is_retryable and not is_retryable(error):
  105. raise
  106. failures += 1
  107. if with_failure_count:
  108. fail(error, failures)
  109. else:
  110. fail(error)
  111. if self._retries >= 0 and failures > self._retries:
  112. raise error
  113. backoff = self._backoff.compute(failures)
  114. if backoff > 0:
  115. sleep(backoff)