metrics.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727
  1. """
  2. OpenTelemetry metrics collector for redis-py.
  3. This module defines and manages all metric instruments according to
  4. OTel semantic conventions for database clients.
  5. """
  6. import logging
  7. import time
  8. from enum import Enum
  9. from typing import TYPE_CHECKING, Callable, Optional, Union
  10. if TYPE_CHECKING:
  11. from redis.asyncio.connection import ConnectionPool
  12. from redis.asyncio.multidb.database import AsyncDatabase
  13. from redis.connection import ConnectionPoolInterface
  14. from redis.multidb.database import SyncDatabase
  15. from redis.observability.attributes import (
  16. REDIS_CLIENT_CONNECTION_CLOSE_REASON,
  17. REDIS_CLIENT_CONNECTION_NOTIFICATION,
  18. AttributeBuilder,
  19. ConnectionState,
  20. CSCReason,
  21. CSCResult,
  22. GeoFailoverReason,
  23. PubSubDirection,
  24. get_pool_name,
  25. )
  26. from redis.observability.config import MetricGroup, OTelConfig
  27. from redis.utils import deprecated_args, deprecated_function
  28. logger = logging.getLogger(__name__)
  29. # Optional imports - OTel SDK may not be installed
  30. try:
  31. from opentelemetry.metrics import Meter
  32. OTEL_AVAILABLE = True
  33. except ImportError:
  34. OTEL_AVAILABLE = False
  35. Counter = None
  36. Histogram = None
  37. Meter = None
  38. UpDownCounter = None
  39. class CloseReason(Enum):
  40. """
  41. Enum representing the reason why a Redis client connection was closed.
  42. Values:
  43. APPLICATION_CLOSE: The connection was closed intentionally by the application
  44. (for example, during normal shutdown or explicit cleanup).
  45. ERROR: The connection was closed due to an unexpected error
  46. (for example, network failure or protocol error).
  47. HEALTHCHECK_FAILED: The connection was closed because a health check
  48. or liveness check for the connection failed.
  49. """
  50. APPLICATION_CLOSE = "application_close"
  51. ERROR = "error"
  52. HEALTHCHECK_FAILED = "healthcheck_failed"
  53. class RedisMetricsCollector:
  54. """
  55. Collects and records OpenTelemetry metrics for Redis operations.
  56. This class manages all metric instruments and provides methods to record
  57. various Redis operations including connection pool events, command execution,
  58. and cluster-specific operations.
  59. Args:
  60. meter: OpenTelemetry Meter instance
  61. config: OTel configuration object
  62. """
  63. METER_NAME = "redis-py"
  64. METER_VERSION = "1.0.0"
  65. def __init__(self, meter: Meter, config: OTelConfig):
  66. if not OTEL_AVAILABLE:
  67. raise ImportError(
  68. "OpenTelemetry API is not installed. "
  69. "Install it with: pip install opentelemetry-api"
  70. )
  71. self.meter = meter
  72. self.config = config
  73. self.attr_builder = AttributeBuilder()
  74. # Initialize enabled metric instruments
  75. if MetricGroup.RESILIENCY in self.config.metric_groups:
  76. self._init_resiliency_metrics()
  77. if MetricGroup.COMMAND in self.config.metric_groups:
  78. self._init_command_metrics()
  79. if MetricGroup.CONNECTION_BASIC in self.config.metric_groups:
  80. self._init_connection_basic_metrics()
  81. if MetricGroup.CONNECTION_ADVANCED in self.config.metric_groups:
  82. self._init_connection_advanced_metrics()
  83. if MetricGroup.PUBSUB in self.config.metric_groups:
  84. self._init_pubsub_metrics()
  85. if MetricGroup.STREAMING in self.config.metric_groups:
  86. self._init_streaming_metrics()
  87. if MetricGroup.CSC in self.config.metric_groups:
  88. self._init_csc_metrics()
  89. logger.info("RedisMetricsCollector initialized")
  90. def _init_resiliency_metrics(self) -> None:
  91. """Initialize resiliency metrics."""
  92. self.client_errors = self.meter.create_counter(
  93. name="redis.client.errors",
  94. unit="{error}",
  95. description="A counter of all errors (both returned to the user and handled internally in the client library)",
  96. )
  97. self.maintenance_notifications = self.meter.create_counter(
  98. name="redis.client.maintenance.notifications",
  99. unit="{notification}",
  100. description="Tracks server-side maintenance notifications",
  101. )
  102. self.geo_failovers = self.meter.create_counter(
  103. name="redis.client.geofailover.failovers",
  104. unit="{geofailover}",
  105. description="Total count of failovers happened using MultiDbClient.",
  106. )
  107. def _init_connection_basic_metrics(self) -> None:
  108. """Initialize basic connection metrics."""
  109. self.connection_create_time = self.meter.create_histogram(
  110. name="db.client.connection.create_time",
  111. unit="s",
  112. description="Time to create a new connection",
  113. explicit_bucket_boundaries_advisory=self.config.buckets_connection_create_time,
  114. )
  115. self.connection_relaxed_timeout = self.meter.create_up_down_counter(
  116. name="redis.client.connection.relaxed_timeout",
  117. unit="{relaxation}",
  118. description="Counts up for relaxed timeout, counts down for unrelaxed timeout",
  119. )
  120. self.connection_handoff = self.meter.create_counter(
  121. name="redis.client.connection.handoff",
  122. unit="{handoff}",
  123. description="Connections that have been handed off (e.g., after a MOVING notification)",
  124. )
  125. # DEPRECATED: This attribute is kept for backward compatibility.
  126. # It requires manual initialization via init_connection_count() with a callback.
  127. # Use connection_count_updown instead for push-based tracking.
  128. # Will be removed in the next major version.
  129. self.connection_count = None
  130. # New push-based connection count tracking via UpDownCounter
  131. self.connection_count_updown = self.meter.create_up_down_counter(
  132. name="db.client.connection.count",
  133. unit="{connection}",
  134. description="Number of connections currently in the pool by state",
  135. )
  136. def _init_connection_advanced_metrics(self) -> None:
  137. """Initialize advanced connection metrics."""
  138. self.connection_timeouts = self.meter.create_counter(
  139. name="db.client.connection.timeouts",
  140. unit="{timeout}",
  141. description="The number of connection timeouts that have occurred trying to obtain a connection from the pool.",
  142. )
  143. self.connection_wait_time = self.meter.create_histogram(
  144. name="db.client.connection.wait_time",
  145. unit="s",
  146. description="Time to obtain an open connection from the pool",
  147. explicit_bucket_boundaries_advisory=self.config.buckets_connection_wait_time,
  148. )
  149. self.connection_closed = self.meter.create_counter(
  150. name="redis.client.connection.closed",
  151. unit="{connection}",
  152. description="Total number of closed connections",
  153. )
  154. def _init_command_metrics(self) -> None:
  155. """Initialize command execution metric instruments."""
  156. self.operation_duration = self.meter.create_histogram(
  157. name="db.client.operation.duration",
  158. unit="s",
  159. description="Command execution duration",
  160. explicit_bucket_boundaries_advisory=self.config.buckets_operation_duration,
  161. )
  162. def _init_pubsub_metrics(self) -> None:
  163. """Initialize PubSub metric instruments."""
  164. self.pubsub_messages = self.meter.create_counter(
  165. name="redis.client.pubsub.messages",
  166. unit="{message}",
  167. description="Tracks published and received messages",
  168. )
  169. def _init_streaming_metrics(self) -> None:
  170. """Initialize Streaming metric instruments."""
  171. self.stream_lag = self.meter.create_histogram(
  172. name="redis.client.stream.lag",
  173. unit="s",
  174. description="End-to-end lag per message, showing how stale are the messages when the application starts processing them.",
  175. explicit_bucket_boundaries_advisory=self.config.buckets_stream_processing_duration,
  176. )
  177. def _init_csc_metrics(self) -> None:
  178. """Initialize Client Side Caching (CSC) metric instruments."""
  179. self.csc_requests = self.meter.create_counter(
  180. name="redis.client.csc.requests",
  181. unit="{request}",
  182. description="The total number of requests to the cache",
  183. )
  184. self.csc_evictions = self.meter.create_counter(
  185. name="redis.client.csc.evictions",
  186. unit="{eviction}",
  187. description="The total number of cache evictions",
  188. )
  189. self.csc_network_saved = self.meter.create_counter(
  190. name="redis.client.csc.network_saved",
  191. unit="By",
  192. description="The total number of bytes saved by using CSC",
  193. )
  194. # Resiliency metric recording methods
  195. def record_error_count(
  196. self,
  197. server_address: Optional[str] = None,
  198. server_port: Optional[int] = None,
  199. network_peer_address: Optional[str] = None,
  200. network_peer_port: Optional[int] = None,
  201. error_type: Optional[Exception] = None,
  202. retry_attempts: Optional[int] = None,
  203. is_internal: Optional[bool] = None,
  204. ):
  205. """
  206. Record error count
  207. Args:
  208. server_address: Server address
  209. server_port: Server port
  210. network_peer_address: Network peer address
  211. network_peer_port: Network peer port
  212. error_type: Error type
  213. retry_attempts: Retry attempts
  214. is_internal: Whether the error is internal (e.g., timeout, network error)
  215. """
  216. if not hasattr(self, "client_errors"):
  217. return
  218. attrs = self.attr_builder.build_base_attributes(
  219. server_address=server_address,
  220. server_port=server_port,
  221. )
  222. attrs.update(
  223. self.attr_builder.build_operation_attributes(
  224. network_peer_address=network_peer_address,
  225. network_peer_port=network_peer_port,
  226. retry_attempts=retry_attempts,
  227. )
  228. )
  229. attrs.update(
  230. self.attr_builder.build_error_attributes(
  231. error_type=error_type,
  232. is_internal=is_internal,
  233. )
  234. )
  235. self.client_errors.add(1, attributes=attrs)
  236. def record_maint_notification_count(
  237. self,
  238. server_address: str,
  239. server_port: int,
  240. network_peer_address: str,
  241. network_peer_port: int,
  242. maint_notification: str,
  243. ):
  244. """
  245. Record maintenance notification count
  246. Args:
  247. server_address: Server address
  248. server_port: Server port
  249. network_peer_address: Network peer address
  250. network_peer_port: Network peer port
  251. maint_notification: Maintenance notification
  252. """
  253. if not hasattr(self, "maintenance_notifications"):
  254. return
  255. attrs = self.attr_builder.build_base_attributes(
  256. server_address=server_address,
  257. server_port=server_port,
  258. )
  259. attrs.update(
  260. self.attr_builder.build_operation_attributes(
  261. network_peer_address=network_peer_address,
  262. network_peer_port=network_peer_port,
  263. )
  264. )
  265. attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification
  266. self.maintenance_notifications.add(1, attributes=attrs)
  267. def record_geo_failover(
  268. self,
  269. fail_from: Union["SyncDatabase", "AsyncDatabase"],
  270. fail_to: Union["SyncDatabase", "AsyncDatabase"],
  271. reason: GeoFailoverReason,
  272. ):
  273. """
  274. Record geo failover
  275. Args:
  276. fail_from: Database failed from
  277. fail_to: Database failed to
  278. reason: Reason for the failover
  279. """
  280. if not hasattr(self, "geo_failovers"):
  281. return
  282. attrs = self.attr_builder.build_geo_failover_attributes(
  283. fail_from=fail_from,
  284. fail_to=fail_to,
  285. reason=reason,
  286. )
  287. return self.geo_failovers.add(1, attributes=attrs)
  288. def record_connection_count(
  289. self,
  290. pool_name: str,
  291. connection_state: ConnectionState,
  292. counter: int = 1,
  293. ) -> None:
  294. """
  295. Record a connection count change for a single state.
  296. Args:
  297. pool_name: Connection pool name
  298. connection_state: State to update (IDLE or USED)
  299. counter: Number to add (positive) or subtract (negative)
  300. """
  301. if not hasattr(self, "connection_count_updown"):
  302. return
  303. attrs = self.attr_builder.build_connection_attributes(
  304. pool_name=pool_name,
  305. connection_state=connection_state,
  306. )
  307. self.connection_count_updown.add(counter, attributes=attrs)
  308. @deprecated_function(
  309. reason="Connection count is now tracked via record_connection_count(). "
  310. "This functionality will be removed in the next major version",
  311. version="7.4.0",
  312. )
  313. def init_connection_count(
  314. self,
  315. callback: Callable,
  316. ) -> None:
  317. """
  318. Initialize observable gauge for connection count metric.
  319. Args:
  320. callback: Callback function to retrieve connection counts
  321. """
  322. if MetricGroup.CONNECTION_BASIC not in self.config.metric_groups:
  323. return
  324. # DEPRECATED: Create observable gauge for backward compatibility
  325. # This gauge uses a different metric name to avoid conflicts with
  326. # the new push-based connection_count_updown counter
  327. self.connection_count = self.meter.create_observable_gauge(
  328. name="db.client.connection.count.deprecated",
  329. unit="{connection}",
  330. description="The number of connections that are currently in state "
  331. "described by the state attribute (deprecated - use db.client.connection.count instead)",
  332. callbacks=[callback],
  333. )
  334. def init_csc_items(
  335. self,
  336. callback: Callable,
  337. ) -> None:
  338. """
  339. Initialize observable gauge for CSC items metric.
  340. Args:
  341. callback: Callback function to retrieve CSC items count
  342. """
  343. if MetricGroup.CSC not in self.config.metric_groups and not self.csc_items:
  344. return
  345. self.csc_items = self.meter.create_observable_gauge(
  346. name="redis.client.csc.items",
  347. unit="{item}",
  348. description="The total number of cached responses currently stored",
  349. callbacks=[callback],
  350. )
  351. def record_connection_timeout(self, pool_name: str) -> None:
  352. """
  353. Record a connection timeout event.
  354. Args:
  355. pool_name: Connection pool name
  356. """
  357. if not hasattr(self, "connection_timeouts"):
  358. return
  359. attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
  360. self.connection_timeouts.add(1, attributes=attrs)
  361. def record_connection_create_time(
  362. self,
  363. connection_pool: Union["ConnectionPoolInterface", "ConnectionPool"],
  364. duration_seconds: float,
  365. ) -> None:
  366. """
  367. Record time taken to create a new connection.
  368. Args:
  369. connection_pool: Connection pool implementation
  370. duration_seconds: Creation time in seconds
  371. """
  372. if not hasattr(self, "connection_create_time"):
  373. return
  374. attrs = self.attr_builder.build_connection_attributes(
  375. pool_name=get_pool_name(connection_pool)
  376. )
  377. self.connection_create_time.record(duration_seconds, attributes=attrs)
  378. def record_connection_wait_time(
  379. self,
  380. pool_name: str,
  381. duration_seconds: float,
  382. ) -> None:
  383. """
  384. Record time taken to obtain a connection from the pool.
  385. Args:
  386. pool_name: Connection pool name
  387. duration_seconds: Wait time in seconds
  388. """
  389. if not hasattr(self, "connection_wait_time"):
  390. return
  391. attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
  392. self.connection_wait_time.record(duration_seconds, attributes=attrs)
  393. # Command execution metric recording methods
  394. @deprecated_args(
  395. args_to_warn=["batch_size"],
  396. reason="The batch_size argument is no longer used and will be removed in the next major version.",
  397. version="7.2.1",
  398. )
  399. def record_operation_duration(
  400. self,
  401. command_name: str,
  402. duration_seconds: float,
  403. server_address: Optional[str] = None,
  404. server_port: Optional[int] = None,
  405. db_namespace: Optional[int] = None,
  406. batch_size: Optional[int] = None, # noqa
  407. error_type: Optional[Exception] = None,
  408. network_peer_address: Optional[str] = None,
  409. network_peer_port: Optional[int] = None,
  410. retry_attempts: Optional[int] = None,
  411. is_blocking: Optional[bool] = None,
  412. ) -> None:
  413. """
  414. Record command execution duration.
  415. Args:
  416. command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI')
  417. duration_seconds: Execution time in seconds
  418. server_address: Redis server address
  419. server_port: Redis server port
  420. db_namespace: Redis database index
  421. batch_size: Number of commands in batch (for pipelines/transactions)
  422. error_type: Error type if operation failed
  423. network_peer_address: Resolved peer address
  424. network_peer_port: Peer port number
  425. retry_attempts: Number of retry attempts made
  426. is_blocking: Whether the operation is a blocking command
  427. """
  428. if not hasattr(self, "operation_duration"):
  429. return
  430. # Check if this command should be tracked
  431. if not self.config.should_track_command(command_name):
  432. return
  433. # Build attributes
  434. attrs = self.attr_builder.build_base_attributes(
  435. server_address=server_address,
  436. server_port=server_port,
  437. db_namespace=db_namespace,
  438. )
  439. attrs.update(
  440. self.attr_builder.build_operation_attributes(
  441. command_name=command_name,
  442. network_peer_address=network_peer_address,
  443. network_peer_port=network_peer_port,
  444. retry_attempts=retry_attempts,
  445. is_blocking=is_blocking,
  446. )
  447. )
  448. attrs.update(
  449. self.attr_builder.build_error_attributes(
  450. error_type=error_type,
  451. )
  452. )
  453. self.operation_duration.record(duration_seconds, attributes=attrs)
  454. def record_connection_closed(
  455. self,
  456. close_reason: Optional[CloseReason] = None,
  457. error_type: Optional[Exception] = None,
  458. ) -> None:
  459. """
  460. Record a connection closed event.
  461. Args:
  462. close_reason: Reason for closing (e.g. 'error', 'application_close')
  463. error_type: Error type if closed due to error
  464. """
  465. if not hasattr(self, "connection_closed"):
  466. return
  467. attrs = self.attr_builder.build_connection_attributes()
  468. if close_reason:
  469. attrs[REDIS_CLIENT_CONNECTION_CLOSE_REASON] = close_reason.value
  470. attrs.update(
  471. self.attr_builder.build_error_attributes(
  472. error_type=error_type,
  473. )
  474. )
  475. self.connection_closed.add(1, attributes=attrs)
  476. def record_connection_relaxed_timeout(
  477. self,
  478. connection_name: str,
  479. maint_notification: str,
  480. relaxed: bool,
  481. ) -> None:
  482. """
  483. Record a connection timeout relaxation event.
  484. Args:
  485. connection_name: Connection name
  486. maint_notification: Maintenance notification type
  487. relaxed: True to count up (relaxed), False to count down (unrelaxed)
  488. """
  489. if not hasattr(self, "connection_relaxed_timeout"):
  490. return
  491. attrs = self.attr_builder.build_connection_attributes(pool_name=connection_name)
  492. attrs[REDIS_CLIENT_CONNECTION_NOTIFICATION] = maint_notification
  493. self.connection_relaxed_timeout.add(1 if relaxed else -1, attributes=attrs)
  494. def record_connection_handoff(
  495. self,
  496. pool_name: str,
  497. ) -> None:
  498. """
  499. Record a connection handoff event (e.g., after MOVING notification).
  500. Args:
  501. pool_name: Connection pool name
  502. """
  503. if not hasattr(self, "connection_handoff"):
  504. return
  505. attrs = self.attr_builder.build_connection_attributes(pool_name=pool_name)
  506. self.connection_handoff.add(1, attributes=attrs)
  507. # PubSub metric recording methods
  508. def record_pubsub_message(
  509. self,
  510. direction: PubSubDirection,
  511. channel: Optional[str] = None,
  512. sharded: Optional[bool] = None,
  513. ) -> None:
  514. """
  515. Record a PubSub message (published or received).
  516. Args:
  517. direction: Message direction ('publish' or 'receive')
  518. channel: Pub/Sub channel name
  519. sharded: True if sharded Pub/Sub channel
  520. """
  521. if not hasattr(self, "pubsub_messages"):
  522. return
  523. attrs = self.attr_builder.build_pubsub_message_attributes(
  524. direction=direction,
  525. channel=channel,
  526. sharded=sharded,
  527. )
  528. self.pubsub_messages.add(1, attributes=attrs)
  529. # Streaming metric recording methods
  530. @deprecated_args(
  531. args_to_warn=["consumer_name"],
  532. reason="The consumer_name argument is no longer used and will be removed in the next major version.",
  533. version="7.2.1",
  534. )
  535. def record_streaming_lag(
  536. self,
  537. lag_seconds: float,
  538. stream_name: Optional[str] = None,
  539. consumer_group: Optional[str] = None,
  540. consumer_name: Optional[str] = None, # noqa
  541. ) -> None:
  542. """
  543. Record the lag of a streaming message.
  544. Args:
  545. lag_seconds: Lag in seconds
  546. stream_name: Stream name
  547. consumer_group: Consumer group name
  548. consumer_name: Consumer name
  549. """
  550. if not hasattr(self, "stream_lag"):
  551. return
  552. attrs = self.attr_builder.build_streaming_attributes(
  553. stream_name=stream_name,
  554. consumer_group=consumer_group,
  555. )
  556. self.stream_lag.record(lag_seconds, attributes=attrs)
  557. # CSC metric recording methods
  558. def record_csc_request(
  559. self,
  560. result: Optional[CSCResult] = None,
  561. ) -> None:
  562. """
  563. Record a Client Side Caching (CSC) request.
  564. Args:
  565. result: CSC result ('hit' or 'miss')
  566. """
  567. if not hasattr(self, "csc_requests"):
  568. return
  569. attrs = self.attr_builder.build_csc_attributes(result=result)
  570. self.csc_requests.add(1, attributes=attrs)
  571. def record_csc_eviction(
  572. self,
  573. count: int,
  574. reason: Optional[CSCReason] = None,
  575. ) -> None:
  576. """
  577. Record a Client Side Caching (CSC) eviction.
  578. Args:
  579. count: Number of evictions
  580. reason: Reason for eviction
  581. """
  582. if not hasattr(self, "csc_evictions"):
  583. return
  584. attrs = self.attr_builder.build_csc_attributes(reason=reason)
  585. self.csc_evictions.add(count, attributes=attrs)
  586. def record_csc_network_saved(
  587. self,
  588. bytes_saved: int,
  589. ) -> None:
  590. """
  591. Record the number of bytes saved by using Client Side Caching (CSC).
  592. Args:
  593. bytes_saved: Number of bytes saved
  594. """
  595. if not hasattr(self, "csc_network_saved"):
  596. return
  597. attrs = self.attr_builder.build_csc_attributes()
  598. self.csc_network_saved.add(bytes_saved, attributes=attrs)
  599. # Utility methods
  600. @staticmethod
  601. def monotonic_time() -> float:
  602. """
  603. Get monotonic time for duration measurements.
  604. Returns:
  605. Current monotonic time in seconds
  606. """
  607. return time.monotonic()
  608. def __repr__(self) -> str:
  609. return f"RedisMetricsCollector(meter={self.meter}, config={self.config})"