cluster.py 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225
  1. import asyncio
  2. from typing import (
  3. TYPE_CHECKING,
  4. Any,
  5. AsyncIterator,
  6. Awaitable,
  7. Dict,
  8. Iterable,
  9. Iterator,
  10. List,
  11. Literal,
  12. Mapping,
  13. NoReturn,
  14. Optional,
  15. Sequence,
  16. Union,
  17. )
  18. from redis.crc import key_slot
  19. from redis.exceptions import RedisClusterException, RedisError
  20. from redis.typing import (
  21. AnyKeyT,
  22. ClusterCommandsProtocol,
  23. EncodableT,
  24. KeysT,
  25. KeyT,
  26. PatternT,
  27. ResponseT,
  28. )
  29. from redis.utils import deprecated_function
  30. from .core import (
  31. ACLCommands,
  32. AsyncACLCommands,
  33. AsyncDataAccessCommands,
  34. AsyncFunctionCommands,
  35. AsyncManagementCommands,
  36. AsyncModuleCommands,
  37. AsyncScriptCommands,
  38. DataAccessCommands,
  39. FunctionCommands,
  40. HotkeysMetricsTypes,
  41. ManagementCommands,
  42. ModuleCommands,
  43. PubSubCommands,
  44. ScriptCommands,
  45. )
  46. from .helpers import list_or_args
  47. from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
  48. if TYPE_CHECKING:
  49. from redis.asyncio.cluster import TargetNodesT
  50. # Not complete, but covers the major ones
  51. # https://redis.io/commands
  52. READ_COMMANDS = frozenset(
  53. [
  54. # Bit Operations
  55. "BITCOUNT",
  56. "BITFIELD_RO",
  57. "BITPOS",
  58. # Scripting
  59. "EVAL_RO",
  60. "EVALSHA_RO",
  61. "FCALL_RO",
  62. # Key Operations
  63. "DBSIZE",
  64. "DIGEST",
  65. "DUMP",
  66. "EXISTS",
  67. "EXPIRETIME",
  68. "PEXPIRETIME",
  69. "KEYS",
  70. "SCAN",
  71. "PTTL",
  72. "RANDOMKEY",
  73. "TTL",
  74. "TYPE",
  75. # String Operations
  76. "GET",
  77. "GETBIT",
  78. "GETRANGE",
  79. "MGET",
  80. "STRLEN",
  81. "LCS",
  82. # Geo Operations
  83. "GEODIST",
  84. "GEOHASH",
  85. "GEOPOS",
  86. "GEOSEARCH",
  87. # Hash Operations
  88. "HEXISTS",
  89. "HGET",
  90. "HGETALL",
  91. "HKEYS",
  92. "HLEN",
  93. "HMGET",
  94. "HSTRLEN",
  95. "HVALS",
  96. "HRANDFIELD",
  97. "HEXPIRETIME",
  98. "HPEXPIRETIME",
  99. "HTTL",
  100. "HPTTL",
  101. "HSCAN",
  102. # List Operations
  103. "LINDEX",
  104. "LPOS",
  105. "LLEN",
  106. "LRANGE",
  107. # Set Operations
  108. "SCARD",
  109. "SDIFF",
  110. "SINTER",
  111. "SINTERCARD",
  112. "SISMEMBER",
  113. "SMISMEMBER",
  114. "SMEMBERS",
  115. "SRANDMEMBER",
  116. "SUNION",
  117. "SSCAN",
  118. # Sorted Set Operations
  119. "ZCARD",
  120. "ZCOUNT",
  121. "ZDIFF",
  122. "ZINTER",
  123. "ZINTERCARD",
  124. "ZLEXCOUNT",
  125. "ZMSCORE",
  126. "ZRANDMEMBER",
  127. "ZRANGE",
  128. "ZRANGEBYLEX",
  129. "ZRANGEBYSCORE",
  130. "ZRANK",
  131. "ZREVRANGE",
  132. "ZREVRANGEBYLEX",
  133. "ZREVRANGEBYSCORE",
  134. "ZREVRANK",
  135. "ZSCAN",
  136. "ZSCORE",
  137. "ZUNION",
  138. # Stream Operations
  139. "XLEN",
  140. "XPENDING",
  141. "XRANGE",
  142. "XREAD",
  143. "XREVRANGE",
  144. # JSON Module
  145. "JSON.ARRINDEX",
  146. "JSON.ARRLEN",
  147. "JSON.GET",
  148. "JSON.MGET",
  149. "JSON.OBJKEYS",
  150. "JSON.OBJLEN",
  151. "JSON.RESP",
  152. "JSON.STRLEN",
  153. "JSON.TYPE",
  154. # RediSearch Module
  155. "FT.EXPLAIN",
  156. "FT.INFO",
  157. "FT.PROFILE",
  158. "FT.SEARCH",
  159. ]
  160. )
  161. class ClusterMultiKeyCommands(ClusterCommandsProtocol):
  162. """
  163. A class containing commands that handle more than one key
  164. """
  165. def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
  166. """Split keys into a dictionary that maps a slot to a list of keys."""
  167. slots_to_keys = {}
  168. for key in keys:
  169. slot = key_slot(self.encoder.encode(key))
  170. slots_to_keys.setdefault(slot, []).append(key)
  171. return slots_to_keys
  172. def _partition_pairs_by_slot(
  173. self, mapping: Mapping[AnyKeyT, EncodableT]
  174. ) -> Dict[int, List[EncodableT]]:
  175. """Split pairs into a dictionary that maps a slot to a list of pairs."""
  176. slots_to_pairs = {}
  177. for pair in mapping.items():
  178. slot = key_slot(self.encoder.encode(pair[0]))
  179. slots_to_pairs.setdefault(slot, []).extend(pair)
  180. return slots_to_pairs
  181. def _execute_pipeline_by_slot(
  182. self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
  183. ) -> List[Any]:
  184. read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
  185. pipe = self.pipeline()
  186. [
  187. pipe.execute_command(
  188. command,
  189. *slot_args,
  190. target_nodes=[
  191. self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
  192. ],
  193. )
  194. for slot, slot_args in slots_to_args.items()
  195. ]
  196. return pipe.execute()
  197. def _reorder_keys_by_command(
  198. self,
  199. keys: Iterable[KeyT],
  200. slots_to_args: Mapping[int, Iterable[EncodableT]],
  201. responses: Iterable[Any],
  202. ) -> List[Any]:
  203. results = {
  204. k: v
  205. for slot_values, response in zip(slots_to_args.values(), responses)
  206. for k, v in zip(slot_values, response)
  207. }
  208. return [results[key] for key in keys]
  209. def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
  210. """
  211. Splits the keys into different slots and then calls MGET
  212. for the keys of every slot. This operation will not be atomic
  213. if keys belong to more than one slot.
  214. Returns a list of values ordered identically to ``keys``
  215. For more information see https://redis.io/commands/mget
  216. """
  217. # Concatenate all keys into a list
  218. keys = list_or_args(keys, args)
  219. # Split keys into slots
  220. slots_to_keys = self._partition_keys_by_slot(keys)
  221. # Execute commands using a pipeline
  222. res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
  223. # Reorder keys in the order the user provided & return
  224. return self._reorder_keys_by_command(keys, slots_to_keys, res)
  225. def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
  226. """
  227. Sets key/values based on a mapping. Mapping is a dictionary of
  228. key/value pairs. Both keys and values should be strings or types that
  229. can be cast to a string via str().
  230. Splits the keys into different slots and then calls MSET
  231. for the keys of every slot. This operation will not be atomic
  232. if keys belong to more than one slot.
  233. For more information see https://redis.io/commands/mset
  234. """
  235. # Partition the keys by slot
  236. slots_to_pairs = self._partition_pairs_by_slot(mapping)
  237. # Execute commands using a pipeline & return list of replies
  238. return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
  239. def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
  240. """
  241. Runs the given command once for the keys
  242. of each slot. Returns the sum of the return values.
  243. """
  244. # Partition the keys by slot
  245. slots_to_keys = self._partition_keys_by_slot(keys)
  246. # Sum up the reply from each command
  247. return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
  248. def exists(self, *keys: KeyT) -> ResponseT:
  249. """
  250. Returns the number of ``names`` that exist in the
  251. whole cluster. The keys are first split up into slots
  252. and then an EXISTS command is sent for every slot
  253. For more information see https://redis.io/commands/exists
  254. """
  255. return self._split_command_across_slots("EXISTS", *keys)
  256. def delete(self, *keys: KeyT) -> ResponseT:
  257. """
  258. Deletes the given keys in the cluster.
  259. The keys are first split up into slots
  260. and then an DEL command is sent for every slot
  261. Non-existent keys are ignored.
  262. Returns the number of keys that were deleted.
  263. For more information see https://redis.io/commands/del
  264. """
  265. return self._split_command_across_slots("DEL", *keys)
  266. def touch(self, *keys: KeyT) -> ResponseT:
  267. """
  268. Updates the last access time of given keys across the
  269. cluster.
  270. The keys are first split up into slots
  271. and then an TOUCH command is sent for every slot
  272. Non-existent keys are ignored.
  273. Returns the number of keys that were touched.
  274. For more information see https://redis.io/commands/touch
  275. """
  276. return self._split_command_across_slots("TOUCH", *keys)
  277. def unlink(self, *keys: KeyT) -> ResponseT:
  278. """
  279. Remove the specified keys in a different thread.
  280. The keys are first split up into slots
  281. and then an TOUCH command is sent for every slot
  282. Non-existent keys are ignored.
  283. Returns the number of keys that were unlinked.
  284. For more information see https://redis.io/commands/unlink
  285. """
  286. return self._split_command_across_slots("UNLINK", *keys)
  287. class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
  288. """
  289. A class containing commands that handle more than one key
  290. """
  291. async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
  292. """
  293. Splits the keys into different slots and then calls MGET
  294. for the keys of every slot. This operation will not be atomic
  295. if keys belong to more than one slot.
  296. Returns a list of values ordered identically to ``keys``
  297. For more information see https://redis.io/commands/mget
  298. """
  299. # Concatenate all keys into a list
  300. keys = list_or_args(keys, args)
  301. # Split keys into slots
  302. slots_to_keys = self._partition_keys_by_slot(keys)
  303. # Execute commands using a pipeline
  304. res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
  305. # Reorder keys in the order the user provided & return
  306. return self._reorder_keys_by_command(keys, slots_to_keys, res)
  307. async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
  308. """
  309. Sets key/values based on a mapping. Mapping is a dictionary of
  310. key/value pairs. Both keys and values should be strings or types that
  311. can be cast to a string via str().
  312. Splits the keys into different slots and then calls MSET
  313. for the keys of every slot. This operation will not be atomic
  314. if keys belong to more than one slot.
  315. For more information see https://redis.io/commands/mset
  316. """
  317. # Partition the keys by slot
  318. slots_to_pairs = self._partition_pairs_by_slot(mapping)
  319. # Execute commands using a pipeline & return list of replies
  320. return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
  321. async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
  322. """
  323. Runs the given command once for the keys
  324. of each slot. Returns the sum of the return values.
  325. """
  326. # Partition the keys by slot
  327. slots_to_keys = self._partition_keys_by_slot(keys)
  328. # Sum up the reply from each command
  329. return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
  330. async def _execute_pipeline_by_slot(
  331. self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
  332. ) -> List[Any]:
  333. if self._initialize:
  334. await self.initialize()
  335. read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
  336. pipe = self.pipeline()
  337. [
  338. pipe.execute_command(
  339. command,
  340. *slot_args,
  341. target_nodes=[
  342. self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
  343. ],
  344. )
  345. for slot, slot_args in slots_to_args.items()
  346. ]
  347. return await pipe.execute()
  348. class ClusterManagementCommands(ManagementCommands):
  349. """
  350. A class for Redis Cluster management commands
  351. The class inherits from Redis's core ManagementCommands class and do the
  352. required adjustments to work with cluster mode
  353. """
  354. def slaveof(self, *args, **kwargs) -> NoReturn:
  355. """
  356. Make the server a replica of another instance, or promote it as master.
  357. For more information see https://redis.io/commands/slaveof
  358. """
  359. raise RedisClusterException("SLAVEOF is not supported in cluster mode")
  360. def replicaof(self, *args, **kwargs) -> NoReturn:
  361. """
  362. Make the server a replica of another instance, or promote it as master.
  363. For more information see https://redis.io/commands/replicaof
  364. """
  365. raise RedisClusterException("REPLICAOF is not supported in cluster mode")
  366. def swapdb(self, *args, **kwargs) -> NoReturn:
  367. """
  368. Swaps two Redis databases.
  369. For more information see https://redis.io/commands/swapdb
  370. """
  371. raise RedisClusterException("SWAPDB is not supported in cluster mode")
  372. def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
  373. """
  374. Returns the node's id.
  375. :target_node: 'ClusterNode'
  376. The node to execute the command on
  377. For more information check https://redis.io/commands/cluster-myid/
  378. """
  379. return self.execute_command("CLUSTER MYID", target_nodes=target_node)
  380. def cluster_addslots(
  381. self, target_node: "TargetNodesT", *slots: EncodableT
  382. ) -> ResponseT:
  383. """
  384. Assign new hash slots to receiving node. Sends to specified node.
  385. :target_node: 'ClusterNode'
  386. The node to execute the command on
  387. For more information see https://redis.io/commands/cluster-addslots
  388. """
  389. return self.execute_command(
  390. "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
  391. )
  392. def cluster_addslotsrange(
  393. self, target_node: "TargetNodesT", *slots: EncodableT
  394. ) -> ResponseT:
  395. """
  396. Similar to the CLUSTER ADDSLOTS command.
  397. The difference between the two commands is that ADDSLOTS takes a list of slots
  398. to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
  399. (specified by start and end slots) to assign to the node.
  400. :target_node: 'ClusterNode'
  401. The node to execute the command on
  402. For more information see https://redis.io/commands/cluster-addslotsrange
  403. """
  404. return self.execute_command(
  405. "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
  406. )
  407. def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
  408. """
  409. Return the number of local keys in the specified hash slot
  410. Send to node based on specified slot_id
  411. For more information see https://redis.io/commands/cluster-countkeysinslot
  412. """
  413. return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
  414. def cluster_count_failure_report(self, node_id: str) -> ResponseT:
  415. """
  416. Return the number of failure reports active for a given node
  417. Sends to a random node
  418. For more information see https://redis.io/commands/cluster-count-failure-reports
  419. """
  420. return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
  421. def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
  422. """
  423. Set hash slots as unbound in the cluster.
  424. It determines by it self what node the slot is in and sends it there
  425. Returns a list of the results for each processed slot.
  426. For more information see https://redis.io/commands/cluster-delslots
  427. """
  428. return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
  429. def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
  430. """
  431. Similar to the CLUSTER DELSLOTS command.
  432. The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
  433. from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
  434. from the node.
  435. For more information see https://redis.io/commands/cluster-delslotsrange
  436. """
  437. return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
  438. def cluster_failover(
  439. self, target_node: "TargetNodesT", option: Optional[str] = None
  440. ) -> ResponseT:
  441. """
  442. Forces a slave to perform a manual failover of its master
  443. Sends to specified node
  444. :target_node: 'ClusterNode'
  445. The node to execute the command on
  446. For more information see https://redis.io/commands/cluster-failover
  447. """
  448. if option:
  449. if option.upper() not in ["FORCE", "TAKEOVER"]:
  450. raise RedisError(
  451. f"Invalid option for CLUSTER FAILOVER command: {option}"
  452. )
  453. else:
  454. return self.execute_command(
  455. "CLUSTER FAILOVER", option, target_nodes=target_node
  456. )
  457. else:
  458. return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
  459. def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
  460. """
  461. Provides info about Redis Cluster node state.
  462. The command will be sent to a random node in the cluster if no target
  463. node is specified.
  464. For more information see https://redis.io/commands/cluster-info
  465. """
  466. return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
  467. def cluster_keyslot(self, key: str) -> ResponseT:
  468. """
  469. Returns the hash slot of the specified key
  470. Sends to random node in the cluster
  471. For more information see https://redis.io/commands/cluster-keyslot
  472. """
  473. return self.execute_command("CLUSTER KEYSLOT", key)
  474. def cluster_meet(
  475. self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
  476. ) -> ResponseT:
  477. """
  478. Force a node cluster to handshake with another node.
  479. Sends to specified node.
  480. For more information see https://redis.io/commands/cluster-meet
  481. """
  482. return self.execute_command(
  483. "CLUSTER MEET", host, port, target_nodes=target_nodes
  484. )
  485. def cluster_nodes(self) -> ResponseT:
  486. """
  487. Get Cluster config for the node.
  488. Sends to random node in the cluster
  489. For more information see https://redis.io/commands/cluster-nodes
  490. """
  491. return self.execute_command("CLUSTER NODES")
  492. def cluster_replicate(
  493. self, target_nodes: "TargetNodesT", node_id: str
  494. ) -> ResponseT:
  495. """
  496. Reconfigure a node as a slave of the specified master node
  497. For more information see https://redis.io/commands/cluster-replicate
  498. """
  499. return self.execute_command(
  500. "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
  501. )
  502. def cluster_reset(
  503. self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
  504. ) -> ResponseT:
  505. """
  506. Reset a Redis Cluster node
  507. If 'soft' is True then it will send 'SOFT' argument
  508. If 'soft' is False then it will send 'HARD' argument
  509. For more information see https://redis.io/commands/cluster-reset
  510. """
  511. return self.execute_command(
  512. "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
  513. )
  514. def cluster_save_config(
  515. self, target_nodes: Optional["TargetNodesT"] = None
  516. ) -> ResponseT:
  517. """
  518. Forces the node to save cluster state on disk
  519. For more information see https://redis.io/commands/cluster-saveconfig
  520. """
  521. return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
  522. def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
  523. """
  524. Returns the number of keys in the specified cluster slot
  525. For more information see https://redis.io/commands/cluster-getkeysinslot
  526. """
  527. return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
  528. def cluster_set_config_epoch(
  529. self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
  530. ) -> ResponseT:
  531. """
  532. Set the configuration epoch in a new node
  533. For more information see https://redis.io/commands/cluster-set-config-epoch
  534. """
  535. return self.execute_command(
  536. "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
  537. )
  538. def cluster_setslot(
  539. self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
  540. ) -> ResponseT:
  541. """
  542. Bind an hash slot to a specific node
  543. :target_node: 'ClusterNode'
  544. The node to execute the command on
  545. For more information see https://redis.io/commands/cluster-setslot
  546. """
  547. if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
  548. return self.execute_command(
  549. "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
  550. )
  551. elif state.upper() == "STABLE":
  552. raise RedisError('For "stable" state please use cluster_setslot_stable')
  553. else:
  554. raise RedisError(f"Invalid slot state: {state}")
  555. def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
  556. """
  557. Clears migrating / importing state from the slot.
  558. It determines by it self what node the slot is in and sends it there.
  559. For more information see https://redis.io/commands/cluster-setslot
  560. """
  561. return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
  562. def cluster_replicas(
  563. self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
  564. ) -> ResponseT:
  565. """
  566. Provides a list of replica nodes replicating from the specified primary
  567. target node.
  568. For more information see https://redis.io/commands/cluster-replicas
  569. """
  570. return self.execute_command(
  571. "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
  572. )
  573. def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
  574. """
  575. Get array of Cluster slot to node mappings
  576. For more information see https://redis.io/commands/cluster-slots
  577. """
  578. return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
  579. def cluster_shards(self, target_nodes=None):
  580. """
  581. Returns details about the shards of the cluster.
  582. For more information see https://redis.io/commands/cluster-shards
  583. """
  584. return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
  585. def cluster_myshardid(self, target_nodes=None):
  586. """
  587. Returns the shard ID of the node.
  588. For more information see https://redis.io/commands/cluster-myshardid/
  589. """
  590. return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
  591. def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
  592. """
  593. Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
  594. peer in the cluster: One for sending outbound messages towards the peer and one
  595. for receiving inbound messages from the peer.
  596. This command outputs information of all such peer links as an array.
  597. For more information see https://redis.io/commands/cluster-links
  598. """
  599. return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
  600. def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
  601. raise NotImplementedError(
  602. "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
  603. )
  604. def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
  605. raise NotImplementedError(
  606. "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
  607. )
  608. def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
  609. """
  610. Enables read queries.
  611. The command will be sent to the default cluster node if target_nodes is
  612. not specified.
  613. For more information see https://redis.io/commands/readonly
  614. """
  615. if target_nodes == "replicas" or target_nodes == "all":
  616. # read_from_replicas will only be enabled if the READONLY command
  617. # is sent to all replicas
  618. self.read_from_replicas = True
  619. return self.execute_command("READONLY", target_nodes=target_nodes)
  620. def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
  621. """
  622. Disables read queries.
  623. The command will be sent to the default cluster node if target_nodes is
  624. not specified.
  625. For more information see https://redis.io/commands/readwrite
  626. """
  627. # Reset read from replicas flag
  628. self.read_from_replicas = False
  629. return self.execute_command("READWRITE", target_nodes=target_nodes)
  630. @deprecated_function(
  631. version="7.2.0",
  632. reason="Use client-side caching feature instead.",
  633. )
  634. def client_tracking_on(
  635. self,
  636. clientid: Optional[int] = None,
  637. prefix: Sequence[KeyT] = [],
  638. bcast: bool = False,
  639. optin: bool = False,
  640. optout: bool = False,
  641. noloop: bool = False,
  642. target_nodes: Optional["TargetNodesT"] = "all",
  643. ) -> ResponseT:
  644. """
  645. Enables the tracking feature of the Redis server, that is used
  646. for server assisted client side caching.
  647. When clientid is provided - in target_nodes only the node that owns the
  648. connection with this id should be provided.
  649. When clientid is not provided - target_nodes can be any node.
  650. For more information see https://redis.io/commands/client-tracking
  651. """
  652. return self.client_tracking(
  653. True,
  654. clientid,
  655. prefix,
  656. bcast,
  657. optin,
  658. optout,
  659. noloop,
  660. target_nodes=target_nodes,
  661. )
  662. @deprecated_function(
  663. version="7.2.0",
  664. reason="Use client-side caching feature instead.",
  665. )
  666. def client_tracking_off(
  667. self,
  668. clientid: Optional[int] = None,
  669. prefix: Sequence[KeyT] = [],
  670. bcast: bool = False,
  671. optin: bool = False,
  672. optout: bool = False,
  673. noloop: bool = False,
  674. target_nodes: Optional["TargetNodesT"] = "all",
  675. ) -> ResponseT:
  676. """
  677. Disables the tracking feature of the Redis server, that is used
  678. for server assisted client side caching.
  679. When clientid is provided - in target_nodes only the node that owns the
  680. connection with this id should be provided.
  681. When clientid is not provided - target_nodes can be any node.
  682. For more information see https://redis.io/commands/client-tracking
  683. """
  684. return self.client_tracking(
  685. False,
  686. clientid,
  687. prefix,
  688. bcast,
  689. optin,
  690. optout,
  691. noloop,
  692. target_nodes=target_nodes,
  693. )
  694. def hotkeys_start(
  695. self,
  696. metrics: List[HotkeysMetricsTypes],
  697. count: Optional[int] = None,
  698. duration: Optional[int] = None,
  699. sample_ratio: Optional[int] = None,
  700. slots: Optional[List[int]] = None,
  701. **kwargs,
  702. ) -> Union[str, bytes]:
  703. """
  704. Cluster client does not support hotkeys command. Please use the non-cluster client.
  705. For more information see https://redis.io/commands/hotkeys-start
  706. """
  707. raise NotImplementedError(
  708. "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
  709. )
  710. def hotkeys_stop(self, **kwargs) -> Union[str, bytes]:
  711. """
  712. Cluster client does not support hotkeys command. Please use the non-cluster client.
  713. For more information see https://redis.io/commands/hotkeys-stop
  714. """
  715. raise NotImplementedError(
  716. "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
  717. )
  718. def hotkeys_reset(self, **kwargs) -> Union[str, bytes]:
  719. """
  720. Cluster client does not support hotkeys command. Please use the non-cluster client.
  721. For more information see https://redis.io/commands/hotkeys-reset
  722. """
  723. raise NotImplementedError(
  724. "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
  725. )
  726. def hotkeys_get(self, **kwargs) -> list[dict[Union[str, bytes], Any]]:
  727. """
  728. Cluster client does not support hotkeys command. Please use the non-cluster client.
  729. For more information see https://redis.io/commands/hotkeys-get
  730. """
  731. raise NotImplementedError(
  732. "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
  733. )
  734. class AsyncClusterManagementCommands(
  735. ClusterManagementCommands, AsyncManagementCommands
  736. ):
  737. """
  738. A class for Redis Cluster management commands
  739. The class inherits from Redis's core ManagementCommands class and do the
  740. required adjustments to work with cluster mode
  741. """
  742. async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
  743. """
  744. Set hash slots as unbound in the cluster.
  745. It determines by it self what node the slot is in and sends it there
  746. Returns a list of the results for each processed slot.
  747. For more information see https://redis.io/commands/cluster-delslots
  748. """
  749. return await asyncio.gather(
  750. *(
  751. asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
  752. for slot in slots
  753. )
  754. )
  755. @deprecated_function(
  756. version="7.2.0",
  757. reason="Use client-side caching feature instead.",
  758. )
  759. async def client_tracking_on(
  760. self,
  761. clientid: Optional[int] = None,
  762. prefix: Sequence[KeyT] = [],
  763. bcast: bool = False,
  764. optin: bool = False,
  765. optout: bool = False,
  766. noloop: bool = False,
  767. target_nodes: Optional["TargetNodesT"] = "all",
  768. ) -> ResponseT:
  769. """
  770. Enables the tracking feature of the Redis server, that is used
  771. for server assisted client side caching.
  772. When clientid is provided - in target_nodes only the node that owns the
  773. connection with this id should be provided.
  774. When clientid is not provided - target_nodes can be any node.
  775. For more information see https://redis.io/commands/client-tracking
  776. """
  777. return await self.client_tracking(
  778. True,
  779. clientid,
  780. prefix,
  781. bcast,
  782. optin,
  783. optout,
  784. noloop,
  785. target_nodes=target_nodes,
  786. )
  787. @deprecated_function(
  788. version="7.2.0",
  789. reason="Use client-side caching feature instead.",
  790. )
  791. async def client_tracking_off(
  792. self,
  793. clientid: Optional[int] = None,
  794. prefix: Sequence[KeyT] = [],
  795. bcast: bool = False,
  796. optin: bool = False,
  797. optout: bool = False,
  798. noloop: bool = False,
  799. target_nodes: Optional["TargetNodesT"] = "all",
  800. ) -> ResponseT:
  801. """
  802. Disables the tracking feature of the Redis server, that is used
  803. for server assisted client side caching.
  804. When clientid is provided - in target_nodes only the node that owns the
  805. connection with this id should be provided.
  806. When clientid is not provided - target_nodes can be any node.
  807. For more information see https://redis.io/commands/client-tracking
  808. """
  809. return await self.client_tracking(
  810. False,
  811. clientid,
  812. prefix,
  813. bcast,
  814. optin,
  815. optout,
  816. noloop,
  817. target_nodes=target_nodes,
  818. )
  819. async def hotkeys_start(
  820. self,
  821. metrics: List[HotkeysMetricsTypes],
  822. count: Optional[int] = None,
  823. duration: Optional[int] = None,
  824. sample_ratio: Optional[int] = None,
  825. slots: Optional[List[int]] = None,
  826. **kwargs,
  827. ) -> Awaitable[Union[str, bytes]]:
  828. """
  829. Cluster client does not support hotkeys command. Please use the non-cluster client.
  830. For more information see https://redis.io/commands/hotkeys-start
  831. """
  832. raise NotImplementedError(
  833. "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
  834. )
  835. async def hotkeys_stop(self, **kwargs) -> Awaitable[Union[str, bytes]]:
  836. """
  837. Cluster client does not support hotkeys command. Please use the non-cluster client.
  838. For more information see https://redis.io/commands/hotkeys-stop
  839. """
  840. raise NotImplementedError(
  841. "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
  842. )
  843. async def hotkeys_reset(self, **kwargs) -> Awaitable[Union[str, bytes]]:
  844. """
  845. Cluster client does not support hotkeys command. Please use the non-cluster client.
  846. For more information see https://redis.io/commands/hotkeys-reset
  847. """
  848. raise NotImplementedError(
  849. "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
  850. )
  851. async def hotkeys_get(
  852. self, **kwargs
  853. ) -> Awaitable[list[dict[Union[str, bytes], Any]]]:
  854. """
  855. Cluster client does not support hotkeys command. Please use the non-cluster client.
  856. For more information see https://redis.io/commands/hotkeys-get
  857. """
  858. raise NotImplementedError(
  859. "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
  860. )
  861. class ClusterDataAccessCommands(DataAccessCommands):
  862. """
  863. A class for Redis Cluster Data Access Commands
  864. The class inherits from Redis's core DataAccessCommand class and do the
  865. required adjustments to work with cluster mode
  866. """
  867. def stralgo(
  868. self,
  869. algo: Literal["LCS"],
  870. value1: KeyT,
  871. value2: KeyT,
  872. specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
  873. len: bool = False,
  874. idx: bool = False,
  875. minmatchlen: Optional[int] = None,
  876. withmatchlen: bool = False,
  877. **kwargs,
  878. ) -> ResponseT:
  879. """
  880. Implements complex algorithms that operate on strings.
  881. Right now the only algorithm implemented is the LCS algorithm
  882. (longest common substring). However new algorithms could be
  883. implemented in the future.
  884. ``algo`` Right now must be LCS
  885. ``value1`` and ``value2`` Can be two strings or two keys
  886. ``specific_argument`` Specifying if the arguments to the algorithm
  887. will be keys or strings. strings is the default.
  888. ``len`` Returns just the len of the match.
  889. ``idx`` Returns the match positions in each string.
  890. ``minmatchlen`` Restrict the list of matches to the ones of a given
  891. minimal length. Can be provided only when ``idx`` set to True.
  892. ``withmatchlen`` Returns the matches with the len of the match.
  893. Can be provided only when ``idx`` set to True.
  894. For more information see https://redis.io/commands/stralgo
  895. """
  896. target_nodes = kwargs.pop("target_nodes", None)
  897. if specific_argument == "strings" and target_nodes is None:
  898. target_nodes = "default-node"
  899. kwargs.update({"target_nodes": target_nodes})
  900. return super().stralgo(
  901. algo,
  902. value1,
  903. value2,
  904. specific_argument,
  905. len,
  906. idx,
  907. minmatchlen,
  908. withmatchlen,
  909. **kwargs,
  910. )
  911. def scan_iter(
  912. self,
  913. match: Optional[PatternT] = None,
  914. count: Optional[int] = None,
  915. _type: Optional[str] = None,
  916. **kwargs,
  917. ) -> Iterator:
  918. # Do the first query with cursor=0 for all nodes
  919. cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
  920. yield from data
  921. cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
  922. if cursors:
  923. # Get nodes by name
  924. nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
  925. # Iterate over each node till its cursor is 0
  926. kwargs.pop("target_nodes", None)
  927. while cursors:
  928. for name, cursor in cursors.items():
  929. cur, data = self.scan(
  930. cursor=cursor,
  931. match=match,
  932. count=count,
  933. _type=_type,
  934. target_nodes=nodes[name],
  935. **kwargs,
  936. )
  937. yield from data
  938. cursors[name] = cur[name]
  939. cursors = {
  940. name: cursor for name, cursor in cursors.items() if cursor != 0
  941. }
  942. class AsyncClusterDataAccessCommands(
  943. ClusterDataAccessCommands, AsyncDataAccessCommands
  944. ):
  945. """
  946. A class for Redis Cluster Data Access Commands
  947. The class inherits from Redis's core DataAccessCommand class and do the
  948. required adjustments to work with cluster mode
  949. """
  950. async def scan_iter(
  951. self,
  952. match: Optional[PatternT] = None,
  953. count: Optional[int] = None,
  954. _type: Optional[str] = None,
  955. **kwargs,
  956. ) -> AsyncIterator:
  957. # Do the first query with cursor=0 for all nodes
  958. cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
  959. for value in data:
  960. yield value
  961. cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
  962. if cursors:
  963. # Get nodes by name
  964. nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
  965. # Iterate over each node till its cursor is 0
  966. kwargs.pop("target_nodes", None)
  967. while cursors:
  968. for name, cursor in cursors.items():
  969. cur, data = await self.scan(
  970. cursor=cursor,
  971. match=match,
  972. count=count,
  973. _type=_type,
  974. target_nodes=nodes[name],
  975. **kwargs,
  976. )
  977. for value in data:
  978. yield value
  979. cursors[name] = cur[name]
  980. cursors = {
  981. name: cursor for name, cursor in cursors.items() if cursor != 0
  982. }
  983. class RedisClusterCommands(
  984. ClusterMultiKeyCommands,
  985. ClusterManagementCommands,
  986. ACLCommands,
  987. PubSubCommands,
  988. ClusterDataAccessCommands,
  989. ScriptCommands,
  990. FunctionCommands,
  991. ModuleCommands,
  992. RedisModuleCommands,
  993. ):
  994. """
  995. A class for all Redis Cluster commands
  996. For key-based commands, the target node(s) will be internally determined
  997. by the keys' hash slot.
  998. Non-key-based commands can be executed with the 'target_nodes' argument to
  999. target specific nodes. By default, if target_nodes is not specified, the
  1000. command will be executed on the default cluster node.
  1001. :param :target_nodes: type can be one of the followings:
  1002. - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
  1003. - 'ClusterNode'
  1004. - 'list(ClusterNodes)'
  1005. - 'dict(any:clusterNodes)'
  1006. for example:
  1007. r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
  1008. """
  1009. class AsyncRedisClusterCommands(
  1010. AsyncClusterMultiKeyCommands,
  1011. AsyncClusterManagementCommands,
  1012. AsyncACLCommands,
  1013. AsyncClusterDataAccessCommands,
  1014. AsyncScriptCommands,
  1015. AsyncFunctionCommands,
  1016. AsyncModuleCommands,
  1017. AsyncRedisModuleCommands,
  1018. ):
  1019. """
  1020. A class for all Redis Cluster commands
  1021. For key-based commands, the target node(s) will be internally determined
  1022. by the keys' hash slot.
  1023. Non-key-based commands can be executed with the 'target_nodes' argument to
  1024. target specific nodes. By default, if target_nodes is not specified, the
  1025. command will be executed on the default cluster node.
  1026. :param :target_nodes: type can be one of the followings:
  1027. - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
  1028. - 'ClusterNode'
  1029. - 'list(ClusterNodes)'
  1030. - 'dict(any:clusterNodes)'
  1031. for example:
  1032. r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
  1033. """