providers.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. """
  2. OpenTelemetry provider management for redis-py.
  3. This module handles initialization and lifecycle management of OTel SDK components
  4. including MeterProvider, TracerProvider (future), and LoggerProvider (future).
  5. Uses a singleton pattern - initialize once globally, all Redis clients use it automatically.
  6. Redis-py uses the global MeterProvider set by your application. Set it up before
  7. initializing observability:
  8. from opentelemetry import metrics
  9. from opentelemetry.sdk.metrics import MeterProvider
  10. provider = MeterProvider(...)
  11. metrics.set_meter_provider(provider)
  12. # Then initialize redis-py observability
  13. otel = get_observability_instance()
  14. otel.init(OTelConfig(enable_metrics=True))
  15. """
  16. import logging
  17. from typing import Optional
  18. from redis.observability.config import OTelConfig
  19. logger = logging.getLogger(__name__)
  20. # Optional imports - OTel SDK may not be installed
  21. try:
  22. from opentelemetry.sdk.metrics import MeterProvider
  23. OTEL_AVAILABLE = True
  24. except ImportError:
  25. OTEL_AVAILABLE = False
  26. MeterProvider = None
  27. # Global singleton instance
  28. _global_provider_manager: Optional["OTelProviderManager"] = None
  29. class OTelProviderManager:
  30. """
  31. Manages OpenTelemetry SDK providers and their lifecycle.
  32. This class handles:
  33. - Getting the global MeterProvider set by the application
  34. - Configuring histogram bucket boundaries via Views
  35. - Graceful shutdown
  36. Args:
  37. config: OTel configuration object
  38. """
  39. def __init__(self, config: OTelConfig):
  40. self.config = config
  41. self._meter_provider: Optional[MeterProvider] = None
  42. def get_meter_provider(self) -> Optional[MeterProvider]:
  43. """
  44. Get the global MeterProvider set by the application.
  45. Returns:
  46. MeterProvider instance or None if metrics are disabled
  47. Raises:
  48. ImportError: If OpenTelemetry is not installed
  49. RuntimeError: If metrics are enabled but no global MeterProvider is set
  50. """
  51. if not self.config.is_enabled():
  52. return None
  53. # Lazy import - only import OTel when metrics are enabled
  54. try:
  55. from opentelemetry import metrics
  56. from opentelemetry.metrics import NoOpMeterProvider
  57. except ImportError:
  58. raise ImportError(
  59. "OpenTelemetry is not installed. Install it with:\n"
  60. " pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-http"
  61. )
  62. # Get the global MeterProvider
  63. if self._meter_provider is None:
  64. self._meter_provider = metrics.get_meter_provider()
  65. # Check if it's a real provider (not NoOp)
  66. if isinstance(self._meter_provider, NoOpMeterProvider):
  67. raise RuntimeError(
  68. "Metrics are enabled but no global MeterProvider is configured.\n"
  69. "\n"
  70. "Set up OpenTelemetry before initializing redis-py observability:\n"
  71. "\n"
  72. " from opentelemetry import metrics\n"
  73. " from opentelemetry.sdk.metrics import MeterProvider\n"
  74. " from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader\n"
  75. " from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter\n"
  76. "\n"
  77. " # Create exporter\n"
  78. " exporter = OTLPMetricExporter(\n"
  79. " endpoint='http://localhost:4318/v1/metrics'\n"
  80. " )\n"
  81. "\n"
  82. " # Create reader\n"
  83. " reader = PeriodicExportingMetricReader(\n"
  84. " exporter=exporter,\n"
  85. " export_interval_millis=10000\n"
  86. " )\n"
  87. "\n"
  88. " # Create and set global provider\n"
  89. " provider = MeterProvider(metric_readers=[reader])\n"
  90. " metrics.set_meter_provider(provider)\n"
  91. "\n"
  92. " # Now initialize redis-py observability\n"
  93. " from redis.observability import get_observability_instance, OTelConfig\n"
  94. " otel = get_observability_instance()\n"
  95. " otel.init(OTelConfig(enable_metrics=True))\n"
  96. )
  97. logger.info("Using global MeterProvider from application")
  98. return self._meter_provider
  99. def shutdown(self, timeout_millis: int = 30000) -> bool:
  100. """
  101. Shutdown observability and flush any pending metrics.
  102. Note: We don't shutdown the global MeterProvider since it's owned by the application.
  103. We only force flush pending metrics.
  104. Args:
  105. timeout_millis: Maximum time to wait for flush
  106. Returns:
  107. True if flush was successful, False otherwise
  108. """
  109. logger.debug(
  110. "Flushing metrics before shutdown (not shutting down global MeterProvider)"
  111. )
  112. return self.force_flush(timeout_millis=timeout_millis)
  113. def force_flush(self, timeout_millis: int = 30000) -> bool:
  114. """
  115. Force flush any pending metrics from the global MeterProvider.
  116. Args:
  117. timeout_millis: Maximum time to wait for flush
  118. Returns:
  119. True if flush was successful, False otherwise
  120. """
  121. if self._meter_provider is None:
  122. return True
  123. # NoOpMeterProvider doesn't have force_flush method
  124. if not hasattr(self._meter_provider, "force_flush"):
  125. logger.debug("MeterProvider does not support force_flush, skipping")
  126. return True
  127. try:
  128. logger.debug("Force flushing metrics from global MeterProvider")
  129. self._meter_provider.force_flush(timeout_millis=timeout_millis)
  130. return True
  131. except Exception as e:
  132. logger.error(f"Error flushing metrics: {e}")
  133. return False
  134. def __enter__(self):
  135. """Context manager entry."""
  136. return self
  137. def __exit__(self, _exc_type, _exc_val, _exc_tb):
  138. """Context manager exit - shutdown provider."""
  139. self.shutdown()
  140. def __repr__(self) -> str:
  141. return f"OTelProviderManager(config={self.config})"
  142. # Singleton instance class
  143. class ObservabilityInstance:
  144. """
  145. Singleton instance for managing OpenTelemetry observability.
  146. This class follows the singleton pattern similar to Glide's GetOtelInstance().
  147. Use GetObservabilityInstance() to get the singleton instance, then call init()
  148. to initialize observability.
  149. Example:
  150. >>> from redis.observability.config import OTelConfig
  151. >>>
  152. >>> # Get singleton instance
  153. >>> otel = get_observability_instance()
  154. >>>
  155. >>> # Initialize once at app startup
  156. >>> otel.init(OTelConfig())
  157. >>>
  158. >>> # All Redis clients now automatically collect metrics
  159. >>> import redis
  160. >>> r = redis.Redis(host='localhost', port=6379)
  161. >>> r.set('key', 'value') # Metrics collected automatically
  162. """
  163. def __init__(self):
  164. self._provider_manager: Optional[OTelProviderManager] = None
  165. def init(self, config: OTelConfig) -> "ObservabilityInstance":
  166. """
  167. Initialize OpenTelemetry observability globally for all Redis clients.
  168. This should be called once at application startup. After initialization,
  169. all Redis clients will automatically collect and export metrics without
  170. needing any additional configuration.
  171. Safe to call multiple times - will shutdown previous instance before
  172. initializing a new one.
  173. Args:
  174. config: OTel configuration object
  175. Returns:
  176. Self for method chaining
  177. Example:
  178. >>> otel = get_observability_instance()
  179. >>> otel.init(OTelConfig())
  180. """
  181. if self._provider_manager is not None:
  182. logger.warning(
  183. "Observability already initialized. Shutting down previous instance."
  184. )
  185. self._provider_manager.shutdown()
  186. self._provider_manager = OTelProviderManager(config)
  187. logger.info("Observability initialized")
  188. return self
  189. def is_enabled(self) -> bool:
  190. """
  191. Check if observability is enabled.
  192. Returns:
  193. True if observability is initialized and metrics are enabled
  194. Example:
  195. >>> otel = get_observability_instance()
  196. >>> if otel.is_enabled():
  197. ... print("Metrics are being collected")
  198. """
  199. return (
  200. self._provider_manager is not None
  201. and self._provider_manager.config.is_enabled()
  202. )
  203. def get_provider_manager(self) -> Optional[OTelProviderManager]:
  204. """
  205. Get the provider manager instance.
  206. Returns:
  207. The provider manager, or None if not initialized
  208. Example:
  209. >>> otel = get_observability_instance()
  210. >>> manager = otel.get_provider_manager()
  211. >>> if manager is not None:
  212. ... print(f"Observability enabled: {manager.config.is_enabled()}")
  213. """
  214. return self._provider_manager
  215. def shutdown(self, timeout_millis: int = 30000) -> bool:
  216. """
  217. Shutdown observability and flush any pending metrics.
  218. This should be called at application shutdown to ensure all metrics
  219. are exported before the application exits.
  220. Args:
  221. timeout_millis: Maximum time to wait for shutdown
  222. Returns:
  223. True if shutdown was successful
  224. Example:
  225. >>> otel = get_observability_instance()
  226. >>> # At application shutdown
  227. >>> otel.shutdown()
  228. """
  229. if self._provider_manager is None:
  230. logger.debug("Observability not initialized, nothing to shutdown")
  231. return True
  232. success = self._provider_manager.shutdown(timeout_millis)
  233. self._provider_manager = None
  234. logger.info("Observability shutdown")
  235. return success
  236. def force_flush(self, timeout_millis: int = 30000) -> bool:
  237. """
  238. Force flush all pending metrics immediately.
  239. Useful for testing or when you want to ensure metrics are exported
  240. before a specific point in your application.
  241. Args:
  242. timeout_millis: Maximum time to wait for flush
  243. Returns:
  244. True if flush was successful
  245. Example:
  246. >>> otel = get_observability_instance()
  247. >>> # Execute some Redis commands
  248. >>> r.set('key', 'value')
  249. >>> # Force flush metrics immediately
  250. >>> otel.force_flush()
  251. """
  252. if self._provider_manager is None:
  253. logger.debug("Observability not initialized, nothing to flush")
  254. return True
  255. return self._provider_manager.force_flush(timeout_millis)
  256. # Global singleton instance
  257. _observability_instance: Optional[ObservabilityInstance] = None
  258. def get_observability_instance() -> ObservabilityInstance:
  259. """
  260. Get the global observability singleton instance.
  261. This is the Pythonic way to get the singleton instance.
  262. Returns:
  263. The global ObservabilityInstance singleton
  264. Example:
  265. >>>
  266. >>> otel = get_observability_instance()
  267. >>> otel.init(OTelConfig())
  268. """
  269. global _observability_instance
  270. if _observability_instance is None:
  271. _observability_instance = ObservabilityInstance()
  272. return _observability_instance
  273. def reset_observability_instance() -> None:
  274. """
  275. Reset the global observability singleton instance.
  276. This is primarily used for testing and benchmarking to ensure
  277. a clean state between test runs.
  278. Warning:
  279. This will shutdown any active provider manager and reset
  280. the global state. Use with caution in production code.
  281. """
  282. global _observability_instance
  283. if _observability_instance is not None:
  284. _observability_instance.shutdown()
  285. _observability_instance = None