sentinel.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. import random
  2. import weakref
  3. from typing import Optional
  4. from redis.client import Redis
  5. from redis.commands import SentinelCommands
  6. from redis.connection import Connection, ConnectionPool, SSLConnection
  7. from redis.exceptions import (
  8. ConnectionError,
  9. ReadOnlyError,
  10. ResponseError,
  11. TimeoutError,
  12. )
  13. class MasterNotFoundError(ConnectionError):
  14. pass
  15. class SlaveNotFoundError(ConnectionError):
  16. pass
  17. class SentinelManagedConnection(Connection):
  18. def __init__(self, **kwargs):
  19. self.connection_pool = kwargs.pop("connection_pool")
  20. super().__init__(**kwargs)
  21. def __repr__(self):
  22. pool = self.connection_pool
  23. s = (
  24. f"<{type(self).__module__}.{type(self).__name__}"
  25. f"(service={pool.service_name}%s)>"
  26. )
  27. if self.host:
  28. host_info = f",host={self.host},port={self.port}"
  29. s = s % host_info
  30. return s
  31. def connect_to(self, address):
  32. self.host, self.port = address
  33. self.connect_check_health(
  34. check_health=self.connection_pool.check_connection,
  35. retry_socket_connect=False,
  36. )
  37. def _connect_retry(self):
  38. if self._sock:
  39. return # already connected
  40. if self.connection_pool.is_master:
  41. self.connect_to(self.connection_pool.get_master_address())
  42. else:
  43. for slave in self.connection_pool.rotate_slaves():
  44. try:
  45. return self.connect_to(slave)
  46. except ConnectionError:
  47. continue
  48. raise SlaveNotFoundError # Never be here
  49. def connect(self):
  50. return self.retry.call_with_retry(self._connect_retry, lambda error: None)
  51. def read_response(
  52. self,
  53. disable_decoding=False,
  54. *,
  55. disconnect_on_error: Optional[bool] = False,
  56. push_request: Optional[bool] = False,
  57. ):
  58. try:
  59. return super().read_response(
  60. disable_decoding=disable_decoding,
  61. disconnect_on_error=disconnect_on_error,
  62. push_request=push_request,
  63. )
  64. except ReadOnlyError:
  65. if self.connection_pool.is_master:
  66. # When talking to a master, a ReadOnlyError when likely
  67. # indicates that the previous master that we're still connected
  68. # to has been demoted to a slave and there's a new master.
  69. # calling disconnect will force the connection to re-query
  70. # sentinel during the next connect() attempt.
  71. self.disconnect()
  72. raise ConnectionError("The previous master is now a slave")
  73. raise
  74. class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection):
  75. pass
  76. class SentinelConnectionPoolProxy:
  77. def __init__(
  78. self,
  79. connection_pool,
  80. is_master,
  81. check_connection,
  82. service_name,
  83. sentinel_manager,
  84. ):
  85. self.connection_pool_ref = weakref.ref(connection_pool)
  86. self.is_master = is_master
  87. self.check_connection = check_connection
  88. self.service_name = service_name
  89. self.sentinel_manager = sentinel_manager
  90. self.reset()
  91. def reset(self):
  92. self.master_address = None
  93. self.slave_rr_counter = None
  94. def get_master_address(self):
  95. master_address = self.sentinel_manager.discover_master(self.service_name)
  96. if self.is_master and self.master_address != master_address:
  97. self.master_address = master_address
  98. # disconnect any idle connections so that they reconnect
  99. # to the new master the next time that they are used.
  100. connection_pool = self.connection_pool_ref()
  101. if connection_pool is not None:
  102. connection_pool.disconnect(inuse_connections=False)
  103. return master_address
  104. def rotate_slaves(self):
  105. slaves = self.sentinel_manager.discover_slaves(self.service_name)
  106. if slaves:
  107. if self.slave_rr_counter is None:
  108. self.slave_rr_counter = random.randint(0, len(slaves) - 1)
  109. for _ in range(len(slaves)):
  110. self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves)
  111. slave = slaves[self.slave_rr_counter]
  112. yield slave
  113. # Fallback to the master connection
  114. try:
  115. yield self.get_master_address()
  116. except MasterNotFoundError:
  117. pass
  118. raise SlaveNotFoundError(f"No slave found for {self.service_name!r}")
  119. class SentinelConnectionPool(ConnectionPool):
  120. """
  121. Sentinel backed connection pool.
  122. If ``check_connection`` flag is set to True, SentinelManagedConnection
  123. sends a PING command right after establishing the connection.
  124. """
  125. def __init__(self, service_name, sentinel_manager, **kwargs):
  126. kwargs["connection_class"] = kwargs.get(
  127. "connection_class",
  128. (
  129. SentinelManagedSSLConnection
  130. if kwargs.pop("ssl", False)
  131. else SentinelManagedConnection
  132. ),
  133. )
  134. self.is_master = kwargs.pop("is_master", True)
  135. self.check_connection = kwargs.pop("check_connection", False)
  136. self.proxy = SentinelConnectionPoolProxy(
  137. connection_pool=self,
  138. is_master=self.is_master,
  139. check_connection=self.check_connection,
  140. service_name=service_name,
  141. sentinel_manager=sentinel_manager,
  142. )
  143. super().__init__(**kwargs)
  144. self.connection_kwargs["connection_pool"] = self.proxy
  145. self.service_name = service_name
  146. self.sentinel_manager = sentinel_manager
  147. def __repr__(self):
  148. role = "master" if self.is_master else "slave"
  149. return (
  150. f"<{type(self).__module__}.{type(self).__name__}"
  151. f"(service={self.service_name}({role}))>"
  152. )
  153. def reset(self):
  154. super().reset()
  155. self.proxy.reset()
  156. @property
  157. def master_address(self):
  158. return self.proxy.master_address
  159. def owns_connection(self, connection):
  160. check = not self.is_master or (
  161. self.is_master and self.master_address == (connection.host, connection.port)
  162. )
  163. parent = super()
  164. return check and parent.owns_connection(connection)
  165. def get_master_address(self):
  166. return self.proxy.get_master_address()
  167. def rotate_slaves(self):
  168. "Round-robin slave balancer"
  169. return self.proxy.rotate_slaves()
  170. class Sentinel(SentinelCommands):
  171. """
  172. Redis Sentinel cluster client
  173. >>> from redis.sentinel import Sentinel
  174. >>> sentinel = Sentinel([('localhost', 26379)], socket_timeout=0.1)
  175. >>> master = sentinel.master_for('mymaster', socket_timeout=0.1)
  176. >>> master.set('foo', 'bar')
  177. >>> slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
  178. >>> slave.get('foo')
  179. b'bar'
  180. ``sentinels`` is a list of sentinel nodes. Each node is represented by
  181. a pair (hostname, port).
  182. ``min_other_sentinels`` defined a minimum number of peers for a sentinel.
  183. When querying a sentinel, if it doesn't meet this threshold, responses
  184. from that sentinel won't be considered valid.
  185. ``sentinel_kwargs`` is a dictionary of connection arguments used when
  186. connecting to sentinel instances. Any argument that can be passed to
  187. a normal Redis connection can be specified here. If ``sentinel_kwargs`` is
  188. not specified, any socket_timeout and socket_keepalive options specified
  189. in ``connection_kwargs`` will be used.
  190. ``connection_kwargs`` are keyword arguments that will be used when
  191. establishing a connection to a Redis server.
  192. """
  193. def __init__(
  194. self,
  195. sentinels,
  196. min_other_sentinels=0,
  197. sentinel_kwargs=None,
  198. force_master_ip=None,
  199. **connection_kwargs,
  200. ):
  201. # if sentinel_kwargs isn't defined, use the socket_* options from
  202. # connection_kwargs
  203. if sentinel_kwargs is None:
  204. sentinel_kwargs = {
  205. k: v for k, v in connection_kwargs.items() if k.startswith("socket_")
  206. }
  207. self.sentinel_kwargs = sentinel_kwargs
  208. self.sentinels = [
  209. Redis(hostname, port, **self.sentinel_kwargs)
  210. for hostname, port in sentinels
  211. ]
  212. self.min_other_sentinels = min_other_sentinels
  213. self.connection_kwargs = connection_kwargs
  214. self._force_master_ip = force_master_ip
  215. def execute_command(self, *args, **kwargs):
  216. """
  217. Execute Sentinel command in sentinel nodes.
  218. once - If set to True, then execute the resulting command on a single
  219. node at random, rather than across the entire sentinel cluster.
  220. """
  221. once = bool(kwargs.pop("once", False))
  222. # Check if command is supposed to return the original
  223. # responses instead of boolean value.
  224. return_responses = bool(kwargs.pop("return_responses", False))
  225. if once:
  226. response = random.choice(self.sentinels).execute_command(*args, **kwargs)
  227. if return_responses:
  228. return [response]
  229. else:
  230. return True if response else False
  231. responses = []
  232. for sentinel in self.sentinels:
  233. responses.append(sentinel.execute_command(*args, **kwargs))
  234. if return_responses:
  235. return responses
  236. return all(responses)
  237. def __repr__(self):
  238. sentinel_addresses = []
  239. for sentinel in self.sentinels:
  240. sentinel_addresses.append(
  241. "{host}:{port}".format_map(sentinel.connection_pool.connection_kwargs)
  242. )
  243. return (
  244. f"<{type(self).__module__}.{type(self).__name__}"
  245. f"(sentinels=[{','.join(sentinel_addresses)}])>"
  246. )
  247. def check_master_state(self, state, service_name):
  248. if not state["is_master"] or state["is_sdown"] or state["is_odown"]:
  249. return False
  250. # Check if our sentinel doesn't see other nodes
  251. if state["num-other-sentinels"] < self.min_other_sentinels:
  252. return False
  253. return True
  254. def discover_master(self, service_name):
  255. """
  256. Asks sentinel servers for the Redis master's address corresponding
  257. to the service labeled ``service_name``.
  258. Returns a pair (address, port) or raises MasterNotFoundError if no
  259. master is found.
  260. """
  261. collected_errors = list()
  262. for sentinel_no, sentinel in enumerate(self.sentinels):
  263. try:
  264. masters = sentinel.sentinel_masters()
  265. except (ConnectionError, TimeoutError) as e:
  266. collected_errors.append(f"{sentinel} - {e!r}")
  267. continue
  268. state = masters.get(service_name)
  269. if state and self.check_master_state(state, service_name):
  270. # Put this sentinel at the top of the list
  271. self.sentinels[0], self.sentinels[sentinel_no] = (
  272. sentinel,
  273. self.sentinels[0],
  274. )
  275. ip = (
  276. self._force_master_ip
  277. if self._force_master_ip is not None
  278. else state["ip"]
  279. )
  280. return ip, state["port"]
  281. error_info = ""
  282. if len(collected_errors) > 0:
  283. error_info = f" : {', '.join(collected_errors)}"
  284. raise MasterNotFoundError(f"No master found for {service_name!r}{error_info}")
  285. def filter_slaves(self, slaves):
  286. "Remove slaves that are in an ODOWN or SDOWN state"
  287. slaves_alive = []
  288. for slave in slaves:
  289. if slave["is_odown"] or slave["is_sdown"]:
  290. continue
  291. slaves_alive.append((slave["ip"], slave["port"]))
  292. return slaves_alive
  293. def discover_slaves(self, service_name):
  294. "Returns a list of alive slaves for service ``service_name``"
  295. for sentinel in self.sentinels:
  296. try:
  297. slaves = sentinel.sentinel_slaves(service_name)
  298. except (ConnectionError, ResponseError, TimeoutError):
  299. continue
  300. slaves = self.filter_slaves(slaves)
  301. if slaves:
  302. return slaves
  303. return []
  304. def master_for(
  305. self,
  306. service_name,
  307. redis_class=Redis,
  308. connection_pool_class=SentinelConnectionPool,
  309. **kwargs,
  310. ):
  311. """
  312. Returns a redis client instance for the ``service_name`` master.
  313. Sentinel client will detect failover and reconnect Redis clients
  314. automatically.
  315. A :py:class:`~redis.sentinel.SentinelConnectionPool` class is
  316. used to retrieve the master's address before establishing a new
  317. connection.
  318. NOTE: If the master's address has changed, any cached connections to
  319. the old master are closed.
  320. By default clients will be a :py:class:`~redis.Redis` instance.
  321. Specify a different class to the ``redis_class`` argument if you
  322. desire something different.
  323. The ``connection_pool_class`` specifies the connection pool to
  324. use. The :py:class:`~redis.sentinel.SentinelConnectionPool`
  325. will be used by default.
  326. All other keyword arguments are merged with any connection_kwargs
  327. passed to this class and passed to the connection pool as keyword
  328. arguments to be used to initialize Redis connections.
  329. """
  330. kwargs["is_master"] = True
  331. connection_kwargs = dict(self.connection_kwargs)
  332. connection_kwargs.update(kwargs)
  333. return redis_class.from_pool(
  334. connection_pool_class(service_name, self, **connection_kwargs)
  335. )
  336. def slave_for(
  337. self,
  338. service_name,
  339. redis_class=Redis,
  340. connection_pool_class=SentinelConnectionPool,
  341. **kwargs,
  342. ):
  343. """
  344. Returns redis client instance for the ``service_name`` slave(s).
  345. A SentinelConnectionPool class is used to retrieve the slave's
  346. address before establishing a new connection.
  347. By default clients will be a :py:class:`~redis.Redis` instance.
  348. Specify a different class to the ``redis_class`` argument if you
  349. desire something different.
  350. The ``connection_pool_class`` specifies the connection pool to use.
  351. The SentinelConnectionPool will be used by default.
  352. All other keyword arguments are merged with any connection_kwargs
  353. passed to this class and passed to the connection pool as keyword
  354. arguments to be used to initialize Redis connections.
  355. """
  356. kwargs["is_master"] = False
  357. connection_kwargs = dict(self.connection_kwargs)
  358. connection_kwargs.update(kwargs)
  359. return redis_class.from_pool(
  360. connection_pool_class(service_name, self, **connection_kwargs)
  361. )