cluster.py 112 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957
  1. import asyncio
  2. import collections
  3. import random
  4. import socket
  5. import threading
  6. import time
  7. import warnings
  8. from abc import ABC, abstractmethod
  9. from copy import copy
  10. from itertools import chain
  11. from typing import (
  12. Any,
  13. Callable,
  14. Coroutine,
  15. Deque,
  16. Dict,
  17. Generator,
  18. List,
  19. Mapping,
  20. Optional,
  21. Set,
  22. Tuple,
  23. Type,
  24. TypeVar,
  25. Union,
  26. )
  27. from redis._parsers import AsyncCommandsParser, Encoder
  28. from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
  29. from redis._parsers.helpers import (
  30. _RedisCallbacks,
  31. _RedisCallbacksRESP2,
  32. _RedisCallbacksRESP3,
  33. )
  34. from redis.asyncio.client import ResponseCallbackT
  35. from redis.asyncio.connection import Connection, SSLConnection, parse_url
  36. from redis.asyncio.lock import Lock
  37. from redis.asyncio.observability.recorder import (
  38. record_error_count,
  39. record_operation_duration,
  40. )
  41. from redis.asyncio.retry import Retry
  42. from redis.auth.token import TokenInterface
  43. from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
  44. from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
  45. from redis.cluster import (
  46. PIPELINE_BLOCKED_COMMANDS,
  47. PRIMARY,
  48. REPLICA,
  49. SLOT_ID,
  50. AbstractRedisCluster,
  51. LoadBalancer,
  52. LoadBalancingStrategy,
  53. block_pipeline_command,
  54. get_node_name,
  55. parse_cluster_slots,
  56. )
  57. from redis.commands import READ_COMMANDS, AsyncRedisClusterCommands
  58. from redis.commands.policies import AsyncPolicyResolver, AsyncStaticPolicyResolver
  59. from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
  60. from redis.credentials import CredentialProvider
  61. from redis.event import AfterAsyncClusterInstantiationEvent, EventDispatcher
  62. from redis.exceptions import (
  63. AskError,
  64. BusyLoadingError,
  65. ClusterDownError,
  66. ClusterError,
  67. ConnectionError,
  68. CrossSlotTransactionError,
  69. DataError,
  70. ExecAbortError,
  71. InvalidPipelineStack,
  72. MaxConnectionsError,
  73. MovedError,
  74. RedisClusterException,
  75. RedisError,
  76. ResponseError,
  77. SlotNotCoveredError,
  78. TimeoutError,
  79. TryAgainError,
  80. WatchError,
  81. )
  82. from redis.typing import AnyKeyT, EncodableT, KeyT
  83. from redis.utils import (
  84. SSL_AVAILABLE,
  85. deprecated_args,
  86. deprecated_function,
  87. get_lib_version,
  88. safe_str,
  89. str_if_bytes,
  90. truncate_text,
  91. )
  92. if SSL_AVAILABLE:
  93. from ssl import TLSVersion, VerifyFlags, VerifyMode
  94. else:
  95. TLSVersion = None
  96. VerifyMode = None
  97. VerifyFlags = None
  98. TargetNodesT = TypeVar(
  99. "TargetNodesT", str, "ClusterNode", List["ClusterNode"], Dict[Any, "ClusterNode"]
  100. )
  101. class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands):
  102. """
  103. Create a new RedisCluster client.
  104. Pass one of parameters:
  105. - `host` & `port`
  106. - `startup_nodes`
  107. | Use ``await`` :meth:`initialize` to find cluster nodes & create connections.
  108. | Use ``await`` :meth:`close` to disconnect connections & close client.
  109. Many commands support the target_nodes kwarg. It can be one of the
  110. :attr:`NODE_FLAGS`:
  111. - :attr:`PRIMARIES`
  112. - :attr:`REPLICAS`
  113. - :attr:`ALL_NODES`
  114. - :attr:`RANDOM`
  115. - :attr:`DEFAULT_NODE`
  116. Note: This client is not thread/process/fork safe.
  117. :param host:
  118. | Can be used to point to a startup node
  119. :param port:
  120. | Port used if **host** is provided
  121. :param startup_nodes:
  122. | :class:`~.ClusterNode` to used as a startup node
  123. :param require_full_coverage:
  124. | When set to ``False``: the client will not require a full coverage of
  125. the slots. However, if not all slots are covered, and at least one node
  126. has ``cluster-require-full-coverage`` set to ``yes``, the server will throw
  127. a :class:`~.ClusterDownError` for some key-based commands.
  128. | When set to ``True``: all slots must be covered to construct the cluster
  129. client. If not all slots are covered, :class:`~.RedisClusterException` will be
  130. thrown.
  131. | See:
  132. https://redis.io/docs/manual/scaling/#redis-cluster-configuration-parameters
  133. :param read_from_replicas:
  134. | @deprecated - please use load_balancing_strategy instead
  135. | Enable read from replicas in READONLY mode.
  136. When set to true, read commands will be assigned between the primary and
  137. its replications in a Round-Robin manner.
  138. The data read from replicas is eventually consistent with the data in primary nodes.
  139. :param load_balancing_strategy:
  140. | Enable read from replicas in READONLY mode and defines the load balancing
  141. strategy that will be used for cluster node selection.
  142. The data read from replicas is eventually consistent with the data in primary nodes.
  143. :param dynamic_startup_nodes:
  144. | Set the RedisCluster's startup nodes to all the discovered nodes.
  145. If true (default value), the cluster's discovered nodes will be used to
  146. determine the cluster nodes-slots mapping in the next topology refresh.
  147. It will remove the initial passed startup nodes if their endpoints aren't
  148. listed in the CLUSTER SLOTS output.
  149. If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
  150. specific IP addresses, it is best to set it to false.
  151. :param reinitialize_steps:
  152. | Specifies the number of MOVED errors that need to occur before reinitializing
  153. the whole cluster topology. If a MOVED error occurs and the cluster does not
  154. need to be reinitialized on this current error handling, only the MOVED slot
  155. will be patched with the redirected node.
  156. To reinitialize the cluster on every MOVED error, set reinitialize_steps to 1.
  157. To avoid reinitializing the cluster on moved errors, set reinitialize_steps to
  158. 0.
  159. :param cluster_error_retry_attempts:
  160. | @deprecated - Please configure the 'retry' object instead
  161. In case 'retry' object is set - this argument is ignored!
  162. Number of times to retry before raising an error when :class:`~.TimeoutError`,
  163. :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError`
  164. or :class:`~.ClusterDownError` are encountered
  165. :param retry:
  166. | A retry object that defines the retry strategy and the number of
  167. retries for the cluster client.
  168. In current implementation for the cluster client (starting form redis-py version 6.0.0)
  169. the retry object is not yet fully utilized, instead it is used just to determine
  170. the number of retries for the cluster client.
  171. In the future releases the retry object will be used to handle the cluster client retries!
  172. :param max_connections:
  173. | Maximum number of connections per node. If there are no free connections & the
  174. maximum number of connections are already created, a
  175. :class:`~.MaxConnectionsError` is raised.
  176. :param address_remap:
  177. | An optional callable which, when provided with an internal network
  178. address of a node, e.g. a `(host, port)` tuple, will return the address
  179. where the node is reachable. This can be used to map the addresses at
  180. which the nodes _think_ they are, to addresses at which a client may
  181. reach them, such as when they sit behind a proxy.
  182. | Rest of the arguments will be passed to the
  183. :class:`~redis.asyncio.connection.Connection` instances when created
  184. :raises RedisClusterException:
  185. if any arguments are invalid or unknown. Eg:
  186. - `db` != 0 or None
  187. - `path` argument for unix socket connection
  188. - none of the `host`/`port` & `startup_nodes` were provided
  189. """
  190. @classmethod
  191. def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
  192. """
  193. Return a Redis client object configured from the given URL.
  194. For example::
  195. redis://[[username]:[password]]@localhost:6379/0
  196. rediss://[[username]:[password]]@localhost:6379/0
  197. Three URL schemes are supported:
  198. - `redis://` creates a TCP socket connection. See more at:
  199. <https://www.iana.org/assignments/uri-schemes/prov/redis>
  200. - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
  201. <https://www.iana.org/assignments/uri-schemes/prov/rediss>
  202. The username, password, hostname, path and all querystring values are passed
  203. through ``urllib.parse.unquote`` in order to replace any percent-encoded values
  204. with their corresponding characters.
  205. All querystring options are cast to their appropriate Python types. Boolean
  206. arguments can be specified with string values "True"/"False" or "Yes"/"No".
  207. Values that cannot be properly cast cause a ``ValueError`` to be raised. Once
  208. parsed, the querystring arguments and keyword arguments are passed to
  209. :class:`~redis.asyncio.connection.Connection` when created.
  210. In the case of conflicting arguments, querystring arguments are used.
  211. """
  212. kwargs.update(parse_url(url))
  213. if kwargs.pop("connection_class", None) is SSLConnection:
  214. kwargs["ssl"] = True
  215. return cls(**kwargs)
  216. __slots__ = (
  217. "_initialize",
  218. "_lock",
  219. "retry",
  220. "command_flags",
  221. "commands_parser",
  222. "connection_kwargs",
  223. "encoder",
  224. "node_flags",
  225. "nodes_manager",
  226. "read_from_replicas",
  227. "reinitialize_counter",
  228. "reinitialize_steps",
  229. "response_callbacks",
  230. "result_callbacks",
  231. )
  232. @deprecated_args(
  233. args_to_warn=["read_from_replicas"],
  234. reason="Please configure the 'load_balancing_strategy' instead",
  235. version="5.3.0",
  236. )
  237. @deprecated_args(
  238. args_to_warn=[
  239. "cluster_error_retry_attempts",
  240. ],
  241. reason="Please configure the 'retry' object instead",
  242. version="6.0.0",
  243. )
  244. def __init__(
  245. self,
  246. host: Optional[str] = None,
  247. port: Union[str, int] = 6379,
  248. # Cluster related kwargs
  249. startup_nodes: Optional[List["ClusterNode"]] = None,
  250. require_full_coverage: bool = True,
  251. read_from_replicas: bool = False,
  252. load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
  253. dynamic_startup_nodes: bool = True,
  254. reinitialize_steps: int = 5,
  255. cluster_error_retry_attempts: int = 3,
  256. max_connections: int = 2**31,
  257. retry: Optional["Retry"] = None,
  258. retry_on_error: Optional[List[Type[Exception]]] = None,
  259. # Client related kwargs
  260. db: Union[str, int] = 0,
  261. path: Optional[str] = None,
  262. credential_provider: Optional[CredentialProvider] = None,
  263. username: Optional[str] = None,
  264. password: Optional[str] = None,
  265. client_name: Optional[str] = None,
  266. lib_name: Optional[str] = "redis-py",
  267. lib_version: Optional[str] = get_lib_version(),
  268. # Encoding related kwargs
  269. encoding: str = "utf-8",
  270. encoding_errors: str = "strict",
  271. decode_responses: bool = False,
  272. # Connection related kwargs
  273. health_check_interval: float = 0,
  274. socket_connect_timeout: Optional[float] = None,
  275. socket_keepalive: bool = False,
  276. socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
  277. socket_timeout: Optional[float] = None,
  278. # SSL related kwargs
  279. ssl: bool = False,
  280. ssl_ca_certs: Optional[str] = None,
  281. ssl_ca_data: Optional[str] = None,
  282. ssl_cert_reqs: Union[str, VerifyMode] = "required",
  283. ssl_include_verify_flags: Optional[List[VerifyFlags]] = None,
  284. ssl_exclude_verify_flags: Optional[List[VerifyFlags]] = None,
  285. ssl_certfile: Optional[str] = None,
  286. ssl_check_hostname: bool = True,
  287. ssl_keyfile: Optional[str] = None,
  288. ssl_min_version: Optional[TLSVersion] = None,
  289. ssl_ciphers: Optional[str] = None,
  290. protocol: Optional[int] = 2,
  291. address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
  292. event_dispatcher: Optional[EventDispatcher] = None,
  293. policy_resolver: AsyncPolicyResolver = AsyncStaticPolicyResolver(),
  294. ) -> None:
  295. if db:
  296. raise RedisClusterException(
  297. "Argument 'db' must be 0 or None in cluster mode"
  298. )
  299. if path:
  300. raise RedisClusterException(
  301. "Unix domain socket is not supported in cluster mode"
  302. )
  303. if (not host or not port) and not startup_nodes:
  304. raise RedisClusterException(
  305. "RedisCluster requires at least one node to discover the cluster.\n"
  306. "Please provide one of the following or use RedisCluster.from_url:\n"
  307. ' - host and port: RedisCluster(host="localhost", port=6379)\n'
  308. " - startup_nodes: RedisCluster(startup_nodes=["
  309. 'ClusterNode("localhost", 6379), ClusterNode("localhost", 6380)])'
  310. )
  311. kwargs: Dict[str, Any] = {
  312. "max_connections": max_connections,
  313. "connection_class": Connection,
  314. # Client related kwargs
  315. "credential_provider": credential_provider,
  316. "username": username,
  317. "password": password,
  318. "client_name": client_name,
  319. "lib_name": lib_name,
  320. "lib_version": lib_version,
  321. # Encoding related kwargs
  322. "encoding": encoding,
  323. "encoding_errors": encoding_errors,
  324. "decode_responses": decode_responses,
  325. # Connection related kwargs
  326. "health_check_interval": health_check_interval,
  327. "socket_connect_timeout": socket_connect_timeout,
  328. "socket_keepalive": socket_keepalive,
  329. "socket_keepalive_options": socket_keepalive_options,
  330. "socket_timeout": socket_timeout,
  331. "protocol": protocol,
  332. }
  333. if ssl:
  334. # SSL related kwargs
  335. kwargs.update(
  336. {
  337. "connection_class": SSLConnection,
  338. "ssl_ca_certs": ssl_ca_certs,
  339. "ssl_ca_data": ssl_ca_data,
  340. "ssl_cert_reqs": ssl_cert_reqs,
  341. "ssl_include_verify_flags": ssl_include_verify_flags,
  342. "ssl_exclude_verify_flags": ssl_exclude_verify_flags,
  343. "ssl_certfile": ssl_certfile,
  344. "ssl_check_hostname": ssl_check_hostname,
  345. "ssl_keyfile": ssl_keyfile,
  346. "ssl_min_version": ssl_min_version,
  347. "ssl_ciphers": ssl_ciphers,
  348. }
  349. )
  350. if read_from_replicas or load_balancing_strategy:
  351. # Call our on_connect function to configure READONLY mode
  352. kwargs["redis_connect_func"] = self.on_connect
  353. if retry:
  354. self.retry = retry
  355. else:
  356. self.retry = Retry(
  357. backoff=ExponentialWithJitterBackoff(base=1, cap=10),
  358. retries=cluster_error_retry_attempts,
  359. )
  360. if retry_on_error:
  361. self.retry.update_supported_errors(retry_on_error)
  362. kwargs["response_callbacks"] = _RedisCallbacks.copy()
  363. if kwargs.get("protocol") in ["3", 3]:
  364. kwargs["response_callbacks"].update(_RedisCallbacksRESP3)
  365. else:
  366. kwargs["response_callbacks"].update(_RedisCallbacksRESP2)
  367. self.connection_kwargs = kwargs
  368. if startup_nodes:
  369. passed_nodes = []
  370. for node in startup_nodes:
  371. passed_nodes.append(
  372. ClusterNode(node.host, node.port, **self.connection_kwargs)
  373. )
  374. startup_nodes = passed_nodes
  375. else:
  376. startup_nodes = []
  377. if host and port:
  378. startup_nodes.append(ClusterNode(host, port, **self.connection_kwargs))
  379. if event_dispatcher is None:
  380. self._event_dispatcher = EventDispatcher()
  381. else:
  382. self._event_dispatcher = event_dispatcher
  383. self.startup_nodes = startup_nodes
  384. self.nodes_manager = NodesManager(
  385. startup_nodes,
  386. require_full_coverage,
  387. kwargs,
  388. dynamic_startup_nodes=dynamic_startup_nodes,
  389. address_remap=address_remap,
  390. event_dispatcher=self._event_dispatcher,
  391. )
  392. self.encoder = Encoder(encoding, encoding_errors, decode_responses)
  393. self.read_from_replicas = read_from_replicas
  394. self.load_balancing_strategy = load_balancing_strategy
  395. self.reinitialize_steps = reinitialize_steps
  396. self.reinitialize_counter = 0
  397. # For backward compatibility, mapping from existing policies to new one
  398. self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
  399. self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
  400. self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
  401. self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
  402. self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
  403. self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
  404. SLOT_ID: RequestPolicy.DEFAULT_KEYED,
  405. }
  406. self._policies_callback_mapping: dict[
  407. Union[RequestPolicy, ResponsePolicy], Callable
  408. ] = {
  409. RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
  410. self.get_random_primary_or_all_nodes(command_name)
  411. ],
  412. RequestPolicy.DEFAULT_KEYED: self.get_nodes_from_slot,
  413. RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
  414. RequestPolicy.ALL_SHARDS: self.get_primaries,
  415. RequestPolicy.ALL_NODES: self.get_nodes,
  416. RequestPolicy.ALL_REPLICAS: self.get_replicas,
  417. RequestPolicy.SPECIAL: self.get_special_nodes,
  418. ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
  419. ResponsePolicy.DEFAULT_KEYED: lambda res: res,
  420. }
  421. self._policy_resolver = policy_resolver
  422. self.commands_parser = AsyncCommandsParser()
  423. self._aggregate_nodes = None
  424. self.node_flags = self.__class__.NODE_FLAGS.copy()
  425. self.command_flags = self.__class__.COMMAND_FLAGS.copy()
  426. self.response_callbacks = kwargs["response_callbacks"]
  427. self.result_callbacks = self.__class__.RESULT_CALLBACKS.copy()
  428. self.result_callbacks["CLUSTER SLOTS"] = (
  429. lambda cmd, res, **kwargs: parse_cluster_slots(
  430. list(res.values())[0], **kwargs
  431. )
  432. )
  433. self._initialize = True
  434. self._lock: Optional[asyncio.Lock] = None
  435. # When used as an async context manager, we need to increment and decrement
  436. # a usage counter so that we can close the connection pool when no one is
  437. # using the client.
  438. self._usage_counter = 0
  439. self._usage_lock = asyncio.Lock()
  440. async def initialize(self) -> "RedisCluster":
  441. """Get all nodes from startup nodes & creates connections if not initialized."""
  442. if self._initialize:
  443. if not self._lock:
  444. self._lock = asyncio.Lock()
  445. async with self._lock:
  446. if self._initialize:
  447. try:
  448. await self.nodes_manager.initialize()
  449. await self.commands_parser.initialize(
  450. self.nodes_manager.default_node
  451. )
  452. self._initialize = False
  453. except BaseException:
  454. await self.nodes_manager.aclose()
  455. await self.nodes_manager.aclose("startup_nodes")
  456. raise
  457. return self
  458. async def aclose(self) -> None:
  459. """Close all connections & client if initialized."""
  460. if not self._initialize:
  461. if not self._lock:
  462. self._lock = asyncio.Lock()
  463. async with self._lock:
  464. if not self._initialize:
  465. self._initialize = True
  466. await self.nodes_manager.aclose()
  467. await self.nodes_manager.aclose("startup_nodes")
  468. @deprecated_function(version="5.0.0", reason="Use aclose() instead", name="close")
  469. async def close(self) -> None:
  470. """alias for aclose() for backwards compatibility"""
  471. await self.aclose()
  472. async def __aenter__(self) -> "RedisCluster":
  473. """
  474. Async context manager entry. Increments a usage counter so that the
  475. connection pool is only closed (via aclose()) when no context is using
  476. the client.
  477. """
  478. await self._increment_usage()
  479. try:
  480. # Initialize the client (i.e. establish connection, etc.)
  481. return await self.initialize()
  482. except Exception:
  483. # If initialization fails, decrement the counter to keep it in sync
  484. await self._decrement_usage()
  485. raise
  486. async def _increment_usage(self) -> int:
  487. """
  488. Helper coroutine to increment the usage counter while holding the lock.
  489. Returns the new value of the usage counter.
  490. """
  491. async with self._usage_lock:
  492. self._usage_counter += 1
  493. return self._usage_counter
  494. async def _decrement_usage(self) -> int:
  495. """
  496. Helper coroutine to decrement the usage counter while holding the lock.
  497. Returns the new value of the usage counter.
  498. """
  499. async with self._usage_lock:
  500. self._usage_counter -= 1
  501. return self._usage_counter
  502. async def __aexit__(self, exc_type, exc_value, traceback):
  503. """
  504. Async context manager exit. Decrements a usage counter. If this is the
  505. last exit (counter becomes zero), the client closes its connection pool.
  506. """
  507. current_usage = await asyncio.shield(self._decrement_usage())
  508. if current_usage == 0:
  509. # This was the last active context, so disconnect the pool.
  510. await asyncio.shield(self.aclose())
  511. def __await__(self) -> Generator[Any, None, "RedisCluster"]:
  512. return self.initialize().__await__()
  513. _DEL_MESSAGE = "Unclosed RedisCluster client"
  514. def __del__(
  515. self,
  516. _warn: Any = warnings.warn,
  517. _grl: Any = asyncio.get_running_loop,
  518. ) -> None:
  519. if hasattr(self, "_initialize") and not self._initialize:
  520. _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self)
  521. try:
  522. context = {"client": self, "message": self._DEL_MESSAGE}
  523. _grl().call_exception_handler(context)
  524. except RuntimeError:
  525. pass
  526. async def on_connect(self, connection: Connection) -> None:
  527. await connection.on_connect()
  528. # Sending READONLY command to server to configure connection as
  529. # readonly. Since each cluster node may change its server type due
  530. # to a failover, we should establish a READONLY connection
  531. # regardless of the server type. If this is a primary connection,
  532. # READONLY would not affect executing write commands.
  533. await connection.send_command("READONLY")
  534. if str_if_bytes(await connection.read_response()) != "OK":
  535. raise ConnectionError("READONLY command failed")
  536. def get_nodes(self) -> List["ClusterNode"]:
  537. """Get all nodes of the cluster."""
  538. return list(self.nodes_manager.nodes_cache.values())
  539. def get_primaries(self) -> List["ClusterNode"]:
  540. """Get the primary nodes of the cluster."""
  541. return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
  542. def get_replicas(self) -> List["ClusterNode"]:
  543. """Get the replica nodes of the cluster."""
  544. return self.nodes_manager.get_nodes_by_server_type(REPLICA)
  545. def get_random_node(self) -> "ClusterNode":
  546. """Get a random node of the cluster."""
  547. return random.choice(list(self.nodes_manager.nodes_cache.values()))
  548. def get_default_node(self) -> "ClusterNode":
  549. """Get the default node of the client."""
  550. return self.nodes_manager.default_node
  551. def set_default_node(self, node: "ClusterNode") -> None:
  552. """
  553. Set the default node of the client.
  554. :raises DataError: if None is passed or node does not exist in cluster.
  555. """
  556. if not node or not self.get_node(node_name=node.name):
  557. raise DataError("The requested node does not exist in the cluster.")
  558. self.nodes_manager.default_node = node
  559. def get_node(
  560. self,
  561. host: Optional[str] = None,
  562. port: Optional[int] = None,
  563. node_name: Optional[str] = None,
  564. ) -> Optional["ClusterNode"]:
  565. """Get node by (host, port) or node_name."""
  566. return self.nodes_manager.get_node(host, port, node_name)
  567. def get_node_from_key(
  568. self, key: str, replica: bool = False
  569. ) -> Optional["ClusterNode"]:
  570. """
  571. Get the cluster node corresponding to the provided key.
  572. :param key:
  573. :param replica:
  574. | Indicates if a replica should be returned
  575. |
  576. None will returned if no replica holds this key
  577. :raises SlotNotCoveredError: if the key is not covered by any slot.
  578. """
  579. slot = self.keyslot(key)
  580. slot_cache = self.nodes_manager.slots_cache.get(slot)
  581. if not slot_cache:
  582. raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
  583. if replica:
  584. if len(self.nodes_manager.slots_cache[slot]) < 2:
  585. return None
  586. node_idx = 1
  587. else:
  588. node_idx = 0
  589. return slot_cache[node_idx]
  590. def get_random_primary_or_all_nodes(self, command_name):
  591. """
  592. Returns random primary or all nodes depends on READONLY mode.
  593. """
  594. if self.read_from_replicas and command_name in READ_COMMANDS:
  595. return self.get_random_node()
  596. return self.get_random_primary_node()
  597. def get_random_primary_node(self) -> "ClusterNode":
  598. """
  599. Returns a random primary node
  600. """
  601. return random.choice(self.get_primaries())
  602. async def get_nodes_from_slot(self, command: str, *args):
  603. """
  604. Returns a list of nodes that hold the specified keys' slots.
  605. """
  606. # get the node that holds the key's slot
  607. return [
  608. self.nodes_manager.get_node_from_slot(
  609. await self._determine_slot(command, *args),
  610. self.read_from_replicas and command in READ_COMMANDS,
  611. self.load_balancing_strategy if command in READ_COMMANDS else None,
  612. )
  613. ]
  614. def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
  615. """
  616. Returns a list of nodes for commands with a special policy.
  617. """
  618. if not self._aggregate_nodes:
  619. raise RedisClusterException(
  620. "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
  621. )
  622. return self._aggregate_nodes
  623. def keyslot(self, key: EncodableT) -> int:
  624. """
  625. Find the keyslot for a given key.
  626. See: https://redis.io/docs/manual/scaling/#redis-cluster-data-sharding
  627. """
  628. return key_slot(self.encoder.encode(key))
  629. def get_encoder(self) -> Encoder:
  630. """Get the encoder object of the client."""
  631. return self.encoder
  632. def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
  633. """Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
  634. return self.connection_kwargs
  635. def set_retry(self, retry: Retry) -> None:
  636. self.retry = retry
  637. def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
  638. """Set a custom response callback."""
  639. self.response_callbacks[command] = callback
  640. async def _determine_nodes(
  641. self,
  642. command: str,
  643. *args: Any,
  644. request_policy: RequestPolicy,
  645. node_flag: Optional[str] = None,
  646. ) -> List["ClusterNode"]:
  647. # Determine which nodes should be executed the command on.
  648. # Returns a list of target nodes.
  649. if not node_flag:
  650. # get the nodes group for this command if it was predefined
  651. node_flag = self.command_flags.get(command)
  652. if node_flag in self._command_flags_mapping:
  653. request_policy = self._command_flags_mapping[node_flag]
  654. policy_callback = self._policies_callback_mapping[request_policy]
  655. if request_policy == RequestPolicy.DEFAULT_KEYED:
  656. nodes = await policy_callback(command, *args)
  657. elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
  658. nodes = policy_callback(command)
  659. else:
  660. nodes = policy_callback()
  661. if command.lower() == "ft.aggregate":
  662. self._aggregate_nodes = nodes
  663. return nodes
  664. async def _determine_slot(self, command: str, *args: Any) -> int:
  665. if self.command_flags.get(command) == SLOT_ID:
  666. # The command contains the slot ID
  667. return int(args[0])
  668. # Get the keys in the command
  669. # EVAL and EVALSHA are common enough that it's wasteful to go to the
  670. # redis server to parse the keys. Besides, there is a bug in redis<7.0
  671. # where `self._get_command_keys()` fails anyway. So, we special case
  672. # EVAL/EVALSHA.
  673. # - issue: https://github.com/redis/redis/issues/9493
  674. # - fix: https://github.com/redis/redis/pull/9733
  675. if command.upper() in ("EVAL", "EVALSHA"):
  676. # command syntax: EVAL "script body" num_keys ...
  677. if len(args) < 2:
  678. raise RedisClusterException(
  679. f"Invalid args in command: {command, *args}"
  680. )
  681. keys = args[2 : 2 + int(args[1])]
  682. # if there are 0 keys, that means the script can be run on any node
  683. # so we can just return a random slot
  684. if not keys:
  685. return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
  686. else:
  687. keys = await self.commands_parser.get_keys(command, *args)
  688. if not keys:
  689. # FCALL can call a function with 0 keys, that means the function
  690. # can be run on any node so we can just return a random slot
  691. if command.upper() in ("FCALL", "FCALL_RO"):
  692. return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
  693. raise RedisClusterException(
  694. "No way to dispatch this command to Redis Cluster. "
  695. "Missing key.\nYou can execute the command by specifying "
  696. f"target nodes.\nCommand: {args}"
  697. )
  698. # single key command
  699. if len(keys) == 1:
  700. return self.keyslot(keys[0])
  701. # multi-key command; we need to make sure all keys are mapped to
  702. # the same slot
  703. slots = {self.keyslot(key) for key in keys}
  704. if len(slots) != 1:
  705. raise RedisClusterException(
  706. f"{command} - all keys must map to the same key slot"
  707. )
  708. return slots.pop()
  709. def _is_node_flag(self, target_nodes: Any) -> bool:
  710. return isinstance(target_nodes, str) and target_nodes in self.node_flags
  711. def _parse_target_nodes(self, target_nodes: Any) -> List["ClusterNode"]:
  712. if isinstance(target_nodes, list):
  713. nodes = target_nodes
  714. elif isinstance(target_nodes, ClusterNode):
  715. # Supports passing a single ClusterNode as a variable
  716. nodes = [target_nodes]
  717. elif isinstance(target_nodes, dict):
  718. # Supports dictionaries of the format {node_name: node}.
  719. # It enables to execute commands with multi nodes as follows:
  720. # rc.cluster_save_config(rc.get_primaries())
  721. nodes = list(target_nodes.values())
  722. else:
  723. raise TypeError(
  724. "target_nodes type can be one of the following: "
  725. "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
  726. "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
  727. f"The passed type is {type(target_nodes)}"
  728. )
  729. return nodes
  730. async def _record_error_metric(
  731. self,
  732. error: Exception,
  733. connection: Union[Connection, "ClusterNode"],
  734. is_internal: bool = True,
  735. retry_attempts: Optional[int] = None,
  736. ):
  737. """
  738. Records error count metric directly.
  739. Accepts either a Connection or ClusterNode object.
  740. """
  741. await record_error_count(
  742. server_address=connection.host,
  743. server_port=connection.port,
  744. network_peer_address=connection.host,
  745. network_peer_port=connection.port,
  746. error_type=error,
  747. retry_attempts=retry_attempts if retry_attempts is not None else 0,
  748. is_internal=is_internal,
  749. )
  750. async def _record_command_metric(
  751. self,
  752. command_name: str,
  753. duration_seconds: float,
  754. connection: Union[Connection, "ClusterNode"],
  755. error: Optional[Exception] = None,
  756. ):
  757. """
  758. Records operation duration metric directly.
  759. Accepts either a Connection or ClusterNode object.
  760. """
  761. # Connection has db attribute, ClusterNode has connection_kwargs
  762. if hasattr(connection, "db"):
  763. db = connection.db
  764. else:
  765. db = connection.connection_kwargs.get("db", 0)
  766. await record_operation_duration(
  767. command_name=command_name,
  768. duration_seconds=duration_seconds,
  769. server_address=connection.host,
  770. server_port=connection.port,
  771. db_namespace=str(db) if db is not None else None,
  772. error=error,
  773. )
  774. async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
  775. """
  776. Execute a raw command on the appropriate cluster node or target_nodes.
  777. It will retry the command as specified by the retries property of
  778. the :attr:`retry` & then raise an exception.
  779. :param args:
  780. | Raw command args
  781. :param kwargs:
  782. - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode`
  783. or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
  784. - Rest of the kwargs are passed to the Redis connection
  785. :raises RedisClusterException: if target_nodes is not provided & the command
  786. can't be mapped to a slot
  787. """
  788. command = args[0]
  789. target_nodes = []
  790. target_nodes_specified = False
  791. retry_attempts = self.retry.get_retries()
  792. passed_targets = kwargs.pop("target_nodes", None)
  793. if passed_targets and not self._is_node_flag(passed_targets):
  794. target_nodes = self._parse_target_nodes(passed_targets)
  795. target_nodes_specified = True
  796. retry_attempts = 0
  797. command_policies = await self._policy_resolver.resolve(args[0].lower())
  798. if not command_policies and not target_nodes_specified:
  799. command_flag = self.command_flags.get(command)
  800. if not command_flag:
  801. # Fallback to default policy
  802. if not self.get_default_node():
  803. slot = None
  804. else:
  805. slot = await self._determine_slot(*args)
  806. if slot is None:
  807. command_policies = CommandPolicies()
  808. else:
  809. command_policies = CommandPolicies(
  810. request_policy=RequestPolicy.DEFAULT_KEYED,
  811. response_policy=ResponsePolicy.DEFAULT_KEYED,
  812. )
  813. else:
  814. if command_flag in self._command_flags_mapping:
  815. command_policies = CommandPolicies(
  816. request_policy=self._command_flags_mapping[command_flag]
  817. )
  818. else:
  819. command_policies = CommandPolicies()
  820. elif not command_policies and target_nodes_specified:
  821. command_policies = CommandPolicies()
  822. # Add one for the first execution
  823. execute_attempts = 1 + retry_attempts
  824. failure_count = 0
  825. # Start timing for observability
  826. start_time = time.monotonic()
  827. for _ in range(execute_attempts):
  828. if self._initialize:
  829. await self.initialize()
  830. if (
  831. len(target_nodes) == 1
  832. and target_nodes[0] == self.get_default_node()
  833. ):
  834. # Replace the default cluster node
  835. self.replace_default_node()
  836. try:
  837. if not target_nodes_specified:
  838. # Determine the nodes to execute the command on
  839. target_nodes = await self._determine_nodes(
  840. *args,
  841. request_policy=command_policies.request_policy,
  842. node_flag=passed_targets,
  843. )
  844. if not target_nodes:
  845. raise RedisClusterException(
  846. f"No targets were found to execute {args} command on"
  847. )
  848. if len(target_nodes) == 1:
  849. # Return the processed result
  850. ret = await self._execute_command(target_nodes[0], *args, **kwargs)
  851. if command in self.result_callbacks:
  852. ret = self.result_callbacks[command](
  853. command, {target_nodes[0].name: ret}, **kwargs
  854. )
  855. return self._policies_callback_mapping[
  856. command_policies.response_policy
  857. ](ret)
  858. else:
  859. keys = [node.name for node in target_nodes]
  860. values = await asyncio.gather(
  861. *(
  862. asyncio.create_task(
  863. self._execute_command(node, *args, **kwargs)
  864. )
  865. for node in target_nodes
  866. )
  867. )
  868. if command in self.result_callbacks:
  869. return self.result_callbacks[command](
  870. command, dict(zip(keys, values)), **kwargs
  871. )
  872. return self._policies_callback_mapping[
  873. command_policies.response_policy
  874. ](dict(zip(keys, values)))
  875. except Exception as e:
  876. if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
  877. # The nodes and slots cache were should be reinitialized.
  878. # Try again with the new cluster setup.
  879. retry_attempts -= 1
  880. failure_count += 1
  881. if hasattr(e, "connection"):
  882. await self._record_command_metric(
  883. command_name=command,
  884. duration_seconds=time.monotonic() - start_time,
  885. connection=e.connection,
  886. error=e,
  887. )
  888. await self._record_error_metric(
  889. error=e,
  890. connection=e.connection,
  891. retry_attempts=failure_count,
  892. )
  893. continue
  894. else:
  895. # raise the exception
  896. if hasattr(e, "connection"):
  897. await self._record_error_metric(
  898. error=e,
  899. connection=e.connection,
  900. retry_attempts=failure_count,
  901. is_internal=False,
  902. )
  903. raise e
  904. async def _execute_command(
  905. self, target_node: "ClusterNode", *args: Union[KeyT, EncodableT], **kwargs: Any
  906. ) -> Any:
  907. asking = moved = False
  908. redirect_addr = None
  909. ttl = self.RedisClusterRequestTTL
  910. command = args[0]
  911. start_time = time.monotonic()
  912. while ttl > 0:
  913. ttl -= 1
  914. try:
  915. if asking:
  916. target_node = self.get_node(node_name=redirect_addr)
  917. await target_node.execute_command("ASKING")
  918. asking = False
  919. elif moved:
  920. # MOVED occurred and the slots cache was updated,
  921. # refresh the target node
  922. slot = await self._determine_slot(*args)
  923. target_node = self.nodes_manager.get_node_from_slot(
  924. slot,
  925. self.read_from_replicas and args[0] in READ_COMMANDS,
  926. self.load_balancing_strategy
  927. if args[0] in READ_COMMANDS
  928. else None,
  929. )
  930. moved = False
  931. response = await target_node.execute_command(*args, **kwargs)
  932. await self._record_command_metric(
  933. command_name=command,
  934. duration_seconds=time.monotonic() - start_time,
  935. connection=target_node,
  936. )
  937. return response
  938. except BusyLoadingError as e:
  939. e.connection = target_node
  940. await self._record_command_metric(
  941. command_name=command,
  942. duration_seconds=time.monotonic() - start_time,
  943. connection=target_node,
  944. error=e,
  945. )
  946. raise
  947. except MaxConnectionsError as e:
  948. # MaxConnectionsError indicates client-side resource exhaustion
  949. # (too many connections in the pool), not a node failure.
  950. # Don't treat this as a node failure - just re-raise the error
  951. # without reinitializing the cluster.
  952. e.connection = target_node
  953. await self._record_command_metric(
  954. command_name=command,
  955. duration_seconds=time.monotonic() - start_time,
  956. connection=target_node,
  957. error=e,
  958. )
  959. raise
  960. except (ConnectionError, TimeoutError) as e:
  961. # Connection retries are being handled in the node's
  962. # Retry object.
  963. # Mark active connections for reconnect and disconnect free ones
  964. # This handles connection state (like READONLY) that may be stale
  965. target_node.update_active_connections_for_reconnect()
  966. await target_node.disconnect_free_connections()
  967. # Move the failed node to the end of the cached nodes list
  968. # so it's tried last during reinitialization
  969. self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name)
  970. # Signal that reinitialization is needed
  971. # The retry loop will handle initialize() AND replace_default_node()
  972. self._initialize = True
  973. e.connection = target_node
  974. await self._record_command_metric(
  975. command_name=command,
  976. duration_seconds=time.monotonic() - start_time,
  977. connection=target_node,
  978. error=e,
  979. )
  980. raise
  981. except (ClusterDownError, SlotNotCoveredError) as e:
  982. # ClusterDownError can occur during a failover and to get
  983. # self-healed, we will try to reinitialize the cluster layout
  984. # and retry executing the command
  985. # SlotNotCoveredError can occur when the cluster is not fully
  986. # initialized or can be temporary issue.
  987. # We will try to reinitialize the cluster topology
  988. # and retry executing the command
  989. await self.aclose()
  990. await asyncio.sleep(0.25)
  991. e.connection = target_node
  992. await self._record_command_metric(
  993. command_name=command,
  994. duration_seconds=time.monotonic() - start_time,
  995. connection=target_node,
  996. error=e,
  997. )
  998. raise
  999. except MovedError as e:
  1000. # First, we will try to patch the slots/nodes cache with the
  1001. # redirected node output and try again. If MovedError exceeds
  1002. # 'reinitialize_steps' number of times, we will force
  1003. # reinitializing the tables, and then try again.
  1004. # 'reinitialize_steps' counter will increase faster when
  1005. # the same client object is shared between multiple threads. To
  1006. # reduce the frequency you can set this variable in the
  1007. # RedisCluster constructor.
  1008. self.reinitialize_counter += 1
  1009. if (
  1010. self.reinitialize_steps
  1011. and self.reinitialize_counter % self.reinitialize_steps == 0
  1012. ):
  1013. await self.aclose()
  1014. # Reset the counter
  1015. self.reinitialize_counter = 0
  1016. else:
  1017. self.nodes_manager.move_slot(e)
  1018. moved = True
  1019. await self._record_command_metric(
  1020. command_name=command,
  1021. duration_seconds=time.monotonic() - start_time,
  1022. connection=target_node,
  1023. error=e,
  1024. )
  1025. await self._record_error_metric(
  1026. error=e,
  1027. connection=target_node,
  1028. )
  1029. except AskError as e:
  1030. redirect_addr = get_node_name(host=e.host, port=e.port)
  1031. asking = True
  1032. await self._record_command_metric(
  1033. command_name=command,
  1034. duration_seconds=time.monotonic() - start_time,
  1035. connection=target_node,
  1036. error=e,
  1037. )
  1038. await self._record_error_metric(
  1039. error=e,
  1040. connection=target_node,
  1041. )
  1042. except TryAgainError as e:
  1043. if ttl < self.RedisClusterRequestTTL / 2:
  1044. await asyncio.sleep(0.05)
  1045. await self._record_command_metric(
  1046. command_name=command,
  1047. duration_seconds=time.monotonic() - start_time,
  1048. connection=target_node,
  1049. error=e,
  1050. )
  1051. await self._record_error_metric(
  1052. error=e,
  1053. connection=target_node,
  1054. )
  1055. except ResponseError as e:
  1056. e.connection = target_node
  1057. await self._record_command_metric(
  1058. command_name=command,
  1059. duration_seconds=time.monotonic() - start_time,
  1060. connection=target_node,
  1061. error=e,
  1062. )
  1063. raise
  1064. except Exception as e:
  1065. e.connection = target_node
  1066. await self._record_command_metric(
  1067. command_name=command,
  1068. duration_seconds=time.monotonic() - start_time,
  1069. connection=target_node,
  1070. error=e,
  1071. )
  1072. raise
  1073. e = ClusterError("TTL exhausted.")
  1074. e.connection = target_node
  1075. await self._record_command_metric(
  1076. command_name=command,
  1077. duration_seconds=time.monotonic() - start_time,
  1078. connection=target_node,
  1079. error=e,
  1080. )
  1081. raise e
  1082. def pipeline(
  1083. self, transaction: Optional[Any] = None, shard_hint: Optional[Any] = None
  1084. ) -> "ClusterPipeline":
  1085. """
  1086. Create & return a new :class:`~.ClusterPipeline` object.
  1087. Cluster implementation of pipeline does not support transaction or shard_hint.
  1088. :raises RedisClusterException: if transaction or shard_hint are truthy values
  1089. """
  1090. if shard_hint:
  1091. raise RedisClusterException("shard_hint is deprecated in cluster mode")
  1092. return ClusterPipeline(self, transaction)
  1093. def lock(
  1094. self,
  1095. name: KeyT,
  1096. timeout: Optional[float] = None,
  1097. sleep: float = 0.1,
  1098. blocking: bool = True,
  1099. blocking_timeout: Optional[float] = None,
  1100. lock_class: Optional[Type[Lock]] = None,
  1101. thread_local: bool = True,
  1102. raise_on_release_error: bool = True,
  1103. ) -> Lock:
  1104. """
  1105. Return a new Lock object using key ``name`` that mimics
  1106. the behavior of threading.Lock.
  1107. If specified, ``timeout`` indicates a maximum life for the lock.
  1108. By default, it will remain locked until release() is called.
  1109. ``sleep`` indicates the amount of time to sleep per loop iteration
  1110. when the lock is in blocking mode and another client is currently
  1111. holding the lock.
  1112. ``blocking`` indicates whether calling ``acquire`` should block until
  1113. the lock has been acquired or to fail immediately, causing ``acquire``
  1114. to return False and the lock not being acquired. Defaults to True.
  1115. Note this value can be overridden by passing a ``blocking``
  1116. argument to ``acquire``.
  1117. ``blocking_timeout`` indicates the maximum amount of time in seconds to
  1118. spend trying to acquire the lock. A value of ``None`` indicates
  1119. continue trying forever. ``blocking_timeout`` can be specified as a
  1120. float or integer, both representing the number of seconds to wait.
  1121. ``lock_class`` forces the specified lock implementation. Note that as
  1122. of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
  1123. a Lua-based lock). So, it's unlikely you'll need this parameter, unless
  1124. you have created your own custom lock class.
  1125. ``thread_local`` indicates whether the lock token is placed in
  1126. thread-local storage. By default, the token is placed in thread local
  1127. storage so that a thread only sees its token, not a token set by
  1128. another thread. Consider the following timeline:
  1129. time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
  1130. thread-1 sets the token to "abc"
  1131. time: 1, thread-2 blocks trying to acquire `my-lock` using the
  1132. Lock instance.
  1133. time: 5, thread-1 has not yet completed. redis expires the lock
  1134. key.
  1135. time: 5, thread-2 acquired `my-lock` now that it's available.
  1136. thread-2 sets the token to "xyz"
  1137. time: 6, thread-1 finishes its work and calls release(). if the
  1138. token is *not* stored in thread local storage, then
  1139. thread-1 would see the token value as "xyz" and would be
  1140. able to successfully release the thread-2's lock.
  1141. ``raise_on_release_error`` indicates whether to raise an exception when
  1142. the lock is no longer owned when exiting the context manager. By default,
  1143. this is True, meaning an exception will be raised. If False, the warning
  1144. will be logged and the exception will be suppressed.
  1145. In some use cases it's necessary to disable thread local storage. For
  1146. example, if you have code where one thread acquires a lock and passes
  1147. that lock instance to a worker thread to release later. If thread
  1148. local storage isn't disabled in this case, the worker thread won't see
  1149. the token set by the thread that acquired the lock. Our assumption
  1150. is that these cases aren't common and as such default to using
  1151. thread local storage."""
  1152. if lock_class is None:
  1153. lock_class = Lock
  1154. return lock_class(
  1155. self,
  1156. name,
  1157. timeout=timeout,
  1158. sleep=sleep,
  1159. blocking=blocking,
  1160. blocking_timeout=blocking_timeout,
  1161. thread_local=thread_local,
  1162. raise_on_release_error=raise_on_release_error,
  1163. )
  1164. async def transaction(
  1165. self, func: Coroutine[None, "ClusterPipeline", Any], *watches, **kwargs
  1166. ):
  1167. """
  1168. Convenience method for executing the callable `func` as a transaction
  1169. while watching all keys specified in `watches`. The 'func' callable
  1170. should expect a single argument which is a Pipeline object.
  1171. """
  1172. shard_hint = kwargs.pop("shard_hint", None)
  1173. value_from_callable = kwargs.pop("value_from_callable", False)
  1174. watch_delay = kwargs.pop("watch_delay", None)
  1175. async with self.pipeline(True, shard_hint) as pipe:
  1176. while True:
  1177. try:
  1178. if watches:
  1179. await pipe.watch(*watches)
  1180. func_value = await func(pipe)
  1181. exec_value = await pipe.execute()
  1182. return func_value if value_from_callable else exec_value
  1183. except WatchError:
  1184. if watch_delay is not None and watch_delay > 0:
  1185. time.sleep(watch_delay)
  1186. continue
  1187. class ClusterNode:
  1188. """
  1189. Create a new ClusterNode.
  1190. Each ClusterNode manages multiple :class:`~redis.asyncio.connection.Connection`
  1191. objects for the (host, port).
  1192. """
  1193. __slots__ = (
  1194. "_connections",
  1195. "_free",
  1196. "_lock",
  1197. "_event_dispatcher",
  1198. "connection_class",
  1199. "connection_kwargs",
  1200. "host",
  1201. "max_connections",
  1202. "name",
  1203. "port",
  1204. "response_callbacks",
  1205. "server_type",
  1206. )
  1207. def __init__(
  1208. self,
  1209. host: str,
  1210. port: Union[str, int],
  1211. server_type: Optional[str] = None,
  1212. *,
  1213. max_connections: int = 2**31,
  1214. connection_class: Type[Connection] = Connection,
  1215. **connection_kwargs: Any,
  1216. ) -> None:
  1217. if host == "localhost":
  1218. host = socket.gethostbyname(host)
  1219. connection_kwargs["host"] = host
  1220. connection_kwargs["port"] = port
  1221. self.host = host
  1222. self.port = port
  1223. self.name = get_node_name(host, port)
  1224. self.server_type = server_type
  1225. self.max_connections = max_connections
  1226. self.connection_class = connection_class
  1227. self.connection_kwargs = connection_kwargs
  1228. self.response_callbacks = connection_kwargs.pop("response_callbacks", {})
  1229. self._connections: List[Connection] = []
  1230. self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
  1231. self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None)
  1232. if self._event_dispatcher is None:
  1233. self._event_dispatcher = EventDispatcher()
  1234. def __repr__(self) -> str:
  1235. return (
  1236. f"[host={self.host}, port={self.port}, "
  1237. f"name={self.name}, server_type={self.server_type}]"
  1238. )
  1239. def __eq__(self, obj: Any) -> bool:
  1240. return isinstance(obj, ClusterNode) and obj.name == self.name
  1241. def __hash__(self) -> int:
  1242. return hash(self.name)
  1243. _DEL_MESSAGE = "Unclosed ClusterNode object"
  1244. def __del__(
  1245. self,
  1246. _warn: Any = warnings.warn,
  1247. _grl: Any = asyncio.get_running_loop,
  1248. ) -> None:
  1249. for connection in self._connections:
  1250. if connection.is_connected:
  1251. _warn(f"{self._DEL_MESSAGE} {self!r}", ResourceWarning, source=self)
  1252. try:
  1253. context = {"client": self, "message": self._DEL_MESSAGE}
  1254. _grl().call_exception_handler(context)
  1255. except RuntimeError:
  1256. pass
  1257. break
  1258. async def disconnect(self) -> None:
  1259. ret = await asyncio.gather(
  1260. *(
  1261. asyncio.create_task(connection.disconnect())
  1262. for connection in self._connections
  1263. ),
  1264. return_exceptions=True,
  1265. )
  1266. exc = next((res for res in ret if isinstance(res, Exception)), None)
  1267. if exc:
  1268. raise exc
  1269. def acquire_connection(self) -> Connection:
  1270. try:
  1271. return self._free.popleft()
  1272. except IndexError:
  1273. if len(self._connections) < self.max_connections:
  1274. # We are configuring the connection pool not to retry
  1275. # connections on lower level clients to avoid retrying
  1276. # connections to nodes that are not reachable
  1277. # and to avoid blocking the connection pool.
  1278. # The only error that will have some handling in the lower
  1279. # level clients is ConnectionError which will trigger disconnection
  1280. # of the socket.
  1281. # The retries will be handled on cluster client level
  1282. # where we will have proper handling of the cluster topology
  1283. retry = Retry(
  1284. backoff=NoBackoff(),
  1285. retries=0,
  1286. supported_errors=(ConnectionError,),
  1287. )
  1288. connection_kwargs = self.connection_kwargs.copy()
  1289. connection_kwargs["retry"] = retry
  1290. connection = self.connection_class(**connection_kwargs)
  1291. self._connections.append(connection)
  1292. return connection
  1293. raise MaxConnectionsError()
  1294. async def disconnect_if_needed(self, connection: Connection) -> None:
  1295. """
  1296. Disconnect a connection if it's marked for reconnect.
  1297. This implements lazy disconnection to avoid race conditions.
  1298. The connection will auto-reconnect on next use.
  1299. """
  1300. if connection.should_reconnect():
  1301. await connection.disconnect()
  1302. def release(self, connection: Connection) -> None:
  1303. """
  1304. Release connection back to free queue.
  1305. If the connection is marked for reconnect, it will be disconnected
  1306. lazily when next acquired via disconnect_if_needed().
  1307. """
  1308. self._free.append(connection)
  1309. def update_active_connections_for_reconnect(self) -> None:
  1310. """
  1311. Mark all in-use (active) connections for reconnect.
  1312. In-use connections are those in _connections but not currently in _free.
  1313. They will be disconnected when released back to the pool.
  1314. """
  1315. free_set = set(self._free)
  1316. for connection in self._connections:
  1317. if connection not in free_set:
  1318. connection.mark_for_reconnect()
  1319. async def disconnect_free_connections(self) -> None:
  1320. """
  1321. Disconnect all free/idle connections in the pool.
  1322. This is useful after topology changes (e.g., failover) to clear
  1323. stale connection state like READONLY mode.
  1324. The connections remain in the pool and will reconnect on next use.
  1325. """
  1326. if self._free:
  1327. # Take a snapshot to avoid issues if _free changes during await
  1328. await asyncio.gather(
  1329. *(connection.disconnect() for connection in tuple(self._free)),
  1330. return_exceptions=True,
  1331. )
  1332. async def parse_response(
  1333. self, connection: Connection, command: str, **kwargs: Any
  1334. ) -> Any:
  1335. try:
  1336. if NEVER_DECODE in kwargs:
  1337. response = await connection.read_response(disable_decoding=True)
  1338. kwargs.pop(NEVER_DECODE)
  1339. else:
  1340. response = await connection.read_response()
  1341. except ResponseError:
  1342. if EMPTY_RESPONSE in kwargs:
  1343. return kwargs[EMPTY_RESPONSE]
  1344. raise
  1345. if EMPTY_RESPONSE in kwargs:
  1346. kwargs.pop(EMPTY_RESPONSE)
  1347. # Remove keys entry, it needs only for cache.
  1348. kwargs.pop("keys", None)
  1349. # Return response
  1350. if command in self.response_callbacks:
  1351. return self.response_callbacks[command](response, **kwargs)
  1352. return response
  1353. async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
  1354. # Acquire connection
  1355. connection = self.acquire_connection()
  1356. # Handle lazy disconnect for connections marked for reconnect
  1357. await self.disconnect_if_needed(connection)
  1358. # Execute command
  1359. await connection.send_packed_command(connection.pack_command(*args), False)
  1360. # Read response
  1361. try:
  1362. return await self.parse_response(connection, args[0], **kwargs)
  1363. finally:
  1364. await self.disconnect_if_needed(connection)
  1365. # Release connection
  1366. self._free.append(connection)
  1367. async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
  1368. # Acquire connection
  1369. connection = self.acquire_connection()
  1370. # Handle lazy disconnect for connections marked for reconnect
  1371. await self.disconnect_if_needed(connection)
  1372. # Execute command
  1373. await connection.send_packed_command(
  1374. connection.pack_commands(cmd.args for cmd in commands), False
  1375. )
  1376. # Read responses
  1377. ret = False
  1378. for cmd in commands:
  1379. try:
  1380. cmd.result = await self.parse_response(
  1381. connection, cmd.args[0], **cmd.kwargs
  1382. )
  1383. except Exception as e:
  1384. cmd.result = e
  1385. ret = True
  1386. # Release connection
  1387. await self.disconnect_if_needed(connection)
  1388. self._free.append(connection)
  1389. return ret
  1390. async def re_auth_callback(self, token: TokenInterface):
  1391. tmp_queue = collections.deque()
  1392. while self._free:
  1393. conn = self._free.popleft()
  1394. await conn.retry.call_with_retry(
  1395. lambda: conn.send_command(
  1396. "AUTH", token.try_get("oid"), token.get_value()
  1397. ),
  1398. lambda error: self._mock(error),
  1399. )
  1400. await conn.retry.call_with_retry(
  1401. lambda: conn.read_response(), lambda error: self._mock(error)
  1402. )
  1403. tmp_queue.append(conn)
  1404. while tmp_queue:
  1405. conn = tmp_queue.popleft()
  1406. self._free.append(conn)
  1407. async def _mock(self, error: RedisError):
  1408. """
  1409. Dummy functions, needs to be passed as error callback to retry object.
  1410. :param error:
  1411. :return:
  1412. """
  1413. pass
  1414. class NodesManager:
  1415. __slots__ = (
  1416. "_dynamic_startup_nodes",
  1417. "_event_dispatcher",
  1418. "_background_tasks",
  1419. "connection_kwargs",
  1420. "default_node",
  1421. "nodes_cache",
  1422. "_epoch",
  1423. "read_load_balancer",
  1424. "_initialize_lock",
  1425. "require_full_coverage",
  1426. "slots_cache",
  1427. "startup_nodes",
  1428. "address_remap",
  1429. )
  1430. def __init__(
  1431. self,
  1432. startup_nodes: List["ClusterNode"],
  1433. require_full_coverage: bool,
  1434. connection_kwargs: Dict[str, Any],
  1435. dynamic_startup_nodes: bool = True,
  1436. address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
  1437. event_dispatcher: Optional[EventDispatcher] = None,
  1438. ) -> None:
  1439. self.startup_nodes = {node.name: node for node in startup_nodes}
  1440. self.require_full_coverage = require_full_coverage
  1441. self.connection_kwargs = connection_kwargs
  1442. self.address_remap = address_remap
  1443. self.default_node: "ClusterNode" = None
  1444. self.nodes_cache: Dict[str, "ClusterNode"] = {}
  1445. self.slots_cache: Dict[int, List["ClusterNode"]] = {}
  1446. self._epoch: int = 0
  1447. self.read_load_balancer = LoadBalancer()
  1448. self._initialize_lock: asyncio.Lock = asyncio.Lock()
  1449. self._background_tasks: Set[asyncio.Task] = set()
  1450. self._dynamic_startup_nodes: bool = dynamic_startup_nodes
  1451. if event_dispatcher is None:
  1452. self._event_dispatcher = EventDispatcher()
  1453. else:
  1454. self._event_dispatcher = event_dispatcher
  1455. def get_node(
  1456. self,
  1457. host: Optional[str] = None,
  1458. port: Optional[int] = None,
  1459. node_name: Optional[str] = None,
  1460. ) -> Optional["ClusterNode"]:
  1461. if host and port:
  1462. # the user passed host and port
  1463. if host == "localhost":
  1464. host = socket.gethostbyname(host)
  1465. return self.nodes_cache.get(get_node_name(host=host, port=port))
  1466. elif node_name:
  1467. return self.nodes_cache.get(node_name)
  1468. else:
  1469. raise DataError(
  1470. "get_node requires one of the following: 1. node name 2. host and port"
  1471. )
  1472. def set_nodes(
  1473. self,
  1474. old: Dict[str, "ClusterNode"],
  1475. new: Dict[str, "ClusterNode"],
  1476. remove_old: bool = False,
  1477. ) -> None:
  1478. if remove_old:
  1479. for name in list(old.keys()):
  1480. if name not in new:
  1481. # Node is removed from cache before disconnect starts,
  1482. # so it won't be found in lookups during disconnect
  1483. # Mark active connections for reconnect so they get disconnected after current command completes
  1484. # and disconnect free connections immediately
  1485. # the node is removed from the cache before the connections changes so it won't be used and should be safe
  1486. # not to wait for the disconnects
  1487. removed_node = old.pop(name)
  1488. removed_node.update_active_connections_for_reconnect()
  1489. task = asyncio.create_task(
  1490. removed_node.disconnect_free_connections()
  1491. )
  1492. self._background_tasks.add(task)
  1493. task.add_done_callback(self._background_tasks.discard)
  1494. for name, node in new.items():
  1495. if name in old:
  1496. # Preserve the existing node but mark connections for reconnect.
  1497. # This method is sync so we can't call disconnect_free_connections()
  1498. # which is async. Instead, we mark free connections for reconnect
  1499. # and they will be lazily disconnected when acquired via
  1500. # disconnect_if_needed() to avoid race conditions.
  1501. # TODO: Make this method async in the next major release to allow
  1502. # immediate disconnection of free connections.
  1503. existing_node = old[name]
  1504. existing_node.update_active_connections_for_reconnect()
  1505. for conn in existing_node._free:
  1506. conn.mark_for_reconnect()
  1507. continue
  1508. # New node is detected and should be added to the pool
  1509. old[name] = node
  1510. def move_node_to_end_of_cached_nodes(self, node_name: str) -> None:
  1511. """
  1512. Move a failing node to the end of startup_nodes and nodes_cache so it's
  1513. tried last during reinitialization and when selecting the default node.
  1514. If the node is not in the respective list, nothing is done.
  1515. """
  1516. # Move in startup_nodes
  1517. if node_name in self.startup_nodes and len(self.startup_nodes) > 1:
  1518. node = self.startup_nodes.pop(node_name)
  1519. self.startup_nodes[node_name] = node # Re-insert at end
  1520. # Move in nodes_cache - this affects get_nodes_by_server_type ordering
  1521. # which is used to select the default_node during initialize()
  1522. if node_name in self.nodes_cache and len(self.nodes_cache) > 1:
  1523. node = self.nodes_cache.pop(node_name)
  1524. self.nodes_cache[node_name] = node # Re-insert at end
  1525. def move_slot(self, e: AskError | MovedError):
  1526. redirected_node = self.get_node(host=e.host, port=e.port)
  1527. if redirected_node:
  1528. # The node already exists
  1529. if redirected_node.server_type != PRIMARY:
  1530. # Update the node's server type
  1531. redirected_node.server_type = PRIMARY
  1532. else:
  1533. # This is a new node, we will add it to the nodes cache
  1534. redirected_node = ClusterNode(
  1535. e.host, e.port, PRIMARY, **self.connection_kwargs
  1536. )
  1537. self.set_nodes(self.nodes_cache, {redirected_node.name: redirected_node})
  1538. slot_nodes = self.slots_cache[e.slot_id]
  1539. if redirected_node not in slot_nodes:
  1540. # The new slot owner is a new server, or a server from a different
  1541. # shard. We need to remove all current nodes from the slot's list
  1542. # (including replications) and add just the new node.
  1543. self.slots_cache[e.slot_id] = [redirected_node]
  1544. elif redirected_node is not slot_nodes[0]:
  1545. # The MOVED error resulted from a failover, and the new slot owner
  1546. # had previously been a replica.
  1547. old_primary = slot_nodes[0]
  1548. # Update the old primary to be a replica and add it to the end of
  1549. # the slot's node list
  1550. old_primary.server_type = REPLICA
  1551. slot_nodes.append(old_primary)
  1552. # Remove the old replica, which is now a primary, from the slot's
  1553. # node list
  1554. slot_nodes.remove(redirected_node)
  1555. # Override the old primary with the new one
  1556. slot_nodes[0] = redirected_node
  1557. if self.default_node == old_primary:
  1558. # Update the default node with the new primary
  1559. self.default_node = redirected_node
  1560. # else: circular MOVED to current primary -> no-op
  1561. def get_node_from_slot(
  1562. self,
  1563. slot: int,
  1564. read_from_replicas: bool = False,
  1565. load_balancing_strategy=None,
  1566. ) -> "ClusterNode":
  1567. if read_from_replicas is True and load_balancing_strategy is None:
  1568. load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
  1569. try:
  1570. if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
  1571. # get the server index using the strategy defined in load_balancing_strategy
  1572. primary_name = self.slots_cache[slot][0].name
  1573. node_idx = self.read_load_balancer.get_server_index(
  1574. primary_name, len(self.slots_cache[slot]), load_balancing_strategy
  1575. )
  1576. return self.slots_cache[slot][node_idx]
  1577. return self.slots_cache[slot][0]
  1578. except (IndexError, TypeError):
  1579. raise SlotNotCoveredError(
  1580. f'Slot "{slot}" not covered by the cluster. '
  1581. f'"require_full_coverage={self.require_full_coverage}"'
  1582. )
  1583. def get_nodes_by_server_type(self, server_type: str) -> List["ClusterNode"]:
  1584. return [
  1585. node
  1586. for node in self.nodes_cache.values()
  1587. if node.server_type == server_type
  1588. ]
  1589. async def initialize(self) -> None:
  1590. self.read_load_balancer.reset()
  1591. tmp_nodes_cache: Dict[str, "ClusterNode"] = {}
  1592. tmp_slots: Dict[int, List["ClusterNode"]] = {}
  1593. disagreements = []
  1594. startup_nodes_reachable = False
  1595. fully_covered = False
  1596. exception = None
  1597. epoch = self._epoch
  1598. async with self._initialize_lock:
  1599. if self._epoch != epoch:
  1600. # another initialize call has already reinitialized the
  1601. # nodes since we started waiting for the lock;
  1602. # we don't need to do it again.
  1603. return
  1604. # Convert to tuple to prevent RuntimeError if self.startup_nodes
  1605. # is modified during iteration
  1606. for startup_node in tuple(self.startup_nodes.values()):
  1607. try:
  1608. # Make sure cluster mode is enabled on this node
  1609. try:
  1610. self._event_dispatcher.dispatch(
  1611. AfterAsyncClusterInstantiationEvent(
  1612. self.nodes_cache,
  1613. self.connection_kwargs.get("credential_provider", None),
  1614. )
  1615. )
  1616. cluster_slots = await startup_node.execute_command(
  1617. "CLUSTER SLOTS"
  1618. )
  1619. except ResponseError:
  1620. raise RedisClusterException(
  1621. "Cluster mode is not enabled on this node"
  1622. )
  1623. startup_nodes_reachable = True
  1624. except Exception as e:
  1625. # Try the next startup node.
  1626. # The exception is saved and raised only if we have no more nodes.
  1627. exception = e
  1628. continue
  1629. # CLUSTER SLOTS command results in the following output:
  1630. # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
  1631. # where each node contains the following list: [IP, port, node_id]
  1632. # Therefore, cluster_slots[0][2][0] will be the IP address of the
  1633. # primary node of the first slot section.
  1634. # If there's only one server in the cluster, its ``host`` is ''
  1635. # Fix it to the host in startup_nodes
  1636. if (
  1637. len(cluster_slots) == 1
  1638. and not cluster_slots[0][2][0]
  1639. and len(self.startup_nodes) == 1
  1640. ):
  1641. cluster_slots[0][2][0] = startup_node.host
  1642. for slot in cluster_slots:
  1643. for i in range(2, len(slot)):
  1644. slot[i] = [str_if_bytes(val) for val in slot[i]]
  1645. primary_node = slot[2]
  1646. host = primary_node[0]
  1647. if host == "":
  1648. host = startup_node.host
  1649. port = int(primary_node[1])
  1650. host, port = self.remap_host_port(host, port)
  1651. nodes_for_slot = []
  1652. target_node = tmp_nodes_cache.get(get_node_name(host, port))
  1653. if not target_node:
  1654. target_node = ClusterNode(
  1655. host, port, PRIMARY, **self.connection_kwargs
  1656. )
  1657. # add this node to the nodes cache
  1658. tmp_nodes_cache[target_node.name] = target_node
  1659. nodes_for_slot.append(target_node)
  1660. replica_nodes = slot[3:]
  1661. for replica_node in replica_nodes:
  1662. host = replica_node[0]
  1663. port = replica_node[1]
  1664. host, port = self.remap_host_port(host, port)
  1665. target_replica_node = tmp_nodes_cache.get(
  1666. get_node_name(host, port)
  1667. )
  1668. if not target_replica_node:
  1669. target_replica_node = ClusterNode(
  1670. host, port, REPLICA, **self.connection_kwargs
  1671. )
  1672. # add this node to the nodes cache
  1673. tmp_nodes_cache[target_replica_node.name] = target_replica_node
  1674. nodes_for_slot.append(target_replica_node)
  1675. for i in range(int(slot[0]), int(slot[1]) + 1):
  1676. if i not in tmp_slots:
  1677. tmp_slots[i] = nodes_for_slot
  1678. else:
  1679. # Validate that 2 nodes want to use the same slot cache
  1680. # setup
  1681. tmp_slot = tmp_slots[i][0]
  1682. if tmp_slot.name != target_node.name:
  1683. disagreements.append(
  1684. f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
  1685. )
  1686. if len(disagreements) > 5:
  1687. raise RedisClusterException(
  1688. f"startup_nodes could not agree on a valid "
  1689. f"slots cache: {', '.join(disagreements)}"
  1690. )
  1691. # Validate if all slots are covered or if we should try next startup node
  1692. fully_covered = True
  1693. for i in range(REDIS_CLUSTER_HASH_SLOTS):
  1694. if i not in tmp_slots:
  1695. fully_covered = False
  1696. break
  1697. if fully_covered:
  1698. break
  1699. if not startup_nodes_reachable:
  1700. raise RedisClusterException(
  1701. f"Redis Cluster cannot be connected. Please provide at least "
  1702. f"one reachable node: {str(exception)}"
  1703. ) from exception
  1704. # Check if the slots are not fully covered
  1705. if not fully_covered and self.require_full_coverage:
  1706. # Despite the requirement that the slots be covered, there
  1707. # isn't a full coverage
  1708. raise RedisClusterException(
  1709. f"All slots are not covered after query all startup_nodes. "
  1710. f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
  1711. f"covered..."
  1712. )
  1713. # Set the tmp variables to the real variables
  1714. self.slots_cache = tmp_slots
  1715. self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
  1716. if self._dynamic_startup_nodes:
  1717. # Populate the startup nodes with all discovered nodes
  1718. self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
  1719. # Set the default node
  1720. self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
  1721. self._epoch += 1
  1722. async def aclose(self, attr: str = "nodes_cache") -> None:
  1723. self.default_node = None
  1724. await asyncio.gather(
  1725. *(
  1726. asyncio.create_task(node.disconnect())
  1727. for node in getattr(self, attr).values()
  1728. )
  1729. )
  1730. def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
  1731. """
  1732. Remap the host and port returned from the cluster to a different
  1733. internal value. Useful if the client is not connecting directly
  1734. to the cluster.
  1735. """
  1736. if self.address_remap:
  1737. return self.address_remap((host, port))
  1738. return host, port
  1739. class ClusterPipeline(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommands):
  1740. """
  1741. Create a new ClusterPipeline object.
  1742. Usage::
  1743. result = await (
  1744. rc.pipeline()
  1745. .set("A", 1)
  1746. .get("A")
  1747. .hset("K", "F", "V")
  1748. .hgetall("K")
  1749. .mset_nonatomic({"A": 2, "B": 3})
  1750. .get("A")
  1751. .get("B")
  1752. .delete("A", "B", "K")
  1753. .execute()
  1754. )
  1755. # result = [True, "1", 1, {"F": "V"}, True, True, "2", "3", 1, 1, 1]
  1756. Note: For commands `DELETE`, `EXISTS`, `TOUCH`, `UNLINK`, `mset_nonatomic`, which
  1757. are split across multiple nodes, you'll get multiple results for them in the array.
  1758. Retryable errors:
  1759. - :class:`~.ClusterDownError`
  1760. - :class:`~.ConnectionError`
  1761. - :class:`~.TimeoutError`
  1762. Redirection errors:
  1763. - :class:`~.TryAgainError`
  1764. - :class:`~.MovedError`
  1765. - :class:`~.AskError`
  1766. :param client:
  1767. | Existing :class:`~.RedisCluster` client
  1768. """
  1769. __slots__ = ("cluster_client", "_transaction", "_execution_strategy")
  1770. def __init__(
  1771. self, client: RedisCluster, transaction: Optional[bool] = None
  1772. ) -> None:
  1773. self.cluster_client = client
  1774. self._transaction = transaction
  1775. self._execution_strategy: ExecutionStrategy = (
  1776. PipelineStrategy(self)
  1777. if not self._transaction
  1778. else TransactionStrategy(self)
  1779. )
  1780. @property
  1781. def nodes_manager(self) -> "NodesManager":
  1782. """Get the nodes manager from the cluster client."""
  1783. return self.cluster_client.nodes_manager
  1784. def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
  1785. """Set a custom response callback on the cluster client."""
  1786. self.cluster_client.set_response_callback(command, callback)
  1787. async def initialize(self) -> "ClusterPipeline":
  1788. await self._execution_strategy.initialize()
  1789. return self
  1790. async def __aenter__(self) -> "ClusterPipeline":
  1791. return await self.initialize()
  1792. async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None:
  1793. await self.reset()
  1794. def __await__(self) -> Generator[Any, None, "ClusterPipeline"]:
  1795. return self.initialize().__await__()
  1796. def __bool__(self) -> bool:
  1797. "Pipeline instances should always evaluate to True on Python 3+"
  1798. return True
  1799. def __len__(self) -> int:
  1800. return len(self._execution_strategy)
  1801. def execute_command(
  1802. self, *args: Union[KeyT, EncodableT], **kwargs: Any
  1803. ) -> "ClusterPipeline":
  1804. """
  1805. Append a raw command to the pipeline.
  1806. :param args:
  1807. | Raw command args
  1808. :param kwargs:
  1809. - target_nodes: :attr:`NODE_FLAGS` or :class:`~.ClusterNode`
  1810. or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
  1811. - Rest of the kwargs are passed to the Redis connection
  1812. """
  1813. return self._execution_strategy.execute_command(*args, **kwargs)
  1814. async def execute(
  1815. self, raise_on_error: bool = True, allow_redirections: bool = True
  1816. ) -> List[Any]:
  1817. """
  1818. Execute the pipeline.
  1819. It will retry the commands as specified by retries specified in :attr:`retry`
  1820. & then raise an exception.
  1821. :param raise_on_error:
  1822. | Raise the first error if there are any errors
  1823. :param allow_redirections:
  1824. | Whether to retry each failed command individually in case of redirection
  1825. errors
  1826. :raises RedisClusterException: if target_nodes is not provided & the command
  1827. can't be mapped to a slot
  1828. """
  1829. try:
  1830. return await self._execution_strategy.execute(
  1831. raise_on_error, allow_redirections
  1832. )
  1833. finally:
  1834. await self.reset()
  1835. def _split_command_across_slots(
  1836. self, command: str, *keys: KeyT
  1837. ) -> "ClusterPipeline":
  1838. for slot_keys in self.cluster_client._partition_keys_by_slot(keys).values():
  1839. self.execute_command(command, *slot_keys)
  1840. return self
  1841. async def reset(self):
  1842. """
  1843. Reset back to empty pipeline.
  1844. """
  1845. await self._execution_strategy.reset()
  1846. def multi(self):
  1847. """
  1848. Start a transactional block of the pipeline after WATCH commands
  1849. are issued. End the transactional block with `execute`.
  1850. """
  1851. self._execution_strategy.multi()
  1852. async def discard(self):
  1853. """ """
  1854. await self._execution_strategy.discard()
  1855. async def watch(self, *names):
  1856. """Watches the values at keys ``names``"""
  1857. await self._execution_strategy.watch(*names)
  1858. async def unwatch(self):
  1859. """Unwatches all previously specified keys"""
  1860. await self._execution_strategy.unwatch()
  1861. async def unlink(self, *names):
  1862. await self._execution_strategy.unlink(*names)
  1863. def mset_nonatomic(
  1864. self, mapping: Mapping[AnyKeyT, EncodableT]
  1865. ) -> "ClusterPipeline":
  1866. return self._execution_strategy.mset_nonatomic(mapping)
  1867. for command in PIPELINE_BLOCKED_COMMANDS:
  1868. command = command.replace(" ", "_").lower()
  1869. if command == "mset_nonatomic":
  1870. continue
  1871. setattr(ClusterPipeline, command, block_pipeline_command(command))
  1872. class PipelineCommand:
  1873. def __init__(self, position: int, *args: Any, **kwargs: Any) -> None:
  1874. self.args = args
  1875. self.kwargs = kwargs
  1876. self.position = position
  1877. self.result: Union[Any, Exception] = None
  1878. self.command_policies: Optional[CommandPolicies] = None
  1879. def __repr__(self) -> str:
  1880. return f"[{self.position}] {self.args} ({self.kwargs})"
  1881. class ExecutionStrategy(ABC):
  1882. @abstractmethod
  1883. async def initialize(self) -> "ClusterPipeline":
  1884. """
  1885. Initialize the execution strategy.
  1886. See ClusterPipeline.initialize()
  1887. """
  1888. pass
  1889. @abstractmethod
  1890. def execute_command(
  1891. self, *args: Union[KeyT, EncodableT], **kwargs: Any
  1892. ) -> "ClusterPipeline":
  1893. """
  1894. Append a raw command to the pipeline.
  1895. See ClusterPipeline.execute_command()
  1896. """
  1897. pass
  1898. @abstractmethod
  1899. async def execute(
  1900. self, raise_on_error: bool = True, allow_redirections: bool = True
  1901. ) -> List[Any]:
  1902. """
  1903. Execute the pipeline.
  1904. It will retry the commands as specified by retries specified in :attr:`retry`
  1905. & then raise an exception.
  1906. See ClusterPipeline.execute()
  1907. """
  1908. pass
  1909. @abstractmethod
  1910. def mset_nonatomic(
  1911. self, mapping: Mapping[AnyKeyT, EncodableT]
  1912. ) -> "ClusterPipeline":
  1913. """
  1914. Executes multiple MSET commands according to the provided slot/pairs mapping.
  1915. See ClusterPipeline.mset_nonatomic()
  1916. """
  1917. pass
  1918. @abstractmethod
  1919. async def reset(self):
  1920. """
  1921. Resets current execution strategy.
  1922. See: ClusterPipeline.reset()
  1923. """
  1924. pass
  1925. @abstractmethod
  1926. def multi(self):
  1927. """
  1928. Starts transactional context.
  1929. See: ClusterPipeline.multi()
  1930. """
  1931. pass
  1932. @abstractmethod
  1933. async def watch(self, *names):
  1934. """
  1935. Watch given keys.
  1936. See: ClusterPipeline.watch()
  1937. """
  1938. pass
  1939. @abstractmethod
  1940. async def unwatch(self):
  1941. """
  1942. Unwatches all previously specified keys
  1943. See: ClusterPipeline.unwatch()
  1944. """
  1945. pass
  1946. @abstractmethod
  1947. async def discard(self):
  1948. pass
  1949. @abstractmethod
  1950. async def unlink(self, *names):
  1951. """
  1952. "Unlink a key specified by ``names``"
  1953. See: ClusterPipeline.unlink()
  1954. """
  1955. pass
  1956. @abstractmethod
  1957. def __len__(self) -> int:
  1958. pass
  1959. class AbstractStrategy(ExecutionStrategy):
  1960. def __init__(self, pipe: ClusterPipeline) -> None:
  1961. self._pipe: ClusterPipeline = pipe
  1962. self._command_queue: List["PipelineCommand"] = []
  1963. async def initialize(self) -> "ClusterPipeline":
  1964. if self._pipe.cluster_client._initialize:
  1965. await self._pipe.cluster_client.initialize()
  1966. self._command_queue = []
  1967. return self._pipe
  1968. def execute_command(
  1969. self, *args: Union[KeyT, EncodableT], **kwargs: Any
  1970. ) -> "ClusterPipeline":
  1971. self._command_queue.append(
  1972. PipelineCommand(len(self._command_queue), *args, **kwargs)
  1973. )
  1974. return self._pipe
  1975. def _annotate_exception(self, exception, number, command):
  1976. """
  1977. Provides extra context to the exception prior to it being handled
  1978. """
  1979. cmd = " ".join(map(safe_str, command))
  1980. msg = (
  1981. f"Command # {number} ({truncate_text(cmd)}) of pipeline "
  1982. f"caused error: {exception.args[0]}"
  1983. )
  1984. exception.args = (msg,) + exception.args[1:]
  1985. @abstractmethod
  1986. def mset_nonatomic(
  1987. self, mapping: Mapping[AnyKeyT, EncodableT]
  1988. ) -> "ClusterPipeline":
  1989. pass
  1990. @abstractmethod
  1991. async def execute(
  1992. self, raise_on_error: bool = True, allow_redirections: bool = True
  1993. ) -> List[Any]:
  1994. pass
  1995. @abstractmethod
  1996. async def reset(self):
  1997. pass
  1998. @abstractmethod
  1999. def multi(self):
  2000. pass
  2001. @abstractmethod
  2002. async def watch(self, *names):
  2003. pass
  2004. @abstractmethod
  2005. async def unwatch(self):
  2006. pass
  2007. @abstractmethod
  2008. async def discard(self):
  2009. pass
  2010. @abstractmethod
  2011. async def unlink(self, *names):
  2012. pass
  2013. def __len__(self) -> int:
  2014. return len(self._command_queue)
  2015. class PipelineStrategy(AbstractStrategy):
  2016. def __init__(self, pipe: ClusterPipeline) -> None:
  2017. super().__init__(pipe)
  2018. def mset_nonatomic(
  2019. self, mapping: Mapping[AnyKeyT, EncodableT]
  2020. ) -> "ClusterPipeline":
  2021. encoder = self._pipe.cluster_client.encoder
  2022. slots_pairs = {}
  2023. for pair in mapping.items():
  2024. slot = key_slot(encoder.encode(pair[0]))
  2025. slots_pairs.setdefault(slot, []).extend(pair)
  2026. for pairs in slots_pairs.values():
  2027. self.execute_command("MSET", *pairs)
  2028. return self._pipe
  2029. async def execute(
  2030. self, raise_on_error: bool = True, allow_redirections: bool = True
  2031. ) -> List[Any]:
  2032. if not self._command_queue:
  2033. return []
  2034. try:
  2035. retry_attempts = self._pipe.cluster_client.retry.get_retries()
  2036. while True:
  2037. try:
  2038. if self._pipe.cluster_client._initialize:
  2039. await self._pipe.cluster_client.initialize()
  2040. return await self._execute(
  2041. self._pipe.cluster_client,
  2042. self._command_queue,
  2043. raise_on_error=raise_on_error,
  2044. allow_redirections=allow_redirections,
  2045. )
  2046. except RedisCluster.ERRORS_ALLOW_RETRY as e:
  2047. if retry_attempts > 0:
  2048. # Try again with the new cluster setup. All other errors
  2049. # should be raised.
  2050. retry_attempts -= 1
  2051. await self._pipe.cluster_client.aclose()
  2052. await asyncio.sleep(0.25)
  2053. else:
  2054. # All other errors should be raised.
  2055. raise e
  2056. finally:
  2057. await self.reset()
  2058. async def _execute(
  2059. self,
  2060. client: "RedisCluster",
  2061. stack: List["PipelineCommand"],
  2062. raise_on_error: bool = True,
  2063. allow_redirections: bool = True,
  2064. ) -> List[Any]:
  2065. todo = [
  2066. cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception)
  2067. ]
  2068. nodes = {}
  2069. for cmd in todo:
  2070. passed_targets = cmd.kwargs.pop("target_nodes", None)
  2071. command_policies = await client._policy_resolver.resolve(
  2072. cmd.args[0].lower()
  2073. )
  2074. if passed_targets and not client._is_node_flag(passed_targets):
  2075. target_nodes = client._parse_target_nodes(passed_targets)
  2076. if not command_policies:
  2077. command_policies = CommandPolicies()
  2078. else:
  2079. if not command_policies:
  2080. command_flag = client.command_flags.get(cmd.args[0])
  2081. if not command_flag:
  2082. # Fallback to default policy
  2083. if not client.get_default_node():
  2084. slot = None
  2085. else:
  2086. slot = await client._determine_slot(*cmd.args)
  2087. if slot is None:
  2088. command_policies = CommandPolicies()
  2089. else:
  2090. command_policies = CommandPolicies(
  2091. request_policy=RequestPolicy.DEFAULT_KEYED,
  2092. response_policy=ResponsePolicy.DEFAULT_KEYED,
  2093. )
  2094. else:
  2095. if command_flag in client._command_flags_mapping:
  2096. command_policies = CommandPolicies(
  2097. request_policy=client._command_flags_mapping[
  2098. command_flag
  2099. ]
  2100. )
  2101. else:
  2102. command_policies = CommandPolicies()
  2103. target_nodes = await client._determine_nodes(
  2104. *cmd.args,
  2105. request_policy=command_policies.request_policy,
  2106. node_flag=passed_targets,
  2107. )
  2108. if not target_nodes:
  2109. raise RedisClusterException(
  2110. f"No targets were found to execute {cmd.args} command on"
  2111. )
  2112. cmd.command_policies = command_policies
  2113. if len(target_nodes) > 1:
  2114. raise RedisClusterException(f"Too many targets for command {cmd.args}")
  2115. node = target_nodes[0]
  2116. if node.name not in nodes:
  2117. nodes[node.name] = (node, [])
  2118. nodes[node.name][1].append(cmd)
  2119. # Start timing for observability
  2120. start_time = time.monotonic()
  2121. errors = await asyncio.gather(
  2122. *(
  2123. asyncio.create_task(node[0].execute_pipeline(node[1]))
  2124. for node in nodes.values()
  2125. )
  2126. )
  2127. # Record operation duration for each node
  2128. for node_name, (node, commands) in nodes.items():
  2129. # Find the first error in this node's commands, if any
  2130. node_error = None
  2131. for cmd in commands:
  2132. if isinstance(cmd.result, Exception):
  2133. node_error = cmd.result
  2134. break
  2135. db = node.connection_kwargs.get("db", 0)
  2136. await record_operation_duration(
  2137. command_name="PIPELINE",
  2138. duration_seconds=time.monotonic() - start_time,
  2139. server_address=node.host,
  2140. server_port=node.port,
  2141. db_namespace=str(db) if db is not None else None,
  2142. error=node_error,
  2143. )
  2144. if any(errors):
  2145. if allow_redirections:
  2146. # send each errored command individually
  2147. for cmd in todo:
  2148. if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
  2149. try:
  2150. cmd.result = client._policies_callback_mapping[
  2151. cmd.command_policies.response_policy
  2152. ](await client.execute_command(*cmd.args, **cmd.kwargs))
  2153. except Exception as e:
  2154. cmd.result = e
  2155. if raise_on_error:
  2156. for cmd in todo:
  2157. result = cmd.result
  2158. if isinstance(result, Exception):
  2159. command = " ".join(map(safe_str, cmd.args))
  2160. msg = (
  2161. f"Command # {cmd.position + 1} "
  2162. f"({truncate_text(command)}) "
  2163. f"of pipeline caused error: {result.args}"
  2164. )
  2165. result.args = (msg,) + result.args[1:]
  2166. raise result
  2167. default_cluster_node = client.get_default_node()
  2168. # Check whether the default node was used. In some cases,
  2169. # 'client.get_default_node()' may return None. The check below
  2170. # prevents a potential AttributeError.
  2171. if default_cluster_node is not None:
  2172. default_node = nodes.get(default_cluster_node.name)
  2173. if default_node is not None:
  2174. # This pipeline execution used the default node, check if we need
  2175. # to replace it.
  2176. # Note: when the error is raised we'll reset the default node in the
  2177. # caller function.
  2178. for cmd in default_node[1]:
  2179. # Check if it has a command that failed with a relevant
  2180. # exception
  2181. if type(cmd.result) in RedisCluster.ERRORS_ALLOW_RETRY:
  2182. client.replace_default_node()
  2183. break
  2184. return [cmd.result for cmd in stack]
  2185. async def reset(self):
  2186. """
  2187. Reset back to empty pipeline.
  2188. """
  2189. self._command_queue = []
  2190. def multi(self):
  2191. raise RedisClusterException(
  2192. "method multi() is not supported outside of transactional context"
  2193. )
  2194. async def watch(self, *names):
  2195. raise RedisClusterException(
  2196. "method watch() is not supported outside of transactional context"
  2197. )
  2198. async def unwatch(self):
  2199. raise RedisClusterException(
  2200. "method unwatch() is not supported outside of transactional context"
  2201. )
  2202. async def discard(self):
  2203. raise RedisClusterException(
  2204. "method discard() is not supported outside of transactional context"
  2205. )
  2206. async def unlink(self, *names):
  2207. if len(names) != 1:
  2208. raise RedisClusterException(
  2209. "unlinking multiple keys is not implemented in pipeline command"
  2210. )
  2211. return self.execute_command("UNLINK", names[0])
  2212. class TransactionStrategy(AbstractStrategy):
  2213. NO_SLOTS_COMMANDS = {"UNWATCH"}
  2214. IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
  2215. UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
  2216. SLOT_REDIRECT_ERRORS = (AskError, MovedError)
  2217. CONNECTION_ERRORS = (
  2218. ConnectionError,
  2219. OSError,
  2220. ClusterDownError,
  2221. SlotNotCoveredError,
  2222. )
  2223. def __init__(self, pipe: ClusterPipeline) -> None:
  2224. super().__init__(pipe)
  2225. self._explicit_transaction = False
  2226. self._watching = False
  2227. self._pipeline_slots: Set[int] = set()
  2228. self._transaction_node: Optional[ClusterNode] = None
  2229. self._transaction_connection: Optional[Connection] = None
  2230. self._executing = False
  2231. self._retry = copy(self._pipe.cluster_client.retry)
  2232. self._retry.update_supported_errors(
  2233. RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
  2234. )
  2235. def _get_client_and_connection_for_transaction(
  2236. self,
  2237. ) -> Tuple[ClusterNode, Connection]:
  2238. """
  2239. Find a connection for a pipeline transaction.
  2240. For running an atomic transaction, watch keys ensure that contents have not been
  2241. altered as long as the watch commands for those keys were sent over the same
  2242. connection. So once we start watching a key, we fetch a connection to the
  2243. node that owns that slot and reuse it.
  2244. """
  2245. if not self._pipeline_slots:
  2246. raise RedisClusterException(
  2247. "At least a command with a key is needed to identify a node"
  2248. )
  2249. node: ClusterNode = self._pipe.cluster_client.nodes_manager.get_node_from_slot(
  2250. list(self._pipeline_slots)[0], False
  2251. )
  2252. self._transaction_node = node
  2253. if not self._transaction_connection:
  2254. connection: Connection = self._transaction_node.acquire_connection()
  2255. self._transaction_connection = connection
  2256. return self._transaction_node, self._transaction_connection
  2257. def execute_command(self, *args: Union[KeyT, EncodableT], **kwargs: Any) -> "Any":
  2258. # Given the limitation of ClusterPipeline sync API, we have to run it in thread.
  2259. response = None
  2260. error = None
  2261. def runner():
  2262. nonlocal response
  2263. nonlocal error
  2264. try:
  2265. response = asyncio.run(self._execute_command(*args, **kwargs))
  2266. except Exception as e:
  2267. error = e
  2268. thread = threading.Thread(target=runner)
  2269. thread.start()
  2270. thread.join()
  2271. if error:
  2272. raise error
  2273. return response
  2274. async def _execute_command(
  2275. self, *args: Union[KeyT, EncodableT], **kwargs: Any
  2276. ) -> Any:
  2277. if self._pipe.cluster_client._initialize:
  2278. await self._pipe.cluster_client.initialize()
  2279. slot_number: Optional[int] = None
  2280. if args[0] not in self.NO_SLOTS_COMMANDS:
  2281. slot_number = await self._pipe.cluster_client._determine_slot(*args)
  2282. if (
  2283. self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
  2284. ) and not self._explicit_transaction:
  2285. if args[0] == "WATCH":
  2286. self._validate_watch()
  2287. if slot_number is not None:
  2288. if self._pipeline_slots and slot_number not in self._pipeline_slots:
  2289. raise CrossSlotTransactionError(
  2290. "Cannot watch or send commands on different slots"
  2291. )
  2292. self._pipeline_slots.add(slot_number)
  2293. elif args[0] not in self.NO_SLOTS_COMMANDS:
  2294. raise RedisClusterException(
  2295. f"Cannot identify slot number for command: {args[0]},"
  2296. "it cannot be triggered in a transaction"
  2297. )
  2298. return self._immediate_execute_command(*args, **kwargs)
  2299. else:
  2300. if slot_number is not None:
  2301. self._pipeline_slots.add(slot_number)
  2302. return super().execute_command(*args, **kwargs)
  2303. def _validate_watch(self):
  2304. if self._explicit_transaction:
  2305. raise RedisError("Cannot issue a WATCH after a MULTI")
  2306. self._watching = True
  2307. async def _immediate_execute_command(self, *args, **options):
  2308. return await self._retry.call_with_retry(
  2309. lambda: self._get_connection_and_send_command(*args, **options),
  2310. self._reinitialize_on_error,
  2311. with_failure_count=True,
  2312. )
  2313. async def _get_connection_and_send_command(self, *args, **options):
  2314. redis_node, connection = self._get_client_and_connection_for_transaction()
  2315. # Only disconnect if not watching - disconnecting would lose WATCH state
  2316. if not self._watching:
  2317. await redis_node.disconnect_if_needed(connection)
  2318. # Start timing for observability
  2319. start_time = time.monotonic()
  2320. try:
  2321. response = await self._send_command_parse_response(
  2322. connection, redis_node, args[0], *args, **options
  2323. )
  2324. await record_operation_duration(
  2325. command_name=args[0],
  2326. duration_seconds=time.monotonic() - start_time,
  2327. server_address=connection.host,
  2328. server_port=connection.port,
  2329. db_namespace=str(connection.db),
  2330. )
  2331. return response
  2332. except Exception as e:
  2333. e.connection = connection
  2334. await record_operation_duration(
  2335. command_name=args[0],
  2336. duration_seconds=time.monotonic() - start_time,
  2337. server_address=connection.host,
  2338. server_port=connection.port,
  2339. db_namespace=str(connection.db),
  2340. error=e,
  2341. )
  2342. raise
  2343. async def _send_command_parse_response(
  2344. self,
  2345. connection: Connection,
  2346. redis_node: ClusterNode,
  2347. command_name,
  2348. *args,
  2349. **options,
  2350. ):
  2351. """
  2352. Send a command and parse the response
  2353. """
  2354. await connection.send_command(*args)
  2355. output = await redis_node.parse_response(connection, command_name, **options)
  2356. if command_name in self.UNWATCH_COMMANDS:
  2357. self._watching = False
  2358. return output
  2359. async def _reinitialize_on_error(self, error, failure_count):
  2360. if hasattr(error, "connection"):
  2361. await record_error_count(
  2362. server_address=error.connection.host,
  2363. server_port=error.connection.port,
  2364. network_peer_address=error.connection.host,
  2365. network_peer_port=error.connection.port,
  2366. error_type=error,
  2367. retry_attempts=failure_count,
  2368. is_internal=True,
  2369. )
  2370. if self._watching:
  2371. if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
  2372. raise WatchError("Slot rebalancing occurred while watching keys")
  2373. if (
  2374. type(error) in self.SLOT_REDIRECT_ERRORS
  2375. or type(error) in self.CONNECTION_ERRORS
  2376. ):
  2377. if self._transaction_connection and self._transaction_node:
  2378. # Disconnect and release back to pool
  2379. await self._transaction_connection.disconnect()
  2380. self._transaction_node.release(self._transaction_connection)
  2381. self._transaction_connection = None
  2382. self._pipe.cluster_client.reinitialize_counter += 1
  2383. if (
  2384. self._pipe.cluster_client.reinitialize_steps
  2385. and self._pipe.cluster_client.reinitialize_counter
  2386. % self._pipe.cluster_client.reinitialize_steps
  2387. == 0
  2388. ):
  2389. await self._pipe.cluster_client.nodes_manager.initialize()
  2390. self.reinitialize_counter = 0
  2391. else:
  2392. if isinstance(error, AskError):
  2393. self._pipe.cluster_client.nodes_manager.move_slot(error)
  2394. self._executing = False
  2395. async def _raise_first_error(self, responses, stack, start_time):
  2396. """
  2397. Raise the first exception on the stack
  2398. """
  2399. for r, cmd in zip(responses, stack):
  2400. if isinstance(r, Exception):
  2401. self._annotate_exception(r, cmd.position + 1, cmd.args)
  2402. await record_operation_duration(
  2403. command_name="TRANSACTION",
  2404. duration_seconds=time.monotonic() - start_time,
  2405. server_address=self._transaction_connection.host,
  2406. server_port=self._transaction_connection.port,
  2407. db_namespace=str(self._transaction_connection.db),
  2408. error=r,
  2409. )
  2410. raise r
  2411. def mset_nonatomic(
  2412. self, mapping: Mapping[AnyKeyT, EncodableT]
  2413. ) -> "ClusterPipeline":
  2414. raise NotImplementedError("Method is not supported in transactional context.")
  2415. async def execute(
  2416. self, raise_on_error: bool = True, allow_redirections: bool = True
  2417. ) -> List[Any]:
  2418. stack = self._command_queue
  2419. if not stack and (not self._watching or not self._pipeline_slots):
  2420. return []
  2421. return await self._execute_transaction_with_retries(stack, raise_on_error)
  2422. async def _execute_transaction_with_retries(
  2423. self, stack: List["PipelineCommand"], raise_on_error: bool
  2424. ):
  2425. return await self._retry.call_with_retry(
  2426. lambda: self._execute_transaction(stack, raise_on_error),
  2427. lambda error, failure_count: self._reinitialize_on_error(
  2428. error, failure_count
  2429. ),
  2430. with_failure_count=True,
  2431. )
  2432. async def _execute_transaction(
  2433. self, stack: List["PipelineCommand"], raise_on_error: bool
  2434. ):
  2435. if len(self._pipeline_slots) > 1:
  2436. raise CrossSlotTransactionError(
  2437. "All keys involved in a cluster transaction must map to the same slot"
  2438. )
  2439. self._executing = True
  2440. redis_node, connection = self._get_client_and_connection_for_transaction()
  2441. # Only disconnect if not watching - disconnecting would lose WATCH state
  2442. if not self._watching:
  2443. await redis_node.disconnect_if_needed(connection)
  2444. stack = chain(
  2445. [PipelineCommand(0, "MULTI")],
  2446. stack,
  2447. [PipelineCommand(0, "EXEC")],
  2448. )
  2449. commands = [c.args for c in stack if EMPTY_RESPONSE not in c.kwargs]
  2450. packed_commands = connection.pack_commands(commands)
  2451. # Start timing for observability
  2452. start_time = time.monotonic()
  2453. await connection.send_packed_command(packed_commands)
  2454. errors = []
  2455. # parse off the response for MULTI
  2456. # NOTE: we need to handle ResponseErrors here and continue
  2457. # so that we read all the additional command messages from
  2458. # the socket
  2459. try:
  2460. await redis_node.parse_response(connection, "MULTI")
  2461. except ResponseError as e:
  2462. self._annotate_exception(e, 0, "MULTI")
  2463. errors.append(e)
  2464. except self.CONNECTION_ERRORS as cluster_error:
  2465. self._annotate_exception(cluster_error, 0, "MULTI")
  2466. cluster_error.connection = connection
  2467. raise
  2468. # and all the other commands
  2469. for i, command in enumerate(self._command_queue):
  2470. if EMPTY_RESPONSE in command.kwargs:
  2471. errors.append((i, command.kwargs[EMPTY_RESPONSE]))
  2472. else:
  2473. try:
  2474. _ = await redis_node.parse_response(connection, "_")
  2475. except self.SLOT_REDIRECT_ERRORS as slot_error:
  2476. self._annotate_exception(slot_error, i + 1, command.args)
  2477. errors.append(slot_error)
  2478. except self.CONNECTION_ERRORS as cluster_error:
  2479. self._annotate_exception(cluster_error, i + 1, command.args)
  2480. cluster_error.connection = connection
  2481. raise
  2482. except ResponseError as e:
  2483. self._annotate_exception(e, i + 1, command.args)
  2484. errors.append(e)
  2485. response = None
  2486. # parse the EXEC.
  2487. try:
  2488. response = await redis_node.parse_response(connection, "EXEC")
  2489. except ExecAbortError:
  2490. if errors:
  2491. raise errors[0]
  2492. raise
  2493. self._executing = False
  2494. # EXEC clears any watched keys
  2495. self._watching = False
  2496. if response is None:
  2497. raise WatchError("Watched variable changed.")
  2498. # put any parse errors into the response
  2499. for i, e in errors:
  2500. response.insert(i, e)
  2501. if len(response) != len(self._command_queue):
  2502. raise InvalidPipelineStack(
  2503. "Unexpected response length for cluster pipeline EXEC."
  2504. " Command stack was {} but response had length {}".format(
  2505. [c.args[0] for c in self._command_queue], len(response)
  2506. )
  2507. )
  2508. # find any errors in the response and raise if necessary
  2509. if raise_on_error or len(errors) > 0:
  2510. await self._raise_first_error(
  2511. response,
  2512. self._command_queue,
  2513. start_time,
  2514. )
  2515. # We have to run response callbacks manually
  2516. data = []
  2517. for r, cmd in zip(response, self._command_queue):
  2518. if not isinstance(r, Exception):
  2519. command_name = cmd.args[0]
  2520. if command_name in self._pipe.cluster_client.response_callbacks:
  2521. r = self._pipe.cluster_client.response_callbacks[command_name](
  2522. r, **cmd.kwargs
  2523. )
  2524. data.append(r)
  2525. await record_operation_duration(
  2526. command_name="TRANSACTION",
  2527. duration_seconds=time.monotonic() - start_time,
  2528. server_address=connection.host,
  2529. server_port=connection.port,
  2530. db_namespace=str(connection.db),
  2531. )
  2532. return data
  2533. async def reset(self):
  2534. self._command_queue = []
  2535. # make sure to reset the connection state in the event that we were
  2536. # watching something
  2537. if self._transaction_connection:
  2538. try:
  2539. if self._watching:
  2540. # call this manually since our unwatch or
  2541. # immediate_execute_command methods can call reset()
  2542. await self._transaction_connection.send_command("UNWATCH")
  2543. await self._transaction_connection.read_response()
  2544. # we can safely return the connection to the pool here since we're
  2545. # sure we're no longer WATCHing anything
  2546. self._transaction_node.release(self._transaction_connection)
  2547. self._transaction_connection = None
  2548. except self.CONNECTION_ERRORS:
  2549. # disconnect will also remove any previous WATCHes
  2550. if self._transaction_connection and self._transaction_node:
  2551. await self._transaction_connection.disconnect()
  2552. self._transaction_node.release(self._transaction_connection)
  2553. self._transaction_connection = None
  2554. # clean up the other instance attributes
  2555. self._transaction_node = None
  2556. self._watching = False
  2557. self._explicit_transaction = False
  2558. self._pipeline_slots = set()
  2559. self._executing = False
  2560. def multi(self):
  2561. if self._explicit_transaction:
  2562. raise RedisError("Cannot issue nested calls to MULTI")
  2563. if self._command_queue:
  2564. raise RedisError(
  2565. "Commands without an initial WATCH have already been issued"
  2566. )
  2567. self._explicit_transaction = True
  2568. async def watch(self, *names):
  2569. if self._explicit_transaction:
  2570. raise RedisError("Cannot issue a WATCH after a MULTI")
  2571. return await self.execute_command("WATCH", *names)
  2572. async def unwatch(self):
  2573. if self._watching:
  2574. return await self.execute_command("UNWATCH")
  2575. return True
  2576. async def discard(self):
  2577. await self.reset()
  2578. async def unlink(self, *names):
  2579. return self.execute_command("UNLINK", *names)