cluster.py 157 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582258325842585258625872588258925902591259225932594259525962597259825992600260126022603260426052606260726082609261026112612261326142615261626172618261926202621262226232624262526262627262826292630263126322633263426352636263726382639264026412642264326442645264626472648264926502651265226532654265526562657265826592660266126622663266426652666266726682669267026712672267326742675267626772678267926802681268226832684268526862687268826892690269126922693269426952696269726982699270027012702270327042705270627072708270927102711271227132714271527162717271827192720272127222723272427252726272727282729273027312732273327342735273627372738273927402741274227432744274527462747274827492750275127522753275427552756275727582759276027612762276327642765276627672768276927702771277227732774277527762777277827792780278127822783278427852786278727882789279027912792279327942795279627972798279928002801280228032804280528062807280828092810281128122813281428152816281728182819282028212822282328242825282628272828282928302831283228332834283528362837283828392840284128422843284428452846284728482849285028512852285328542855285628572858285928602861286228632864286528662867286828692870287128722873287428752876287728782879288028812882288328842885288628872888288928902891289228932894289528962897289828992900290129022903290429052906290729082909291029112912291329142915291629172918291929202921292229232924292529262927292829292930293129322933293429352936293729382939294029412942294329442945294629472948294929502951295229532954295529562957295829592960296129622963296429652966296729682969297029712972297329742975297629772978297929802981298229832984298529862987298829892990299129922993299429952996299729982999300030013002300330043005300630073008300930103011301230133014301530163017301830193020302130223023302430253026302730283029303030313032303330343035303630373038303930403041304230433044304530463047304830493050305130523053305430553056305730583059306030613062306330643065306630673068306930703071307230733074307530763077307830793080308130823083308430853086308730883089309030913092309330943095309630973098309931003101310231033104310531063107310831093110311131123113311431153116311731183119312031213122312331243125312631273128312931303131313231333134313531363137313831393140314131423143314431453146314731483149315031513152315331543155315631573158315931603161316231633164316531663167316831693170317131723173317431753176317731783179318031813182318331843185318631873188318931903191319231933194319531963197319831993200320132023203320432053206320732083209321032113212321332143215321632173218321932203221322232233224322532263227322832293230323132323233323432353236323732383239324032413242324332443245324632473248324932503251325232533254325532563257325832593260326132623263326432653266326732683269327032713272327332743275327632773278327932803281328232833284328532863287328832893290329132923293329432953296329732983299330033013302330333043305330633073308330933103311331233133314331533163317331833193320332133223323332433253326332733283329333033313332333333343335333633373338333933403341334233433344334533463347334833493350335133523353335433553356335733583359336033613362336333643365336633673368336933703371337233733374337533763377337833793380338133823383338433853386338733883389339033913392339333943395339633973398339934003401340234033404340534063407340834093410341134123413341434153416341734183419342034213422342334243425342634273428342934303431343234333434343534363437343834393440344134423443344434453446344734483449345034513452345334543455345634573458345934603461346234633464346534663467346834693470347134723473347434753476347734783479348034813482348334843485348634873488348934903491349234933494349534963497349834993500350135023503350435053506350735083509351035113512351335143515351635173518351935203521352235233524352535263527352835293530353135323533353435353536353735383539354035413542354335443545354635473548354935503551355235533554355535563557355835593560356135623563356435653566356735683569357035713572357335743575357635773578357935803581358235833584358535863587358835893590359135923593359435953596359735983599360036013602360336043605360636073608360936103611361236133614361536163617361836193620362136223623362436253626362736283629363036313632363336343635363636373638363936403641364236433644364536463647364836493650365136523653365436553656365736583659366036613662366336643665366636673668366936703671367236733674367536763677367836793680368136823683368436853686368736883689369036913692369336943695369636973698369937003701370237033704370537063707370837093710371137123713371437153716371737183719372037213722372337243725372637273728372937303731373237333734373537363737373837393740374137423743374437453746374737483749375037513752375337543755375637573758375937603761376237633764376537663767376837693770377137723773377437753776377737783779378037813782378337843785378637873788378937903791379237933794379537963797379837993800380138023803380438053806380738083809381038113812381338143815381638173818381938203821382238233824382538263827382838293830383138323833383438353836383738383839384038413842384338443845384638473848384938503851385238533854385538563857385838593860386138623863386438653866386738683869387038713872387338743875387638773878387938803881388238833884388538863887388838893890389138923893389438953896389738983899390039013902390339043905390639073908390939103911391239133914391539163917391839193920392139223923392439253926392739283929393039313932393339343935393639373938393939403941394239433944394539463947394839493950395139523953395439553956395739583959396039613962396339643965396639673968396939703971397239733974397539763977397839793980398139823983398439853986398739883989399039913992399339943995399639973998399940004001400240034004400540064007400840094010401140124013401440154016401740184019402040214022402340244025402640274028402940304031403240334034403540364037403840394040404140424043404440454046404740484049405040514052405340544055405640574058405940604061406240634064406540664067406840694070407140724073407440754076407740784079408040814082408340844085408640874088408940904091409240934094409540964097409840994100410141024103410441054106410741084109411041114112411341144115411641174118411941204121412241234124412541264127412841294130413141324133413441354136413741384139414041414142414341444145414641474148414941504151415241534154415541564157415841594160416141624163416441654166416741684169417041714172417341744175417641774178417941804181418241834184418541864187418841894190419141924193419441954196419741984199420042014202420342044205
  1. import logging
  2. import random
  3. import socket
  4. import sys
  5. import threading
  6. import time
  7. from abc import ABC, abstractmethod
  8. from collections import OrderedDict
  9. from copy import copy
  10. from enum import Enum
  11. from itertools import chain
  12. from typing import (
  13. Any,
  14. Callable,
  15. Dict,
  16. List,
  17. Literal,
  18. Optional,
  19. Set,
  20. Tuple,
  21. Union,
  22. )
  23. from redis._parsers import CommandsParser, Encoder
  24. from redis._parsers.commands import CommandPolicies, RequestPolicy, ResponsePolicy
  25. from redis._parsers.helpers import parse_scan
  26. from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
  27. from redis.cache import CacheConfig, CacheFactory, CacheFactoryInterface, CacheInterface
  28. from redis.client import EMPTY_RESPONSE, CaseInsensitiveDict, PubSub, Redis
  29. from redis.commands import READ_COMMANDS, RedisClusterCommands
  30. from redis.commands.helpers import list_or_args
  31. from redis.commands.policies import PolicyResolver, StaticPolicyResolver
  32. from redis.connection import (
  33. Connection,
  34. ConnectionPool,
  35. parse_url,
  36. )
  37. from redis.crc import REDIS_CLUSTER_HASH_SLOTS, key_slot
  38. from redis.event import (
  39. AfterPooledConnectionsInstantiationEvent,
  40. AfterPubSubConnectionInstantiationEvent,
  41. ClientType,
  42. EventDispatcher,
  43. )
  44. from redis.exceptions import (
  45. AskError,
  46. AuthenticationError,
  47. ClusterDownError,
  48. ClusterError,
  49. ConnectionError,
  50. CrossSlotTransactionError,
  51. DataError,
  52. ExecAbortError,
  53. InvalidPipelineStack,
  54. MaxConnectionsError,
  55. MovedError,
  56. RedisClusterException,
  57. RedisError,
  58. ResponseError,
  59. SlotNotCoveredError,
  60. TimeoutError,
  61. TryAgainError,
  62. WatchError,
  63. )
  64. from redis.lock import Lock
  65. from redis.maint_notifications import (
  66. MaintNotificationsConfig,
  67. OSSMaintNotificationsHandler,
  68. )
  69. from redis.observability.recorder import (
  70. record_error_count,
  71. record_operation_duration,
  72. )
  73. from redis.retry import Retry
  74. from redis.utils import (
  75. check_protocol_version,
  76. deprecated_args,
  77. deprecated_function,
  78. dict_merge,
  79. list_keys_to_dict,
  80. merge_result,
  81. safe_str,
  82. str_if_bytes,
  83. truncate_text,
  84. )
  85. logger = logging.getLogger(__name__)
  86. def is_debug_log_enabled():
  87. return logger.isEnabledFor(logging.DEBUG)
  88. def get_node_name(host: str, port: Union[str, int]) -> str:
  89. return f"{host}:{port}"
  90. @deprecated_args(
  91. allowed_args=["redis_node"],
  92. reason="Use get_connection(redis_node) instead",
  93. version="5.3.0",
  94. )
  95. def get_connection(redis_node: Redis, *args, **options) -> Connection:
  96. return redis_node.connection or redis_node.connection_pool.get_connection()
  97. def parse_scan_result(command, res, **options):
  98. cursors = {}
  99. ret = []
  100. for node_name, response in res.items():
  101. cursor, r = parse_scan(response, **options)
  102. cursors[node_name] = cursor
  103. ret += r
  104. return cursors, ret
  105. def parse_pubsub_numsub(command, res, **options):
  106. numsub_d = OrderedDict()
  107. for numsub_tups in res.values():
  108. for channel, numsubbed in numsub_tups:
  109. try:
  110. numsub_d[channel] += numsubbed
  111. except KeyError:
  112. numsub_d[channel] = numsubbed
  113. ret_numsub = [(channel, numsub) for channel, numsub in numsub_d.items()]
  114. return ret_numsub
  115. def parse_cluster_slots(
  116. resp: Any, **options: Any
  117. ) -> Dict[Tuple[int, int], Dict[str, Any]]:
  118. current_host = options.get("current_host", "")
  119. def fix_server(*args: Any) -> Tuple[str, Any]:
  120. return str_if_bytes(args[0]) or current_host, args[1]
  121. slots = {}
  122. for slot in resp:
  123. start, end, primary = slot[:3]
  124. replicas = slot[3:]
  125. slots[start, end] = {
  126. "primary": fix_server(*primary),
  127. "replicas": [fix_server(*replica) for replica in replicas],
  128. }
  129. return slots
  130. def parse_cluster_shards(resp, **options):
  131. """
  132. Parse CLUSTER SHARDS response.
  133. """
  134. if isinstance(resp[0], dict):
  135. return resp
  136. shards = []
  137. for x in resp:
  138. shard = {"slots": [], "nodes": []}
  139. for i in range(0, len(x[1]), 2):
  140. shard["slots"].append((x[1][i], (x[1][i + 1])))
  141. nodes = x[3]
  142. for node in nodes:
  143. dict_node = {}
  144. for i in range(0, len(node), 2):
  145. dict_node[node[i]] = node[i + 1]
  146. shard["nodes"].append(dict_node)
  147. shards.append(shard)
  148. return shards
  149. def parse_cluster_myshardid(resp, **options):
  150. """
  151. Parse CLUSTER MYSHARDID response.
  152. """
  153. return resp.decode("utf-8")
  154. PRIMARY = "primary"
  155. REPLICA = "replica"
  156. SLOT_ID = "slot-id"
  157. REDIS_ALLOWED_KEYS = (
  158. "connection_class",
  159. "connection_pool",
  160. "connection_pool_class",
  161. "client_name",
  162. "credential_provider",
  163. "db",
  164. "decode_responses",
  165. "encoding",
  166. "encoding_errors",
  167. "host",
  168. "lib_name",
  169. "lib_version",
  170. "max_connections",
  171. "nodes_flag",
  172. "redis_connect_func",
  173. "password",
  174. "port",
  175. "timeout",
  176. "queue_class",
  177. "retry",
  178. "retry_on_timeout",
  179. "protocol",
  180. "socket_connect_timeout",
  181. "socket_keepalive",
  182. "socket_keepalive_options",
  183. "socket_timeout",
  184. "ssl",
  185. "ssl_ca_certs",
  186. "ssl_ca_data",
  187. "ssl_ca_path",
  188. "ssl_certfile",
  189. "ssl_cert_reqs",
  190. "ssl_include_verify_flags",
  191. "ssl_exclude_verify_flags",
  192. "ssl_keyfile",
  193. "ssl_password",
  194. "ssl_check_hostname",
  195. "unix_socket_path",
  196. "username",
  197. "cache",
  198. "cache_config",
  199. "maint_notifications_config",
  200. )
  201. KWARGS_DISABLED_KEYS = ("host", "port", "retry")
  202. def cleanup_kwargs(**kwargs):
  203. """
  204. Remove unsupported or disabled keys from kwargs
  205. """
  206. connection_kwargs = {
  207. k: v
  208. for k, v in kwargs.items()
  209. if k in REDIS_ALLOWED_KEYS and k not in KWARGS_DISABLED_KEYS
  210. }
  211. return connection_kwargs
  212. class MaintNotificationsAbstractRedisCluster:
  213. """
  214. Abstract class for handling maintenance notifications logic.
  215. This class is expected to be used as base class together with RedisCluster.
  216. This class is intended to be used with multiple inheritance!
  217. All logic related to maintenance notifications is encapsulated in this class.
  218. """
  219. def __init__(
  220. self,
  221. maint_notifications_config: Optional[MaintNotificationsConfig],
  222. **kwargs,
  223. ):
  224. # Initialize maintenance notifications
  225. is_protocol_supported = check_protocol_version(kwargs.get("protocol"), 3)
  226. if (
  227. maint_notifications_config
  228. and maint_notifications_config.enabled
  229. and not is_protocol_supported
  230. ):
  231. raise RedisError(
  232. "Maintenance notifications handlers on connection are only supported with RESP version 3"
  233. )
  234. if maint_notifications_config is None and is_protocol_supported:
  235. maint_notifications_config = MaintNotificationsConfig()
  236. self.maint_notifications_config = maint_notifications_config
  237. if self.maint_notifications_config and self.maint_notifications_config.enabled:
  238. self._oss_cluster_maint_notifications_handler = (
  239. OSSMaintNotificationsHandler(self, self.maint_notifications_config)
  240. )
  241. # Update connection kwargs for all future nodes connections
  242. self._update_connection_kwargs_for_maint_notifications(
  243. self._oss_cluster_maint_notifications_handler
  244. )
  245. # Update existing nodes connections - they are created as part of the RedisCluster constructor
  246. for node in self.get_nodes():
  247. if node.redis_connection is None:
  248. continue
  249. node.redis_connection.connection_pool.update_maint_notifications_config(
  250. self.maint_notifications_config,
  251. oss_cluster_maint_notifications_handler=self._oss_cluster_maint_notifications_handler,
  252. )
  253. else:
  254. self._oss_cluster_maint_notifications_handler = None
  255. def _update_connection_kwargs_for_maint_notifications(
  256. self, oss_cluster_maint_notifications_handler: OSSMaintNotificationsHandler
  257. ):
  258. """
  259. Update the connection kwargs for all future connections.
  260. """
  261. self.nodes_manager.connection_kwargs.update(
  262. {
  263. "oss_cluster_maint_notifications_handler": oss_cluster_maint_notifications_handler,
  264. }
  265. )
  266. class AbstractRedisCluster:
  267. RedisClusterRequestTTL = 16
  268. PRIMARIES = "primaries"
  269. REPLICAS = "replicas"
  270. ALL_NODES = "all"
  271. RANDOM = "random"
  272. DEFAULT_NODE = "default-node"
  273. NODE_FLAGS = {PRIMARIES, REPLICAS, ALL_NODES, RANDOM, DEFAULT_NODE}
  274. COMMAND_FLAGS = dict_merge(
  275. list_keys_to_dict(
  276. [
  277. "ACL CAT",
  278. "ACL DELUSER",
  279. "ACL DRYRUN",
  280. "ACL GENPASS",
  281. "ACL GETUSER",
  282. "ACL HELP",
  283. "ACL LIST",
  284. "ACL LOG",
  285. "ACL LOAD",
  286. "ACL SAVE",
  287. "ACL SETUSER",
  288. "ACL USERS",
  289. "ACL WHOAMI",
  290. "AUTH",
  291. "CLIENT LIST",
  292. "CLIENT SETINFO",
  293. "CLIENT SETNAME",
  294. "CLIENT GETNAME",
  295. "CONFIG SET",
  296. "CONFIG REWRITE",
  297. "CONFIG RESETSTAT",
  298. "TIME",
  299. "PUBSUB CHANNELS",
  300. "PUBSUB NUMPAT",
  301. "PUBSUB NUMSUB",
  302. "PUBSUB SHARDCHANNELS",
  303. "PUBSUB SHARDNUMSUB",
  304. "PING",
  305. "INFO",
  306. "SHUTDOWN",
  307. "KEYS",
  308. "DBSIZE",
  309. "BGSAVE",
  310. "SLOWLOG GET",
  311. "SLOWLOG LEN",
  312. "SLOWLOG RESET",
  313. "WAIT",
  314. "WAITAOF",
  315. "SAVE",
  316. "MEMORY PURGE",
  317. "MEMORY MALLOC-STATS",
  318. "MEMORY STATS",
  319. "LASTSAVE",
  320. "CLIENT TRACKINGINFO",
  321. "CLIENT PAUSE",
  322. "CLIENT UNPAUSE",
  323. "CLIENT UNBLOCK",
  324. "CLIENT ID",
  325. "CLIENT REPLY",
  326. "CLIENT GETREDIR",
  327. "CLIENT INFO",
  328. "CLIENT KILL",
  329. "READONLY",
  330. "CLUSTER INFO",
  331. "CLUSTER MEET",
  332. "CLUSTER MYSHARDID",
  333. "CLUSTER NODES",
  334. "CLUSTER REPLICAS",
  335. "CLUSTER RESET",
  336. "CLUSTER SET-CONFIG-EPOCH",
  337. "CLUSTER SLOTS",
  338. "CLUSTER SHARDS",
  339. "CLUSTER COUNT-FAILURE-REPORTS",
  340. "CLUSTER KEYSLOT",
  341. "COMMAND",
  342. "COMMAND COUNT",
  343. "COMMAND LIST",
  344. "COMMAND GETKEYS",
  345. "CONFIG GET",
  346. "DEBUG",
  347. "RANDOMKEY",
  348. "READONLY",
  349. "READWRITE",
  350. "TIME",
  351. "TFUNCTION LOAD",
  352. "TFUNCTION DELETE",
  353. "TFUNCTION LIST",
  354. "TFCALL",
  355. "TFCALLASYNC",
  356. "LATENCY HISTORY",
  357. "LATENCY LATEST",
  358. "LATENCY RESET",
  359. "MODULE LIST",
  360. "MODULE LOAD",
  361. "MODULE UNLOAD",
  362. "MODULE LOADEX",
  363. ],
  364. DEFAULT_NODE,
  365. ),
  366. list_keys_to_dict(
  367. [
  368. "FLUSHALL",
  369. "FLUSHDB",
  370. "FUNCTION DELETE",
  371. "FUNCTION FLUSH",
  372. "FUNCTION LIST",
  373. "FUNCTION LOAD",
  374. "FUNCTION RESTORE",
  375. "SCAN",
  376. "SCRIPT EXISTS",
  377. "SCRIPT FLUSH",
  378. "SCRIPT LOAD",
  379. ],
  380. PRIMARIES,
  381. ),
  382. list_keys_to_dict(["FUNCTION DUMP"], RANDOM),
  383. list_keys_to_dict(
  384. [
  385. "CLUSTER COUNTKEYSINSLOT",
  386. "CLUSTER DELSLOTS",
  387. "CLUSTER DELSLOTSRANGE",
  388. "CLUSTER GETKEYSINSLOT",
  389. "CLUSTER SETSLOT",
  390. ],
  391. SLOT_ID,
  392. ),
  393. )
  394. SEARCH_COMMANDS = (
  395. [
  396. "FT.CREATE",
  397. "FT.SEARCH",
  398. "FT.AGGREGATE",
  399. "FT.EXPLAIN",
  400. "FT.EXPLAINCLI",
  401. "FT,PROFILE",
  402. "FT.ALTER",
  403. "FT.DROPINDEX",
  404. "FT.ALIASADD",
  405. "FT.ALIASUPDATE",
  406. "FT.ALIASDEL",
  407. "FT.TAGVALS",
  408. "FT.SUGADD",
  409. "FT.SUGGET",
  410. "FT.SUGDEL",
  411. "FT.SUGLEN",
  412. "FT.SYNUPDATE",
  413. "FT.SYNDUMP",
  414. "FT.SPELLCHECK",
  415. "FT.DICTADD",
  416. "FT.DICTDEL",
  417. "FT.DICTDUMP",
  418. "FT.INFO",
  419. "FT._LIST",
  420. "FT.CONFIG",
  421. "FT.ADD",
  422. "FT.DEL",
  423. "FT.DROP",
  424. "FT.GET",
  425. "FT.MGET",
  426. "FT.SYNADD",
  427. ],
  428. )
  429. CLUSTER_COMMANDS_RESPONSE_CALLBACKS = {
  430. "CLUSTER SLOTS": parse_cluster_slots,
  431. "CLUSTER SHARDS": parse_cluster_shards,
  432. "CLUSTER MYSHARDID": parse_cluster_myshardid,
  433. }
  434. RESULT_CALLBACKS = dict_merge(
  435. list_keys_to_dict(["PUBSUB NUMSUB", "PUBSUB SHARDNUMSUB"], parse_pubsub_numsub),
  436. list_keys_to_dict(
  437. ["PUBSUB NUMPAT"], lambda command, res: sum(list(res.values()))
  438. ),
  439. list_keys_to_dict(
  440. ["KEYS", "PUBSUB CHANNELS", "PUBSUB SHARDCHANNELS"], merge_result
  441. ),
  442. list_keys_to_dict(
  443. [
  444. "PING",
  445. "CONFIG SET",
  446. "CONFIG REWRITE",
  447. "CONFIG RESETSTAT",
  448. "CLIENT SETNAME",
  449. "BGSAVE",
  450. "SLOWLOG RESET",
  451. "SAVE",
  452. "MEMORY PURGE",
  453. "CLIENT PAUSE",
  454. "CLIENT UNPAUSE",
  455. ],
  456. lambda command, res: all(res.values()) if isinstance(res, dict) else res,
  457. ),
  458. list_keys_to_dict(
  459. ["DBSIZE", "WAIT"],
  460. lambda command, res: sum(res.values()) if isinstance(res, dict) else res,
  461. ),
  462. list_keys_to_dict(
  463. ["CLIENT UNBLOCK"], lambda command, res: 1 if sum(res.values()) > 0 else 0
  464. ),
  465. list_keys_to_dict(["SCAN"], parse_scan_result),
  466. list_keys_to_dict(
  467. ["SCRIPT LOAD"], lambda command, res: list(res.values()).pop()
  468. ),
  469. list_keys_to_dict(
  470. ["SCRIPT EXISTS"], lambda command, res: [all(k) for k in zip(*res.values())]
  471. ),
  472. list_keys_to_dict(["SCRIPT FLUSH"], lambda command, res: all(res.values())),
  473. )
  474. ERRORS_ALLOW_RETRY = (
  475. ConnectionError,
  476. TimeoutError,
  477. ClusterDownError,
  478. SlotNotCoveredError,
  479. )
  480. def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
  481. """Replace the default cluster node.
  482. A random cluster node will be chosen if target_node isn't passed, and primaries
  483. will be prioritized. The default node will not be changed if there are no other
  484. nodes in the cluster.
  485. Args:
  486. target_node (ClusterNode, optional): Target node to replace the default
  487. node. Defaults to None.
  488. """
  489. if target_node:
  490. self.nodes_manager.default_node = target_node
  491. else:
  492. curr_node = self.get_default_node()
  493. primaries = [node for node in self.get_primaries() if node != curr_node]
  494. if primaries:
  495. # Choose a primary if the cluster contains different primaries
  496. self.nodes_manager.default_node = random.choice(primaries)
  497. else:
  498. # Otherwise, choose a primary if the cluster contains different primaries
  499. replicas = [node for node in self.get_replicas() if node != curr_node]
  500. if replicas:
  501. self.nodes_manager.default_node = random.choice(replicas)
  502. class RedisCluster(
  503. AbstractRedisCluster, MaintNotificationsAbstractRedisCluster, RedisClusterCommands
  504. ):
  505. @classmethod
  506. def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
  507. """
  508. Return a Redis client object configured from the given URL
  509. For example::
  510. redis://[[username]:[password]]@localhost:6379/0
  511. rediss://[[username]:[password]]@localhost:6379/0
  512. unix://[username@]/path/to/socket.sock?db=0[&password=password]
  513. Three URL schemes are supported:
  514. - `redis://` creates a TCP socket connection. See more at:
  515. <https://www.iana.org/assignments/uri-schemes/prov/redis>
  516. - `rediss://` creates a SSL wrapped TCP socket connection. See more at:
  517. <https://www.iana.org/assignments/uri-schemes/prov/rediss>
  518. - ``unix://``: creates a Unix Domain Socket connection.
  519. The username, password, hostname, path and all querystring values
  520. are passed through urllib.parse.unquote in order to replace any
  521. percent-encoded values with their corresponding characters.
  522. There are several ways to specify a database number. The first value
  523. found will be used:
  524. 1. A ``db`` querystring option, e.g. redis://localhost?db=0
  525. 2. If using the redis:// or rediss:// schemes, the path argument
  526. of the url, e.g. redis://localhost/0
  527. 3. A ``db`` keyword argument to this function.
  528. If none of these options are specified, the default db=0 is used.
  529. All querystring options are cast to their appropriate Python types.
  530. Boolean arguments can be specified with string values "True"/"False"
  531. or "Yes"/"No". Values that cannot be properly cast cause a
  532. ``ValueError`` to be raised. Once parsed, the querystring arguments
  533. and keyword arguments are passed to the ``ConnectionPool``'s
  534. class initializer. In the case of conflicting arguments, querystring
  535. arguments always win.
  536. """
  537. return cls(url=url, **kwargs)
  538. @deprecated_args(
  539. args_to_warn=["read_from_replicas"],
  540. reason="Please configure the 'load_balancing_strategy' instead",
  541. version="5.3.0",
  542. )
  543. @deprecated_args(
  544. args_to_warn=[
  545. "cluster_error_retry_attempts",
  546. ],
  547. reason="Please configure the 'retry' object instead",
  548. version="6.0.0",
  549. )
  550. def __init__(
  551. self,
  552. host: Optional[str] = None,
  553. port: int = 6379,
  554. startup_nodes: Optional[List["ClusterNode"]] = None,
  555. cluster_error_retry_attempts: int = 3,
  556. retry: Optional["Retry"] = None,
  557. require_full_coverage: bool = True,
  558. reinitialize_steps: int = 5,
  559. read_from_replicas: bool = False,
  560. load_balancing_strategy: Optional["LoadBalancingStrategy"] = None,
  561. dynamic_startup_nodes: bool = True,
  562. url: Optional[str] = None,
  563. address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
  564. cache: Optional[CacheInterface] = None,
  565. cache_config: Optional[CacheConfig] = None,
  566. event_dispatcher: Optional[EventDispatcher] = None,
  567. policy_resolver: PolicyResolver = StaticPolicyResolver(),
  568. maint_notifications_config: Optional[MaintNotificationsConfig] = None,
  569. **kwargs,
  570. ):
  571. """
  572. Initialize a new RedisCluster client.
  573. :param startup_nodes:
  574. List of nodes from which initial bootstrapping can be done
  575. :param host:
  576. Can be used to point to a startup node
  577. :param port:
  578. Can be used to point to a startup node
  579. :param require_full_coverage:
  580. When set to False (default value): the client will not require a
  581. full coverage of the slots. However, if not all slots are covered,
  582. and at least one node has 'cluster-require-full-coverage' set to
  583. 'yes,' the server will throw a ClusterDownError for some key-based
  584. commands. See -
  585. https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
  586. When set to True: all slots must be covered to construct the
  587. cluster client. If not all slots are covered, RedisClusterException
  588. will be thrown.
  589. :param read_from_replicas:
  590. @deprecated - please use load_balancing_strategy instead
  591. Enable read from replicas in READONLY mode. You can read possibly
  592. stale data.
  593. When set to true, read commands will be assigned between the
  594. primary and its replications in a Round-Robin manner.
  595. :param load_balancing_strategy:
  596. Enable read from replicas in READONLY mode and defines the load balancing
  597. strategy that will be used for cluster node selection.
  598. The data read from replicas is eventually consistent with the data in primary nodes.
  599. :param dynamic_startup_nodes:
  600. Set the RedisCluster's startup nodes to all of the discovered nodes.
  601. If true (default value), the cluster's discovered nodes will be used to
  602. determine the cluster nodes-slots mapping in the next topology refresh.
  603. It will remove the initial passed startup nodes if their endpoints aren't
  604. listed in the CLUSTER SLOTS output.
  605. If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
  606. specific IP addresses, it is best to set it to false.
  607. :param cluster_error_retry_attempts:
  608. @deprecated - Please configure the 'retry' object instead
  609. In case 'retry' object is set - this argument is ignored!
  610. Number of times to retry before raising an error when
  611. :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
  612. :class:`~.ClusterDownError` are encountered
  613. :param retry:
  614. A retry object that defines the retry strategy and the number of
  615. retries for the cluster client.
  616. In current implementation for the cluster client (starting form redis-py version 6.0.0)
  617. the retry object is not yet fully utilized, instead it is used just to determine
  618. the number of retries for the cluster client.
  619. In the future releases the retry object will be used to handle the cluster client retries!
  620. :param reinitialize_steps:
  621. Specifies the number of MOVED errors that need to occur before
  622. reinitializing the whole cluster topology. If a MOVED error occurs
  623. and the cluster does not need to be reinitialized on this current
  624. error handling, only the MOVED slot will be patched with the
  625. redirected node.
  626. To reinitialize the cluster on every MOVED error, set
  627. reinitialize_steps to 1.
  628. To avoid reinitializing the cluster on moved errors, set
  629. reinitialize_steps to 0.
  630. :param address_remap:
  631. An optional callable which, when provided with an internal network
  632. address of a node, e.g. a `(host, port)` tuple, will return the address
  633. where the node is reachable. This can be used to map the addresses at
  634. which the nodes _think_ they are, to addresses at which a client may
  635. reach them, such as when they sit behind a proxy.
  636. :param maint_notifications_config:
  637. Configures the nodes connections to support maintenance notifications - see
  638. `redis.maint_notifications.MaintNotificationsConfig` for details.
  639. Only supported with RESP3.
  640. If not provided and protocol is RESP3, the maintenance notifications
  641. will be enabled by default (logic is included in the NodesManager
  642. initialization).
  643. :**kwargs:
  644. Extra arguments that will be sent into Redis instance when created
  645. (See Official redis-py doc for supported kwargs - the only limitation
  646. is that you can't provide 'retry' object as part of kwargs.
  647. [https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
  648. Some kwargs are not supported and will raise a
  649. RedisClusterException:
  650. - db (Redis do not support database SELECT in cluster mode)
  651. """
  652. if startup_nodes is None:
  653. startup_nodes = []
  654. if "db" in kwargs:
  655. # Argument 'db' is not possible to use in cluster mode
  656. raise RedisClusterException(
  657. "Argument 'db' is not possible to use in cluster mode"
  658. )
  659. if "retry" in kwargs:
  660. # Argument 'retry' is not possible to be used in kwargs when in cluster mode
  661. # the kwargs are set to the lower level connections to the cluster nodes
  662. # and there we provide retry configuration without retries allowed.
  663. # The retries should be handled on cluster client level.
  664. raise RedisClusterException(
  665. "The 'retry' argument cannot be used in kwargs when running in cluster mode."
  666. )
  667. # Get the startup node/s
  668. from_url = False
  669. if url is not None:
  670. from_url = True
  671. url_options = parse_url(url)
  672. if "path" in url_options:
  673. raise RedisClusterException(
  674. "RedisCluster does not currently support Unix Domain "
  675. "Socket connections"
  676. )
  677. if "db" in url_options and url_options["db"] != 0:
  678. # Argument 'db' is not possible to use in cluster mode
  679. raise RedisClusterException(
  680. "A ``db`` querystring option can only be 0 in cluster mode"
  681. )
  682. kwargs.update(url_options)
  683. host = kwargs.get("host")
  684. port = kwargs.get("port", port)
  685. startup_nodes.append(ClusterNode(host, port))
  686. elif host is not None and port is not None:
  687. startup_nodes.append(ClusterNode(host, port))
  688. elif len(startup_nodes) == 0:
  689. # No startup node was provided
  690. raise RedisClusterException(
  691. "RedisCluster requires at least one node to discover the "
  692. "cluster. Please provide one of the followings:\n"
  693. "1. host and port, for example:\n"
  694. " RedisCluster(host='localhost', port=6379)\n"
  695. "2. list of startup nodes, for example:\n"
  696. " RedisCluster(startup_nodes=[ClusterNode('localhost', 6379),"
  697. " ClusterNode('localhost', 6378)])"
  698. )
  699. # Update the connection arguments
  700. # Whenever a new connection is established, RedisCluster's on_connect
  701. # method should be run
  702. # If the user passed on_connect function we'll save it and run it
  703. # inside the RedisCluster.on_connect() function
  704. self.user_on_connect_func = kwargs.pop("redis_connect_func", None)
  705. kwargs.update({"redis_connect_func": self.on_connect})
  706. kwargs = cleanup_kwargs(**kwargs)
  707. if retry:
  708. self.retry = retry
  709. else:
  710. self.retry = Retry(
  711. backoff=ExponentialWithJitterBackoff(base=1, cap=10),
  712. retries=cluster_error_retry_attempts,
  713. )
  714. self.encoder = Encoder(
  715. kwargs.get("encoding", "utf-8"),
  716. kwargs.get("encoding_errors", "strict"),
  717. kwargs.get("decode_responses", False),
  718. )
  719. protocol = kwargs.get("protocol", None)
  720. if (cache_config or cache) and not check_protocol_version(protocol, 3):
  721. raise RedisError("Client caching is only supported with RESP version 3")
  722. if maint_notifications_config and not check_protocol_version(protocol, 3):
  723. raise RedisError(
  724. "Maintenance notifications are only supported with RESP version 3"
  725. )
  726. if check_protocol_version(protocol, 3) and maint_notifications_config is None:
  727. maint_notifications_config = MaintNotificationsConfig()
  728. self.command_flags = self.__class__.COMMAND_FLAGS.copy()
  729. self.node_flags = self.__class__.NODE_FLAGS.copy()
  730. self.read_from_replicas = read_from_replicas
  731. self.load_balancing_strategy = load_balancing_strategy
  732. self.reinitialize_counter = 0
  733. self.reinitialize_steps = reinitialize_steps
  734. if event_dispatcher is None:
  735. self._event_dispatcher = EventDispatcher()
  736. else:
  737. self._event_dispatcher = event_dispatcher
  738. self.startup_nodes = startup_nodes
  739. self.nodes_manager = NodesManager(
  740. startup_nodes=startup_nodes,
  741. from_url=from_url,
  742. require_full_coverage=require_full_coverage,
  743. dynamic_startup_nodes=dynamic_startup_nodes,
  744. address_remap=address_remap,
  745. cache=cache,
  746. cache_config=cache_config,
  747. event_dispatcher=self._event_dispatcher,
  748. maint_notifications_config=maint_notifications_config,
  749. **kwargs,
  750. )
  751. self.cluster_response_callbacks = CaseInsensitiveDict(
  752. self.__class__.CLUSTER_COMMANDS_RESPONSE_CALLBACKS
  753. )
  754. self.result_callbacks = CaseInsensitiveDict(self.__class__.RESULT_CALLBACKS)
  755. # For backward compatibility, mapping from existing policies to new one
  756. self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
  757. self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
  758. self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
  759. self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
  760. self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
  761. self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
  762. SLOT_ID: RequestPolicy.DEFAULT_KEYED,
  763. }
  764. self._policies_callback_mapping: dict[
  765. Union[RequestPolicy, ResponsePolicy], Callable
  766. ] = {
  767. RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
  768. self.get_random_primary_or_all_nodes(command_name)
  769. ],
  770. RequestPolicy.DEFAULT_KEYED: lambda command,
  771. *args: self.get_nodes_from_slot(command, *args),
  772. RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
  773. RequestPolicy.ALL_SHARDS: self.get_primaries,
  774. RequestPolicy.ALL_NODES: self.get_nodes,
  775. RequestPolicy.ALL_REPLICAS: self.get_replicas,
  776. RequestPolicy.MULTI_SHARD: lambda *args,
  777. **kwargs: self._split_multi_shard_command(*args, **kwargs),
  778. RequestPolicy.SPECIAL: self.get_special_nodes,
  779. ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
  780. ResponsePolicy.DEFAULT_KEYED: lambda res: res,
  781. }
  782. self._policy_resolver = policy_resolver
  783. self.commands_parser = CommandsParser(self)
  784. # Node where FT.AGGREGATE command is executed.
  785. self._aggregate_nodes = None
  786. self._lock = threading.RLock()
  787. MaintNotificationsAbstractRedisCluster.__init__(
  788. self, maint_notifications_config, **kwargs
  789. )
  790. def __enter__(self):
  791. return self
  792. def __exit__(self, exc_type, exc_value, traceback):
  793. self.close()
  794. def __del__(self):
  795. try:
  796. self.close()
  797. except Exception:
  798. pass
  799. def disconnect_connection_pools(self):
  800. for node in self.get_nodes():
  801. if node.redis_connection:
  802. try:
  803. node.redis_connection.connection_pool.disconnect()
  804. except OSError:
  805. # Client was already disconnected. do nothing
  806. pass
  807. def on_connect(self, connection):
  808. """
  809. Initialize the connection, authenticate and select a database and send
  810. READONLY if it is set during object initialization.
  811. """
  812. connection.on_connect()
  813. if self.read_from_replicas or self.load_balancing_strategy:
  814. # Sending READONLY command to server to configure connection as
  815. # readonly. Since each cluster node may change its server type due
  816. # to a failover, we should establish a READONLY connection
  817. # regardless of the server type. If this is a primary connection,
  818. # READONLY would not affect executing write commands.
  819. connection.send_command("READONLY")
  820. if str_if_bytes(connection.read_response()) != "OK":
  821. raise ConnectionError("READONLY command failed")
  822. if self.user_on_connect_func is not None:
  823. self.user_on_connect_func(connection)
  824. def get_redis_connection(self, node: "ClusterNode") -> Redis:
  825. if not node.redis_connection:
  826. with self._lock:
  827. if not node.redis_connection:
  828. self.nodes_manager.create_redis_connections([node])
  829. return node.redis_connection
  830. def get_node(self, host=None, port=None, node_name=None):
  831. return self.nodes_manager.get_node(host, port, node_name)
  832. def get_primaries(self):
  833. return self.nodes_manager.get_nodes_by_server_type(PRIMARY)
  834. def get_replicas(self):
  835. return self.nodes_manager.get_nodes_by_server_type(REPLICA)
  836. def get_random_node(self):
  837. return random.choice(list(self.nodes_manager.nodes_cache.values()))
  838. def get_random_primary_or_all_nodes(self, command_name):
  839. """
  840. Returns random primary or all nodes depends on READONLY mode.
  841. """
  842. if self.read_from_replicas and command_name in READ_COMMANDS:
  843. return self.get_random_node()
  844. return self.get_random_primary_node()
  845. def get_nodes(self):
  846. return list(self.nodes_manager.nodes_cache.values())
  847. def get_node_from_key(self, key, replica=False):
  848. """
  849. Get the node that holds the key's slot.
  850. If replica set to True but the slot doesn't have any replicas, None is
  851. returned.
  852. """
  853. slot = self.keyslot(key)
  854. slot_cache = self.nodes_manager.slots_cache.get(slot)
  855. if slot_cache is None or len(slot_cache) == 0:
  856. raise SlotNotCoveredError(f'Slot "{slot}" is not covered by the cluster.')
  857. if replica and len(self.nodes_manager.slots_cache[slot]) < 2:
  858. return None
  859. elif replica:
  860. node_idx = 1
  861. else:
  862. # primary
  863. node_idx = 0
  864. return slot_cache[node_idx]
  865. def get_default_node(self):
  866. """
  867. Get the cluster's default node
  868. """
  869. return self.nodes_manager.default_node
  870. def get_nodes_from_slot(self, command: str, *args):
  871. """
  872. Returns a list of nodes that hold the specified keys' slots.
  873. """
  874. # get the node that holds the key's slot
  875. slot = self.determine_slot(*args)
  876. node = self.nodes_manager.get_node_from_slot(
  877. slot,
  878. self.read_from_replicas and command in READ_COMMANDS,
  879. self.load_balancing_strategy if command in READ_COMMANDS else None,
  880. )
  881. return [node]
  882. def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
  883. """
  884. Splits the command with Multi-Shard policy, to the multiple commands
  885. """
  886. keys = self._get_command_keys(*args)
  887. commands = []
  888. for key in keys:
  889. commands.append(
  890. {
  891. "args": (args[0], key),
  892. "kwargs": kwargs,
  893. }
  894. )
  895. return commands
  896. def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
  897. """
  898. Returns a list of nodes for commands with a special policy.
  899. """
  900. if not self._aggregate_nodes:
  901. raise RedisClusterException(
  902. "Cannot execute FT.CURSOR commands without FT.AGGREGATE"
  903. )
  904. return self._aggregate_nodes
  905. def get_random_primary_node(self) -> "ClusterNode":
  906. """
  907. Returns a random primary node
  908. """
  909. return random.choice(self.get_primaries())
  910. def _evaluate_all_succeeded(self, res):
  911. """
  912. Evaluate the result of a command with ResponsePolicy.ALL_SUCCEEDED
  913. """
  914. first_successful_response = None
  915. if isinstance(res, dict):
  916. for key, value in res.items():
  917. if value:
  918. if first_successful_response is None:
  919. first_successful_response = {key: value}
  920. else:
  921. return {key: False}
  922. else:
  923. for response in res:
  924. if response:
  925. if first_successful_response is None:
  926. # Dynamically resolve type
  927. first_successful_response = type(response)(response)
  928. else:
  929. return type(response)(False)
  930. return first_successful_response
  931. def set_default_node(self, node):
  932. """
  933. Set the default node of the cluster.
  934. :param node: 'ClusterNode'
  935. :return True if the default node was set, else False
  936. """
  937. if node is None or self.get_node(node_name=node.name) is None:
  938. return False
  939. self.nodes_manager.default_node = node
  940. return True
  941. def set_retry(self, retry: Retry) -> None:
  942. self.retry = retry
  943. def monitor(self, target_node=None):
  944. """
  945. Returns a Monitor object for the specified target node.
  946. The default cluster node will be selected if no target node was
  947. specified.
  948. Monitor is useful for handling the MONITOR command to the redis server.
  949. next_command() method returns one command from monitor
  950. listen() method yields commands from monitor.
  951. """
  952. if target_node is None:
  953. target_node = self.get_default_node()
  954. if target_node.redis_connection is None:
  955. raise RedisClusterException(
  956. f"Cluster Node {target_node.name} has no redis_connection"
  957. )
  958. return target_node.redis_connection.monitor()
  959. def pubsub(self, node=None, host=None, port=None, **kwargs):
  960. """
  961. Allows passing a ClusterNode, or host&port, to get a pubsub instance
  962. connected to the specified node
  963. """
  964. return ClusterPubSub(self, node=node, host=host, port=port, **kwargs)
  965. def pipeline(self, transaction=None, shard_hint=None):
  966. """
  967. Cluster impl:
  968. Pipelines do not work in cluster mode the same way they
  969. do in normal mode. Create a clone of this object so
  970. that simulating pipelines will work correctly. Each
  971. command will be called directly when used and
  972. when calling execute() will only return the result stack.
  973. """
  974. if shard_hint:
  975. raise RedisClusterException("shard_hint is deprecated in cluster mode")
  976. return ClusterPipeline(
  977. nodes_manager=self.nodes_manager,
  978. commands_parser=self.commands_parser,
  979. startup_nodes=self.nodes_manager.startup_nodes,
  980. result_callbacks=self.result_callbacks,
  981. cluster_response_callbacks=self.cluster_response_callbacks,
  982. read_from_replicas=self.read_from_replicas,
  983. load_balancing_strategy=self.load_balancing_strategy,
  984. reinitialize_steps=self.reinitialize_steps,
  985. retry=self.retry,
  986. lock=self._lock,
  987. transaction=transaction,
  988. event_dispatcher=self._event_dispatcher,
  989. )
  990. def lock(
  991. self,
  992. name,
  993. timeout=None,
  994. sleep=0.1,
  995. blocking=True,
  996. blocking_timeout=None,
  997. lock_class=None,
  998. thread_local=True,
  999. raise_on_release_error: bool = True,
  1000. ):
  1001. """
  1002. Return a new Lock object using key ``name`` that mimics
  1003. the behavior of threading.Lock.
  1004. If specified, ``timeout`` indicates a maximum life for the lock.
  1005. By default, it will remain locked until release() is called.
  1006. ``sleep`` indicates the amount of time to sleep per loop iteration
  1007. when the lock is in blocking mode and another client is currently
  1008. holding the lock.
  1009. ``blocking`` indicates whether calling ``acquire`` should block until
  1010. the lock has been acquired or to fail immediately, causing ``acquire``
  1011. to return False and the lock not being acquired. Defaults to True.
  1012. Note this value can be overridden by passing a ``blocking``
  1013. argument to ``acquire``.
  1014. ``blocking_timeout`` indicates the maximum amount of time in seconds to
  1015. spend trying to acquire the lock. A value of ``None`` indicates
  1016. continue trying forever. ``blocking_timeout`` can be specified as a
  1017. float or integer, both representing the number of seconds to wait.
  1018. ``lock_class`` forces the specified lock implementation. Note that as
  1019. of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
  1020. a Lua-based lock). So, it's unlikely you'll need this parameter, unless
  1021. you have created your own custom lock class.
  1022. ``thread_local`` indicates whether the lock token is placed in
  1023. thread-local storage. By default, the token is placed in thread local
  1024. storage so that a thread only sees its token, not a token set by
  1025. another thread. Consider the following timeline:
  1026. time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
  1027. thread-1 sets the token to "abc"
  1028. time: 1, thread-2 blocks trying to acquire `my-lock` using the
  1029. Lock instance.
  1030. time: 5, thread-1 has not yet completed. redis expires the lock
  1031. key.
  1032. time: 5, thread-2 acquired `my-lock` now that it's available.
  1033. thread-2 sets the token to "xyz"
  1034. time: 6, thread-1 finishes its work and calls release(). if the
  1035. token is *not* stored in thread local storage, then
  1036. thread-1 would see the token value as "xyz" and would be
  1037. able to successfully release the thread-2's lock.
  1038. ``raise_on_release_error`` indicates whether to raise an exception when
  1039. the lock is no longer owned when exiting the context manager. By default,
  1040. this is True, meaning an exception will be raised. If False, the warning
  1041. will be logged and the exception will be suppressed.
  1042. In some use cases it's necessary to disable thread local storage. For
  1043. example, if you have code where one thread acquires a lock and passes
  1044. that lock instance to a worker thread to release later. If thread
  1045. local storage isn't disabled in this case, the worker thread won't see
  1046. the token set by the thread that acquired the lock. Our assumption
  1047. is that these cases aren't common and as such default to using
  1048. thread local storage."""
  1049. if lock_class is None:
  1050. lock_class = Lock
  1051. return lock_class(
  1052. self,
  1053. name,
  1054. timeout=timeout,
  1055. sleep=sleep,
  1056. blocking=blocking,
  1057. blocking_timeout=blocking_timeout,
  1058. thread_local=thread_local,
  1059. raise_on_release_error=raise_on_release_error,
  1060. )
  1061. def set_response_callback(self, command, callback):
  1062. """Set a custom Response Callback"""
  1063. self.cluster_response_callbacks[command] = callback
  1064. def _determine_nodes(
  1065. self, *args, request_policy: RequestPolicy, **kwargs
  1066. ) -> List["ClusterNode"]:
  1067. """
  1068. Determines a nodes the command should be executed on.
  1069. """
  1070. command = args[0].upper()
  1071. if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
  1072. command = f"{args[0]} {args[1]}".upper()
  1073. nodes_flag = kwargs.pop("nodes_flag", None)
  1074. if nodes_flag is not None:
  1075. # nodes flag passed by the user
  1076. command_flag = nodes_flag
  1077. else:
  1078. # get the nodes group for this command if it was predefined
  1079. command_flag = self.command_flags.get(command)
  1080. if command_flag in self._command_flags_mapping:
  1081. request_policy = self._command_flags_mapping[command_flag]
  1082. policy_callback = self._policies_callback_mapping[request_policy]
  1083. if request_policy == RequestPolicy.DEFAULT_KEYED:
  1084. nodes = policy_callback(command, *args)
  1085. elif request_policy == RequestPolicy.MULTI_SHARD:
  1086. nodes = policy_callback(*args, **kwargs)
  1087. elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
  1088. nodes = policy_callback(args[0])
  1089. else:
  1090. nodes = policy_callback()
  1091. if args[0].lower() == "ft.aggregate":
  1092. self._aggregate_nodes = nodes
  1093. return nodes
  1094. def _should_reinitialized(self):
  1095. # To reinitialize the cluster on every MOVED error,
  1096. # set reinitialize_steps to 1.
  1097. # To avoid reinitializing the cluster on moved errors, set
  1098. # reinitialize_steps to 0.
  1099. if self.reinitialize_steps == 0:
  1100. return False
  1101. else:
  1102. return self.reinitialize_counter % self.reinitialize_steps == 0
  1103. def keyslot(self, key):
  1104. """
  1105. Calculate keyslot for a given key.
  1106. See Keys distribution model in https://redis.io/topics/cluster-spec
  1107. """
  1108. k = self.encoder.encode(key)
  1109. return key_slot(k)
  1110. def _get_command_keys(self, *args):
  1111. """
  1112. Get the keys in the command. If the command has no keys in in, None is
  1113. returned.
  1114. NOTE: Due to a bug in redis<7.0, this function does not work properly
  1115. for EVAL or EVALSHA when the `numkeys` arg is 0.
  1116. - issue: https://github.com/redis/redis/issues/9493
  1117. - fix: https://github.com/redis/redis/pull/9733
  1118. So, don't use this function with EVAL or EVALSHA.
  1119. """
  1120. redis_conn = self.get_default_node().redis_connection
  1121. return self.commands_parser.get_keys(redis_conn, *args)
  1122. def determine_slot(self, *args) -> Optional[int]:
  1123. """
  1124. Figure out what slot to use based on args.
  1125. Raises a RedisClusterException if there's a missing key and we can't
  1126. determine what slots to map the command to; or, if the keys don't
  1127. all map to the same key slot.
  1128. """
  1129. command = args[0]
  1130. if self.command_flags.get(command) == SLOT_ID:
  1131. # The command contains the slot ID
  1132. return args[1]
  1133. # Get the keys in the command
  1134. # CLIENT TRACKING is a special case.
  1135. # It doesn't have any keys, it needs to be sent to the provided nodes
  1136. # By default it will be sent to all nodes.
  1137. if command.upper() == "CLIENT TRACKING":
  1138. return None
  1139. # EVAL and EVALSHA are common enough that it's wasteful to go to the
  1140. # redis server to parse the keys. Besides, there is a bug in redis<7.0
  1141. # where `self._get_command_keys()` fails anyway. So, we special case
  1142. # EVAL/EVALSHA.
  1143. if command.upper() in ("EVAL", "EVALSHA"):
  1144. # command syntax: EVAL "script body" num_keys ...
  1145. if len(args) <= 2:
  1146. raise RedisClusterException(f"Invalid args in command: {args}")
  1147. num_actual_keys = int(args[2])
  1148. eval_keys = args[3 : 3 + num_actual_keys]
  1149. # if there are 0 keys, that means the script can be run on any node
  1150. # so we can just return a random slot
  1151. if len(eval_keys) == 0:
  1152. return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
  1153. keys = eval_keys
  1154. else:
  1155. keys = self._get_command_keys(*args)
  1156. if keys is None or len(keys) == 0:
  1157. # FCALL can call a function with 0 keys, that means the function
  1158. # can be run on any node so we can just return a random slot
  1159. if command.upper() in ("FCALL", "FCALL_RO"):
  1160. return random.randrange(0, REDIS_CLUSTER_HASH_SLOTS)
  1161. raise RedisClusterException(
  1162. "No way to dispatch this command to Redis Cluster. "
  1163. "Missing key.\nYou can execute the command by specifying "
  1164. f"target nodes.\nCommand: {args}"
  1165. )
  1166. # single key command
  1167. if len(keys) == 1:
  1168. return self.keyslot(keys[0])
  1169. # multi-key command; we need to make sure all keys are mapped to
  1170. # the same slot
  1171. slots = {self.keyslot(key) for key in keys}
  1172. if len(slots) != 1:
  1173. raise RedisClusterException(
  1174. f"{command} - all keys must map to the same key slot"
  1175. )
  1176. return slots.pop()
  1177. def get_encoder(self):
  1178. """
  1179. Get the connections' encoder
  1180. """
  1181. return self.encoder
  1182. def get_connection_kwargs(self):
  1183. """
  1184. Get the connections' key-word arguments
  1185. """
  1186. return self.nodes_manager.connection_kwargs
  1187. def _is_nodes_flag(self, target_nodes):
  1188. return isinstance(target_nodes, str) and target_nodes in self.node_flags
  1189. def _parse_target_nodes(self, target_nodes):
  1190. if isinstance(target_nodes, list):
  1191. nodes = target_nodes
  1192. elif isinstance(target_nodes, ClusterNode):
  1193. # Supports passing a single ClusterNode as a variable
  1194. nodes = [target_nodes]
  1195. elif isinstance(target_nodes, dict):
  1196. # Supports dictionaries of the format {node_name: node}.
  1197. # It enables to execute commands with multi nodes as follows:
  1198. # rc.cluster_save_config(rc.get_primaries())
  1199. nodes = target_nodes.values()
  1200. else:
  1201. raise TypeError(
  1202. "target_nodes type can be one of the following: "
  1203. "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
  1204. "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
  1205. f"The passed type is {type(target_nodes)}"
  1206. )
  1207. return nodes
  1208. def execute_command(self, *args, **kwargs):
  1209. return self._internal_execute_command(*args, **kwargs)
  1210. def _internal_execute_command(self, *args, **kwargs):
  1211. """
  1212. Wrapper for ERRORS_ALLOW_RETRY error handling.
  1213. It will try the number of times specified by the retries property from
  1214. config option "self.retry" which defaults to 3 unless manually
  1215. configured.
  1216. If it reaches the number of times, the command will raise the exception
  1217. Key argument :target_nodes: can be passed with the following types:
  1218. nodes_flag: PRIMARIES, REPLICAS, ALL_NODES, RANDOM
  1219. ClusterNode
  1220. list<ClusterNode>
  1221. dict<Any, ClusterNode>
  1222. """
  1223. target_nodes_specified = False
  1224. is_default_node = False
  1225. target_nodes = None
  1226. passed_targets = kwargs.pop("target_nodes", None)
  1227. command_policies = self._policy_resolver.resolve(args[0].lower())
  1228. if passed_targets is not None and not self._is_nodes_flag(passed_targets):
  1229. target_nodes = self._parse_target_nodes(passed_targets)
  1230. target_nodes_specified = True
  1231. if not command_policies and not target_nodes_specified:
  1232. command = args[0].upper()
  1233. if len(args) >= 2 and f"{args[0]} {args[1]}".upper() in self.command_flags:
  1234. command = f"{args[0]} {args[1]}".upper()
  1235. # We only could resolve key properties if command is not
  1236. # in a list of pre-defined request policies
  1237. command_flag = self.command_flags.get(command)
  1238. if not command_flag:
  1239. # Fallback to default policy
  1240. if not self.get_default_node():
  1241. slot = None
  1242. else:
  1243. slot = self.determine_slot(*args)
  1244. if slot is None:
  1245. command_policies = CommandPolicies()
  1246. else:
  1247. command_policies = CommandPolicies(
  1248. request_policy=RequestPolicy.DEFAULT_KEYED,
  1249. response_policy=ResponsePolicy.DEFAULT_KEYED,
  1250. )
  1251. else:
  1252. if command_flag in self._command_flags_mapping:
  1253. command_policies = CommandPolicies(
  1254. request_policy=self._command_flags_mapping[command_flag]
  1255. )
  1256. else:
  1257. command_policies = CommandPolicies()
  1258. elif not command_policies and target_nodes_specified:
  1259. command_policies = CommandPolicies()
  1260. # If an error that allows retrying was thrown, the nodes and slots
  1261. # cache were reinitialized. We will retry executing the command with
  1262. # the updated cluster setup only when the target nodes can be
  1263. # determined again with the new cache tables. Therefore, when target
  1264. # nodes were passed to this function, we cannot retry the command
  1265. # execution since the nodes may not be valid anymore after the tables
  1266. # were reinitialized. So in case of passed target nodes,
  1267. # retry_attempts will be set to 0.
  1268. retry_attempts = 0 if target_nodes_specified else self.retry.get_retries()
  1269. # Add one for the first execution
  1270. execute_attempts = 1 + retry_attempts
  1271. failure_count = 0
  1272. # Start timing for observability
  1273. start_time = time.monotonic()
  1274. for _ in range(execute_attempts):
  1275. try:
  1276. res = {}
  1277. if not target_nodes_specified:
  1278. # Determine the nodes to execute the command on
  1279. target_nodes = self._determine_nodes(
  1280. *args,
  1281. request_policy=command_policies.request_policy,
  1282. nodes_flag=passed_targets,
  1283. )
  1284. if not target_nodes:
  1285. raise RedisClusterException(
  1286. f"No targets were found to execute {args} command on"
  1287. )
  1288. if (
  1289. len(target_nodes) == 1
  1290. and target_nodes[0] == self.get_default_node()
  1291. ):
  1292. is_default_node = True
  1293. for node in target_nodes:
  1294. res[node.name] = self._execute_command(node, *args, **kwargs)
  1295. if command_policies.response_policy == ResponsePolicy.ONE_SUCCEEDED:
  1296. break
  1297. # Return the processed result
  1298. return self._process_result(
  1299. args[0],
  1300. res,
  1301. response_policy=command_policies.response_policy,
  1302. **kwargs,
  1303. )
  1304. except Exception as e:
  1305. if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
  1306. if is_default_node:
  1307. # Replace the default cluster node
  1308. self.replace_default_node()
  1309. # The nodes and slots cache were reinitialized.
  1310. # Try again with the new cluster setup.
  1311. retry_attempts -= 1
  1312. failure_count += 1
  1313. if hasattr(e, "connection"):
  1314. self._record_command_metric(
  1315. command_name=args[0],
  1316. duration_seconds=time.monotonic() - start_time,
  1317. connection=e.connection,
  1318. error=e,
  1319. )
  1320. self._record_error_metric(
  1321. error=e,
  1322. connection=e.connection,
  1323. retry_attempts=failure_count,
  1324. )
  1325. continue
  1326. else:
  1327. # raise the exception
  1328. if hasattr(e, "connection"):
  1329. self._record_error_metric(
  1330. error=e,
  1331. connection=e.connection,
  1332. retry_attempts=failure_count,
  1333. is_internal=False,
  1334. )
  1335. raise e
  1336. def _execute_command(self, target_node, *args, **kwargs):
  1337. """
  1338. Send a command to a node in the cluster
  1339. """
  1340. command = args[0]
  1341. redis_node = None
  1342. connection = None
  1343. redirect_addr = None
  1344. asking = False
  1345. moved = False
  1346. ttl = int(self.RedisClusterRequestTTL)
  1347. # Start timing for observability
  1348. start_time = time.monotonic()
  1349. while ttl > 0:
  1350. ttl -= 1
  1351. try:
  1352. if asking:
  1353. target_node = self.get_node(node_name=redirect_addr)
  1354. elif moved:
  1355. # MOVED occurred and the slots cache was updated,
  1356. # refresh the target node
  1357. slot = self.determine_slot(*args)
  1358. target_node = self.nodes_manager.get_node_from_slot(
  1359. slot,
  1360. self.read_from_replicas and command in READ_COMMANDS,
  1361. self.load_balancing_strategy
  1362. if command in READ_COMMANDS
  1363. else None,
  1364. )
  1365. moved = False
  1366. redis_node = self.get_redis_connection(target_node)
  1367. connection = get_connection(redis_node)
  1368. if asking:
  1369. connection.send_command("ASKING")
  1370. redis_node.parse_response(connection, "ASKING", **kwargs)
  1371. asking = False
  1372. connection.send_command(*args, **kwargs)
  1373. response = redis_node.parse_response(connection, command, **kwargs)
  1374. # Remove keys entry, it needs only for cache.
  1375. kwargs.pop("keys", None)
  1376. if command in self.cluster_response_callbacks:
  1377. response = self.cluster_response_callbacks[command](
  1378. response, **kwargs
  1379. )
  1380. self._record_command_metric(
  1381. command_name=command,
  1382. duration_seconds=time.monotonic() - start_time,
  1383. connection=connection,
  1384. )
  1385. return response
  1386. except AuthenticationError as e:
  1387. e.connection = connection if connection is not None else target_node
  1388. self._record_command_metric(
  1389. command_name=command,
  1390. duration_seconds=time.monotonic() - start_time,
  1391. connection=e.connection,
  1392. error=e,
  1393. )
  1394. raise
  1395. except MaxConnectionsError as e:
  1396. # MaxConnectionsError indicates client-side resource exhaustion
  1397. # (too many connections in the pool), not a node failure.
  1398. # Don't treat this as a node failure - just re-raise the error
  1399. # without reinitializing the cluster.
  1400. # The connection in the error is used to report the metrics based on host and port info
  1401. # so we use the target node object which contains the host and port info
  1402. # because we did not get the connection yet
  1403. e.connection = target_node
  1404. self._record_command_metric(
  1405. command_name=command,
  1406. duration_seconds=time.monotonic() - start_time,
  1407. connection=e.connection,
  1408. error=e,
  1409. )
  1410. raise
  1411. except (ConnectionError, TimeoutError) as e:
  1412. if is_debug_log_enabled():
  1413. socket_address = self._extracts_socket_address(connection)
  1414. args_log_str = truncate_text(" ".join(map(safe_str, args)))
  1415. logger.debug(
  1416. f"{type(e).__name__} received for command {args_log_str}, on node {target_node.name}, "
  1417. f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
  1418. )
  1419. # this is used to report the metrics based on host and port info
  1420. e.connection = connection if connection else target_node
  1421. # ConnectionError can also be raised if we couldn't get a
  1422. # connection from the pool before timing out, so check that
  1423. # this is an actual connection before attempting to disconnect.
  1424. if connection is not None:
  1425. connection.disconnect()
  1426. # Instead of setting to None, properly handle the pool
  1427. # Get the pool safely - redis_connection could be set to None
  1428. # by another thread between the check and access
  1429. redis_conn = target_node.redis_connection
  1430. if redis_conn is not None:
  1431. pool = redis_conn.connection_pool
  1432. if pool is not None:
  1433. with pool._lock:
  1434. # take care for the active connections in the pool
  1435. pool.update_active_connections_for_reconnect()
  1436. # disconnect all free connections
  1437. pool.disconnect_free_connections()
  1438. # Move the failed node to the end of the cached nodes list
  1439. self.nodes_manager.move_node_to_end_of_cached_nodes(target_node.name)
  1440. # DON'T set redis_connection = None - keep the pool for reuse
  1441. self.nodes_manager.initialize()
  1442. self._record_command_metric(
  1443. command_name=command,
  1444. duration_seconds=time.monotonic() - start_time,
  1445. connection=e.connection,
  1446. error=e,
  1447. )
  1448. raise e
  1449. except MovedError as e:
  1450. if is_debug_log_enabled():
  1451. socket_address = self._extracts_socket_address(connection)
  1452. args_log_str = truncate_text(" ".join(map(safe_str, args)))
  1453. logger.debug(
  1454. f"MOVED error received for command {args_log_str}, on node {target_node.name}, "
  1455. f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
  1456. )
  1457. # First, we will try to patch the slots/nodes cache with the
  1458. # redirected node output and try again. If MovedError exceeds
  1459. # 'reinitialize_steps' number of times, we will force
  1460. # reinitializing the tables, and then try again.
  1461. # 'reinitialize_steps' counter will increase faster when
  1462. # the same client object is shared between multiple threads. To
  1463. # reduce the frequency you can set this variable in the
  1464. # RedisCluster constructor.
  1465. self.reinitialize_counter += 1
  1466. if self._should_reinitialized():
  1467. # during this call all connections are closed or marked for disconnect,
  1468. # so we don't need to disconnect the changed node's connections
  1469. self.nodes_manager.initialize(
  1470. additional_startup_nodes_info=[(e.host, e.port)]
  1471. )
  1472. # Reset the counter
  1473. self.reinitialize_counter = 0
  1474. else:
  1475. self.nodes_manager.move_slot(e)
  1476. moved = True
  1477. self._record_command_metric(
  1478. command_name=command,
  1479. duration_seconds=time.monotonic() - start_time,
  1480. connection=connection,
  1481. error=e,
  1482. )
  1483. self._record_error_metric(
  1484. error=e,
  1485. connection=connection,
  1486. )
  1487. except TryAgainError as e:
  1488. if is_debug_log_enabled():
  1489. socket_address = self._extracts_socket_address(connection)
  1490. args_log_str = truncate_text(" ".join(map(safe_str, args)))
  1491. logger.debug(
  1492. f"TRYAGAIN error received for command {args_log_str}, on node {target_node.name}, "
  1493. f"and connection: {connection} using local socket address: {socket_address}"
  1494. )
  1495. if ttl < self.RedisClusterRequestTTL / 2:
  1496. time.sleep(0.05)
  1497. self._record_command_metric(
  1498. command_name=command,
  1499. duration_seconds=time.monotonic() - start_time,
  1500. connection=connection,
  1501. error=e,
  1502. )
  1503. self._record_error_metric(
  1504. error=e,
  1505. connection=connection,
  1506. )
  1507. except AskError as e:
  1508. if is_debug_log_enabled():
  1509. socket_address = self._extracts_socket_address(connection)
  1510. args_log_str = truncate_text(" ".join(map(safe_str, args)))
  1511. logger.debug(
  1512. f"ASK error received for command {args_log_str}, on node {target_node.name}, "
  1513. f"and connection: {connection} using local socket address: {socket_address}, error: {e}"
  1514. )
  1515. redirect_addr = get_node_name(host=e.host, port=e.port)
  1516. asking = True
  1517. self._record_command_metric(
  1518. command_name=command,
  1519. duration_seconds=time.monotonic() - start_time,
  1520. connection=connection,
  1521. error=e,
  1522. )
  1523. self._record_error_metric(
  1524. error=e,
  1525. connection=connection,
  1526. )
  1527. except (ClusterDownError, SlotNotCoveredError) as e:
  1528. # ClusterDownError can occur during a failover and to get
  1529. # self-healed, we will try to reinitialize the cluster layout
  1530. # and retry executing the command
  1531. # SlotNotCoveredError can occur when the cluster is not fully
  1532. # initialized or can be temporary issue.
  1533. # We will try to reinitialize the cluster topology
  1534. # and retry executing the command
  1535. time.sleep(0.25)
  1536. self.nodes_manager.initialize()
  1537. # if we have a connection, use it, otherwise use the target node
  1538. # object which contains the host and port info
  1539. # this is used to report the metrics based on host and port info
  1540. e.connection = connection if connection else target_node
  1541. self._record_command_metric(
  1542. command_name=command,
  1543. duration_seconds=time.monotonic() - start_time,
  1544. connection=e.connection,
  1545. error=e,
  1546. )
  1547. raise
  1548. except ResponseError as e:
  1549. # this is used to report the metrics based on host and port info
  1550. # ResponseError typically happens after get_connection() succeeds,
  1551. # so connection should be available
  1552. e.connection = connection if connection else target_node
  1553. self._record_command_metric(
  1554. command_name=command,
  1555. duration_seconds=time.monotonic() - start_time,
  1556. connection=e.connection,
  1557. error=e,
  1558. )
  1559. raise
  1560. except Exception as e:
  1561. if connection:
  1562. connection.disconnect()
  1563. # if we have a connection, use it, otherwise use the target node
  1564. # object which contains the host and port info
  1565. # this is used to report the metrics based on host and port info
  1566. e.connection = connection if connection else target_node
  1567. self._record_command_metric(
  1568. command_name=command,
  1569. duration_seconds=time.monotonic() - start_time,
  1570. connection=e.connection,
  1571. error=e,
  1572. )
  1573. raise e
  1574. finally:
  1575. if connection is not None:
  1576. redis_node.connection_pool.release(connection)
  1577. e = ClusterError("TTL exhausted.")
  1578. # In this case we should have an active connection.
  1579. # If we are here, we have received many MOVED or ASK errors and finally exhausted the TTL.
  1580. # This means that we used an active connection to read from the socket.
  1581. # This is used to report metrics based on the host and port information.
  1582. e.connection = connection
  1583. self._record_command_metric(
  1584. command_name=command,
  1585. duration_seconds=time.monotonic() - start_time,
  1586. connection=connection,
  1587. error=e,
  1588. )
  1589. raise e
  1590. def _record_command_metric(
  1591. self,
  1592. command_name: str,
  1593. duration_seconds: float,
  1594. connection: Connection,
  1595. error=None,
  1596. ):
  1597. """
  1598. Records operation duration metric directly.
  1599. """
  1600. host = connection.host if connection else "unknown"
  1601. port = connection.port if connection else 0
  1602. db = str(connection.db) if connection and hasattr(connection, "db") else "0"
  1603. record_operation_duration(
  1604. command_name=command_name,
  1605. duration_seconds=duration_seconds,
  1606. server_address=host,
  1607. server_port=port,
  1608. db_namespace=db,
  1609. error=error,
  1610. )
  1611. def _record_error_metric(
  1612. self,
  1613. error: Exception,
  1614. connection: Connection,
  1615. is_internal: bool = True,
  1616. retry_attempts: Optional[int] = None,
  1617. ):
  1618. """
  1619. Records error count metric directly.
  1620. """
  1621. record_error_count(
  1622. server_address=connection.host,
  1623. server_port=connection.port,
  1624. network_peer_address=connection.host,
  1625. network_peer_port=connection.port,
  1626. error_type=error,
  1627. retry_attempts=retry_attempts if retry_attempts is not None else 0,
  1628. is_internal=is_internal,
  1629. )
  1630. def _extracts_socket_address(
  1631. self, connection: Optional[Connection]
  1632. ) -> Optional[int]:
  1633. if connection is None:
  1634. return None
  1635. try:
  1636. socket_address = (
  1637. connection._sock.getsockname() if connection._sock else None
  1638. )
  1639. socket_address = socket_address[1] if socket_address else None
  1640. except (AttributeError, OSError):
  1641. pass
  1642. return socket_address
  1643. def close(self) -> None:
  1644. try:
  1645. with self._lock:
  1646. if self.nodes_manager:
  1647. self.nodes_manager.close()
  1648. except AttributeError:
  1649. # RedisCluster's __init__ can fail before nodes_manager is set
  1650. pass
  1651. def _process_result(self, command, res, response_policy: ResponsePolicy, **kwargs):
  1652. """
  1653. Process the result of the executed command.
  1654. The function would return a dict or a single value.
  1655. :type command: str
  1656. :type res: dict
  1657. `res` should be in the following format:
  1658. Dict<node_name, command_result>
  1659. """
  1660. if command in self.result_callbacks:
  1661. res = self.result_callbacks[command](command, res, **kwargs)
  1662. elif len(res) == 1:
  1663. # When we execute the command on a single node, we can
  1664. # remove the dictionary and return a single response
  1665. res = list(res.values())[0]
  1666. return self._policies_callback_mapping[response_policy](res)
  1667. def load_external_module(self, funcname, func):
  1668. """
  1669. This function can be used to add externally defined redis modules,
  1670. and their namespaces to the redis client.
  1671. ``funcname`` - A string containing the name of the function to create
  1672. ``func`` - The function, being added to this class.
  1673. """
  1674. setattr(self, funcname, func)
  1675. def transaction(self, func, *watches, **kwargs):
  1676. """
  1677. Convenience method for executing the callable `func` as a transaction
  1678. while watching all keys specified in `watches`. The 'func' callable
  1679. should expect a single argument which is a Pipeline object.
  1680. """
  1681. shard_hint = kwargs.pop("shard_hint", None)
  1682. value_from_callable = kwargs.pop("value_from_callable", False)
  1683. watch_delay = kwargs.pop("watch_delay", None)
  1684. with self.pipeline(True, shard_hint) as pipe:
  1685. while True:
  1686. try:
  1687. if watches:
  1688. pipe.watch(*watches)
  1689. func_value = func(pipe)
  1690. exec_value = pipe.execute()
  1691. return func_value if value_from_callable else exec_value
  1692. except WatchError:
  1693. if watch_delay is not None and watch_delay > 0:
  1694. time.sleep(watch_delay)
  1695. continue
  1696. class ClusterNode:
  1697. def __init__(self, host, port, server_type=None, redis_connection=None):
  1698. if host == "localhost":
  1699. host = socket.gethostbyname(host)
  1700. self.host = host
  1701. self.port = port
  1702. self.name = get_node_name(host, port)
  1703. self.server_type = server_type
  1704. self.redis_connection = redis_connection
  1705. def __repr__(self):
  1706. return (
  1707. f"[host={self.host},"
  1708. f"port={self.port},"
  1709. f"name={self.name},"
  1710. f"server_type={self.server_type},"
  1711. f"redis_connection={self.redis_connection}]"
  1712. )
  1713. def __eq__(self, obj):
  1714. return isinstance(obj, ClusterNode) and obj.name == self.name
  1715. def __hash__(self):
  1716. return hash(self.name)
  1717. class LoadBalancingStrategy(Enum):
  1718. ROUND_ROBIN = "round_robin"
  1719. ROUND_ROBIN_REPLICAS = "round_robin_replicas"
  1720. RANDOM_REPLICA = "random_replica"
  1721. class LoadBalancer:
  1722. """
  1723. Round-Robin Load Balancing
  1724. """
  1725. def __init__(self, start_index: int = 0) -> None:
  1726. self.primary_to_idx: dict[str, int] = {}
  1727. self.start_index: int = start_index
  1728. self._lock: threading.Lock = threading.Lock()
  1729. def get_server_index(
  1730. self,
  1731. primary: str,
  1732. list_size: int,
  1733. load_balancing_strategy: LoadBalancingStrategy = LoadBalancingStrategy.ROUND_ROBIN,
  1734. ) -> int:
  1735. if load_balancing_strategy == LoadBalancingStrategy.RANDOM_REPLICA:
  1736. return self._get_random_replica_index(list_size)
  1737. else:
  1738. return self._get_round_robin_index(
  1739. primary,
  1740. list_size,
  1741. load_balancing_strategy == LoadBalancingStrategy.ROUND_ROBIN_REPLICAS,
  1742. )
  1743. def reset(self) -> None:
  1744. with self._lock:
  1745. self.primary_to_idx.clear()
  1746. def _get_random_replica_index(self, list_size: int) -> int:
  1747. return random.randint(1, list_size - 1)
  1748. def _get_round_robin_index(
  1749. self, primary: str, list_size: int, replicas_only: bool
  1750. ) -> int:
  1751. with self._lock:
  1752. server_index = self.primary_to_idx.setdefault(primary, self.start_index)
  1753. if replicas_only and server_index == 0:
  1754. # skip the primary node index
  1755. server_index = 1
  1756. # Update the index for the next round
  1757. self.primary_to_idx[primary] = (server_index + 1) % list_size
  1758. return server_index
  1759. class NodesManager:
  1760. def __init__(
  1761. self,
  1762. startup_nodes: list[ClusterNode],
  1763. from_url=False,
  1764. require_full_coverage=False,
  1765. lock: Optional[threading.RLock] = None,
  1766. dynamic_startup_nodes=True,
  1767. connection_pool_class=ConnectionPool,
  1768. address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
  1769. cache: Optional[CacheInterface] = None,
  1770. cache_config: Optional[CacheConfig] = None,
  1771. cache_factory: Optional[CacheFactoryInterface] = None,
  1772. event_dispatcher: Optional[EventDispatcher] = None,
  1773. maint_notifications_config: Optional[MaintNotificationsConfig] = None,
  1774. **kwargs,
  1775. ):
  1776. self.nodes_cache: dict[str, ClusterNode] = {}
  1777. self.slots_cache: dict[int, list[ClusterNode]] = {}
  1778. self.startup_nodes: dict[str, ClusterNode] = {n.name: n for n in startup_nodes}
  1779. self.default_node: Optional[ClusterNode] = None
  1780. self._epoch: int = 0
  1781. self.from_url = from_url
  1782. self._require_full_coverage = require_full_coverage
  1783. self._dynamic_startup_nodes = dynamic_startup_nodes
  1784. self.connection_pool_class = connection_pool_class
  1785. self.address_remap = address_remap
  1786. self._cache: Optional[CacheInterface] = None
  1787. if cache:
  1788. self._cache = cache
  1789. elif cache_factory is not None:
  1790. self._cache = cache_factory.get_cache()
  1791. elif cache_config is not None:
  1792. self._cache = CacheFactory(cache_config).get_cache()
  1793. self.connection_kwargs = kwargs
  1794. self.read_load_balancer = LoadBalancer()
  1795. # nodes_cache / slots_cache / startup_nodes / default_node are protected by _lock
  1796. if lock is None:
  1797. self._lock = threading.RLock()
  1798. else:
  1799. self._lock = lock
  1800. # initialize holds _initialization_lock to dedup multiple calls to reinitialize;
  1801. # note that if we hold both _lock and _initialization_lock, we _must_ acquire
  1802. # _initialization_lock first (ie: to have a consistent order) to avoid deadlock.
  1803. self._initialization_lock: threading.RLock = threading.RLock()
  1804. if event_dispatcher is None:
  1805. self._event_dispatcher = EventDispatcher()
  1806. else:
  1807. self._event_dispatcher = event_dispatcher
  1808. self._credential_provider = self.connection_kwargs.get(
  1809. "credential_provider", None
  1810. )
  1811. self.maint_notifications_config = maint_notifications_config
  1812. self.initialize()
  1813. def get_node(
  1814. self,
  1815. host: Optional[str] = None,
  1816. port: Optional[int] = None,
  1817. node_name: Optional[str] = None,
  1818. ) -> Optional[ClusterNode]:
  1819. """
  1820. Get the requested node from the cluster's nodes.
  1821. nodes.
  1822. :return: ClusterNode if the node exists, else None
  1823. """
  1824. if host and port:
  1825. # the user passed host and port
  1826. if host == "localhost":
  1827. host = socket.gethostbyname(host)
  1828. with self._lock:
  1829. return self.nodes_cache.get(get_node_name(host=host, port=port))
  1830. elif node_name:
  1831. with self._lock:
  1832. return self.nodes_cache.get(node_name)
  1833. else:
  1834. return None
  1835. def move_slot(self, e: Union[AskError, MovedError]):
  1836. """
  1837. Update the slot's node with the redirected one
  1838. """
  1839. with self._lock:
  1840. redirected_node = self.get_node(host=e.host, port=e.port)
  1841. if redirected_node is not None:
  1842. # The node already exists
  1843. if redirected_node.server_type is not PRIMARY:
  1844. # Update the node's server type
  1845. redirected_node.server_type = PRIMARY
  1846. else:
  1847. # This is a new node, we will add it to the nodes cache
  1848. redirected_node = ClusterNode(e.host, e.port, PRIMARY)
  1849. self.nodes_cache[redirected_node.name] = redirected_node
  1850. slot_nodes = self.slots_cache[e.slot_id]
  1851. if redirected_node not in slot_nodes:
  1852. # The new slot owner is a new server, or a server from a different
  1853. # shard. We need to remove all current nodes from the slot's list
  1854. # (including replications) and add just the new node.
  1855. self.slots_cache[e.slot_id] = [redirected_node]
  1856. elif redirected_node is not slot_nodes[0]:
  1857. # The MOVED error resulted from a failover, and the new slot owner
  1858. # had previously been a replica.
  1859. old_primary = slot_nodes[0]
  1860. # Update the old primary to be a replica and add it to the end of
  1861. # the slot's node list
  1862. old_primary.server_type = REPLICA
  1863. slot_nodes.append(old_primary)
  1864. # Remove the old replica, which is now a primary, from the slot's
  1865. # node list
  1866. slot_nodes.remove(redirected_node)
  1867. # Override the old primary with the new one
  1868. slot_nodes[0] = redirected_node
  1869. if self.default_node == old_primary:
  1870. # Update the default node with the new primary
  1871. self.default_node = redirected_node
  1872. # else: circular MOVED to current primary -> no-op
  1873. @deprecated_args(
  1874. args_to_warn=["server_type"],
  1875. reason=(
  1876. "In case you need select some load balancing strategy "
  1877. "that will use replicas, please set it through 'load_balancing_strategy'"
  1878. ),
  1879. version="5.3.0",
  1880. )
  1881. def get_node_from_slot(
  1882. self,
  1883. slot: int,
  1884. read_from_replicas: bool = False,
  1885. load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
  1886. server_type: Optional[Literal["primary", "replica"]] = None,
  1887. ) -> ClusterNode:
  1888. """
  1889. Gets a node that servers this hash slot
  1890. """
  1891. if read_from_replicas is True and load_balancing_strategy is None:
  1892. load_balancing_strategy = LoadBalancingStrategy.ROUND_ROBIN
  1893. with self._lock:
  1894. if self.slots_cache.get(slot) is None or len(self.slots_cache[slot]) == 0:
  1895. raise SlotNotCoveredError(
  1896. f'Slot "{slot}" not covered by the cluster. '
  1897. + f'"require_full_coverage={self._require_full_coverage}"'
  1898. )
  1899. if len(self.slots_cache[slot]) > 1 and load_balancing_strategy:
  1900. # get the server index using the strategy defined in load_balancing_strategy
  1901. primary_name = self.slots_cache[slot][0].name
  1902. node_idx = self.read_load_balancer.get_server_index(
  1903. primary_name, len(self.slots_cache[slot]), load_balancing_strategy
  1904. )
  1905. elif (
  1906. server_type is None
  1907. or server_type == PRIMARY
  1908. or len(self.slots_cache[slot]) == 1
  1909. ):
  1910. # return a primary
  1911. node_idx = 0
  1912. else:
  1913. # return a replica
  1914. # randomly choose one of the replicas
  1915. node_idx = random.randint(1, len(self.slots_cache[slot]) - 1)
  1916. return self.slots_cache[slot][node_idx]
  1917. def get_nodes_by_server_type(self, server_type: Literal["primary", "replica"]):
  1918. """
  1919. Get all nodes with the specified server type
  1920. :param server_type: 'primary' or 'replica'
  1921. :return: list of ClusterNode
  1922. """
  1923. with self._lock:
  1924. return [
  1925. node
  1926. for node in self.nodes_cache.values()
  1927. if node.server_type == server_type
  1928. ]
  1929. @deprecated_function(
  1930. reason="This method is not used anymore internally. The startup nodes are populated automatically.",
  1931. version="7.0.2",
  1932. )
  1933. def populate_startup_nodes(self, nodes):
  1934. """
  1935. Populate all startup nodes and filters out any duplicates
  1936. """
  1937. with self._lock:
  1938. for n in nodes:
  1939. self.startup_nodes[n.name] = n
  1940. def move_node_to_end_of_cached_nodes(self, node_name: str) -> None:
  1941. """
  1942. Move a failing node to the end of startup_nodes and nodes_cache so it's
  1943. tried last during reinitialization and when selecting the default node.
  1944. If the node is not in the respective list, nothing is done.
  1945. """
  1946. # Move in startup_nodes
  1947. if node_name in self.startup_nodes and len(self.startup_nodes) > 1:
  1948. node = self.startup_nodes.pop(node_name)
  1949. self.startup_nodes[node_name] = node # Re-insert at end
  1950. # Move in nodes_cache - this affects get_nodes_by_server_type ordering
  1951. # which is used to select the default_node during initialize()
  1952. if node_name in self.nodes_cache and len(self.nodes_cache) > 1:
  1953. node = self.nodes_cache.pop(node_name)
  1954. self.nodes_cache[node_name] = node # Re-insert at end
  1955. def check_slots_coverage(self, slots_cache):
  1956. # Validate if all slots are covered or if we should try next
  1957. # startup node
  1958. for i in range(0, REDIS_CLUSTER_HASH_SLOTS):
  1959. if i not in slots_cache:
  1960. return False
  1961. return True
  1962. def create_redis_connections(self, nodes):
  1963. """
  1964. This function will create a redis connection to all nodes in :nodes:
  1965. """
  1966. connection_pools = []
  1967. for node in nodes:
  1968. if node.redis_connection is None:
  1969. node.redis_connection = self.create_redis_node(
  1970. host=node.host,
  1971. port=node.port,
  1972. maint_notifications_config=self.maint_notifications_config,
  1973. **self.connection_kwargs,
  1974. )
  1975. connection_pools.append(node.redis_connection.connection_pool)
  1976. self._event_dispatcher.dispatch(
  1977. AfterPooledConnectionsInstantiationEvent(
  1978. connection_pools, ClientType.SYNC, self._credential_provider
  1979. )
  1980. )
  1981. def create_redis_node(
  1982. self,
  1983. host,
  1984. port,
  1985. **kwargs,
  1986. ):
  1987. # We are configuring the connection pool not to retry
  1988. # connections on lower level clients to avoid retrying
  1989. # connections to nodes that are not reachable
  1990. # and to avoid blocking the connection pool.
  1991. # The only error that will have some handling in the lower
  1992. # level clients is ConnectionError which will trigger disconnection
  1993. # of the socket.
  1994. # The retries will be handled on cluster client level
  1995. # where we will have proper handling of the cluster topology
  1996. node_retry_config = Retry(
  1997. backoff=NoBackoff(), retries=0, supported_errors=(ConnectionError,)
  1998. )
  1999. if self.from_url:
  2000. # Create a redis node with a custom connection pool
  2001. kwargs.update({"host": host})
  2002. kwargs.update({"port": port})
  2003. kwargs.update({"cache": self._cache})
  2004. kwargs.update({"retry": node_retry_config})
  2005. r = Redis(connection_pool=self.connection_pool_class(**kwargs))
  2006. else:
  2007. r = Redis(
  2008. host=host,
  2009. port=port,
  2010. cache=self._cache,
  2011. retry=node_retry_config,
  2012. **kwargs,
  2013. )
  2014. return r
  2015. def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
  2016. node_name = get_node_name(host, port)
  2017. # check if we already have this node in the tmp_nodes_cache
  2018. target_node = tmp_nodes_cache.get(node_name)
  2019. if target_node is None:
  2020. # before creating a new cluster node, check if the cluster node already
  2021. # exists in the current nodes cache and has a valid connection so we can
  2022. # reuse it
  2023. redis_connection: Optional[Redis] = None
  2024. with self._lock:
  2025. previous_node = self.nodes_cache.get(node_name)
  2026. if previous_node:
  2027. redis_connection = previous_node.redis_connection
  2028. # don't update the old ClusterNode, so we don't update its role
  2029. # outside of the lock
  2030. target_node = ClusterNode(host, port, role, redis_connection)
  2031. # add this node to the nodes cache
  2032. tmp_nodes_cache[target_node.name] = target_node
  2033. return target_node
  2034. def _get_epoch(self) -> int:
  2035. """
  2036. Get the current epoch value. This method exists primarily to allow
  2037. tests to mock the epoch fetch and control race condition timing.
  2038. """
  2039. with self._lock:
  2040. return self._epoch
  2041. def initialize(
  2042. self,
  2043. additional_startup_nodes_info: Optional[List[Tuple[str, int]]] = None,
  2044. disconnect_startup_nodes_pools: bool = True,
  2045. ):
  2046. """
  2047. Initializes the nodes cache, slots cache and redis connections.
  2048. :startup_nodes:
  2049. Responsible for discovering other nodes in the cluster
  2050. :disconnect_startup_nodes_pools:
  2051. Whether to disconnect the connection pool of the startup nodes
  2052. after the initialization is complete. This is useful when the
  2053. startup nodes are not part of the cluster and we want to avoid
  2054. keeping the connection open.
  2055. :additional_startup_nodes_info:
  2056. Additional nodes to add temporarily to the startup nodes.
  2057. The additional nodes will be used just in the process of extraction of the slots
  2058. and nodes information from the cluster.
  2059. This is useful when we want to add new nodes to the cluster
  2060. and initialize the client
  2061. with them.
  2062. The format of the list is a list of tuples, where each tuple contains
  2063. the host and port of the node.
  2064. """
  2065. self.reset()
  2066. tmp_nodes_cache = {}
  2067. tmp_slots = {}
  2068. disagreements = []
  2069. startup_nodes_reachable = False
  2070. fully_covered = False
  2071. kwargs = self.connection_kwargs
  2072. exception = None
  2073. epoch = self._get_epoch()
  2074. if additional_startup_nodes_info is None:
  2075. additional_startup_nodes_info = []
  2076. with self._initialization_lock:
  2077. with self._lock:
  2078. if epoch != self._epoch:
  2079. # another thread has already re-initialized the nodes; don't
  2080. # bother running again
  2081. return
  2082. with self._lock:
  2083. startup_nodes = tuple(self.startup_nodes.values())
  2084. additional_startup_nodes = [
  2085. ClusterNode(host, port) for host, port in additional_startup_nodes_info
  2086. ]
  2087. if is_debug_log_enabled():
  2088. logger.debug(
  2089. f"Topology refresh: using additional nodes: {[node.name for node in additional_startup_nodes]}; "
  2090. f"and startup nodes: {[node.name for node in startup_nodes]}"
  2091. )
  2092. for startup_node in (*startup_nodes, *additional_startup_nodes):
  2093. try:
  2094. if startup_node.redis_connection:
  2095. r = startup_node.redis_connection
  2096. else:
  2097. # Create a new Redis connection
  2098. if is_debug_log_enabled():
  2099. socket_timeout = kwargs.get("socket_timeout", "not set")
  2100. socket_connect_timeout = kwargs.get(
  2101. "socket_connect_timeout", "not set"
  2102. )
  2103. maint_enabled = (
  2104. self.maint_notifications_config.enabled
  2105. if self.maint_notifications_config
  2106. else False
  2107. )
  2108. logger.debug(
  2109. "Topology refresh: Creating new Redis connection to "
  2110. f"{startup_node.host}:{startup_node.port}; "
  2111. f"with socket_timeout: {socket_timeout}, and "
  2112. f"socket_connect_timeout: {socket_connect_timeout}, "
  2113. "and maint_notifications enabled: "
  2114. f"{maint_enabled}"
  2115. )
  2116. r = self.create_redis_node(
  2117. startup_node.host,
  2118. startup_node.port,
  2119. maint_notifications_config=self.maint_notifications_config,
  2120. **kwargs,
  2121. )
  2122. if startup_node in self.startup_nodes.values():
  2123. self.startup_nodes[startup_node.name].redis_connection = r
  2124. else:
  2125. startup_node.redis_connection = r
  2126. try:
  2127. # Make sure cluster mode is enabled on this node
  2128. cluster_slots = str_if_bytes(r.execute_command("CLUSTER SLOTS"))
  2129. if disconnect_startup_nodes_pools:
  2130. with r.connection_pool._lock:
  2131. # take care to clear connections before we move on
  2132. # mark all active connections for reconnect - they will be
  2133. # reconnected on next use, but will allow current in flight commands to complete first
  2134. r.connection_pool.update_active_connections_for_reconnect()
  2135. # Needed to clear READONLY state when it is no longer applicable
  2136. r.connection_pool.disconnect_free_connections()
  2137. except ResponseError:
  2138. raise RedisClusterException(
  2139. "Cluster mode is not enabled on this node"
  2140. )
  2141. startup_nodes_reachable = True
  2142. except Exception as e:
  2143. # Try the next startup node.
  2144. # The exception is saved and raised only if we have no more nodes.
  2145. exception = e
  2146. continue
  2147. # CLUSTER SLOTS command results in the following output:
  2148. # [[slot_section[from_slot,to_slot,master,replica1,...,replicaN]]]
  2149. # where each node contains the following list: [IP, port, node_id]
  2150. # Therefore, cluster_slots[0][2][0] will be the IP address of the
  2151. # primary node of the first slot section.
  2152. # If there's only one server in the cluster, its ``host`` is ''
  2153. # Fix it to the host in startup_nodes
  2154. if (
  2155. len(cluster_slots) == 1
  2156. and len(cluster_slots[0][2][0]) == 0
  2157. and len(self.startup_nodes) == 1
  2158. ):
  2159. cluster_slots[0][2][0] = startup_node.host
  2160. for slot in cluster_slots:
  2161. primary_node = slot[2]
  2162. host = str_if_bytes(primary_node[0])
  2163. if host == "":
  2164. host = startup_node.host
  2165. port = int(primary_node[1])
  2166. host, port = self.remap_host_port(host, port)
  2167. nodes_for_slot = []
  2168. target_node = self._get_or_create_cluster_node(
  2169. host, port, PRIMARY, tmp_nodes_cache
  2170. )
  2171. nodes_for_slot.append(target_node)
  2172. replica_nodes = slot[3:]
  2173. for replica_node in replica_nodes:
  2174. host = str_if_bytes(replica_node[0])
  2175. port = int(replica_node[1])
  2176. host, port = self.remap_host_port(host, port)
  2177. target_replica_node = self._get_or_create_cluster_node(
  2178. host, port, REPLICA, tmp_nodes_cache
  2179. )
  2180. nodes_for_slot.append(target_replica_node)
  2181. for i in range(int(slot[0]), int(slot[1]) + 1):
  2182. if i not in tmp_slots:
  2183. tmp_slots[i] = nodes_for_slot
  2184. else:
  2185. # Validate that 2 nodes want to use the same slot cache
  2186. # setup
  2187. tmp_slot = tmp_slots[i][0]
  2188. if tmp_slot.name != target_node.name:
  2189. disagreements.append(
  2190. f"{tmp_slot.name} vs {target_node.name} on slot: {i}"
  2191. )
  2192. if len(disagreements) > 5:
  2193. raise RedisClusterException(
  2194. f"startup_nodes could not agree on a valid "
  2195. f"slots cache: {', '.join(disagreements)}"
  2196. )
  2197. fully_covered = self.check_slots_coverage(tmp_slots)
  2198. if fully_covered:
  2199. # Don't need to continue to the next startup node if all
  2200. # slots are covered
  2201. break
  2202. if not startup_nodes_reachable:
  2203. raise RedisClusterException(
  2204. f"Redis Cluster cannot be connected. Please provide at least "
  2205. f"one reachable node: {str(exception)}"
  2206. ) from exception
  2207. # Create Redis connections to all nodes
  2208. self.create_redis_connections(list(tmp_nodes_cache.values()))
  2209. # Check if the slots are not fully covered
  2210. if not fully_covered and self._require_full_coverage:
  2211. # Despite the requirement that the slots be covered, there
  2212. # isn't a full coverage
  2213. raise RedisClusterException(
  2214. f"All slots are not covered after query all startup_nodes. "
  2215. f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
  2216. f"covered..."
  2217. )
  2218. # Set the tmp variables to the real variables
  2219. with self._lock:
  2220. self.nodes_cache = tmp_nodes_cache
  2221. self.slots_cache = tmp_slots
  2222. # Set the default node
  2223. self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]
  2224. if self._dynamic_startup_nodes:
  2225. # Populate the startup nodes with all discovered nodes
  2226. self.startup_nodes = tmp_nodes_cache
  2227. # Increment the epoch to signal that initialization has completed
  2228. self._epoch += 1
  2229. def close(self) -> None:
  2230. with self._lock:
  2231. self.default_node = None
  2232. nodes = tuple(self.nodes_cache.values())
  2233. for node in nodes:
  2234. if node.redis_connection:
  2235. node.redis_connection.close()
  2236. def reset(self):
  2237. try:
  2238. self.read_load_balancer.reset()
  2239. except TypeError:
  2240. # The read_load_balancer is None, do nothing
  2241. pass
  2242. def remap_host_port(self, host: str, port: int) -> Tuple[str, int]:
  2243. """
  2244. Remap the host and port returned from the cluster to a different
  2245. internal value. Useful if the client is not connecting directly
  2246. to the cluster.
  2247. """
  2248. if self.address_remap:
  2249. return self.address_remap((host, port))
  2250. return host, port
  2251. def find_connection_owner(self, connection: Connection) -> Optional[ClusterNode]:
  2252. node_name = get_node_name(connection.host, connection.port)
  2253. with self._lock:
  2254. for node in tuple(self.nodes_cache.values()):
  2255. if node.redis_connection:
  2256. conn_args = node.redis_connection.connection_pool.connection_kwargs
  2257. if node_name == get_node_name(
  2258. conn_args.get("host"), conn_args.get("port")
  2259. ):
  2260. return node
  2261. return None
  2262. class ClusterPubSub(PubSub):
  2263. """
  2264. Wrapper for PubSub class.
  2265. IMPORTANT: before using ClusterPubSub, read about the known limitations
  2266. with pubsub in Cluster mode and learn how to workaround them:
  2267. https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html
  2268. """
  2269. def __init__(
  2270. self,
  2271. redis_cluster,
  2272. node=None,
  2273. host=None,
  2274. port=None,
  2275. push_handler_func=None,
  2276. event_dispatcher: Optional["EventDispatcher"] = None,
  2277. **kwargs,
  2278. ):
  2279. """
  2280. When a pubsub instance is created without specifying a node, a single
  2281. node will be transparently chosen for the pubsub connection on the
  2282. first command execution. The node will be determined by:
  2283. 1. Hashing the channel name in the request to find its keyslot
  2284. 2. Selecting a node that handles the keyslot: If read_from_replicas is
  2285. set to true or load_balancing_strategy is set, a replica can be selected.
  2286. :type redis_cluster: RedisCluster
  2287. :type node: ClusterNode
  2288. :type host: str
  2289. :type port: int
  2290. """
  2291. self.node = None
  2292. self.set_pubsub_node(redis_cluster, node, host, port)
  2293. connection_pool = (
  2294. None
  2295. if self.node is None
  2296. else redis_cluster.get_redis_connection(self.node).connection_pool
  2297. )
  2298. self.cluster = redis_cluster
  2299. self.node_pubsub_mapping = {}
  2300. self._pubsubs_generator = self._pubsubs_generator()
  2301. if event_dispatcher is None:
  2302. self._event_dispatcher = EventDispatcher()
  2303. else:
  2304. self._event_dispatcher = event_dispatcher
  2305. super().__init__(
  2306. connection_pool=connection_pool,
  2307. encoder=redis_cluster.encoder,
  2308. push_handler_func=push_handler_func,
  2309. event_dispatcher=self._event_dispatcher,
  2310. **kwargs,
  2311. )
  2312. def set_pubsub_node(self, cluster, node=None, host=None, port=None):
  2313. """
  2314. The pubsub node will be set according to the passed node, host and port
  2315. When none of the node, host, or port are specified - the node is set
  2316. to None and will be determined by the keyslot of the channel in the
  2317. first command to be executed.
  2318. RedisClusterException will be thrown if the passed node does not exist
  2319. in the cluster.
  2320. If host is passed without port, or vice versa, a DataError will be
  2321. thrown.
  2322. :type cluster: RedisCluster
  2323. :type node: ClusterNode
  2324. :type host: str
  2325. :type port: int
  2326. """
  2327. if node is not None:
  2328. # node is passed by the user
  2329. self._raise_on_invalid_node(cluster, node, node.host, node.port)
  2330. pubsub_node = node
  2331. elif host is not None and port is not None:
  2332. # host and port passed by the user
  2333. node = cluster.get_node(host=host, port=port)
  2334. self._raise_on_invalid_node(cluster, node, host, port)
  2335. pubsub_node = node
  2336. elif any([host, port]) is True:
  2337. # only 'host' or 'port' passed
  2338. raise DataError("Passing a host requires passing a port, and vice versa")
  2339. else:
  2340. # nothing passed by the user. set node to None
  2341. pubsub_node = None
  2342. self.node = pubsub_node
  2343. def get_pubsub_node(self):
  2344. """
  2345. Get the node that is being used as the pubsub connection
  2346. """
  2347. return self.node
  2348. def _raise_on_invalid_node(self, redis_cluster, node, host, port):
  2349. """
  2350. Raise a RedisClusterException if the node is None or doesn't exist in
  2351. the cluster.
  2352. """
  2353. if node is None or redis_cluster.get_node(node_name=node.name) is None:
  2354. raise RedisClusterException(
  2355. f"Node {host}:{port} doesn't exist in the cluster"
  2356. )
  2357. def execute_command(self, *args):
  2358. """
  2359. Execute a subscribe/unsubscribe command.
  2360. Taken code from redis-py and tweak to make it work within a cluster.
  2361. """
  2362. # NOTE: don't parse the response in this function -- it could pull a
  2363. # legitimate message off the stack if the connection is already
  2364. # subscribed to one or more channels
  2365. if self.connection is None:
  2366. if self.connection_pool is None:
  2367. if len(args) > 1:
  2368. # Hash the first channel and get one of the nodes holding
  2369. # this slot
  2370. channel = args[1]
  2371. slot = self.cluster.keyslot(channel)
  2372. node = self.cluster.nodes_manager.get_node_from_slot(
  2373. slot,
  2374. self.cluster.read_from_replicas,
  2375. self.cluster.load_balancing_strategy,
  2376. )
  2377. else:
  2378. # Get a random node
  2379. node = self.cluster.get_random_node()
  2380. self.node = node
  2381. redis_connection = self.cluster.get_redis_connection(node)
  2382. self.connection_pool = redis_connection.connection_pool
  2383. self.connection = self.connection_pool.get_connection()
  2384. # register a callback that re-subscribes to any channels we
  2385. # were listening to when we were disconnected
  2386. self.connection.register_connect_callback(self.on_connect)
  2387. if self.push_handler_func is not None:
  2388. self.connection._parser.set_pubsub_push_handler(self.push_handler_func)
  2389. self._event_dispatcher.dispatch(
  2390. AfterPubSubConnectionInstantiationEvent(
  2391. self.connection, self.connection_pool, ClientType.SYNC, self._lock
  2392. )
  2393. )
  2394. connection = self.connection
  2395. self._execute(connection, connection.send_command, *args)
  2396. def _get_node_pubsub(self, node):
  2397. try:
  2398. return self.node_pubsub_mapping[node.name]
  2399. except KeyError:
  2400. pubsub = node.redis_connection.pubsub(
  2401. push_handler_func=self.push_handler_func
  2402. )
  2403. self.node_pubsub_mapping[node.name] = pubsub
  2404. return pubsub
  2405. def _sharded_message_generator(self):
  2406. for _ in range(len(self.node_pubsub_mapping)):
  2407. pubsub = next(self._pubsubs_generator)
  2408. message = pubsub.get_message()
  2409. if message is not None:
  2410. return message
  2411. return None
  2412. def _pubsubs_generator(self):
  2413. while True:
  2414. current_nodes = list(self.node_pubsub_mapping.values())
  2415. yield from current_nodes
  2416. def get_sharded_message(
  2417. self, ignore_subscribe_messages=False, timeout=0.0, target_node=None
  2418. ):
  2419. if target_node:
  2420. message = self.node_pubsub_mapping[target_node.name].get_message(
  2421. ignore_subscribe_messages=ignore_subscribe_messages, timeout=timeout
  2422. )
  2423. else:
  2424. message = self._sharded_message_generator()
  2425. if message is None:
  2426. return None
  2427. elif str_if_bytes(message["type"]) == "sunsubscribe":
  2428. if message["channel"] in self.pending_unsubscribe_shard_channels:
  2429. self.pending_unsubscribe_shard_channels.remove(message["channel"])
  2430. self.shard_channels.pop(message["channel"], None)
  2431. node = self.cluster.get_node_from_key(message["channel"])
  2432. if self.node_pubsub_mapping[node.name].subscribed is False:
  2433. self.node_pubsub_mapping.pop(node.name)
  2434. if not self.channels and not self.patterns and not self.shard_channels:
  2435. # There are no subscriptions anymore, set subscribed_event flag
  2436. # to false
  2437. self.subscribed_event.clear()
  2438. if self.ignore_subscribe_messages or ignore_subscribe_messages:
  2439. return None
  2440. return message
  2441. def ssubscribe(self, *args, **kwargs):
  2442. if args:
  2443. args = list_or_args(args[0], args[1:])
  2444. s_channels = dict.fromkeys(args)
  2445. s_channels.update(kwargs)
  2446. for s_channel, handler in s_channels.items():
  2447. node = self.cluster.get_node_from_key(s_channel)
  2448. pubsub = self._get_node_pubsub(node)
  2449. if handler:
  2450. pubsub.ssubscribe(**{s_channel: handler})
  2451. else:
  2452. pubsub.ssubscribe(s_channel)
  2453. self.shard_channels.update(pubsub.shard_channels)
  2454. self.pending_unsubscribe_shard_channels.difference_update(
  2455. self._normalize_keys({s_channel: None})
  2456. )
  2457. if pubsub.subscribed and not self.subscribed:
  2458. self.subscribed_event.set()
  2459. self.health_check_response_counter = 0
  2460. def sunsubscribe(self, *args):
  2461. if args:
  2462. args = list_or_args(args[0], args[1:])
  2463. else:
  2464. args = self.shard_channels
  2465. for s_channel in args:
  2466. node = self.cluster.get_node_from_key(s_channel)
  2467. p = self._get_node_pubsub(node)
  2468. p.sunsubscribe(s_channel)
  2469. self.pending_unsubscribe_shard_channels.update(
  2470. p.pending_unsubscribe_shard_channels
  2471. )
  2472. def get_redis_connection(self):
  2473. """
  2474. Get the Redis connection of the pubsub connected node.
  2475. """
  2476. if self.node is not None:
  2477. return self.node.redis_connection
  2478. def disconnect(self):
  2479. """
  2480. Disconnect the pubsub connection.
  2481. """
  2482. if self.connection:
  2483. self.connection.disconnect()
  2484. for pubsub in self.node_pubsub_mapping.values():
  2485. pubsub.connection.disconnect()
  2486. class ClusterPipeline(RedisCluster):
  2487. """
  2488. Support for Redis pipeline
  2489. in cluster mode
  2490. """
  2491. ERRORS_ALLOW_RETRY = (
  2492. ConnectionError,
  2493. TimeoutError,
  2494. MovedError,
  2495. AskError,
  2496. TryAgainError,
  2497. )
  2498. NO_SLOTS_COMMANDS = {"UNWATCH"}
  2499. IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
  2500. UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
  2501. @deprecated_args(
  2502. args_to_warn=[
  2503. "cluster_error_retry_attempts",
  2504. ],
  2505. reason="Please configure the 'retry' object instead",
  2506. version="6.0.0",
  2507. )
  2508. def __init__(
  2509. self,
  2510. nodes_manager: "NodesManager",
  2511. commands_parser: "CommandsParser",
  2512. result_callbacks: Optional[Dict[str, Callable]] = None,
  2513. cluster_response_callbacks: Optional[Dict[str, Callable]] = None,
  2514. startup_nodes: Optional[List["ClusterNode"]] = None,
  2515. read_from_replicas: bool = False,
  2516. load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
  2517. cluster_error_retry_attempts: int = 3,
  2518. reinitialize_steps: int = 5,
  2519. retry: Optional[Retry] = None,
  2520. lock=None,
  2521. transaction=False,
  2522. policy_resolver: PolicyResolver = StaticPolicyResolver(),
  2523. event_dispatcher: Optional["EventDispatcher"] = None,
  2524. **kwargs,
  2525. ):
  2526. """ """
  2527. self.command_stack = []
  2528. self.nodes_manager = nodes_manager
  2529. self.commands_parser = commands_parser
  2530. self.refresh_table_asap = False
  2531. self.result_callbacks = (
  2532. result_callbacks or self.__class__.RESULT_CALLBACKS.copy()
  2533. )
  2534. self.startup_nodes = startup_nodes if startup_nodes else []
  2535. self.read_from_replicas = read_from_replicas
  2536. self.load_balancing_strategy = load_balancing_strategy
  2537. self.command_flags = self.__class__.COMMAND_FLAGS.copy()
  2538. self.cluster_response_callbacks = cluster_response_callbacks
  2539. self.reinitialize_counter = 0
  2540. self.reinitialize_steps = reinitialize_steps
  2541. if retry is not None:
  2542. self.retry = retry
  2543. else:
  2544. self.retry = Retry(
  2545. backoff=ExponentialWithJitterBackoff(base=1, cap=10),
  2546. retries=cluster_error_retry_attempts,
  2547. )
  2548. self.encoder = Encoder(
  2549. kwargs.get("encoding", "utf-8"),
  2550. kwargs.get("encoding_errors", "strict"),
  2551. kwargs.get("decode_responses", False),
  2552. )
  2553. if lock is None:
  2554. lock = threading.RLock()
  2555. self._lock = lock
  2556. self.parent_execute_command = super().execute_command
  2557. self._execution_strategy: ExecutionStrategy = (
  2558. PipelineStrategy(self) if not transaction else TransactionStrategy(self)
  2559. )
  2560. # For backward compatibility, mapping from existing policies to new one
  2561. self._command_flags_mapping: dict[str, Union[RequestPolicy, ResponsePolicy]] = {
  2562. self.__class__.RANDOM: RequestPolicy.DEFAULT_KEYLESS,
  2563. self.__class__.PRIMARIES: RequestPolicy.ALL_SHARDS,
  2564. self.__class__.ALL_NODES: RequestPolicy.ALL_NODES,
  2565. self.__class__.REPLICAS: RequestPolicy.ALL_REPLICAS,
  2566. self.__class__.DEFAULT_NODE: RequestPolicy.DEFAULT_NODE,
  2567. SLOT_ID: RequestPolicy.DEFAULT_KEYED,
  2568. }
  2569. self._policies_callback_mapping: dict[
  2570. Union[RequestPolicy, ResponsePolicy], Callable
  2571. ] = {
  2572. RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
  2573. self.get_random_primary_or_all_nodes(command_name)
  2574. ],
  2575. RequestPolicy.DEFAULT_KEYED: lambda command,
  2576. *args: self.get_nodes_from_slot(command, *args),
  2577. RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
  2578. RequestPolicy.ALL_SHARDS: self.get_primaries,
  2579. RequestPolicy.ALL_NODES: self.get_nodes,
  2580. RequestPolicy.ALL_REPLICAS: self.get_replicas,
  2581. RequestPolicy.MULTI_SHARD: lambda *args,
  2582. **kwargs: self._split_multi_shard_command(*args, **kwargs),
  2583. RequestPolicy.SPECIAL: self.get_special_nodes,
  2584. ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
  2585. ResponsePolicy.DEFAULT_KEYED: lambda res: res,
  2586. }
  2587. self._policy_resolver = policy_resolver
  2588. if event_dispatcher is None:
  2589. self._event_dispatcher = EventDispatcher()
  2590. else:
  2591. self._event_dispatcher = event_dispatcher
  2592. def __repr__(self):
  2593. """ """
  2594. return f"{type(self).__name__}"
  2595. def __enter__(self):
  2596. """ """
  2597. return self
  2598. def __exit__(self, exc_type, exc_value, traceback):
  2599. """ """
  2600. self.reset()
  2601. def __del__(self):
  2602. try:
  2603. self.reset()
  2604. except Exception:
  2605. pass
  2606. def __len__(self):
  2607. """ """
  2608. return len(self._execution_strategy.command_queue)
  2609. def __bool__(self):
  2610. "Pipeline instances should always evaluate to True on Python 3+"
  2611. return True
  2612. def execute_command(self, *args, **kwargs):
  2613. """
  2614. Wrapper function for pipeline_execute_command
  2615. """
  2616. return self._execution_strategy.execute_command(*args, **kwargs)
  2617. def pipeline_execute_command(self, *args, **options):
  2618. """
  2619. Stage a command to be executed when execute() is next called
  2620. Returns the current Pipeline object back so commands can be
  2621. chained together, such as:
  2622. pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')
  2623. At some other point, you can then run: pipe.execute(),
  2624. which will execute all commands queued in the pipe.
  2625. """
  2626. return self._execution_strategy.execute_command(*args, **options)
  2627. def annotate_exception(self, exception, number, command):
  2628. """
  2629. Provides extra context to the exception prior to it being handled
  2630. """
  2631. self._execution_strategy.annotate_exception(exception, number, command)
  2632. def execute(self, raise_on_error: bool = True) -> List[Any]:
  2633. """
  2634. Execute all the commands in the current pipeline
  2635. """
  2636. try:
  2637. return self._execution_strategy.execute(raise_on_error)
  2638. finally:
  2639. self.reset()
  2640. def reset(self):
  2641. """
  2642. Reset back to empty pipeline.
  2643. """
  2644. self._execution_strategy.reset()
  2645. def send_cluster_commands(
  2646. self, stack, raise_on_error=True, allow_redirections=True
  2647. ):
  2648. return self._execution_strategy.send_cluster_commands(
  2649. stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections
  2650. )
  2651. def exists(self, *keys):
  2652. return self._execution_strategy.exists(*keys)
  2653. def eval(self):
  2654. """ """
  2655. return self._execution_strategy.eval()
  2656. def multi(self):
  2657. """
  2658. Start a transactional block of the pipeline after WATCH commands
  2659. are issued. End the transactional block with `execute`.
  2660. """
  2661. self._execution_strategy.multi()
  2662. def load_scripts(self):
  2663. """ """
  2664. self._execution_strategy.load_scripts()
  2665. def discard(self):
  2666. """ """
  2667. self._execution_strategy.discard()
  2668. def watch(self, *names):
  2669. """Watches the values at keys ``names``"""
  2670. self._execution_strategy.watch(*names)
  2671. def unwatch(self):
  2672. """Unwatches all previously specified keys"""
  2673. self._execution_strategy.unwatch()
  2674. def script_load_for_pipeline(self, *args, **kwargs):
  2675. self._execution_strategy.script_load_for_pipeline(*args, **kwargs)
  2676. def delete(self, *names):
  2677. self._execution_strategy.delete(*names)
  2678. def unlink(self, *names):
  2679. self._execution_strategy.unlink(*names)
  2680. def block_pipeline_command(name: str) -> Callable[..., Any]:
  2681. """
  2682. Prints error because some pipelined commands should
  2683. be blocked when running in cluster-mode
  2684. """
  2685. def inner(*args, **kwargs):
  2686. raise RedisClusterException(
  2687. f"ERROR: Calling pipelined function {name} is blocked "
  2688. f"when running redis in cluster mode..."
  2689. )
  2690. return inner
  2691. # Blocked pipeline commands
  2692. PIPELINE_BLOCKED_COMMANDS = (
  2693. "BGREWRITEAOF",
  2694. "BGSAVE",
  2695. "BITOP",
  2696. "BRPOPLPUSH",
  2697. "CLIENT GETNAME",
  2698. "CLIENT KILL",
  2699. "CLIENT LIST",
  2700. "CLIENT SETNAME",
  2701. "CLIENT",
  2702. "CONFIG GET",
  2703. "CONFIG RESETSTAT",
  2704. "CONFIG REWRITE",
  2705. "CONFIG SET",
  2706. "CONFIG",
  2707. "DBSIZE",
  2708. "ECHO",
  2709. "EVALSHA",
  2710. "FLUSHALL",
  2711. "FLUSHDB",
  2712. "INFO",
  2713. "KEYS",
  2714. "LASTSAVE",
  2715. "MGET",
  2716. "MGET NONATOMIC",
  2717. "MOVE",
  2718. "MSET",
  2719. "MSETEX",
  2720. "MSET NONATOMIC",
  2721. "MSETNX",
  2722. "PFCOUNT",
  2723. "PFMERGE",
  2724. "PING",
  2725. "PUBLISH",
  2726. "RANDOMKEY",
  2727. "READONLY",
  2728. "READWRITE",
  2729. "RENAME",
  2730. "RENAMENX",
  2731. "RPOPLPUSH",
  2732. "SAVE",
  2733. "SCAN",
  2734. "SCRIPT EXISTS",
  2735. "SCRIPT FLUSH",
  2736. "SCRIPT KILL",
  2737. "SCRIPT LOAD",
  2738. "SCRIPT",
  2739. "SDIFF",
  2740. "SDIFFSTORE",
  2741. "SENTINEL GET MASTER ADDR BY NAME",
  2742. "SENTINEL MASTER",
  2743. "SENTINEL MASTERS",
  2744. "SENTINEL MONITOR",
  2745. "SENTINEL REMOVE",
  2746. "SENTINEL SENTINELS",
  2747. "SENTINEL SET",
  2748. "SENTINEL SLAVES",
  2749. "SENTINEL",
  2750. "SHUTDOWN",
  2751. "SINTER",
  2752. "SINTERSTORE",
  2753. "SLAVEOF",
  2754. "SLOWLOG GET",
  2755. "SLOWLOG LEN",
  2756. "SLOWLOG RESET",
  2757. "SLOWLOG",
  2758. "SMOVE",
  2759. "SORT",
  2760. "SUNION",
  2761. "SUNIONSTORE",
  2762. "TIME",
  2763. )
  2764. for command in PIPELINE_BLOCKED_COMMANDS:
  2765. command = command.replace(" ", "_").lower()
  2766. setattr(ClusterPipeline, command, block_pipeline_command(command))
  2767. class PipelineCommand:
  2768. """ """
  2769. def __init__(self, args, options=None, position=None):
  2770. self.args = args
  2771. if options is None:
  2772. options = {}
  2773. self.options = options
  2774. self.position = position
  2775. self.result = None
  2776. self.node = None
  2777. self.asking = False
  2778. self.command_policies: Optional[CommandPolicies] = None
  2779. class NodeCommands:
  2780. """ """
  2781. def __init__(
  2782. self, parse_response, connection_pool: ConnectionPool, connection: Connection
  2783. ):
  2784. """ """
  2785. self.parse_response = parse_response
  2786. self.connection_pool = connection_pool
  2787. self.connection = connection
  2788. self.commands = []
  2789. def append(self, c):
  2790. """ """
  2791. self.commands.append(c)
  2792. def write(self):
  2793. """
  2794. Code borrowed from Redis so it can be fixed
  2795. """
  2796. connection = self.connection
  2797. commands = self.commands
  2798. # We are going to clobber the commands with the write, so go ahead
  2799. # and ensure that nothing is sitting there from a previous run.
  2800. for c in commands:
  2801. c.result = None
  2802. # build up all commands into a single request to increase network perf
  2803. # send all the commands and catch connection and timeout errors.
  2804. try:
  2805. connection.send_packed_command(
  2806. connection.pack_commands([c.args for c in commands])
  2807. )
  2808. except (ConnectionError, TimeoutError) as e:
  2809. for c in commands:
  2810. c.result = e
  2811. def read(self):
  2812. """ """
  2813. connection = self.connection
  2814. for c in self.commands:
  2815. # if there is a result on this command,
  2816. # it means we ran into an exception
  2817. # like a connection error. Trying to parse
  2818. # a response on a connection that
  2819. # is no longer open will result in a
  2820. # connection error raised by redis-py.
  2821. # but redis-py doesn't check in parse_response
  2822. # that the sock object is
  2823. # still set and if you try to
  2824. # read from a closed connection, it will
  2825. # result in an AttributeError because
  2826. # it will do a readline() call on None.
  2827. # This can have all kinds of nasty side-effects.
  2828. # Treating this case as a connection error
  2829. # is fine because it will dump
  2830. # the connection object back into the
  2831. # pool and on the next write, it will
  2832. # explicitly open the connection and all will be well.
  2833. if c.result is None:
  2834. try:
  2835. c.result = self.parse_response(connection, c.args[0], **c.options)
  2836. except (ConnectionError, TimeoutError) as e:
  2837. for c in self.commands:
  2838. c.result = e
  2839. return
  2840. except RedisError:
  2841. c.result = sys.exc_info()[1]
  2842. class ExecutionStrategy(ABC):
  2843. @property
  2844. @abstractmethod
  2845. def command_queue(self):
  2846. pass
  2847. @abstractmethod
  2848. def execute_command(self, *args, **kwargs):
  2849. """
  2850. Execution flow for current execution strategy.
  2851. See: ClusterPipeline.execute_command()
  2852. """
  2853. pass
  2854. @abstractmethod
  2855. def annotate_exception(self, exception, number, command):
  2856. """
  2857. Annotate exception according to current execution strategy.
  2858. See: ClusterPipeline.annotate_exception()
  2859. """
  2860. pass
  2861. @abstractmethod
  2862. def pipeline_execute_command(self, *args, **options):
  2863. """
  2864. Pipeline execution flow for current execution strategy.
  2865. See: ClusterPipeline.pipeline_execute_command()
  2866. """
  2867. pass
  2868. @abstractmethod
  2869. def execute(self, raise_on_error: bool = True) -> List[Any]:
  2870. """
  2871. Executes current execution strategy.
  2872. See: ClusterPipeline.execute()
  2873. """
  2874. pass
  2875. @abstractmethod
  2876. def send_cluster_commands(
  2877. self, stack, raise_on_error=True, allow_redirections=True
  2878. ):
  2879. """
  2880. Sends commands according to current execution strategy.
  2881. See: ClusterPipeline.send_cluster_commands()
  2882. """
  2883. pass
  2884. @abstractmethod
  2885. def reset(self):
  2886. """
  2887. Resets current execution strategy.
  2888. See: ClusterPipeline.reset()
  2889. """
  2890. pass
  2891. @abstractmethod
  2892. def exists(self, *keys):
  2893. pass
  2894. @abstractmethod
  2895. def eval(self):
  2896. pass
  2897. @abstractmethod
  2898. def multi(self):
  2899. """
  2900. Starts transactional context.
  2901. See: ClusterPipeline.multi()
  2902. """
  2903. pass
  2904. @abstractmethod
  2905. def load_scripts(self):
  2906. pass
  2907. @abstractmethod
  2908. def watch(self, *names):
  2909. pass
  2910. @abstractmethod
  2911. def unwatch(self):
  2912. """
  2913. Unwatches all previously specified keys
  2914. See: ClusterPipeline.unwatch()
  2915. """
  2916. pass
  2917. @abstractmethod
  2918. def script_load_for_pipeline(self, *args, **kwargs):
  2919. pass
  2920. @abstractmethod
  2921. def delete(self, *names):
  2922. """
  2923. "Delete a key specified by ``names``"
  2924. See: ClusterPipeline.delete()
  2925. """
  2926. pass
  2927. @abstractmethod
  2928. def unlink(self, *names):
  2929. """
  2930. "Unlink a key specified by ``names``"
  2931. See: ClusterPipeline.unlink()
  2932. """
  2933. pass
  2934. @abstractmethod
  2935. def discard(self):
  2936. pass
  2937. class AbstractStrategy(ExecutionStrategy):
  2938. def __init__(
  2939. self,
  2940. pipe: ClusterPipeline,
  2941. ):
  2942. self._command_queue: List[PipelineCommand] = []
  2943. self._pipe = pipe
  2944. self._nodes_manager = self._pipe.nodes_manager
  2945. @property
  2946. def command_queue(self):
  2947. return self._command_queue
  2948. @command_queue.setter
  2949. def command_queue(self, queue: List[PipelineCommand]):
  2950. self._command_queue = queue
  2951. @abstractmethod
  2952. def execute_command(self, *args, **kwargs):
  2953. pass
  2954. def pipeline_execute_command(self, *args, **options):
  2955. self._command_queue.append(
  2956. PipelineCommand(args, options, len(self._command_queue))
  2957. )
  2958. return self._pipe
  2959. @abstractmethod
  2960. def execute(self, raise_on_error: bool = True) -> List[Any]:
  2961. pass
  2962. @abstractmethod
  2963. def send_cluster_commands(
  2964. self, stack, raise_on_error=True, allow_redirections=True
  2965. ):
  2966. pass
  2967. @abstractmethod
  2968. def reset(self):
  2969. pass
  2970. def exists(self, *keys):
  2971. return self.execute_command("EXISTS", *keys)
  2972. def eval(self):
  2973. """ """
  2974. raise RedisClusterException("method eval() is not implemented")
  2975. def load_scripts(self):
  2976. """ """
  2977. raise RedisClusterException("method load_scripts() is not implemented")
  2978. def script_load_for_pipeline(self, *args, **kwargs):
  2979. """ """
  2980. raise RedisClusterException(
  2981. "method script_load_for_pipeline() is not implemented"
  2982. )
  2983. def annotate_exception(self, exception, number, command):
  2984. """
  2985. Provides extra context to the exception prior to it being handled
  2986. """
  2987. cmd = " ".join(map(safe_str, command))
  2988. msg = (
  2989. f"Command # {number} ({truncate_text(cmd)}) of pipeline "
  2990. f"caused error: {exception.args[0]}"
  2991. )
  2992. exception.args = (msg,) + exception.args[1:]
  2993. class PipelineStrategy(AbstractStrategy):
  2994. def __init__(self, pipe: ClusterPipeline):
  2995. super().__init__(pipe)
  2996. self.command_flags = pipe.command_flags
  2997. def execute_command(self, *args, **kwargs):
  2998. return self.pipeline_execute_command(*args, **kwargs)
  2999. def _raise_first_error(self, stack, start_time):
  3000. """
  3001. Raise the first exception on the stack
  3002. """
  3003. for c in stack:
  3004. r = c.result
  3005. if isinstance(r, Exception):
  3006. self.annotate_exception(r, c.position + 1, c.args)
  3007. record_operation_duration(
  3008. command_name="PIPELINE",
  3009. duration_seconds=time.monotonic() - start_time,
  3010. error=r,
  3011. )
  3012. raise r
  3013. def execute(self, raise_on_error: bool = True) -> List[Any]:
  3014. stack = self._command_queue
  3015. if not stack:
  3016. return []
  3017. try:
  3018. return self.send_cluster_commands(stack, raise_on_error)
  3019. finally:
  3020. self.reset()
  3021. def reset(self):
  3022. """
  3023. Reset back to empty pipeline.
  3024. """
  3025. self._command_queue = []
  3026. def send_cluster_commands(
  3027. self, stack, raise_on_error=True, allow_redirections=True
  3028. ):
  3029. """
  3030. Wrapper for RedisCluster.ERRORS_ALLOW_RETRY errors handling.
  3031. If one of the retryable exceptions has been thrown we assume that:
  3032. - connection_pool was disconnected
  3033. - connection_pool was reset
  3034. - refresh_table_asap set to True
  3035. It will try the number of times specified by
  3036. the retries in config option "self.retry"
  3037. which defaults to 3 unless manually configured.
  3038. If it reaches the number of times, the command will
  3039. raises ClusterDownException.
  3040. """
  3041. if not stack:
  3042. return []
  3043. retry_attempts = self._pipe.retry.get_retries()
  3044. while True:
  3045. try:
  3046. return self._send_cluster_commands(
  3047. stack,
  3048. raise_on_error=raise_on_error,
  3049. allow_redirections=allow_redirections,
  3050. )
  3051. except RedisCluster.ERRORS_ALLOW_RETRY as e:
  3052. if retry_attempts > 0:
  3053. # Try again with the new cluster setup. All other errors
  3054. # should be raised.
  3055. retry_attempts -= 1
  3056. pass
  3057. else:
  3058. raise e
  3059. def _send_cluster_commands(
  3060. self, stack, raise_on_error=True, allow_redirections=True
  3061. ):
  3062. """
  3063. Send a bunch of cluster commands to the redis cluster.
  3064. `allow_redirections` If the pipeline should follow
  3065. `ASK` & `MOVED` responses automatically. If set
  3066. to false it will raise RedisClusterException.
  3067. """
  3068. # the first time sending the commands we send all of
  3069. # the commands that were queued up.
  3070. # if we have to run through it again, we only retry
  3071. # the commands that failed.
  3072. attempt = sorted(stack, key=lambda x: x.position)
  3073. is_default_node = False
  3074. # build a list of node objects based on node names we need to
  3075. nodes: dict[str, NodeCommands] = {}
  3076. nodes_written = 0
  3077. nodes_read = 0
  3078. try:
  3079. # as we move through each command that still needs to be processed,
  3080. # we figure out the slot number that command maps to, then from
  3081. # the slot determine the node.
  3082. for c in attempt:
  3083. command_policies = self._pipe._policy_resolver.resolve(
  3084. c.args[0].lower()
  3085. )
  3086. # refer to our internal node -> slot table that
  3087. # tells us where a given command should route to.
  3088. # (it might be possible we have a cached node that no longer
  3089. # exists in the cluster, which is why we do this in a loop)
  3090. passed_targets = c.options.pop("target_nodes", None)
  3091. if passed_targets and not self._is_nodes_flag(passed_targets):
  3092. target_nodes = self._parse_target_nodes(passed_targets)
  3093. if not command_policies:
  3094. command_policies = CommandPolicies()
  3095. else:
  3096. if not command_policies:
  3097. command = c.args[0].upper()
  3098. if (
  3099. len(c.args) >= 2
  3100. and f"{c.args[0]} {c.args[1]}".upper()
  3101. in self._pipe.command_flags
  3102. ):
  3103. command = f"{c.args[0]} {c.args[1]}".upper()
  3104. # We only could resolve key properties if command is not
  3105. # in a list of pre-defined request policies
  3106. command_flag = self.command_flags.get(command)
  3107. if not command_flag:
  3108. # Fallback to default policy
  3109. if not self._pipe.get_default_node():
  3110. keys = None
  3111. else:
  3112. keys = self._pipe._get_command_keys(*c.args)
  3113. if not keys or len(keys) == 0:
  3114. command_policies = CommandPolicies()
  3115. else:
  3116. command_policies = CommandPolicies(
  3117. request_policy=RequestPolicy.DEFAULT_KEYED,
  3118. response_policy=ResponsePolicy.DEFAULT_KEYED,
  3119. )
  3120. else:
  3121. if command_flag in self._pipe._command_flags_mapping:
  3122. command_policies = CommandPolicies(
  3123. request_policy=self._pipe._command_flags_mapping[
  3124. command_flag
  3125. ]
  3126. )
  3127. else:
  3128. command_policies = CommandPolicies()
  3129. target_nodes = self._determine_nodes(
  3130. *c.args,
  3131. request_policy=command_policies.request_policy,
  3132. node_flag=passed_targets,
  3133. )
  3134. if not target_nodes:
  3135. raise RedisClusterException(
  3136. f"No targets were found to execute {c.args} command on"
  3137. )
  3138. c.command_policies = command_policies
  3139. if len(target_nodes) > 1:
  3140. raise RedisClusterException(
  3141. f"Too many targets for command {c.args}"
  3142. )
  3143. node = target_nodes[0]
  3144. if node == self._pipe.get_default_node():
  3145. is_default_node = True
  3146. # now that we know the name of the node
  3147. # ( it's just a string in the form of host:port )
  3148. # we can build a list of commands for each node.
  3149. node_name = node.name
  3150. if node_name not in nodes:
  3151. redis_node = self._pipe.get_redis_connection(node)
  3152. try:
  3153. connection = get_connection(redis_node)
  3154. except (ConnectionError, TimeoutError):
  3155. # Release any connections we've already acquired before clearing nodes
  3156. for n in nodes.values():
  3157. n.connection_pool.release(n.connection)
  3158. # Connection retries are being handled in the node's
  3159. # Retry object. Reinitialize the node -> slot table.
  3160. self._nodes_manager.initialize()
  3161. if is_default_node:
  3162. self._pipe.replace_default_node()
  3163. nodes = {}
  3164. raise
  3165. nodes[node_name] = NodeCommands(
  3166. redis_node.parse_response,
  3167. redis_node.connection_pool,
  3168. connection,
  3169. )
  3170. nodes[node_name].append(c)
  3171. # send the commands in sequence.
  3172. # we write to all the open sockets for each node first,
  3173. # before reading anything
  3174. # this allows us to flush all the requests out across the
  3175. # network
  3176. # so that we can read them from different sockets as they come back.
  3177. # we don't multiplex on the sockets as they come available,
  3178. # but that shouldn't make too much difference.
  3179. # Start timing for observability
  3180. start_time = time.monotonic()
  3181. node_commands = nodes.values()
  3182. for n in node_commands:
  3183. nodes_written += 1
  3184. n.write()
  3185. for n in node_commands:
  3186. n.read()
  3187. # Find the first error in this node's commands, if any
  3188. node_error = None
  3189. for cmd in n.commands:
  3190. if isinstance(cmd.result, Exception):
  3191. node_error = cmd.result
  3192. break
  3193. record_operation_duration(
  3194. command_name="PIPELINE",
  3195. duration_seconds=time.monotonic() - start_time,
  3196. server_address=n.connection.host,
  3197. server_port=n.connection.port,
  3198. db_namespace=str(n.connection.db),
  3199. error=node_error,
  3200. )
  3201. nodes_read += 1
  3202. finally:
  3203. # release all the redis connections we allocated earlier
  3204. # back into the connection pool.
  3205. # if the connection is dirty (that is: we've written
  3206. # commands to it, but haven't read the responses), we need
  3207. # to close the connection before returning it to the pool.
  3208. # otherwise, the next caller to use this connection will
  3209. # read the response from _this_ request, not its own request.
  3210. # disconnecting discards the dirty state & forces the next
  3211. # caller to reconnect.
  3212. # NOTE: dicts have a consistent ordering; we're iterating
  3213. # through nodes.values() in the same order as we are when
  3214. # reading / writing to the connections above, which is critical
  3215. # for how we're using the nodes_written/nodes_read offsets.
  3216. for i, n in enumerate(nodes.values()):
  3217. if i < nodes_written and i >= nodes_read:
  3218. n.connection.disconnect()
  3219. n.connection_pool.release(n.connection)
  3220. # if the response isn't an exception it is a
  3221. # valid response from the node
  3222. # we're all done with that command, YAY!
  3223. # if we have more commands to attempt, we've run into problems.
  3224. # collect all the commands we are allowed to retry.
  3225. # (MOVED, ASK, or connection errors or timeout errors)
  3226. attempt = sorted(
  3227. (
  3228. c
  3229. for c in attempt
  3230. if isinstance(c.result, ClusterPipeline.ERRORS_ALLOW_RETRY)
  3231. ),
  3232. key=lambda x: x.position,
  3233. )
  3234. if attempt and allow_redirections:
  3235. # RETRY MAGIC HAPPENS HERE!
  3236. # send these remaining commands one at a time using `execute_command`
  3237. # in the main client. This keeps our retry logic
  3238. # in one place mostly,
  3239. # and allows us to be more confident in correctness of behavior.
  3240. # at this point any speed gains from pipelining have been lost
  3241. # anyway, so we might as well make the best
  3242. # attempt to get the correct behavior.
  3243. #
  3244. # The client command will handle retries for each
  3245. # individual command sequentially as we pass each
  3246. # one into `execute_command`. Any exceptions
  3247. # that bubble out should only appear once all
  3248. # retries have been exhausted.
  3249. #
  3250. # If a lot of commands have failed, we'll be setting the
  3251. # flag to rebuild the slots table from scratch.
  3252. # So MOVED errors should correct themselves fairly quickly.
  3253. self._pipe.reinitialize_counter += 1
  3254. if self._pipe._should_reinitialized():
  3255. self._nodes_manager.initialize()
  3256. if is_default_node:
  3257. self._pipe.replace_default_node()
  3258. for c in attempt:
  3259. try:
  3260. # send each command individually like we
  3261. # do in the main client.
  3262. c.result = self._pipe.parent_execute_command(*c.args, **c.options)
  3263. except RedisError as e:
  3264. c.result = e
  3265. # turn the response back into a simple flat array that corresponds
  3266. # to the sequence of commands issued in the stack in pipeline.execute()
  3267. response = []
  3268. for c in sorted(stack, key=lambda x: x.position):
  3269. if c.args[0] in self._pipe.cluster_response_callbacks:
  3270. # Remove keys entry, it needs only for cache.
  3271. c.options.pop("keys", None)
  3272. c.result = self._pipe._policies_callback_mapping[
  3273. c.command_policies.response_policy
  3274. ](
  3275. self._pipe.cluster_response_callbacks[c.args[0]](
  3276. c.result, **c.options
  3277. )
  3278. )
  3279. response.append(c.result)
  3280. if raise_on_error:
  3281. self._raise_first_error(stack, start_time)
  3282. return response
  3283. def _is_nodes_flag(self, target_nodes):
  3284. return isinstance(target_nodes, str) and target_nodes in self._pipe.node_flags
  3285. def _parse_target_nodes(self, target_nodes):
  3286. if isinstance(target_nodes, list):
  3287. nodes = target_nodes
  3288. elif isinstance(target_nodes, ClusterNode):
  3289. # Supports passing a single ClusterNode as a variable
  3290. nodes = [target_nodes]
  3291. elif isinstance(target_nodes, dict):
  3292. # Supports dictionaries of the format {node_name: node}.
  3293. # It enables to execute commands with multi nodes as follows:
  3294. # rc.cluster_save_config(rc.get_primaries())
  3295. nodes = target_nodes.values()
  3296. else:
  3297. raise TypeError(
  3298. "target_nodes type can be one of the following: "
  3299. "node_flag (PRIMARIES, REPLICAS, RANDOM, ALL_NODES),"
  3300. "ClusterNode, list<ClusterNode>, or dict<any, ClusterNode>. "
  3301. f"The passed type is {type(target_nodes)}"
  3302. )
  3303. return nodes
  3304. def _determine_nodes(
  3305. self, *args, request_policy: RequestPolicy, **kwargs
  3306. ) -> List["ClusterNode"]:
  3307. # Determine which nodes should be executed the command on.
  3308. # Returns a list of target nodes.
  3309. command = args[0].upper()
  3310. if (
  3311. len(args) >= 2
  3312. and f"{args[0]} {args[1]}".upper() in self._pipe.command_flags
  3313. ):
  3314. command = f"{args[0]} {args[1]}".upper()
  3315. nodes_flag = kwargs.pop("nodes_flag", None)
  3316. if nodes_flag is not None:
  3317. # nodes flag passed by the user
  3318. command_flag = nodes_flag
  3319. else:
  3320. # get the nodes group for this command if it was predefined
  3321. command_flag = self._pipe.command_flags.get(command)
  3322. if command_flag in self._pipe._command_flags_mapping:
  3323. request_policy = self._pipe._command_flags_mapping[command_flag]
  3324. policy_callback = self._pipe._policies_callback_mapping[request_policy]
  3325. if request_policy == RequestPolicy.DEFAULT_KEYED:
  3326. nodes = policy_callback(command, *args)
  3327. elif request_policy == RequestPolicy.MULTI_SHARD:
  3328. nodes = policy_callback(*args, **kwargs)
  3329. elif request_policy == RequestPolicy.DEFAULT_KEYLESS:
  3330. nodes = policy_callback(args[0])
  3331. else:
  3332. nodes = policy_callback()
  3333. if args[0].lower() == "ft.aggregate":
  3334. self._aggregate_nodes = nodes
  3335. return nodes
  3336. def multi(self):
  3337. raise RedisClusterException(
  3338. "method multi() is not supported outside of transactional context"
  3339. )
  3340. def discard(self):
  3341. raise RedisClusterException(
  3342. "method discard() is not supported outside of transactional context"
  3343. )
  3344. def watch(self, *names):
  3345. raise RedisClusterException(
  3346. "method watch() is not supported outside of transactional context"
  3347. )
  3348. def unwatch(self, *names):
  3349. raise RedisClusterException(
  3350. "method unwatch() is not supported outside of transactional context"
  3351. )
  3352. def delete(self, *names):
  3353. if len(names) != 1:
  3354. raise RedisClusterException(
  3355. "deleting multiple keys is not implemented in pipeline command"
  3356. )
  3357. return self.execute_command("DEL", names[0])
  3358. def unlink(self, *names):
  3359. if len(names) != 1:
  3360. raise RedisClusterException(
  3361. "unlinking multiple keys is not implemented in pipeline command"
  3362. )
  3363. return self.execute_command("UNLINK", names[0])
  3364. class TransactionStrategy(AbstractStrategy):
  3365. NO_SLOTS_COMMANDS = {"UNWATCH"}
  3366. IMMEDIATE_EXECUTE_COMMANDS = {"WATCH", "UNWATCH"}
  3367. UNWATCH_COMMANDS = {"DISCARD", "EXEC", "UNWATCH"}
  3368. SLOT_REDIRECT_ERRORS = (AskError, MovedError)
  3369. CONNECTION_ERRORS = (
  3370. ConnectionError,
  3371. OSError,
  3372. ClusterDownError,
  3373. SlotNotCoveredError,
  3374. )
  3375. def __init__(self, pipe: ClusterPipeline):
  3376. super().__init__(pipe)
  3377. self._explicit_transaction = False
  3378. self._watching = False
  3379. self._pipeline_slots: Set[int] = set()
  3380. self._transaction_connection: Optional[Connection] = None
  3381. self._executing = False
  3382. self._retry = copy(self._pipe.retry)
  3383. self._retry.update_supported_errors(
  3384. RedisCluster.ERRORS_ALLOW_RETRY + self.SLOT_REDIRECT_ERRORS
  3385. )
  3386. def _get_client_and_connection_for_transaction(self) -> Tuple[Redis, Connection]:
  3387. """
  3388. Find a connection for a pipeline transaction.
  3389. For running an atomic transaction, watch keys ensure that contents have not been
  3390. altered as long as the watch commands for those keys were sent over the same
  3391. connection. So once we start watching a key, we fetch a connection to the
  3392. node that owns that slot and reuse it.
  3393. """
  3394. if not self._pipeline_slots:
  3395. raise RedisClusterException(
  3396. "At least a command with a key is needed to identify a node"
  3397. )
  3398. node: ClusterNode = self._nodes_manager.get_node_from_slot(
  3399. list(self._pipeline_slots)[0], False
  3400. )
  3401. redis_node: Redis = self._pipe.get_redis_connection(node)
  3402. if self._transaction_connection:
  3403. if not redis_node.connection_pool.owns_connection(
  3404. self._transaction_connection
  3405. ):
  3406. previous_node = self._nodes_manager.find_connection_owner(
  3407. self._transaction_connection
  3408. )
  3409. previous_node.connection_pool.release(self._transaction_connection)
  3410. self._transaction_connection = None
  3411. if not self._transaction_connection:
  3412. self._transaction_connection = get_connection(redis_node)
  3413. return redis_node, self._transaction_connection
  3414. def execute_command(self, *args, **kwargs):
  3415. slot_number: Optional[int] = None
  3416. if args[0] not in ClusterPipeline.NO_SLOTS_COMMANDS:
  3417. slot_number = self._pipe.determine_slot(*args)
  3418. if (
  3419. self._watching or args[0] in self.IMMEDIATE_EXECUTE_COMMANDS
  3420. ) and not self._explicit_transaction:
  3421. if args[0] == "WATCH":
  3422. self._validate_watch()
  3423. if slot_number is not None:
  3424. if self._pipeline_slots and slot_number not in self._pipeline_slots:
  3425. raise CrossSlotTransactionError(
  3426. "Cannot watch or send commands on different slots"
  3427. )
  3428. self._pipeline_slots.add(slot_number)
  3429. elif args[0] not in self.NO_SLOTS_COMMANDS:
  3430. raise RedisClusterException(
  3431. f"Cannot identify slot number for command: {args[0]},"
  3432. "it cannot be triggered in a transaction"
  3433. )
  3434. return self._immediate_execute_command(*args, **kwargs)
  3435. else:
  3436. if slot_number is not None:
  3437. self._pipeline_slots.add(slot_number)
  3438. return self.pipeline_execute_command(*args, **kwargs)
  3439. def _validate_watch(self):
  3440. if self._explicit_transaction:
  3441. raise RedisError("Cannot issue a WATCH after a MULTI")
  3442. self._watching = True
  3443. def _immediate_execute_command(self, *args, **options):
  3444. return self._retry.call_with_retry(
  3445. lambda: self._get_connection_and_send_command(*args, **options),
  3446. self._reinitialize_on_error,
  3447. with_failure_count=True,
  3448. )
  3449. def _get_connection_and_send_command(self, *args, **options):
  3450. redis_node, connection = self._get_client_and_connection_for_transaction()
  3451. # Start timing for observability
  3452. start_time = time.monotonic()
  3453. try:
  3454. response = self._send_command_parse_response(
  3455. connection, redis_node, args[0], *args, **options
  3456. )
  3457. record_operation_duration(
  3458. command_name=args[0],
  3459. duration_seconds=time.monotonic() - start_time,
  3460. server_address=connection.host,
  3461. server_port=connection.port,
  3462. db_namespace=str(connection.db),
  3463. )
  3464. return response
  3465. except Exception as e:
  3466. if connection:
  3467. # this is used to report the metrics based on host and port info
  3468. e.connection = connection
  3469. record_operation_duration(
  3470. command_name=args[0],
  3471. duration_seconds=time.monotonic() - start_time,
  3472. server_address=connection.host,
  3473. server_port=connection.port,
  3474. db_namespace=str(connection.db),
  3475. error=e,
  3476. )
  3477. raise
  3478. def _send_command_parse_response(
  3479. self, conn, redis_node: Redis, command_name, *args, **options
  3480. ):
  3481. """
  3482. Send a command and parse the response
  3483. """
  3484. conn.send_command(*args)
  3485. output = redis_node.parse_response(conn, command_name, **options)
  3486. if command_name in self.UNWATCH_COMMANDS:
  3487. self._watching = False
  3488. return output
  3489. def _reinitialize_on_error(self, error, failure_count):
  3490. if hasattr(error, "connection"):
  3491. record_error_count(
  3492. server_address=error.connection.host,
  3493. server_port=error.connection.port,
  3494. network_peer_address=error.connection.host,
  3495. network_peer_port=error.connection.port,
  3496. error_type=error,
  3497. retry_attempts=failure_count,
  3498. is_internal=True,
  3499. )
  3500. if self._watching:
  3501. if type(error) in self.SLOT_REDIRECT_ERRORS and self._executing:
  3502. raise WatchError("Slot rebalancing occurred while watching keys")
  3503. if (
  3504. type(error) in self.SLOT_REDIRECT_ERRORS
  3505. or type(error) in self.CONNECTION_ERRORS
  3506. ):
  3507. if self._transaction_connection:
  3508. # Disconnect and release back to pool
  3509. self._transaction_connection.disconnect()
  3510. node = self._nodes_manager.find_connection_owner(
  3511. self._transaction_connection
  3512. )
  3513. if node and node.redis_connection:
  3514. node.redis_connection.connection_pool.release(
  3515. self._transaction_connection
  3516. )
  3517. self._transaction_connection = None
  3518. self._pipe.reinitialize_counter += 1
  3519. if self._pipe._should_reinitialized():
  3520. self._nodes_manager.initialize()
  3521. self.reinitialize_counter = 0
  3522. else:
  3523. if isinstance(error, AskError):
  3524. self._nodes_manager.move_slot(error)
  3525. self._executing = False
  3526. def _raise_first_error(self, responses, stack, start_time):
  3527. """
  3528. Raise the first exception on the stack
  3529. """
  3530. for r, cmd in zip(responses, stack):
  3531. if isinstance(r, Exception):
  3532. self.annotate_exception(r, cmd.position + 1, cmd.args)
  3533. record_operation_duration(
  3534. command_name="TRANSACTION",
  3535. duration_seconds=time.monotonic() - start_time,
  3536. server_address=self._transaction_connection.host,
  3537. server_port=self._transaction_connection.port,
  3538. db_namespace=str(self._transaction_connection.db),
  3539. )
  3540. raise r
  3541. def execute(self, raise_on_error: bool = True) -> List[Any]:
  3542. stack = self._command_queue
  3543. if not stack and (not self._watching or not self._pipeline_slots):
  3544. return []
  3545. return self._execute_transaction_with_retries(stack, raise_on_error)
  3546. def _execute_transaction_with_retries(
  3547. self, stack: List["PipelineCommand"], raise_on_error: bool
  3548. ):
  3549. return self._retry.call_with_retry(
  3550. lambda: self._execute_transaction(stack, raise_on_error),
  3551. lambda error, failure_count: self._reinitialize_on_error(
  3552. error, failure_count
  3553. ),
  3554. with_failure_count=True,
  3555. )
  3556. def _execute_transaction(
  3557. self, stack: List["PipelineCommand"], raise_on_error: bool
  3558. ):
  3559. if len(self._pipeline_slots) > 1:
  3560. raise CrossSlotTransactionError(
  3561. "All keys involved in a cluster transaction must map to the same slot"
  3562. )
  3563. self._executing = True
  3564. redis_node, connection = self._get_client_and_connection_for_transaction()
  3565. stack = chain(
  3566. [PipelineCommand(("MULTI",))],
  3567. stack,
  3568. [PipelineCommand(("EXEC",))],
  3569. )
  3570. commands = [c.args for c in stack if EMPTY_RESPONSE not in c.options]
  3571. packed_commands = connection.pack_commands(commands)
  3572. # Start timing for observability
  3573. start_time = time.monotonic()
  3574. connection.send_packed_command(packed_commands)
  3575. errors = []
  3576. # parse off the response for MULTI
  3577. # NOTE: we need to handle ResponseErrors here and continue
  3578. # so that we read all the additional command messages from
  3579. # the socket
  3580. try:
  3581. redis_node.parse_response(connection, "MULTI")
  3582. except ResponseError as e:
  3583. self.annotate_exception(e, 0, "MULTI")
  3584. errors.append(e)
  3585. except self.CONNECTION_ERRORS as cluster_error:
  3586. self.annotate_exception(cluster_error, 0, "MULTI")
  3587. raise
  3588. # and all the other commands
  3589. for i, command in enumerate(self._command_queue):
  3590. if EMPTY_RESPONSE in command.options:
  3591. errors.append((i, command.options[EMPTY_RESPONSE]))
  3592. else:
  3593. try:
  3594. _ = redis_node.parse_response(connection, "_")
  3595. except self.SLOT_REDIRECT_ERRORS as slot_error:
  3596. self.annotate_exception(slot_error, i + 1, command.args)
  3597. errors.append(slot_error)
  3598. except self.CONNECTION_ERRORS as cluster_error:
  3599. self.annotate_exception(cluster_error, i + 1, command.args)
  3600. raise
  3601. except ResponseError as e:
  3602. self.annotate_exception(e, i + 1, command.args)
  3603. errors.append(e)
  3604. response = None
  3605. # parse the EXEC.
  3606. try:
  3607. response = redis_node.parse_response(connection, "EXEC")
  3608. except ExecAbortError:
  3609. if errors:
  3610. raise errors[0]
  3611. raise
  3612. self._executing = False
  3613. record_operation_duration(
  3614. command_name="TRANSACTION",
  3615. duration_seconds=time.monotonic() - start_time,
  3616. server_address=connection.host,
  3617. server_port=connection.port,
  3618. db_namespace=str(connection.db),
  3619. )
  3620. # EXEC clears any watched keys
  3621. self._watching = False
  3622. if response is None:
  3623. raise WatchError("Watched variable changed.")
  3624. # put any parse errors into the response
  3625. for i, e in errors:
  3626. response.insert(i, e)
  3627. if len(response) != len(self._command_queue):
  3628. raise InvalidPipelineStack(
  3629. "Unexpected response length for cluster pipeline EXEC."
  3630. " Command stack was {} but response had length {}".format(
  3631. [c.args[0] for c in self._command_queue], len(response)
  3632. )
  3633. )
  3634. # find any errors in the response and raise if necessary
  3635. if raise_on_error or len(errors) > 0:
  3636. self._raise_first_error(
  3637. response,
  3638. self._command_queue,
  3639. start_time,
  3640. )
  3641. # We have to run response callbacks manually
  3642. data = []
  3643. for r, cmd in zip(response, self._command_queue):
  3644. if not isinstance(r, Exception):
  3645. command_name = cmd.args[0]
  3646. if command_name in self._pipe.cluster_response_callbacks:
  3647. r = self._pipe.cluster_response_callbacks[command_name](
  3648. r, **cmd.options
  3649. )
  3650. data.append(r)
  3651. return data
  3652. def reset(self):
  3653. self._command_queue = []
  3654. # make sure to reset the connection state in the event that we were
  3655. # watching something
  3656. if self._transaction_connection:
  3657. try:
  3658. if self._watching:
  3659. # call this manually since our unwatch or
  3660. # immediate_execute_command methods can call reset()
  3661. self._transaction_connection.send_command("UNWATCH")
  3662. self._transaction_connection.read_response()
  3663. # we can safely return the connection to the pool here since we're
  3664. # sure we're no longer WATCHing anything
  3665. node = self._nodes_manager.find_connection_owner(
  3666. self._transaction_connection
  3667. )
  3668. if node and node.redis_connection:
  3669. node.redis_connection.connection_pool.release(
  3670. self._transaction_connection
  3671. )
  3672. self._transaction_connection = None
  3673. except self.CONNECTION_ERRORS:
  3674. # disconnect will also remove any previous WATCHes
  3675. if self._transaction_connection:
  3676. self._transaction_connection.disconnect()
  3677. node = self._nodes_manager.find_connection_owner(
  3678. self._transaction_connection
  3679. )
  3680. if node and node.redis_connection:
  3681. node.redis_connection.connection_pool.release(
  3682. self._transaction_connection
  3683. )
  3684. self._transaction_connection = None
  3685. # clean up the other instance attributes
  3686. self._watching = False
  3687. self._explicit_transaction = False
  3688. self._pipeline_slots = set()
  3689. self._executing = False
  3690. def send_cluster_commands(
  3691. self, stack, raise_on_error=True, allow_redirections=True
  3692. ):
  3693. raise NotImplementedError(
  3694. "send_cluster_commands cannot be executed in transactional context."
  3695. )
  3696. def multi(self):
  3697. if self._explicit_transaction:
  3698. raise RedisError("Cannot issue nested calls to MULTI")
  3699. if self._command_queue:
  3700. raise RedisError(
  3701. "Commands without an initial WATCH have already been issued"
  3702. )
  3703. self._explicit_transaction = True
  3704. def watch(self, *names):
  3705. if self._explicit_transaction:
  3706. raise RedisError("Cannot issue a WATCH after a MULTI")
  3707. return self.execute_command("WATCH", *names)
  3708. def unwatch(self):
  3709. if self._watching:
  3710. return self.execute_command("UNWATCH")
  3711. return True
  3712. def discard(self):
  3713. self.reset()
  3714. def delete(self, *names):
  3715. return self.execute_command("DEL", *names)
  3716. def unlink(self, *names):
  3717. return self.execute_command("UNLINK", *names)