background.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. import asyncio
  2. import logging
  3. import threading
  4. from typing import Any, Callable, Coroutine
  5. class BackgroundScheduler:
  6. """
  7. Schedules background tasks execution either in separate thread or in the running event loop.
  8. """
  9. def __init__(self):
  10. self._next_timer = None
  11. self._event_loops = []
  12. self._lock = threading.Lock()
  13. self._stopped = False
  14. # Dedicated loop for health checks - ensures all health checks use the same loop
  15. self._health_check_loop: asyncio.AbstractEventLoop | None = None
  16. self._health_check_thread: threading.Thread | None = None
  17. # Event to signal when health check loop is ready
  18. self._health_check_loop_ready = threading.Event()
  19. def __del__(self):
  20. self.stop()
  21. def stop(self):
  22. """
  23. Stop all scheduled tasks and clean up resources.
  24. """
  25. with self._lock:
  26. if self._stopped:
  27. return
  28. self._stopped = True
  29. if self._next_timer:
  30. self._next_timer.cancel()
  31. self._next_timer = None
  32. # Stop all event loops
  33. for loop in self._event_loops:
  34. if loop.is_running():
  35. loop.call_soon_threadsafe(loop.stop)
  36. self._event_loops.clear()
  37. def run_once(self, delay: float, callback: Callable, *args):
  38. """
  39. Runs callable task once after certain delay in seconds.
  40. """
  41. with self._lock:
  42. if self._stopped:
  43. return
  44. # Run loop in a separate thread to unblock main thread.
  45. loop = asyncio.new_event_loop()
  46. with self._lock:
  47. self._event_loops.append(loop)
  48. thread = threading.Thread(
  49. target=_start_event_loop_in_thread,
  50. args=(loop, self._call_later, delay, callback, *args),
  51. daemon=True,
  52. )
  53. thread.start()
  54. def run_recurring(self, interval: float, callback: Callable, *args):
  55. """
  56. Runs recurring callable task with given interval in seconds.
  57. """
  58. with self._lock:
  59. if self._stopped:
  60. return
  61. # Run loop in a separate thread to unblock main thread.
  62. loop = asyncio.new_event_loop()
  63. with self._lock:
  64. self._event_loops.append(loop)
  65. thread = threading.Thread(
  66. target=_start_event_loop_in_thread,
  67. args=(loop, self._call_later_recurring, interval, callback, *args),
  68. daemon=True,
  69. )
  70. thread.start()
  71. def run_recurring_coro(
  72. self, interval: float, coro: Callable[..., Coroutine[Any, Any, Any]], *args
  73. ):
  74. """
  75. Runs recurring coroutine with given interval in seconds in a background thread.
  76. Uses a shared event loop to ensure connection pools remain valid across calls.
  77. This is useful for sync code that needs to run async health checks.
  78. """
  79. with self._lock:
  80. if self._stopped:
  81. return
  82. # Use the shared health check loop, creating it if needed
  83. self._ensure_health_check_loop()
  84. with self._lock:
  85. loop = self._health_check_loop
  86. # Schedule recurring execution in the shared loop
  87. loop.call_soon_threadsafe(
  88. self._call_later_recurring_coro, loop, interval, coro, *args
  89. )
  90. def run_coro_sync(
  91. self,
  92. coro: Callable[..., Coroutine[Any, Any, Any]],
  93. *args,
  94. timeout: float | None = 10.0,
  95. ) -> Any:
  96. """
  97. Runs a coroutine synchronously and returns its result.
  98. Uses the shared health check event loop to ensure connection pools
  99. created here remain valid for subsequent recurring health checks.
  100. This is useful for running the initial health check before starting
  101. recurring checks.
  102. Args:
  103. coro: Coroutine function to execute
  104. *args: Arguments to pass to the coroutine
  105. timeout: Maximum seconds to wait for the result. None means wait
  106. forever. Default is 10 seconds to avoid blocking indefinitely
  107. if the event loop is busy with long-running health checks.
  108. Returns:
  109. The result of the coroutine
  110. Raises:
  111. TimeoutError: If the coroutine doesn't complete within timeout
  112. Any exception raised by the coroutine
  113. """
  114. with self._lock:
  115. if self._stopped:
  116. raise RuntimeError("Scheduler is stopped")
  117. # Ensure the shared loop exists
  118. self._ensure_health_check_loop()
  119. with self._lock:
  120. loop = self._health_check_loop
  121. # Submit the coroutine to the shared loop and wait for result
  122. future = asyncio.run_coroutine_threadsafe(coro(*args), loop)
  123. try:
  124. return future.result(timeout=timeout)
  125. except TimeoutError:
  126. # Cancel the future to avoid leaving orphaned tasks
  127. future.cancel()
  128. raise
  129. def run_coro_fire_and_forget(
  130. self, coro: Callable[..., Coroutine[Any, Any, Any]], *args
  131. ) -> None:
  132. """
  133. Schedule a coroutine for execution on the shared health check loop
  134. without waiting for the result. Exceptions are logged but not raised.
  135. This is useful for HALF_OPEN recovery health checks that need to run
  136. on the same event loop where connection pools were created.
  137. Args:
  138. coro: Coroutine function to execute
  139. *args: Arguments to pass to the coroutine
  140. """
  141. with self._lock:
  142. if self._stopped:
  143. return
  144. # Ensure the shared loop exists
  145. self._ensure_health_check_loop()
  146. with self._lock:
  147. loop = self._health_check_loop
  148. def on_complete(future: asyncio.Future):
  149. """Log any exceptions from the coroutine."""
  150. if future.cancelled():
  151. logging.getLogger(__name__).debug("Fire-and-forget coroutine cancelled")
  152. elif future.exception() is not None:
  153. logging.getLogger(__name__).debug(
  154. "Fire-and-forget coroutine raised exception",
  155. exc_info=future.exception(),
  156. )
  157. # Schedule on the shared loop without waiting
  158. future = asyncio.run_coroutine_threadsafe(coro(*args), loop)
  159. future.add_done_callback(on_complete)
  160. def _ensure_health_check_loop(self, timeout: float = 5.0):
  161. """
  162. Ensure the shared health check loop and thread are running.
  163. Args:
  164. timeout: Maximum seconds to wait for the loop to start.
  165. Raises:
  166. RuntimeError: If the loop fails to start within the timeout.
  167. """
  168. # Fast path: if loop is already running, return immediately
  169. if self._health_check_loop_ready.is_set():
  170. with self._lock:
  171. if (
  172. self._health_check_loop is not None
  173. and self._health_check_loop.is_running()
  174. ):
  175. return
  176. with self._lock:
  177. # Double-check after acquiring the lock
  178. if (
  179. self._health_check_loop is not None
  180. and self._health_check_loop.is_running()
  181. ):
  182. return
  183. # Clear the event - we're about to start a new loop
  184. self._health_check_loop_ready.clear()
  185. # Create a new event loop for health checks
  186. self._health_check_loop = asyncio.new_event_loop()
  187. self._event_loops.append(self._health_check_loop)
  188. # Start the loop in a background thread
  189. self._health_check_thread = threading.Thread(
  190. target=self._run_health_check_loop,
  191. daemon=True,
  192. )
  193. self._health_check_thread.start()
  194. # Wait for loop to be running INSIDE the lock with a timeout.
  195. # This prevents other threads from trying to create another loop
  196. # before this one is fully started, while avoiding permanent deadlock
  197. # if the background thread fails to start the loop.
  198. if not self._health_check_loop_ready.wait(timeout=timeout):
  199. # Timeout expired - the loop failed to start
  200. # Clean up the failed loop to allow retry
  201. failed_loop = self._health_check_loop
  202. self._health_check_loop = None
  203. if failed_loop in self._event_loops:
  204. self._event_loops.remove(failed_loop)
  205. try:
  206. failed_loop.close()
  207. except Exception:
  208. pass
  209. raise RuntimeError(
  210. f"Health check event loop failed to start within {timeout} seconds"
  211. )
  212. def _run_health_check_loop(self):
  213. """Run the shared health check event loop."""
  214. asyncio.set_event_loop(self._health_check_loop)
  215. # Signal that the loop is ready before running
  216. # Use call_soon to signal after run_forever starts processing
  217. self._health_check_loop.call_soon(self._health_check_loop_ready.set)
  218. try:
  219. self._health_check_loop.run_forever()
  220. finally:
  221. try:
  222. pending = asyncio.all_tasks(self._health_check_loop)
  223. for task in pending:
  224. task.cancel()
  225. self._health_check_loop.run_until_complete(
  226. asyncio.gather(*pending, return_exceptions=True)
  227. )
  228. except Exception:
  229. pass
  230. finally:
  231. self._health_check_loop.close()
  232. def _call_later_recurring_coro(
  233. self,
  234. loop: asyncio.AbstractEventLoop,
  235. interval: float,
  236. coro: Callable[..., Coroutine[Any, Any, Any]],
  237. *args,
  238. ):
  239. """Schedule first execution of recurring coroutine."""
  240. with self._lock:
  241. if self._stopped:
  242. return
  243. self._call_later(
  244. loop, interval, self._execute_recurring_coro, loop, interval, coro, *args
  245. )
  246. def _execute_recurring_coro(
  247. self,
  248. loop: asyncio.AbstractEventLoop,
  249. interval: float,
  250. coro: Callable[..., Coroutine[Any, Any, Any]],
  251. *args,
  252. ):
  253. """
  254. Executes recurring coroutine with given interval in seconds.
  255. Schedules next execution only after current one completes to prevent overlap.
  256. """
  257. with self._lock:
  258. if self._stopped:
  259. return
  260. def on_complete(task: asyncio.Task):
  261. """Callback when coroutine completes - schedule next execution."""
  262. # Log any exceptions (prevents "Task exception was never retrieved")
  263. if task.cancelled():
  264. pass # Task was cancelled, ignore
  265. elif task.exception() is not None:
  266. # Log the exception but don't crash the scheduler
  267. logging.getLogger(__name__).debug(
  268. "Background coroutine raised exception",
  269. exc_info=task.exception(),
  270. )
  271. # Schedule next execution after completion
  272. with self._lock:
  273. if self._stopped:
  274. return
  275. self._call_later(
  276. loop,
  277. interval,
  278. self._execute_recurring_coro,
  279. loop,
  280. interval,
  281. coro,
  282. *args,
  283. )
  284. try:
  285. task = asyncio.ensure_future(coro(*args))
  286. # Add callback to handle completion and schedule next run
  287. task.add_done_callback(on_complete)
  288. except Exception:
  289. # If scheduling fails (e.g., during shutdown), try to schedule next run anyway
  290. with self._lock:
  291. if self._stopped:
  292. return
  293. self._call_later(
  294. loop,
  295. interval,
  296. self._execute_recurring_coro,
  297. loop,
  298. interval,
  299. coro,
  300. *args,
  301. )
  302. async def run_recurring_async(
  303. self, interval: float, coro: Callable[..., Coroutine[Any, Any, Any]], *args
  304. ):
  305. """
  306. Runs recurring coroutine with given interval in seconds in the current event loop.
  307. To be used only from an async context. No additional threads are created.
  308. Prevents overlapping executions by scheduling the next run only after
  309. the current one completes.
  310. Raises:
  311. RuntimeError: If called without a running event loop (programming error)
  312. """
  313. with self._lock:
  314. if self._stopped:
  315. return
  316. # This is an async method - it must be awaited in a running event loop.
  317. # If get_running_loop() raises RuntimeError, let it propagate as that
  318. # indicates a programming error (calling async method outside async context).
  319. loop = asyncio.get_running_loop()
  320. def schedule_next():
  321. """Schedule the next execution after the current one completes."""
  322. with self._lock:
  323. if self._stopped:
  324. return
  325. self._next_timer = loop.call_later(interval, execute_and_reschedule)
  326. def execute_and_reschedule():
  327. """Execute the coroutine and schedule next run after completion."""
  328. with self._lock:
  329. if self._stopped:
  330. return
  331. def on_complete(task: asyncio.Task):
  332. """Callback when coroutine completes - schedule next execution."""
  333. # Log any exceptions (prevents "Task exception was never retrieved")
  334. if task.cancelled():
  335. pass
  336. elif task.exception() is not None:
  337. logging.getLogger(__name__).debug(
  338. "Recurring async coroutine raised exception",
  339. exc_info=task.exception(),
  340. )
  341. # Schedule next execution AFTER this one completes
  342. schedule_next()
  343. try:
  344. task = asyncio.ensure_future(coro(*args))
  345. task.add_done_callback(on_complete)
  346. except Exception:
  347. # If scheduling fails, still try to schedule next run
  348. logging.getLogger(__name__).debug(
  349. "Failed to schedule recurring async coroutine", exc_info=True
  350. )
  351. schedule_next()
  352. # Schedule first execution
  353. self._next_timer = loop.call_later(interval, execute_and_reschedule)
  354. def _call_later(
  355. self, loop: asyncio.AbstractEventLoop, delay: float, callback: Callable, *args
  356. ):
  357. with self._lock:
  358. if self._stopped:
  359. return
  360. self._next_timer = loop.call_later(delay, callback, *args)
  361. def _call_later_recurring(
  362. self,
  363. loop: asyncio.AbstractEventLoop,
  364. interval: float,
  365. callback: Callable,
  366. *args,
  367. ):
  368. with self._lock:
  369. if self._stopped:
  370. return
  371. self._call_later(
  372. loop, interval, self._execute_recurring, loop, interval, callback, *args
  373. )
  374. def _execute_recurring(
  375. self,
  376. loop: asyncio.AbstractEventLoop,
  377. interval: float,
  378. callback: Callable,
  379. *args,
  380. ):
  381. """
  382. Executes recurring callable task with given interval in seconds.
  383. """
  384. with self._lock:
  385. if self._stopped:
  386. return
  387. try:
  388. callback(*args)
  389. except Exception:
  390. # Silently ignore exceptions during shutdown
  391. pass
  392. with self._lock:
  393. if self._stopped:
  394. return
  395. self._call_later(
  396. loop, interval, self._execute_recurring, loop, interval, callback, *args
  397. )
  398. def _start_event_loop_in_thread(
  399. event_loop: asyncio.AbstractEventLoop, call_soon_cb: Callable, *args
  400. ):
  401. """
  402. Starts event loop in a thread and schedule callback as soon as event loop is ready.
  403. Used to be able to schedule tasks using loop.call_later.
  404. :param event_loop:
  405. :return:
  406. """
  407. asyncio.set_event_loop(event_loop)
  408. event_loop.call_soon(call_soon_cb, event_loop, *args)
  409. try:
  410. event_loop.run_forever()
  411. finally:
  412. try:
  413. # Clean up pending tasks
  414. pending = asyncio.all_tasks(event_loop)
  415. for task in pending:
  416. task.cancel()
  417. # Run loop once more to process cancellations
  418. event_loop.run_until_complete(
  419. asyncio.gather(*pending, return_exceptions=True)
  420. )
  421. except Exception:
  422. pass
  423. finally:
  424. event_loop.close()