database.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. from abc import ABC, abstractmethod
  2. from typing import Optional, Union
  3. import redis
  4. from redis import RedisCluster
  5. from redis.data_structure import WeightedList
  6. from redis.multidb.circuit import CircuitBreaker
  7. from redis.typing import Number
  8. class AbstractDatabase(ABC):
  9. @property
  10. @abstractmethod
  11. def weight(self) -> float:
  12. """The weight of this database in compare to others. Used to determine the database failover to."""
  13. pass
  14. @weight.setter
  15. @abstractmethod
  16. def weight(self, weight: float):
  17. """Set the weight of this database in compare to others."""
  18. pass
  19. @property
  20. @abstractmethod
  21. def health_check_url(self) -> Optional[str]:
  22. """Health check URL associated with the current database."""
  23. pass
  24. @health_check_url.setter
  25. @abstractmethod
  26. def health_check_url(self, health_check_url: Optional[str]):
  27. """Set the health check URL associated with the current database."""
  28. pass
  29. class BaseDatabase(AbstractDatabase):
  30. def __init__(
  31. self,
  32. weight: float,
  33. health_check_url: Optional[str] = None,
  34. ):
  35. self._weight = weight
  36. self._health_check_url = health_check_url
  37. @property
  38. def weight(self) -> float:
  39. return self._weight
  40. @weight.setter
  41. def weight(self, weight: float):
  42. self._weight = weight
  43. @property
  44. def health_check_url(self) -> Optional[str]:
  45. return self._health_check_url
  46. @health_check_url.setter
  47. def health_check_url(self, health_check_url: Optional[str]):
  48. self._health_check_url = health_check_url
  49. class SyncDatabase(AbstractDatabase):
  50. """Database with an underlying synchronous redis client."""
  51. @property
  52. @abstractmethod
  53. def client(self) -> Union[redis.Redis, RedisCluster]:
  54. """The underlying redis client."""
  55. pass
  56. @client.setter
  57. @abstractmethod
  58. def client(self, client: Union[redis.Redis, RedisCluster]):
  59. """Set the underlying redis client."""
  60. pass
  61. @property
  62. @abstractmethod
  63. def circuit(self) -> CircuitBreaker:
  64. """Circuit breaker for the current database."""
  65. pass
  66. @circuit.setter
  67. @abstractmethod
  68. def circuit(self, circuit: CircuitBreaker):
  69. """Set the circuit breaker for the current database."""
  70. pass
  71. Databases = WeightedList[tuple[SyncDatabase, Number]]
  72. class Database(BaseDatabase, SyncDatabase):
  73. def __init__(
  74. self,
  75. client: Union[redis.Redis, RedisCluster],
  76. circuit: CircuitBreaker,
  77. weight: float,
  78. health_check_url: Optional[str] = None,
  79. ):
  80. """
  81. Initialize a new Database instance.
  82. Args:
  83. client: Underlying Redis client instance for database operations
  84. circuit: Circuit breaker for handling database failures
  85. weight: Weight value used for database failover prioritization
  86. health_check_url: Health check URL associated with the current database
  87. """
  88. self._client = client
  89. self._cb = circuit
  90. self._cb.database = self
  91. super().__init__(weight, health_check_url)
  92. @property
  93. def client(self) -> Union[redis.Redis, RedisCluster]:
  94. return self._client
  95. @client.setter
  96. def client(self, client: Union[redis.Redis, RedisCluster]):
  97. self._client = client
  98. @property
  99. def circuit(self) -> CircuitBreaker:
  100. return self._cb
  101. @circuit.setter
  102. def circuit(self, circuit: CircuitBreaker):
  103. self._cb = circuit
  104. def __repr__(self):
  105. return f"Database(client={self.client}, weight={self.weight})"