helpers.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. import copy
  2. import random
  3. import string
  4. from typing import Any, Iterable, List, Tuple
  5. import redis
  6. from redis.typing import KeysT, KeyT
  7. def list_or_args(keys: KeysT, args: Tuple[KeyT, ...]) -> List[KeyT]:
  8. # returns a single new list combining keys and args
  9. try:
  10. iter(keys)
  11. # a string or bytes instance can be iterated, but indicates
  12. # keys wasn't passed as a list
  13. if isinstance(keys, (bytes, str)):
  14. keys = [keys]
  15. else:
  16. keys = list(keys)
  17. except TypeError:
  18. keys = [keys]
  19. if args:
  20. keys.extend(args)
  21. return keys
  22. def nativestr(x):
  23. """Return the decoded binary string, or a string, depending on type."""
  24. r = x.decode("utf-8", "replace") if isinstance(x, bytes) else x
  25. if r == "null":
  26. return
  27. return r
  28. def delist(x):
  29. """Given a list of binaries, return the stringified version."""
  30. if x is None:
  31. return x
  32. return [nativestr(obj) for obj in x]
  33. def parse_to_list(response):
  34. """Optimistically parse the response to a list."""
  35. res = []
  36. special_values = {"infinity", "nan", "-infinity"}
  37. if response is None:
  38. return res
  39. for item in response:
  40. if item is None:
  41. res.append(None)
  42. continue
  43. try:
  44. item_str = nativestr(item)
  45. except TypeError:
  46. res.append(None)
  47. continue
  48. if isinstance(item_str, str) and item_str.lower() in special_values:
  49. res.append(item_str) # Keep as string
  50. else:
  51. try:
  52. res.append(int(item))
  53. except ValueError:
  54. try:
  55. res.append(float(item))
  56. except ValueError:
  57. res.append(item_str)
  58. return res
  59. def random_string(length=10):
  60. """
  61. Returns a random N character long string.
  62. """
  63. return "".join( # nosec
  64. random.choice(string.ascii_lowercase) for x in range(length)
  65. )
  66. def decode_dict_keys(obj):
  67. """Decode the keys of the given dictionary with utf-8."""
  68. newobj = copy.copy(obj)
  69. for k in obj.keys():
  70. if isinstance(k, bytes):
  71. newobj[k.decode("utf-8")] = newobj[k]
  72. newobj.pop(k)
  73. return newobj
  74. def get_protocol_version(client):
  75. if isinstance(client, redis.Redis) or isinstance(client, redis.asyncio.Redis):
  76. return client.connection_pool.connection_kwargs.get("protocol")
  77. elif isinstance(client, redis.cluster.AbstractRedisCluster):
  78. return client.nodes_manager.connection_kwargs.get("protocol")
  79. def at_most_one_value_set(iterable: Iterable[Any]):
  80. """
  81. Checks that at most one of the values in the iterable is truthy.
  82. Args:
  83. iterable: An iterable of values to check.
  84. Returns:
  85. True if at most one value is truthy, False otherwise.
  86. Raises:
  87. Might raise an error if the values in iterable are not boolean-compatible.
  88. For example if the type of the values implement
  89. __len__ or __bool__ methods and they raise an error.
  90. """
  91. values = (bool(x) for x in iterable)
  92. return sum(values) <= 1