cache.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. from abc import ABC, abstractmethod
  2. from collections import OrderedDict
  3. from dataclasses import dataclass
  4. from enum import Enum
  5. from typing import Any, List, Optional, Union
  6. from redis.observability.attributes import CSCReason
  7. class CacheEntryStatus(Enum):
  8. VALID = "VALID"
  9. IN_PROGRESS = "IN_PROGRESS"
  10. class EvictionPolicyType(Enum):
  11. time_based = "time_based"
  12. frequency_based = "frequency_based"
  13. @dataclass(frozen=True)
  14. class CacheKey:
  15. """
  16. Represents a unique key for a cache entry.
  17. Attributes:
  18. command (str): The Redis command being cached.
  19. redis_keys (tuple): The Redis keys involved in the command.
  20. redis_args (tuple): Additional arguments for the Redis command.
  21. This field is included in the cache key to ensure uniqueness
  22. when commands have the same keys but different arguments.
  23. Changing this field will affect cache key uniqueness.
  24. """
  25. command: str
  26. redis_keys: tuple
  27. redis_args: tuple = () # Additional arguments for the Redis command; affects cache key uniqueness.
  28. class CacheEntry:
  29. def __init__(
  30. self,
  31. cache_key: CacheKey,
  32. cache_value: bytes,
  33. status: CacheEntryStatus,
  34. connection_ref,
  35. ):
  36. self.cache_key = cache_key
  37. self.cache_value = cache_value
  38. self.status = status
  39. self.connection_ref = connection_ref
  40. def __hash__(self):
  41. return hash(
  42. (self.cache_key, self.cache_value, self.status, self.connection_ref)
  43. )
  44. def __eq__(self, other):
  45. return hash(self) == hash(other)
  46. class EvictionPolicyInterface(ABC):
  47. @property
  48. @abstractmethod
  49. def cache(self):
  50. pass
  51. @cache.setter
  52. @abstractmethod
  53. def cache(self, value):
  54. pass
  55. @property
  56. @abstractmethod
  57. def type(self) -> EvictionPolicyType:
  58. pass
  59. @abstractmethod
  60. def evict_next(self) -> CacheKey:
  61. pass
  62. @abstractmethod
  63. def evict_many(self, count: int) -> List[CacheKey]:
  64. pass
  65. @abstractmethod
  66. def touch(self, cache_key: CacheKey) -> None:
  67. pass
  68. class CacheConfigurationInterface(ABC):
  69. @abstractmethod
  70. def get_cache_class(self):
  71. pass
  72. @abstractmethod
  73. def get_max_size(self) -> int:
  74. pass
  75. @abstractmethod
  76. def get_eviction_policy(self):
  77. pass
  78. @abstractmethod
  79. def is_exceeds_max_size(self, count: int) -> bool:
  80. pass
  81. @abstractmethod
  82. def is_allowed_to_cache(self, command: str) -> bool:
  83. pass
  84. class CacheInterface(ABC):
  85. @property
  86. @abstractmethod
  87. def collection(self) -> OrderedDict:
  88. pass
  89. @property
  90. @abstractmethod
  91. def config(self) -> CacheConfigurationInterface:
  92. pass
  93. @property
  94. @abstractmethod
  95. def eviction_policy(self) -> EvictionPolicyInterface:
  96. pass
  97. @property
  98. @abstractmethod
  99. def size(self) -> int:
  100. pass
  101. @abstractmethod
  102. def get(self, key: CacheKey) -> Union[CacheEntry, None]:
  103. pass
  104. @abstractmethod
  105. def set(self, entry: CacheEntry) -> bool:
  106. pass
  107. @abstractmethod
  108. def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]:
  109. pass
  110. @abstractmethod
  111. def delete_by_redis_keys(self, redis_keys: List[bytes]) -> List[bool]:
  112. pass
  113. @abstractmethod
  114. def flush(self) -> int:
  115. pass
  116. @abstractmethod
  117. def is_cachable(self, key: CacheKey) -> bool:
  118. pass
  119. class DefaultCache(CacheInterface):
  120. def __init__(
  121. self,
  122. cache_config: CacheConfigurationInterface,
  123. ) -> None:
  124. self._cache = OrderedDict()
  125. self._cache_config = cache_config
  126. self._eviction_policy = self._cache_config.get_eviction_policy().value()
  127. self._eviction_policy.cache = self
  128. @property
  129. def collection(self) -> OrderedDict:
  130. return self._cache
  131. @property
  132. def config(self) -> CacheConfigurationInterface:
  133. return self._cache_config
  134. @property
  135. def eviction_policy(self) -> EvictionPolicyInterface:
  136. return self._eviction_policy
  137. @property
  138. def size(self) -> int:
  139. return len(self._cache)
  140. def set(self, entry: CacheEntry) -> bool:
  141. if not self.is_cachable(entry.cache_key):
  142. return False
  143. self._cache[entry.cache_key] = entry
  144. self._eviction_policy.touch(entry.cache_key)
  145. return True
  146. def get(self, key: CacheKey) -> Union[CacheEntry, None]:
  147. entry = self._cache.get(key, None)
  148. if entry is None:
  149. return None
  150. self._eviction_policy.touch(key)
  151. return entry
  152. def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]:
  153. response = []
  154. for key in cache_keys:
  155. if self.get(key) is not None:
  156. self._cache.pop(key)
  157. response.append(True)
  158. else:
  159. response.append(False)
  160. return response
  161. def delete_by_redis_keys(
  162. self, redis_keys: Union[List[bytes], List[str]]
  163. ) -> List[bool]:
  164. response = []
  165. keys_to_delete = []
  166. for redis_key in redis_keys:
  167. # Prepare both versions for lookup
  168. candidates = [redis_key]
  169. if isinstance(redis_key, str):
  170. candidates.append(redis_key.encode("utf-8"))
  171. elif isinstance(redis_key, bytes):
  172. try:
  173. candidates.append(redis_key.decode("utf-8"))
  174. except UnicodeDecodeError:
  175. pass # Non-UTF-8 bytes, skip str version
  176. for cache_key in self._cache:
  177. if any(candidate in cache_key.redis_keys for candidate in candidates):
  178. keys_to_delete.append(cache_key)
  179. response.append(True)
  180. for key in keys_to_delete:
  181. self._cache.pop(key)
  182. return response
  183. def flush(self) -> int:
  184. elem_count = len(self._cache)
  185. self._cache.clear()
  186. return elem_count
  187. def is_cachable(self, key: CacheKey) -> bool:
  188. return self._cache_config.is_allowed_to_cache(key.command)
  189. class CacheProxy(CacheInterface):
  190. """
  191. Proxy object that wraps cache implementations to enable additional logic on top
  192. """
  193. def __init__(self, cache: CacheInterface):
  194. self._cache = cache
  195. @property
  196. def collection(self) -> OrderedDict:
  197. return self._cache.collection
  198. @property
  199. def config(self) -> CacheConfigurationInterface:
  200. return self._cache.config
  201. @property
  202. def eviction_policy(self) -> EvictionPolicyInterface:
  203. return self._cache.eviction_policy
  204. @property
  205. def size(self) -> int:
  206. return self._cache.size
  207. def get(self, key: CacheKey) -> Union[CacheEntry, None]:
  208. return self._cache.get(key)
  209. def set(self, entry: CacheEntry) -> bool:
  210. is_set = self._cache.set(entry)
  211. if self.config.is_exceeds_max_size(self.size):
  212. # Lazy import to avoid circular dependency
  213. from redis.observability.recorder import record_csc_eviction
  214. record_csc_eviction(
  215. count=1,
  216. reason=CSCReason.FULL,
  217. )
  218. self.eviction_policy.evict_next()
  219. return is_set
  220. def delete_by_cache_keys(self, cache_keys: List[CacheKey]) -> List[bool]:
  221. return self._cache.delete_by_cache_keys(cache_keys)
  222. def delete_by_redis_keys(self, redis_keys: List[bytes]) -> List[bool]:
  223. return self._cache.delete_by_redis_keys(redis_keys)
  224. def flush(self) -> int:
  225. return self._cache.flush()
  226. def is_cachable(self, key: CacheKey) -> bool:
  227. return self._cache.is_cachable(key)
  228. class LRUPolicy(EvictionPolicyInterface):
  229. def __init__(self):
  230. self.cache = None
  231. @property
  232. def cache(self):
  233. return self._cache
  234. @cache.setter
  235. def cache(self, cache: CacheInterface):
  236. self._cache = cache
  237. @property
  238. def type(self) -> EvictionPolicyType:
  239. return EvictionPolicyType.time_based
  240. def evict_next(self) -> CacheKey:
  241. self._assert_cache()
  242. popped_entry = self._cache.collection.popitem(last=False)
  243. return popped_entry[0]
  244. def evict_many(self, count: int) -> List[CacheKey]:
  245. self._assert_cache()
  246. if count > len(self._cache.collection):
  247. raise ValueError("Evictions count is above cache size")
  248. popped_keys = []
  249. for _ in range(count):
  250. popped_entry = self._cache.collection.popitem(last=False)
  251. popped_keys.append(popped_entry[0])
  252. return popped_keys
  253. def touch(self, cache_key: CacheKey) -> None:
  254. self._assert_cache()
  255. if self._cache.collection.get(cache_key) is None:
  256. raise ValueError("Given entry does not belong to the cache")
  257. self._cache.collection.move_to_end(cache_key)
  258. def _assert_cache(self):
  259. if self.cache is None or not isinstance(self.cache, CacheInterface):
  260. raise ValueError("Eviction policy should be associated with valid cache.")
  261. class EvictionPolicy(Enum):
  262. LRU = LRUPolicy
  263. class CacheConfig(CacheConfigurationInterface):
  264. DEFAULT_CACHE_CLASS = DefaultCache
  265. DEFAULT_EVICTION_POLICY = EvictionPolicy.LRU
  266. DEFAULT_MAX_SIZE = 10000
  267. DEFAULT_ALLOW_LIST = [
  268. "BITCOUNT",
  269. "BITFIELD_RO",
  270. "BITPOS",
  271. "EXISTS",
  272. "GEODIST",
  273. "GEOHASH",
  274. "GEOPOS",
  275. "GEORADIUSBYMEMBER_RO",
  276. "GEORADIUS_RO",
  277. "GEOSEARCH",
  278. "GET",
  279. "GETBIT",
  280. "GETRANGE",
  281. "HEXISTS",
  282. "HGET",
  283. "HGETALL",
  284. "HKEYS",
  285. "HLEN",
  286. "HMGET",
  287. "HSTRLEN",
  288. "HVALS",
  289. "JSON.ARRINDEX",
  290. "JSON.ARRLEN",
  291. "JSON.GET",
  292. "JSON.MGET",
  293. "JSON.OBJKEYS",
  294. "JSON.OBJLEN",
  295. "JSON.RESP",
  296. "JSON.STRLEN",
  297. "JSON.TYPE",
  298. "LCS",
  299. "LINDEX",
  300. "LLEN",
  301. "LPOS",
  302. "LRANGE",
  303. "MGET",
  304. "SCARD",
  305. "SDIFF",
  306. "SINTER",
  307. "SINTERCARD",
  308. "SISMEMBER",
  309. "SMEMBERS",
  310. "SMISMEMBER",
  311. "SORT_RO",
  312. "STRLEN",
  313. "SUBSTR",
  314. "SUNION",
  315. "TS.GET",
  316. "TS.INFO",
  317. "TS.RANGE",
  318. "TS.REVRANGE",
  319. "TYPE",
  320. "XLEN",
  321. "XPENDING",
  322. "XRANGE",
  323. "XREAD",
  324. "XREVRANGE",
  325. "ZCARD",
  326. "ZCOUNT",
  327. "ZDIFF",
  328. "ZINTER",
  329. "ZINTERCARD",
  330. "ZLEXCOUNT",
  331. "ZMSCORE",
  332. "ZRANGE",
  333. "ZRANGEBYLEX",
  334. "ZRANGEBYSCORE",
  335. "ZRANK",
  336. "ZREVRANGE",
  337. "ZREVRANGEBYLEX",
  338. "ZREVRANGEBYSCORE",
  339. "ZREVRANK",
  340. "ZSCORE",
  341. "ZUNION",
  342. ]
  343. def __init__(
  344. self,
  345. max_size: int = DEFAULT_MAX_SIZE,
  346. cache_class: Any = DEFAULT_CACHE_CLASS,
  347. eviction_policy: EvictionPolicy = DEFAULT_EVICTION_POLICY,
  348. ):
  349. self._cache_class = cache_class
  350. self._max_size = max_size
  351. self._eviction_policy = eviction_policy
  352. def get_cache_class(self):
  353. return self._cache_class
  354. def get_max_size(self) -> int:
  355. return self._max_size
  356. def get_eviction_policy(self) -> EvictionPolicy:
  357. return self._eviction_policy
  358. def is_exceeds_max_size(self, count: int) -> bool:
  359. return count > self._max_size
  360. def is_allowed_to_cache(self, command: str) -> bool:
  361. return command in self.DEFAULT_ALLOW_LIST
  362. class CacheFactoryInterface(ABC):
  363. @abstractmethod
  364. def get_cache(self) -> CacheInterface:
  365. pass
  366. class CacheFactory(CacheFactoryInterface):
  367. def __init__(self, cache_config: Optional[CacheConfig] = None):
  368. self._config = cache_config
  369. if self._config is None:
  370. self._config = CacheConfig()
  371. def get_cache(self) -> CacheInterface:
  372. cache_class = self._config.get_cache_class()
  373. return CacheProxy(cache_class(cache_config=self._config))