lock.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. import logging
  2. import threading
  3. import time as mod_time
  4. import uuid
  5. from types import SimpleNamespace, TracebackType
  6. from typing import Optional, Type
  7. from redis.exceptions import LockError, LockNotOwnedError
  8. from redis.typing import Number
  9. logger = logging.getLogger(__name__)
  10. class Lock:
  11. """
  12. A shared, distributed Lock. Using Redis for locking allows the Lock
  13. to be shared across processes and/or machines.
  14. It's left to the user to resolve deadlock issues and make sure
  15. multiple clients play nicely together.
  16. """
  17. lua_release = None
  18. lua_extend = None
  19. lua_reacquire = None
  20. # KEYS[1] - lock name
  21. # ARGV[1] - token
  22. # return 1 if the lock was released, otherwise 0
  23. LUA_RELEASE_SCRIPT = """
  24. local token = redis.call('get', KEYS[1])
  25. if not token or token ~= ARGV[1] then
  26. return 0
  27. end
  28. redis.call('del', KEYS[1])
  29. return 1
  30. """
  31. # KEYS[1] - lock name
  32. # ARGV[1] - token
  33. # ARGV[2] - additional milliseconds
  34. # ARGV[3] - "0" if the additional time should be added to the lock's
  35. # existing ttl or "1" if the existing ttl should be replaced
  36. # return 1 if the locks time was extended, otherwise 0
  37. LUA_EXTEND_SCRIPT = """
  38. local token = redis.call('get', KEYS[1])
  39. if not token or token ~= ARGV[1] then
  40. return 0
  41. end
  42. local expiration = redis.call('pttl', KEYS[1])
  43. if not expiration then
  44. expiration = 0
  45. end
  46. if expiration < 0 then
  47. return 0
  48. end
  49. local newttl = ARGV[2]
  50. if ARGV[3] == "0" then
  51. newttl = ARGV[2] + expiration
  52. end
  53. redis.call('pexpire', KEYS[1], newttl)
  54. return 1
  55. """
  56. # KEYS[1] - lock name
  57. # ARGV[1] - token
  58. # ARGV[2] - milliseconds
  59. # return 1 if the locks time was reacquired, otherwise 0
  60. LUA_REACQUIRE_SCRIPT = """
  61. local token = redis.call('get', KEYS[1])
  62. if not token or token ~= ARGV[1] then
  63. return 0
  64. end
  65. redis.call('pexpire', KEYS[1], ARGV[2])
  66. return 1
  67. """
  68. def __init__(
  69. self,
  70. redis,
  71. name: str,
  72. timeout: Optional[Number] = None,
  73. sleep: Number = 0.1,
  74. blocking: bool = True,
  75. blocking_timeout: Optional[Number] = None,
  76. thread_local: bool = True,
  77. raise_on_release_error: bool = True,
  78. ):
  79. """
  80. Create a new Lock instance named ``name`` using the Redis client
  81. supplied by ``redis``.
  82. ``timeout`` indicates a maximum life for the lock in seconds.
  83. By default, it will remain locked until release() is called.
  84. ``timeout`` can be specified as a float or integer, both representing
  85. the number of seconds to wait.
  86. ``sleep`` indicates the amount of time to sleep in seconds per loop
  87. iteration when the lock is in blocking mode and another client is
  88. currently holding the lock.
  89. ``blocking`` indicates whether calling ``acquire`` should block until
  90. the lock has been acquired or to fail immediately, causing ``acquire``
  91. to return False and the lock not being acquired. Defaults to True.
  92. Note this value can be overridden by passing a ``blocking``
  93. argument to ``acquire``.
  94. ``blocking_timeout`` indicates the maximum amount of time in seconds to
  95. spend trying to acquire the lock. A value of ``None`` indicates
  96. continue trying forever. ``blocking_timeout`` can be specified as a
  97. float or integer, both representing the number of seconds to wait.
  98. ``thread_local`` indicates whether the lock token is placed in
  99. thread-local storage. By default, the token is placed in thread local
  100. storage so that a thread only sees its token, not a token set by
  101. another thread. Consider the following timeline:
  102. time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
  103. thread-1 sets the token to "abc"
  104. time: 1, thread-2 blocks trying to acquire `my-lock` using the
  105. Lock instance.
  106. time: 5, thread-1 has not yet completed. redis expires the lock
  107. key.
  108. time: 5, thread-2 acquired `my-lock` now that it's available.
  109. thread-2 sets the token to "xyz"
  110. time: 6, thread-1 finishes its work and calls release(). if the
  111. token is *not* stored in thread local storage, then
  112. thread-1 would see the token value as "xyz" and would be
  113. able to successfully release the thread-2's lock.
  114. ``raise_on_release_error`` indicates whether to raise an exception when
  115. the lock is no longer owned when exiting the context manager. By default,
  116. this is True, meaning an exception will be raised. If False, the warning
  117. will be logged and the exception will be suppressed.
  118. In some use cases it's necessary to disable thread local storage. For
  119. example, if you have code where one thread acquires a lock and passes
  120. that lock instance to a worker thread to release later. If thread
  121. local storage isn't disabled in this case, the worker thread won't see
  122. the token set by the thread that acquired the lock. Our assumption
  123. is that these cases aren't common and as such default to using
  124. thread local storage.
  125. """
  126. self.redis = redis
  127. self.name = name
  128. self.timeout = timeout
  129. self.sleep = sleep
  130. self.blocking = blocking
  131. self.blocking_timeout = blocking_timeout
  132. self.thread_local = bool(thread_local)
  133. self.raise_on_release_error = raise_on_release_error
  134. self.local = threading.local() if self.thread_local else SimpleNamespace()
  135. self.local.token = None
  136. self.register_scripts()
  137. def register_scripts(self) -> None:
  138. cls = self.__class__
  139. client = self.redis
  140. if cls.lua_release is None:
  141. cls.lua_release = client.register_script(cls.LUA_RELEASE_SCRIPT)
  142. if cls.lua_extend is None:
  143. cls.lua_extend = client.register_script(cls.LUA_EXTEND_SCRIPT)
  144. if cls.lua_reacquire is None:
  145. cls.lua_reacquire = client.register_script(cls.LUA_REACQUIRE_SCRIPT)
  146. def __enter__(self) -> "Lock":
  147. if self.acquire():
  148. return self
  149. raise LockError(
  150. "Unable to acquire lock within the time specified",
  151. lock_name=self.name,
  152. )
  153. def __exit__(
  154. self,
  155. exc_type: Optional[Type[BaseException]],
  156. exc_value: Optional[BaseException],
  157. traceback: Optional[TracebackType],
  158. ) -> None:
  159. try:
  160. self.release()
  161. except LockError:
  162. if self.raise_on_release_error:
  163. raise
  164. logger.warning(
  165. "Lock was unlocked or no longer owned when exiting context manager."
  166. )
  167. def acquire(
  168. self,
  169. sleep: Optional[Number] = None,
  170. blocking: Optional[bool] = None,
  171. blocking_timeout: Optional[Number] = None,
  172. token: Optional[str] = None,
  173. ):
  174. """
  175. Use Redis to hold a shared, distributed lock named ``name``.
  176. Returns True once the lock is acquired.
  177. If ``blocking`` is False, always return immediately. If the lock
  178. was acquired, return True, otherwise return False.
  179. ``blocking_timeout`` specifies the maximum number of seconds to
  180. wait trying to acquire the lock.
  181. ``token`` specifies the token value to be used. If provided, token
  182. must be a bytes object or a string that can be encoded to a bytes
  183. object with the default encoding. If a token isn't specified, a UUID
  184. will be generated.
  185. """
  186. if sleep is None:
  187. sleep = self.sleep
  188. if token is None:
  189. token = uuid.uuid1().hex.encode()
  190. else:
  191. encoder = self.redis.get_encoder()
  192. token = encoder.encode(token)
  193. if blocking is None:
  194. blocking = self.blocking
  195. if blocking_timeout is None:
  196. blocking_timeout = self.blocking_timeout
  197. stop_trying_at = None
  198. if blocking_timeout is not None:
  199. stop_trying_at = mod_time.monotonic() + blocking_timeout
  200. while True:
  201. if self.do_acquire(token):
  202. self.local.token = token
  203. return True
  204. if not blocking:
  205. return False
  206. next_try_at = mod_time.monotonic() + sleep
  207. if stop_trying_at is not None and next_try_at > stop_trying_at:
  208. return False
  209. mod_time.sleep(sleep)
  210. def do_acquire(self, token: str) -> bool:
  211. if self.timeout:
  212. # convert to milliseconds
  213. timeout = int(self.timeout * 1000)
  214. else:
  215. timeout = None
  216. if self.redis.set(self.name, token, nx=True, px=timeout):
  217. return True
  218. return False
  219. def locked(self) -> bool:
  220. """
  221. Returns True if this key is locked by any process, otherwise False.
  222. """
  223. return self.redis.get(self.name) is not None
  224. def owned(self) -> bool:
  225. """
  226. Returns True if this key is locked by this lock, otherwise False.
  227. """
  228. stored_token = self.redis.get(self.name)
  229. # need to always compare bytes to bytes
  230. # TODO: this can be simplified when the context manager is finished
  231. if stored_token and not isinstance(stored_token, bytes):
  232. encoder = self.redis.get_encoder()
  233. stored_token = encoder.encode(stored_token)
  234. return self.local.token is not None and stored_token == self.local.token
  235. def release(self) -> None:
  236. """
  237. Releases the already acquired lock
  238. """
  239. expected_token = self.local.token
  240. if expected_token is None:
  241. raise LockError(
  242. "Cannot release a lock that's not owned or is already unlocked.",
  243. lock_name=self.name,
  244. )
  245. self.local.token = None
  246. self.do_release(expected_token)
  247. def do_release(self, expected_token: str) -> None:
  248. if not bool(
  249. self.lua_release(keys=[self.name], args=[expected_token], client=self.redis)
  250. ):
  251. raise LockNotOwnedError(
  252. "Cannot release a lock that's no longer owned",
  253. lock_name=self.name,
  254. )
  255. def extend(self, additional_time: Number, replace_ttl: bool = False) -> bool:
  256. """
  257. Adds more time to an already acquired lock.
  258. ``additional_time`` can be specified as an integer or a float, both
  259. representing the number of seconds to add.
  260. ``replace_ttl`` if False (the default), add `additional_time` to
  261. the lock's existing ttl. If True, replace the lock's ttl with
  262. `additional_time`.
  263. """
  264. if self.local.token is None:
  265. raise LockError("Cannot extend an unlocked lock", lock_name=self.name)
  266. if self.timeout is None:
  267. raise LockError("Cannot extend a lock with no timeout", lock_name=self.name)
  268. return self.do_extend(additional_time, replace_ttl)
  269. def do_extend(self, additional_time: Number, replace_ttl: bool) -> bool:
  270. additional_time = int(additional_time * 1000)
  271. if not bool(
  272. self.lua_extend(
  273. keys=[self.name],
  274. args=[self.local.token, additional_time, "1" if replace_ttl else "0"],
  275. client=self.redis,
  276. )
  277. ):
  278. raise LockNotOwnedError(
  279. "Cannot extend a lock that's no longer owned",
  280. lock_name=self.name,
  281. )
  282. return True
  283. def reacquire(self) -> bool:
  284. """
  285. Resets a TTL of an already acquired lock back to a timeout value.
  286. """
  287. if self.local.token is None:
  288. raise LockError("Cannot reacquire an unlocked lock", lock_name=self.name)
  289. if self.timeout is None:
  290. raise LockError(
  291. "Cannot reacquire a lock with no timeout",
  292. lock_name=self.name,
  293. )
  294. return self.do_reacquire()
  295. def do_reacquire(self) -> bool:
  296. timeout = int(self.timeout * 1000)
  297. if not bool(
  298. self.lua_reacquire(
  299. keys=[self.name], args=[self.local.token, timeout], client=self.redis
  300. )
  301. ):
  302. raise LockNotOwnedError(
  303. "Cannot reacquire a lock that's no longer owned",
  304. lock_name=self.name,
  305. )
  306. return True