event.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. from typing import List
  2. from redis.client import Redis
  3. from redis.event import EventListenerInterface, OnCommandsFailEvent
  4. from redis.multidb.database import SyncDatabase
  5. from redis.multidb.failure_detector import FailureDetector
  6. class ActiveDatabaseChanged:
  7. """
  8. Event fired when an active database has been changed.
  9. """
  10. def __init__(
  11. self,
  12. old_database: SyncDatabase,
  13. new_database: SyncDatabase,
  14. command_executor,
  15. **kwargs,
  16. ):
  17. self._old_database = old_database
  18. self._new_database = new_database
  19. self._command_executor = command_executor
  20. self._kwargs = kwargs
  21. @property
  22. def old_database(self) -> SyncDatabase:
  23. return self._old_database
  24. @property
  25. def new_database(self) -> SyncDatabase:
  26. return self._new_database
  27. @property
  28. def command_executor(self):
  29. return self._command_executor
  30. @property
  31. def kwargs(self):
  32. return self._kwargs
  33. class ResubscribeOnActiveDatabaseChanged(EventListenerInterface):
  34. """
  35. Re-subscribe the currently active pub / sub to a new active database.
  36. """
  37. def listen(self, event: ActiveDatabaseChanged):
  38. old_pubsub = event.command_executor.active_pubsub
  39. if old_pubsub is not None:
  40. # Re-assign old channels and patterns so they will be automatically subscribed on connection.
  41. new_pubsub = event.new_database.client.pubsub(**event.kwargs)
  42. new_pubsub.channels = old_pubsub.channels
  43. new_pubsub.patterns = old_pubsub.patterns
  44. new_pubsub.shard_channels = old_pubsub.shard_channels
  45. new_pubsub.on_connect(None)
  46. event.command_executor.active_pubsub = new_pubsub
  47. old_pubsub.close()
  48. class CloseConnectionOnActiveDatabaseChanged(EventListenerInterface):
  49. """
  50. Close connection to the old active database.
  51. """
  52. def listen(self, event: ActiveDatabaseChanged):
  53. event.old_database.client.close()
  54. if isinstance(event.old_database.client, Redis):
  55. event.old_database.client.connection_pool.update_active_connections_for_reconnect()
  56. event.old_database.client.connection_pool.disconnect()
  57. else:
  58. for node in event.old_database.client.nodes_manager.nodes_cache.values():
  59. node.redis_connection.connection_pool.update_active_connections_for_reconnect()
  60. node.redis_connection.connection_pool.disconnect()
  61. class RegisterCommandFailure(EventListenerInterface):
  62. """
  63. Event listener that registers command failures and passing it to the failure detectors.
  64. """
  65. def __init__(self, failure_detectors: List[FailureDetector]):
  66. self._failure_detectors = failure_detectors
  67. def listen(self, event: OnCommandsFailEvent) -> None:
  68. for failure_detector in self._failure_detectors:
  69. failure_detector.register_failure(event.exception, event.commands)