| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225 |
- import asyncio
- from typing import (
- TYPE_CHECKING,
- Any,
- AsyncIterator,
- Awaitable,
- Dict,
- Iterable,
- Iterator,
- List,
- Literal,
- Mapping,
- NoReturn,
- Optional,
- Sequence,
- Union,
- )
- from redis.crc import key_slot
- from redis.exceptions import RedisClusterException, RedisError
- from redis.typing import (
- AnyKeyT,
- ClusterCommandsProtocol,
- EncodableT,
- KeysT,
- KeyT,
- PatternT,
- ResponseT,
- )
- from redis.utils import deprecated_function
- from .core import (
- ACLCommands,
- AsyncACLCommands,
- AsyncDataAccessCommands,
- AsyncFunctionCommands,
- AsyncManagementCommands,
- AsyncModuleCommands,
- AsyncScriptCommands,
- DataAccessCommands,
- FunctionCommands,
- HotkeysMetricsTypes,
- ManagementCommands,
- ModuleCommands,
- PubSubCommands,
- ScriptCommands,
- )
- from .helpers import list_or_args
- from .redismodules import AsyncRedisModuleCommands, RedisModuleCommands
- if TYPE_CHECKING:
- from redis.asyncio.cluster import TargetNodesT
- # Not complete, but covers the major ones
- # https://redis.io/commands
- READ_COMMANDS = frozenset(
- [
- # Bit Operations
- "BITCOUNT",
- "BITFIELD_RO",
- "BITPOS",
- # Scripting
- "EVAL_RO",
- "EVALSHA_RO",
- "FCALL_RO",
- # Key Operations
- "DBSIZE",
- "DIGEST",
- "DUMP",
- "EXISTS",
- "EXPIRETIME",
- "PEXPIRETIME",
- "KEYS",
- "SCAN",
- "PTTL",
- "RANDOMKEY",
- "TTL",
- "TYPE",
- # String Operations
- "GET",
- "GETBIT",
- "GETRANGE",
- "MGET",
- "STRLEN",
- "LCS",
- # Geo Operations
- "GEODIST",
- "GEOHASH",
- "GEOPOS",
- "GEOSEARCH",
- # Hash Operations
- "HEXISTS",
- "HGET",
- "HGETALL",
- "HKEYS",
- "HLEN",
- "HMGET",
- "HSTRLEN",
- "HVALS",
- "HRANDFIELD",
- "HEXPIRETIME",
- "HPEXPIRETIME",
- "HTTL",
- "HPTTL",
- "HSCAN",
- # List Operations
- "LINDEX",
- "LPOS",
- "LLEN",
- "LRANGE",
- # Set Operations
- "SCARD",
- "SDIFF",
- "SINTER",
- "SINTERCARD",
- "SISMEMBER",
- "SMISMEMBER",
- "SMEMBERS",
- "SRANDMEMBER",
- "SUNION",
- "SSCAN",
- # Sorted Set Operations
- "ZCARD",
- "ZCOUNT",
- "ZDIFF",
- "ZINTER",
- "ZINTERCARD",
- "ZLEXCOUNT",
- "ZMSCORE",
- "ZRANDMEMBER",
- "ZRANGE",
- "ZRANGEBYLEX",
- "ZRANGEBYSCORE",
- "ZRANK",
- "ZREVRANGE",
- "ZREVRANGEBYLEX",
- "ZREVRANGEBYSCORE",
- "ZREVRANK",
- "ZSCAN",
- "ZSCORE",
- "ZUNION",
- # Stream Operations
- "XLEN",
- "XPENDING",
- "XRANGE",
- "XREAD",
- "XREVRANGE",
- # JSON Module
- "JSON.ARRINDEX",
- "JSON.ARRLEN",
- "JSON.GET",
- "JSON.MGET",
- "JSON.OBJKEYS",
- "JSON.OBJLEN",
- "JSON.RESP",
- "JSON.STRLEN",
- "JSON.TYPE",
- # RediSearch Module
- "FT.EXPLAIN",
- "FT.INFO",
- "FT.PROFILE",
- "FT.SEARCH",
- ]
- )
- class ClusterMultiKeyCommands(ClusterCommandsProtocol):
- """
- A class containing commands that handle more than one key
- """
- def _partition_keys_by_slot(self, keys: Iterable[KeyT]) -> Dict[int, List[KeyT]]:
- """Split keys into a dictionary that maps a slot to a list of keys."""
- slots_to_keys = {}
- for key in keys:
- slot = key_slot(self.encoder.encode(key))
- slots_to_keys.setdefault(slot, []).append(key)
- return slots_to_keys
- def _partition_pairs_by_slot(
- self, mapping: Mapping[AnyKeyT, EncodableT]
- ) -> Dict[int, List[EncodableT]]:
- """Split pairs into a dictionary that maps a slot to a list of pairs."""
- slots_to_pairs = {}
- for pair in mapping.items():
- slot = key_slot(self.encoder.encode(pair[0]))
- slots_to_pairs.setdefault(slot, []).extend(pair)
- return slots_to_pairs
- def _execute_pipeline_by_slot(
- self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
- ) -> List[Any]:
- read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
- pipe = self.pipeline()
- [
- pipe.execute_command(
- command,
- *slot_args,
- target_nodes=[
- self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
- ],
- )
- for slot, slot_args in slots_to_args.items()
- ]
- return pipe.execute()
- def _reorder_keys_by_command(
- self,
- keys: Iterable[KeyT],
- slots_to_args: Mapping[int, Iterable[EncodableT]],
- responses: Iterable[Any],
- ) -> List[Any]:
- results = {
- k: v
- for slot_values, response in zip(slots_to_args.values(), responses)
- for k, v in zip(slot_values, response)
- }
- return [results[key] for key in keys]
- def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
- """
- Splits the keys into different slots and then calls MGET
- for the keys of every slot. This operation will not be atomic
- if keys belong to more than one slot.
- Returns a list of values ordered identically to ``keys``
- For more information see https://redis.io/commands/mget
- """
- # Concatenate all keys into a list
- keys = list_or_args(keys, args)
- # Split keys into slots
- slots_to_keys = self._partition_keys_by_slot(keys)
- # Execute commands using a pipeline
- res = self._execute_pipeline_by_slot("MGET", slots_to_keys)
- # Reorder keys in the order the user provided & return
- return self._reorder_keys_by_command(keys, slots_to_keys, res)
- def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
- """
- Sets key/values based on a mapping. Mapping is a dictionary of
- key/value pairs. Both keys and values should be strings or types that
- can be cast to a string via str().
- Splits the keys into different slots and then calls MSET
- for the keys of every slot. This operation will not be atomic
- if keys belong to more than one slot.
- For more information see https://redis.io/commands/mset
- """
- # Partition the keys by slot
- slots_to_pairs = self._partition_pairs_by_slot(mapping)
- # Execute commands using a pipeline & return list of replies
- return self._execute_pipeline_by_slot("MSET", slots_to_pairs)
- def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
- """
- Runs the given command once for the keys
- of each slot. Returns the sum of the return values.
- """
- # Partition the keys by slot
- slots_to_keys = self._partition_keys_by_slot(keys)
- # Sum up the reply from each command
- return sum(self._execute_pipeline_by_slot(command, slots_to_keys))
- def exists(self, *keys: KeyT) -> ResponseT:
- """
- Returns the number of ``names`` that exist in the
- whole cluster. The keys are first split up into slots
- and then an EXISTS command is sent for every slot
- For more information see https://redis.io/commands/exists
- """
- return self._split_command_across_slots("EXISTS", *keys)
- def delete(self, *keys: KeyT) -> ResponseT:
- """
- Deletes the given keys in the cluster.
- The keys are first split up into slots
- and then an DEL command is sent for every slot
- Non-existent keys are ignored.
- Returns the number of keys that were deleted.
- For more information see https://redis.io/commands/del
- """
- return self._split_command_across_slots("DEL", *keys)
- def touch(self, *keys: KeyT) -> ResponseT:
- """
- Updates the last access time of given keys across the
- cluster.
- The keys are first split up into slots
- and then an TOUCH command is sent for every slot
- Non-existent keys are ignored.
- Returns the number of keys that were touched.
- For more information see https://redis.io/commands/touch
- """
- return self._split_command_across_slots("TOUCH", *keys)
- def unlink(self, *keys: KeyT) -> ResponseT:
- """
- Remove the specified keys in a different thread.
- The keys are first split up into slots
- and then an TOUCH command is sent for every slot
- Non-existent keys are ignored.
- Returns the number of keys that were unlinked.
- For more information see https://redis.io/commands/unlink
- """
- return self._split_command_across_slots("UNLINK", *keys)
- class AsyncClusterMultiKeyCommands(ClusterMultiKeyCommands):
- """
- A class containing commands that handle more than one key
- """
- async def mget_nonatomic(self, keys: KeysT, *args: KeyT) -> List[Optional[Any]]:
- """
- Splits the keys into different slots and then calls MGET
- for the keys of every slot. This operation will not be atomic
- if keys belong to more than one slot.
- Returns a list of values ordered identically to ``keys``
- For more information see https://redis.io/commands/mget
- """
- # Concatenate all keys into a list
- keys = list_or_args(keys, args)
- # Split keys into slots
- slots_to_keys = self._partition_keys_by_slot(keys)
- # Execute commands using a pipeline
- res = await self._execute_pipeline_by_slot("MGET", slots_to_keys)
- # Reorder keys in the order the user provided & return
- return self._reorder_keys_by_command(keys, slots_to_keys, res)
- async def mset_nonatomic(self, mapping: Mapping[AnyKeyT, EncodableT]) -> List[bool]:
- """
- Sets key/values based on a mapping. Mapping is a dictionary of
- key/value pairs. Both keys and values should be strings or types that
- can be cast to a string via str().
- Splits the keys into different slots and then calls MSET
- for the keys of every slot. This operation will not be atomic
- if keys belong to more than one slot.
- For more information see https://redis.io/commands/mset
- """
- # Partition the keys by slot
- slots_to_pairs = self._partition_pairs_by_slot(mapping)
- # Execute commands using a pipeline & return list of replies
- return await self._execute_pipeline_by_slot("MSET", slots_to_pairs)
- async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
- """
- Runs the given command once for the keys
- of each slot. Returns the sum of the return values.
- """
- # Partition the keys by slot
- slots_to_keys = self._partition_keys_by_slot(keys)
- # Sum up the reply from each command
- return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))
- async def _execute_pipeline_by_slot(
- self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
- ) -> List[Any]:
- if self._initialize:
- await self.initialize()
- read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
- pipe = self.pipeline()
- [
- pipe.execute_command(
- command,
- *slot_args,
- target_nodes=[
- self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
- ],
- )
- for slot, slot_args in slots_to_args.items()
- ]
- return await pipe.execute()
- class ClusterManagementCommands(ManagementCommands):
- """
- A class for Redis Cluster management commands
- The class inherits from Redis's core ManagementCommands class and do the
- required adjustments to work with cluster mode
- """
- def slaveof(self, *args, **kwargs) -> NoReturn:
- """
- Make the server a replica of another instance, or promote it as master.
- For more information see https://redis.io/commands/slaveof
- """
- raise RedisClusterException("SLAVEOF is not supported in cluster mode")
- def replicaof(self, *args, **kwargs) -> NoReturn:
- """
- Make the server a replica of another instance, or promote it as master.
- For more information see https://redis.io/commands/replicaof
- """
- raise RedisClusterException("REPLICAOF is not supported in cluster mode")
- def swapdb(self, *args, **kwargs) -> NoReturn:
- """
- Swaps two Redis databases.
- For more information see https://redis.io/commands/swapdb
- """
- raise RedisClusterException("SWAPDB is not supported in cluster mode")
- def cluster_myid(self, target_node: "TargetNodesT") -> ResponseT:
- """
- Returns the node's id.
- :target_node: 'ClusterNode'
- The node to execute the command on
- For more information check https://redis.io/commands/cluster-myid/
- """
- return self.execute_command("CLUSTER MYID", target_nodes=target_node)
- def cluster_addslots(
- self, target_node: "TargetNodesT", *slots: EncodableT
- ) -> ResponseT:
- """
- Assign new hash slots to receiving node. Sends to specified node.
- :target_node: 'ClusterNode'
- The node to execute the command on
- For more information see https://redis.io/commands/cluster-addslots
- """
- return self.execute_command(
- "CLUSTER ADDSLOTS", *slots, target_nodes=target_node
- )
- def cluster_addslotsrange(
- self, target_node: "TargetNodesT", *slots: EncodableT
- ) -> ResponseT:
- """
- Similar to the CLUSTER ADDSLOTS command.
- The difference between the two commands is that ADDSLOTS takes a list of slots
- to assign to the node, while ADDSLOTSRANGE takes a list of slot ranges
- (specified by start and end slots) to assign to the node.
- :target_node: 'ClusterNode'
- The node to execute the command on
- For more information see https://redis.io/commands/cluster-addslotsrange
- """
- return self.execute_command(
- "CLUSTER ADDSLOTSRANGE", *slots, target_nodes=target_node
- )
- def cluster_countkeysinslot(self, slot_id: int) -> ResponseT:
- """
- Return the number of local keys in the specified hash slot
- Send to node based on specified slot_id
- For more information see https://redis.io/commands/cluster-countkeysinslot
- """
- return self.execute_command("CLUSTER COUNTKEYSINSLOT", slot_id)
- def cluster_count_failure_report(self, node_id: str) -> ResponseT:
- """
- Return the number of failure reports active for a given node
- Sends to a random node
- For more information see https://redis.io/commands/cluster-count-failure-reports
- """
- return self.execute_command("CLUSTER COUNT-FAILURE-REPORTS", node_id)
- def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
- """
- Set hash slots as unbound in the cluster.
- It determines by it self what node the slot is in and sends it there
- Returns a list of the results for each processed slot.
- For more information see https://redis.io/commands/cluster-delslots
- """
- return [self.execute_command("CLUSTER DELSLOTS", slot) for slot in slots]
- def cluster_delslotsrange(self, *slots: EncodableT) -> ResponseT:
- """
- Similar to the CLUSTER DELSLOTS command.
- The difference is that CLUSTER DELSLOTS takes a list of hash slots to remove
- from the node, while CLUSTER DELSLOTSRANGE takes a list of slot ranges to remove
- from the node.
- For more information see https://redis.io/commands/cluster-delslotsrange
- """
- return self.execute_command("CLUSTER DELSLOTSRANGE", *slots)
- def cluster_failover(
- self, target_node: "TargetNodesT", option: Optional[str] = None
- ) -> ResponseT:
- """
- Forces a slave to perform a manual failover of its master
- Sends to specified node
- :target_node: 'ClusterNode'
- The node to execute the command on
- For more information see https://redis.io/commands/cluster-failover
- """
- if option:
- if option.upper() not in ["FORCE", "TAKEOVER"]:
- raise RedisError(
- f"Invalid option for CLUSTER FAILOVER command: {option}"
- )
- else:
- return self.execute_command(
- "CLUSTER FAILOVER", option, target_nodes=target_node
- )
- else:
- return self.execute_command("CLUSTER FAILOVER", target_nodes=target_node)
- def cluster_info(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
- """
- Provides info about Redis Cluster node state.
- The command will be sent to a random node in the cluster if no target
- node is specified.
- For more information see https://redis.io/commands/cluster-info
- """
- return self.execute_command("CLUSTER INFO", target_nodes=target_nodes)
- def cluster_keyslot(self, key: str) -> ResponseT:
- """
- Returns the hash slot of the specified key
- Sends to random node in the cluster
- For more information see https://redis.io/commands/cluster-keyslot
- """
- return self.execute_command("CLUSTER KEYSLOT", key)
- def cluster_meet(
- self, host: str, port: int, target_nodes: Optional["TargetNodesT"] = None
- ) -> ResponseT:
- """
- Force a node cluster to handshake with another node.
- Sends to specified node.
- For more information see https://redis.io/commands/cluster-meet
- """
- return self.execute_command(
- "CLUSTER MEET", host, port, target_nodes=target_nodes
- )
- def cluster_nodes(self) -> ResponseT:
- """
- Get Cluster config for the node.
- Sends to random node in the cluster
- For more information see https://redis.io/commands/cluster-nodes
- """
- return self.execute_command("CLUSTER NODES")
- def cluster_replicate(
- self, target_nodes: "TargetNodesT", node_id: str
- ) -> ResponseT:
- """
- Reconfigure a node as a slave of the specified master node
- For more information see https://redis.io/commands/cluster-replicate
- """
- return self.execute_command(
- "CLUSTER REPLICATE", node_id, target_nodes=target_nodes
- )
- def cluster_reset(
- self, soft: bool = True, target_nodes: Optional["TargetNodesT"] = None
- ) -> ResponseT:
- """
- Reset a Redis Cluster node
- If 'soft' is True then it will send 'SOFT' argument
- If 'soft' is False then it will send 'HARD' argument
- For more information see https://redis.io/commands/cluster-reset
- """
- return self.execute_command(
- "CLUSTER RESET", b"SOFT" if soft else b"HARD", target_nodes=target_nodes
- )
- def cluster_save_config(
- self, target_nodes: Optional["TargetNodesT"] = None
- ) -> ResponseT:
- """
- Forces the node to save cluster state on disk
- For more information see https://redis.io/commands/cluster-saveconfig
- """
- return self.execute_command("CLUSTER SAVECONFIG", target_nodes=target_nodes)
- def cluster_get_keys_in_slot(self, slot: int, num_keys: int) -> ResponseT:
- """
- Returns the number of keys in the specified cluster slot
- For more information see https://redis.io/commands/cluster-getkeysinslot
- """
- return self.execute_command("CLUSTER GETKEYSINSLOT", slot, num_keys)
- def cluster_set_config_epoch(
- self, epoch: int, target_nodes: Optional["TargetNodesT"] = None
- ) -> ResponseT:
- """
- Set the configuration epoch in a new node
- For more information see https://redis.io/commands/cluster-set-config-epoch
- """
- return self.execute_command(
- "CLUSTER SET-CONFIG-EPOCH", epoch, target_nodes=target_nodes
- )
- def cluster_setslot(
- self, target_node: "TargetNodesT", node_id: str, slot_id: int, state: str
- ) -> ResponseT:
- """
- Bind an hash slot to a specific node
- :target_node: 'ClusterNode'
- The node to execute the command on
- For more information see https://redis.io/commands/cluster-setslot
- """
- if state.upper() in ("IMPORTING", "NODE", "MIGRATING"):
- return self.execute_command(
- "CLUSTER SETSLOT", slot_id, state, node_id, target_nodes=target_node
- )
- elif state.upper() == "STABLE":
- raise RedisError('For "stable" state please use cluster_setslot_stable')
- else:
- raise RedisError(f"Invalid slot state: {state}")
- def cluster_setslot_stable(self, slot_id: int) -> ResponseT:
- """
- Clears migrating / importing state from the slot.
- It determines by it self what node the slot is in and sends it there.
- For more information see https://redis.io/commands/cluster-setslot
- """
- return self.execute_command("CLUSTER SETSLOT", slot_id, "STABLE")
- def cluster_replicas(
- self, node_id: str, target_nodes: Optional["TargetNodesT"] = None
- ) -> ResponseT:
- """
- Provides a list of replica nodes replicating from the specified primary
- target node.
- For more information see https://redis.io/commands/cluster-replicas
- """
- return self.execute_command(
- "CLUSTER REPLICAS", node_id, target_nodes=target_nodes
- )
- def cluster_slots(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
- """
- Get array of Cluster slot to node mappings
- For more information see https://redis.io/commands/cluster-slots
- """
- return self.execute_command("CLUSTER SLOTS", target_nodes=target_nodes)
- def cluster_shards(self, target_nodes=None):
- """
- Returns details about the shards of the cluster.
- For more information see https://redis.io/commands/cluster-shards
- """
- return self.execute_command("CLUSTER SHARDS", target_nodes=target_nodes)
- def cluster_myshardid(self, target_nodes=None):
- """
- Returns the shard ID of the node.
- For more information see https://redis.io/commands/cluster-myshardid/
- """
- return self.execute_command("CLUSTER MYSHARDID", target_nodes=target_nodes)
- def cluster_links(self, target_node: "TargetNodesT") -> ResponseT:
- """
- Each node in a Redis Cluster maintains a pair of long-lived TCP link with each
- peer in the cluster: One for sending outbound messages towards the peer and one
- for receiving inbound messages from the peer.
- This command outputs information of all such peer links as an array.
- For more information see https://redis.io/commands/cluster-links
- """
- return self.execute_command("CLUSTER LINKS", target_nodes=target_node)
- def cluster_flushslots(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
- raise NotImplementedError(
- "CLUSTER FLUSHSLOTS is intentionally not implemented in the client."
- )
- def cluster_bumpepoch(self, target_nodes: Optional["TargetNodesT"] = None) -> None:
- raise NotImplementedError(
- "CLUSTER BUMPEPOCH is intentionally not implemented in the client."
- )
- def readonly(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
- """
- Enables read queries.
- The command will be sent to the default cluster node if target_nodes is
- not specified.
- For more information see https://redis.io/commands/readonly
- """
- if target_nodes == "replicas" or target_nodes == "all":
- # read_from_replicas will only be enabled if the READONLY command
- # is sent to all replicas
- self.read_from_replicas = True
- return self.execute_command("READONLY", target_nodes=target_nodes)
- def readwrite(self, target_nodes: Optional["TargetNodesT"] = None) -> ResponseT:
- """
- Disables read queries.
- The command will be sent to the default cluster node if target_nodes is
- not specified.
- For more information see https://redis.io/commands/readwrite
- """
- # Reset read from replicas flag
- self.read_from_replicas = False
- return self.execute_command("READWRITE", target_nodes=target_nodes)
- @deprecated_function(
- version="7.2.0",
- reason="Use client-side caching feature instead.",
- )
- def client_tracking_on(
- self,
- clientid: Optional[int] = None,
- prefix: Sequence[KeyT] = [],
- bcast: bool = False,
- optin: bool = False,
- optout: bool = False,
- noloop: bool = False,
- target_nodes: Optional["TargetNodesT"] = "all",
- ) -> ResponseT:
- """
- Enables the tracking feature of the Redis server, that is used
- for server assisted client side caching.
- When clientid is provided - in target_nodes only the node that owns the
- connection with this id should be provided.
- When clientid is not provided - target_nodes can be any node.
- For more information see https://redis.io/commands/client-tracking
- """
- return self.client_tracking(
- True,
- clientid,
- prefix,
- bcast,
- optin,
- optout,
- noloop,
- target_nodes=target_nodes,
- )
- @deprecated_function(
- version="7.2.0",
- reason="Use client-side caching feature instead.",
- )
- def client_tracking_off(
- self,
- clientid: Optional[int] = None,
- prefix: Sequence[KeyT] = [],
- bcast: bool = False,
- optin: bool = False,
- optout: bool = False,
- noloop: bool = False,
- target_nodes: Optional["TargetNodesT"] = "all",
- ) -> ResponseT:
- """
- Disables the tracking feature of the Redis server, that is used
- for server assisted client side caching.
- When clientid is provided - in target_nodes only the node that owns the
- connection with this id should be provided.
- When clientid is not provided - target_nodes can be any node.
- For more information see https://redis.io/commands/client-tracking
- """
- return self.client_tracking(
- False,
- clientid,
- prefix,
- bcast,
- optin,
- optout,
- noloop,
- target_nodes=target_nodes,
- )
- def hotkeys_start(
- self,
- metrics: List[HotkeysMetricsTypes],
- count: Optional[int] = None,
- duration: Optional[int] = None,
- sample_ratio: Optional[int] = None,
- slots: Optional[List[int]] = None,
- **kwargs,
- ) -> Union[str, bytes]:
- """
- Cluster client does not support hotkeys command. Please use the non-cluster client.
- For more information see https://redis.io/commands/hotkeys-start
- """
- raise NotImplementedError(
- "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
- )
- def hotkeys_stop(self, **kwargs) -> Union[str, bytes]:
- """
- Cluster client does not support hotkeys command. Please use the non-cluster client.
- For more information see https://redis.io/commands/hotkeys-stop
- """
- raise NotImplementedError(
- "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
- )
- def hotkeys_reset(self, **kwargs) -> Union[str, bytes]:
- """
- Cluster client does not support hotkeys command. Please use the non-cluster client.
- For more information see https://redis.io/commands/hotkeys-reset
- """
- raise NotImplementedError(
- "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
- )
- def hotkeys_get(self, **kwargs) -> list[dict[Union[str, bytes], Any]]:
- """
- Cluster client does not support hotkeys command. Please use the non-cluster client.
- For more information see https://redis.io/commands/hotkeys-get
- """
- raise NotImplementedError(
- "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
- )
- class AsyncClusterManagementCommands(
- ClusterManagementCommands, AsyncManagementCommands
- ):
- """
- A class for Redis Cluster management commands
- The class inherits from Redis's core ManagementCommands class and do the
- required adjustments to work with cluster mode
- """
- async def cluster_delslots(self, *slots: EncodableT) -> List[bool]:
- """
- Set hash slots as unbound in the cluster.
- It determines by it self what node the slot is in and sends it there
- Returns a list of the results for each processed slot.
- For more information see https://redis.io/commands/cluster-delslots
- """
- return await asyncio.gather(
- *(
- asyncio.create_task(self.execute_command("CLUSTER DELSLOTS", slot))
- for slot in slots
- )
- )
- @deprecated_function(
- version="7.2.0",
- reason="Use client-side caching feature instead.",
- )
- async def client_tracking_on(
- self,
- clientid: Optional[int] = None,
- prefix: Sequence[KeyT] = [],
- bcast: bool = False,
- optin: bool = False,
- optout: bool = False,
- noloop: bool = False,
- target_nodes: Optional["TargetNodesT"] = "all",
- ) -> ResponseT:
- """
- Enables the tracking feature of the Redis server, that is used
- for server assisted client side caching.
- When clientid is provided - in target_nodes only the node that owns the
- connection with this id should be provided.
- When clientid is not provided - target_nodes can be any node.
- For more information see https://redis.io/commands/client-tracking
- """
- return await self.client_tracking(
- True,
- clientid,
- prefix,
- bcast,
- optin,
- optout,
- noloop,
- target_nodes=target_nodes,
- )
- @deprecated_function(
- version="7.2.0",
- reason="Use client-side caching feature instead.",
- )
- async def client_tracking_off(
- self,
- clientid: Optional[int] = None,
- prefix: Sequence[KeyT] = [],
- bcast: bool = False,
- optin: bool = False,
- optout: bool = False,
- noloop: bool = False,
- target_nodes: Optional["TargetNodesT"] = "all",
- ) -> ResponseT:
- """
- Disables the tracking feature of the Redis server, that is used
- for server assisted client side caching.
- When clientid is provided - in target_nodes only the node that owns the
- connection with this id should be provided.
- When clientid is not provided - target_nodes can be any node.
- For more information see https://redis.io/commands/client-tracking
- """
- return await self.client_tracking(
- False,
- clientid,
- prefix,
- bcast,
- optin,
- optout,
- noloop,
- target_nodes=target_nodes,
- )
- async def hotkeys_start(
- self,
- metrics: List[HotkeysMetricsTypes],
- count: Optional[int] = None,
- duration: Optional[int] = None,
- sample_ratio: Optional[int] = None,
- slots: Optional[List[int]] = None,
- **kwargs,
- ) -> Awaitable[Union[str, bytes]]:
- """
- Cluster client does not support hotkeys command. Please use the non-cluster client.
- For more information see https://redis.io/commands/hotkeys-start
- """
- raise NotImplementedError(
- "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
- )
- async def hotkeys_stop(self, **kwargs) -> Awaitable[Union[str, bytes]]:
- """
- Cluster client does not support hotkeys command. Please use the non-cluster client.
- For more information see https://redis.io/commands/hotkeys-stop
- """
- raise NotImplementedError(
- "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
- )
- async def hotkeys_reset(self, **kwargs) -> Awaitable[Union[str, bytes]]:
- """
- Cluster client does not support hotkeys command. Please use the non-cluster client.
- For more information see https://redis.io/commands/hotkeys-reset
- """
- raise NotImplementedError(
- "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
- )
- async def hotkeys_get(
- self, **kwargs
- ) -> Awaitable[list[dict[Union[str, bytes], Any]]]:
- """
- Cluster client does not support hotkeys command. Please use the non-cluster client.
- For more information see https://redis.io/commands/hotkeys-get
- """
- raise NotImplementedError(
- "HOTKEYS commands are not supported in cluster mode. Please use the non-cluster client."
- )
- class ClusterDataAccessCommands(DataAccessCommands):
- """
- A class for Redis Cluster Data Access Commands
- The class inherits from Redis's core DataAccessCommand class and do the
- required adjustments to work with cluster mode
- """
- def stralgo(
- self,
- algo: Literal["LCS"],
- value1: KeyT,
- value2: KeyT,
- specific_argument: Union[Literal["strings"], Literal["keys"]] = "strings",
- len: bool = False,
- idx: bool = False,
- minmatchlen: Optional[int] = None,
- withmatchlen: bool = False,
- **kwargs,
- ) -> ResponseT:
- """
- Implements complex algorithms that operate on strings.
- Right now the only algorithm implemented is the LCS algorithm
- (longest common substring). However new algorithms could be
- implemented in the future.
- ``algo`` Right now must be LCS
- ``value1`` and ``value2`` Can be two strings or two keys
- ``specific_argument`` Specifying if the arguments to the algorithm
- will be keys or strings. strings is the default.
- ``len`` Returns just the len of the match.
- ``idx`` Returns the match positions in each string.
- ``minmatchlen`` Restrict the list of matches to the ones of a given
- minimal length. Can be provided only when ``idx`` set to True.
- ``withmatchlen`` Returns the matches with the len of the match.
- Can be provided only when ``idx`` set to True.
- For more information see https://redis.io/commands/stralgo
- """
- target_nodes = kwargs.pop("target_nodes", None)
- if specific_argument == "strings" and target_nodes is None:
- target_nodes = "default-node"
- kwargs.update({"target_nodes": target_nodes})
- return super().stralgo(
- algo,
- value1,
- value2,
- specific_argument,
- len,
- idx,
- minmatchlen,
- withmatchlen,
- **kwargs,
- )
- def scan_iter(
- self,
- match: Optional[PatternT] = None,
- count: Optional[int] = None,
- _type: Optional[str] = None,
- **kwargs,
- ) -> Iterator:
- # Do the first query with cursor=0 for all nodes
- cursors, data = self.scan(match=match, count=count, _type=_type, **kwargs)
- yield from data
- cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
- if cursors:
- # Get nodes by name
- nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
- # Iterate over each node till its cursor is 0
- kwargs.pop("target_nodes", None)
- while cursors:
- for name, cursor in cursors.items():
- cur, data = self.scan(
- cursor=cursor,
- match=match,
- count=count,
- _type=_type,
- target_nodes=nodes[name],
- **kwargs,
- )
- yield from data
- cursors[name] = cur[name]
- cursors = {
- name: cursor for name, cursor in cursors.items() if cursor != 0
- }
- class AsyncClusterDataAccessCommands(
- ClusterDataAccessCommands, AsyncDataAccessCommands
- ):
- """
- A class for Redis Cluster Data Access Commands
- The class inherits from Redis's core DataAccessCommand class and do the
- required adjustments to work with cluster mode
- """
- async def scan_iter(
- self,
- match: Optional[PatternT] = None,
- count: Optional[int] = None,
- _type: Optional[str] = None,
- **kwargs,
- ) -> AsyncIterator:
- # Do the first query with cursor=0 for all nodes
- cursors, data = await self.scan(match=match, count=count, _type=_type, **kwargs)
- for value in data:
- yield value
- cursors = {name: cursor for name, cursor in cursors.items() if cursor != 0}
- if cursors:
- # Get nodes by name
- nodes = {name: self.get_node(node_name=name) for name in cursors.keys()}
- # Iterate over each node till its cursor is 0
- kwargs.pop("target_nodes", None)
- while cursors:
- for name, cursor in cursors.items():
- cur, data = await self.scan(
- cursor=cursor,
- match=match,
- count=count,
- _type=_type,
- target_nodes=nodes[name],
- **kwargs,
- )
- for value in data:
- yield value
- cursors[name] = cur[name]
- cursors = {
- name: cursor for name, cursor in cursors.items() if cursor != 0
- }
- class RedisClusterCommands(
- ClusterMultiKeyCommands,
- ClusterManagementCommands,
- ACLCommands,
- PubSubCommands,
- ClusterDataAccessCommands,
- ScriptCommands,
- FunctionCommands,
- ModuleCommands,
- RedisModuleCommands,
- ):
- """
- A class for all Redis Cluster commands
- For key-based commands, the target node(s) will be internally determined
- by the keys' hash slot.
- Non-key-based commands can be executed with the 'target_nodes' argument to
- target specific nodes. By default, if target_nodes is not specified, the
- command will be executed on the default cluster node.
- :param :target_nodes: type can be one of the followings:
- - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
- - 'ClusterNode'
- - 'list(ClusterNodes)'
- - 'dict(any:clusterNodes)'
- for example:
- r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
- """
- class AsyncRedisClusterCommands(
- AsyncClusterMultiKeyCommands,
- AsyncClusterManagementCommands,
- AsyncACLCommands,
- AsyncClusterDataAccessCommands,
- AsyncScriptCommands,
- AsyncFunctionCommands,
- AsyncModuleCommands,
- AsyncRedisModuleCommands,
- ):
- """
- A class for all Redis Cluster commands
- For key-based commands, the target node(s) will be internally determined
- by the keys' hash slot.
- Non-key-based commands can be executed with the 'target_nodes' argument to
- target specific nodes. By default, if target_nodes is not specified, the
- command will be executed on the default cluster node.
- :param :target_nodes: type can be one of the followings:
- - nodes flag: ALL_NODES, PRIMARIES, REPLICAS, RANDOM
- - 'ClusterNode'
- - 'list(ClusterNodes)'
- - 'dict(any:clusterNodes)'
- for example:
- r.cluster_info(target_nodes=RedisCluster.ALL_NODES)
- """
|