sentinel.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404
  1. import asyncio
  2. import random
  3. import weakref
  4. from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
  5. from redis.asyncio.client import Redis
  6. from redis.asyncio.connection import (
  7. Connection,
  8. ConnectionPool,
  9. EncodableT,
  10. SSLConnection,
  11. )
  12. from redis.commands import AsyncSentinelCommands
  13. from redis.exceptions import (
  14. ConnectionError,
  15. ReadOnlyError,
  16. ResponseError,
  17. TimeoutError,
  18. )
  19. class MasterNotFoundError(ConnectionError):
  20. pass
  21. class SlaveNotFoundError(ConnectionError):
  22. pass
  23. class SentinelManagedConnection(Connection):
  24. def __init__(self, **kwargs):
  25. self.connection_pool = kwargs.pop("connection_pool")
  26. super().__init__(**kwargs)
  27. def __repr__(self):
  28. s = f"<{self.__class__.__module__}.{self.__class__.__name__}"
  29. if self.host:
  30. host_info = f",host={self.host},port={self.port}"
  31. s += host_info
  32. return s + ")>"
  33. async def connect_to(self, address):
  34. self.host, self.port = address
  35. await self.connect_check_health(
  36. check_health=self.connection_pool.check_connection,
  37. retry_socket_connect=False,
  38. )
  39. async def _connect_retry(self):
  40. if self._reader:
  41. return # already connected
  42. if self.connection_pool.is_master:
  43. await self.connect_to(await self.connection_pool.get_master_address())
  44. else:
  45. async for slave in self.connection_pool.rotate_slaves():
  46. try:
  47. return await self.connect_to(slave)
  48. except ConnectionError:
  49. continue
  50. raise SlaveNotFoundError # Never be here
  51. async def connect(self):
  52. return await self.retry.call_with_retry(
  53. self._connect_retry,
  54. lambda error: asyncio.sleep(0),
  55. )
  56. async def read_response(
  57. self,
  58. disable_decoding: bool = False,
  59. timeout: Optional[float] = None,
  60. *,
  61. disconnect_on_error: Optional[float] = True,
  62. push_request: Optional[bool] = False,
  63. ):
  64. try:
  65. return await super().read_response(
  66. disable_decoding=disable_decoding,
  67. timeout=timeout,
  68. disconnect_on_error=disconnect_on_error,
  69. push_request=push_request,
  70. )
  71. except ReadOnlyError:
  72. if self.connection_pool.is_master:
  73. # When talking to a master, a ReadOnlyError when likely
  74. # indicates that the previous master that we're still connected
  75. # to has been demoted to a slave and there's a new master.
  76. # calling disconnect will force the connection to re-query
  77. # sentinel during the next connect() attempt.
  78. await self.disconnect()
  79. raise ConnectionError("The previous master is now a slave")
  80. raise
  81. class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
  82. pass
  83. class SentinelConnectionPool(ConnectionPool):
  84. """
  85. Sentinel backed connection pool.
  86. If ``check_connection`` flag is set to True, SentinelManagedConnection
  87. sends a PING command right after establishing the connection.
  88. """
  89. def __init__(self, service_name, sentinel_manager, **kwargs):
  90. kwargs["connection_class"] = kwargs.get(
  91. "connection_class",
  92. (
  93. SentinelManagedSSLConnection
  94. if kwargs.pop("ssl", False)
  95. else SentinelManagedConnection
  96. ),
  97. )
  98. self.is_master = kwargs.pop("is_master", True)
  99. self.check_connection = kwargs.pop("check_connection", False)
  100. super().__init__(**kwargs)
  101. self.connection_kwargs["connection_pool"] = weakref.proxy(self)
  102. self.service_name = service_name
  103. self.sentinel_manager = sentinel_manager
  104. self.master_address = None
  105. self.slave_rr_counter = None
  106. def __repr__(self):
  107. return (
  108. f"<{self.__class__.__module__}.{self.__class__.__name__}"
  109. f"(service={self.service_name}({self.is_master and 'master' or 'slave'}))>"
  110. )
  111. def reset(self):
  112. super().reset()
  113. self.master_address = None
  114. self.slave_rr_counter = None
  115. def owns_connection(self, connection: Connection):
  116. check = not self.is_master or (
  117. self.is_master and self.master_address == (connection.host, connection.port)
  118. )
  119. return check and super().owns_connection(connection)
  120. async def get_master_address(self):
  121. master_address = await self.sentinel_manager.discover_master(self.service_name)
  122. if self.is_master:
  123. if self.master_address != master_address:
  124. self.master_address = master_address
  125. # disconnect any idle connections so that they reconnect
  126. # to the new master the next time that they are used.
  127. await self.disconnect(inuse_connections=False)
  128. return master_address
  129. async def rotate_slaves(self) -> AsyncIterator:
  130. """Round-robin slave balancer"""
  131. slaves = await self.sentinel_manager.discover_slaves(self.service_name)
  132. if slaves:
  133. if self.slave_rr_counter is None:
  134. self.slave_rr_counter = random.randint(0, len(slaves) - 1)
  135. for _ in range(len(slaves)):
  136. self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
  137. slave = slaves[self.slave_rr_counter]
  138. yield slave
  139. # Fallback to the master connection
  140. try:
  141. yield await self.get_master_address()
  142. except MasterNotFoundError:
  143. pass
  144. raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
  145. class Sentinel(AsyncSentinelCommands):
  146. """
  147. Redis Sentinel cluster client
  148. >>> from redis.sentinel import Sentinel
  149. >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
  150. >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
  151. >>> await master.set('foo', 'bar')
  152. >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
  153. >>> await slave.get('foo')
  154. b'bar'
  155. ``sentinels`` is a list of sentinel nodes. Each node is represented by
  156. a pair (hostname, port).
  157. ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
  158. When querying a sentinel, if it doesn't meet this threshold, responses
  159. from that sentinel won't be considered valid.
  160. ``sentinel_kwargs`` is a dictionary of connection arguments used when
  161. connecting to sentinel instances. Any argument that can be passed to
  162. a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
  163. not specified, any socket_timeout and socket_keepalive options specified
  164. in ``connection_kwargs`` will be used.
  165. ``connection_kwargs`` are keyword arguments that will be used when
  166. establishing a connection to a Redis server.
  167. """
  168. def __init__(
  169. self,
  170. sentinels,
  171. min_other_sentinels=0,
  172. sentinel_kwargs=None,
  173. force_master_ip=None,
  174. **connection_kwargs,
  175. ):
  176. # if sentinel_kwargs isn't defined, use the socket_* options from
  177. # connection_kwargs
  178. if sentinel_kwargs is None:
  179. sentinel_kwargs = {
  180. k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
  181. }
  182. self.sentinel_kwargs = sentinel_kwargs
  183. self.sentinels = [
  184. Redis(host=hostname, port=port, **self.sentinel_kwargs)
  185. for hostname, port in sentinels
  186. ]
  187. self.min_other_sentinels = min_other_sentinels
  188. self.connection_kwargs = connection_kwargs
  189. self._force_master_ip = force_master_ip
  190. async def execute_command(self, *args, **kwargs):
  191. """
  192. Execute Sentinel command in sentinel nodes.
  193. once - If set to True, then execute the resulting command on a single
  194. node at random, rather than across the entire sentinel cluster.
  195. """
  196. once = bool(kwargs.pop("once", False))
  197. # Check if command is supposed to return the original
  198. # responses instead of boolean value.
  199. return_responses = bool(kwargs.pop("return_responses", False))
  200. if once:
  201. response = await random.choice(self.sentinels).execute_command(
  202. *args, **kwargs
  203. )
  204. if return_responses:
  205. return [response]
  206. else:
  207. return True if response else False
  208. tasks = [
  209. asyncio.Task(sentinel.execute_command(*args, **kwargs))
  210. for sentinel in self.sentinels
  211. ]
  212. responses = await asyncio.gather(*tasks)
  213. if return_responses:
  214. return responses
  215. return all(responses)
  216. def __repr__(self):
  217. sentinel_addresses = []
  218. for sentinel in self.sentinels:
  219. sentinel_addresses.append(
  220. f"{sentinel.connection_pool.connection_kwargs['host']}:"
  221. f"{sentinel.connection_pool.connection_kwargs['port']}"
  222. )
  223. return (
  224. f"<{self.__class__}.{self.__class__.__name__}"
  225. f"(sentinels=[{','.join(sentinel_addresses)}])>"
  226. )
  227. def check_master_state(self, state: dict, service_name: str) -> bool:
  228. if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
  229. return False
  230. # Check if our sentinel doesn't see other nodes
  231. if state["num-other-sentinels"] < self.min_other_sentinels:
  232. return False
  233. return True
  234. async def discover_master(self, service_name: str):
  235. """
  236. Asks sentinel servers for the Redis master's address corresponding
  237. to the service labeled ``service_name``.
  238. Returns a pair (address, port) or raises MasterNotFoundError if no
  239. master is found.
  240. """
  241. collected_errors = list()
  242. for sentinel_no, sentinel in enumerate(self.sentinels):
  243. try:
  244. masters = await sentinel.sentinel_masters()
  245. except (ConnectionError, TimeoutError) as e:
  246. collected_errors.append(f"{sentinel} - {e!r}")
  247. continue
  248. state = masters.get(service_name)
  249. if state and self.check_master_state(state, service_name):
  250. # Put this sentinel at the top of the list
  251. self.sentinels[0], self.sentinels[sentinel_no] = (
  252. sentinel,
  253. self.sentinels[0],
  254. )
  255. ip = (
  256. self._force_master_ip
  257. if self._force_master_ip is not None
  258. else state["ip"]
  259. )
  260. return ip, state["port"]
  261. error_info = ""
  262. if len(collected_errors) > 0:
  263. error_info = f" : {', '.join(collected_errors)}"
  264. raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
  265. def filter_slaves(
  266. self, slaves: Iterable[Mapping]
  267. ) -> Sequence[Tuple[EncodableT, EncodableT]]:
  268. """Remove slaves that are in an ODOWN or SDOWN state"""
  269. slaves_alive = []
  270. for slave in slaves:
  271. if slave["is_odown"] or slave["is_sdown"]:
  272. continue
  273. slaves_alive.append((slave["ip"], slave["port"]))
  274. return slaves_alive
  275. async def discover_slaves(
  276. self, service_name: str
  277. ) -> Sequence[Tuple[EncodableT, EncodableT]]:
  278. """Returns a list of alive slaves for service ``service_name``"""
  279. for sentinel in self.sentinels:
  280. try:
  281. slaves = await sentinel.sentinel_slaves(service_name)
  282. except (ConnectionError, ResponseError, TimeoutError):
  283. continue
  284. slaves = self.filter_slaves(slaves)
  285. if slaves:
  286. return slaves
  287. return []
  288. def master_for(
  289. self,
  290. service_name: str,
  291. redis_class: Type[Redis] = Redis,
  292. connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
  293. **kwargs,
  294. ):
  295. """
  296. Returns a redis client instance for the ``service_name`` master.
  297. Sentinel client will detect failover and reconnect Redis clients
  298. automatically.
  299. A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
  300. used to retrieve the master's address before establishing a new
  301. connection.
  302. NOTE: If the master's address has changed, any cached connections to
  303. the old master are closed.
  304. By default clients will be a :py:class:`~redis.Redis` instance.
  305. Specify a different class to the ``redis_class`` argument if you
  306. desire something different.
  307. The ``connection_pool_class`` specifies the connection pool to
  308. use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
  309. will be used by default.
  310. All other keyword arguments are merged with any connection_kwargs
  311. passed to this class and passed to the connection pool as keyword
  312. arguments to be used to initialize Redis connections.
  313. """
  314. kwargs["is_master"] = True
  315. connection_kwargs = dict(self.connection_kwargs)
  316. connection_kwargs.update(kwargs)
  317. connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
  318. # The Redis object "owns" the pool
  319. return redis_class.from_pool(connection_pool)
  320. def slave_for(
  321. self,
  322. service_name: str,
  323. redis_class: Type[Redis] = Redis,
  324. connection_pool_class: Type[SentinelConnectionPool] = SentinelConnectionPool,
  325. **kwargs,
  326. ):
  327. """
  328. Returns redis client instance for the ``service_name`` slave(s).
  329. A SentinelConnectionPool class is used to retrieve the slave's
  330. address before establishing a new connection.
  331. By default clients will be a :py:class:`~redis.Redis` instance.
  332. Specify a different class to the ``redis_class`` argument if you
  333. desire something different.
  334. The ``connection_pool_class`` specifies the connection pool to use.
  335. The SentinelConnectionPool will be used by default.
  336. All other keyword arguments are merged with any connection_kwargs
  337. passed to this class and passed to the connection pool as keyword
  338. arguments to be used to initialize Redis connections.
  339. """
  340. kwargs["is_master"] = False
  341. connection_kwargs = dict(self.connection_kwargs)
  342. connection_kwargs.update(kwargs)
  343. connection_pool = connection_pool_class(service_name, self, **connection_kwargs)
  344. # The Redis object "owns" the pool
  345. return redis_class.from_pool(connection_pool)