failover.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import time
  2. from abc import ABC, abstractmethod
  3. from redis.data_structure import WeightedList
  4. from redis.multidb.circuit import State as CBState
  5. from redis.multidb.database import Databases, SyncDatabase
  6. from redis.multidb.exception import (
  7. NoValidDatabaseException,
  8. TemporaryUnavailableException,
  9. )
  10. DEFAULT_FAILOVER_ATTEMPTS = 10
  11. DEFAULT_FAILOVER_DELAY = 12
  12. class FailoverStrategy(ABC):
  13. @abstractmethod
  14. def database(self) -> SyncDatabase:
  15. """Select the database according to the strategy."""
  16. pass
  17. @abstractmethod
  18. def set_databases(self, databases: Databases) -> None:
  19. """Set the database strategy operates on."""
  20. pass
  21. class FailoverStrategyExecutor(ABC):
  22. @property
  23. @abstractmethod
  24. def failover_attempts(self) -> int:
  25. """The number of failover attempts."""
  26. pass
  27. @property
  28. @abstractmethod
  29. def failover_delay(self) -> float:
  30. """The delay between failover attempts."""
  31. pass
  32. @property
  33. @abstractmethod
  34. def strategy(self) -> FailoverStrategy:
  35. """The strategy to execute."""
  36. pass
  37. @abstractmethod
  38. def execute(self) -> SyncDatabase:
  39. """Execute the failover strategy."""
  40. pass
  41. class WeightBasedFailoverStrategy(FailoverStrategy):
  42. """
  43. Failover strategy based on database weights.
  44. """
  45. def __init__(self) -> None:
  46. self._databases = WeightedList()
  47. def database(self) -> SyncDatabase:
  48. for database, _ in self._databases:
  49. if database.circuit.state == CBState.CLOSED:
  50. return database
  51. raise NoValidDatabaseException("No valid database available for communication")
  52. def set_databases(self, databases: Databases) -> None:
  53. self._databases = databases
  54. class DefaultFailoverStrategyExecutor(FailoverStrategyExecutor):
  55. """
  56. Executes given failover strategy.
  57. """
  58. def __init__(
  59. self,
  60. strategy: FailoverStrategy,
  61. failover_attempts: int = DEFAULT_FAILOVER_ATTEMPTS,
  62. failover_delay: float = DEFAULT_FAILOVER_DELAY,
  63. ):
  64. self._strategy = strategy
  65. self._failover_attempts = failover_attempts
  66. self._failover_delay = failover_delay
  67. self._next_attempt_ts: int = 0
  68. self._failover_counter: int = 0
  69. @property
  70. def failover_attempts(self) -> int:
  71. return self._failover_attempts
  72. @property
  73. def failover_delay(self) -> float:
  74. return self._failover_delay
  75. @property
  76. def strategy(self) -> FailoverStrategy:
  77. return self._strategy
  78. def execute(self) -> SyncDatabase:
  79. try:
  80. database = self._strategy.database()
  81. self._reset()
  82. return database
  83. except NoValidDatabaseException as e:
  84. if self._next_attempt_ts == 0:
  85. self._next_attempt_ts = time.time() + self._failover_delay
  86. self._failover_counter += 1
  87. elif time.time() >= self._next_attempt_ts:
  88. self._next_attempt_ts += self._failover_delay
  89. self._failover_counter += 1
  90. if self._failover_counter > self._failover_attempts:
  91. self._reset()
  92. raise e
  93. else:
  94. raise TemporaryUnavailableException(
  95. "No database connections currently available. "
  96. "This is a temporary condition - please retry the operation."
  97. )
  98. def _reset(self) -> None:
  99. self._next_attempt_ts = 0
  100. self._failover_counter = 0