| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- """
- OpenTelemetry semantic convention attributes for Redis.
- This module provides constants and helper functions for building OTel attributes
- according to the semantic conventions for database clients.
- Reference: https://opentelemetry.io/docs/specs/semconv/database/redis/
- """
- from enum import Enum
- from typing import TYPE_CHECKING, Any, Dict, Optional, Union
- import redis
- if TYPE_CHECKING:
- from redis.asyncio.connection import ConnectionPool
- from redis.asyncio.multidb.database import AsyncDatabase
- from redis.connection import ConnectionPoolInterface
- from redis.multidb.database import SyncDatabase
- # Database semantic convention attributes
- DB_SYSTEM = "db.system"
- DB_NAMESPACE = "db.namespace"
- DB_OPERATION_NAME = "db.operation.name"
- DB_RESPONSE_STATUS_CODE = "db.response.status_code"
- DB_STORED_PROCEDURE_NAME = "db.stored_procedure.name"
- # Error attributes
- ERROR_TYPE = "error.type"
- # Network attributes
- NETWORK_PEER_ADDRESS = "network.peer.address"
- NETWORK_PEER_PORT = "network.peer.port"
- # Server attributes
- SERVER_ADDRESS = "server.address"
- SERVER_PORT = "server.port"
- # Connection pool attributes
- DB_CLIENT_CONNECTION_POOL_NAME = "db.client.connection.pool.name"
- DB_CLIENT_CONNECTION_STATE = "db.client.connection.state"
- DB_CLIENT_CONNECTION_NAME = "db.client.connection.name"
- # Geofailover attributes
- DB_CLIENT_GEOFAILOVER_FAIL_FROM = "db.client.geofailover.fail_from"
- DB_CLIENT_GEOFAILOVER_FAIL_TO = "db.client.geofailover.fail_to"
- DB_CLIENT_GEOFAILOVER_REASON = "db.client.geofailover.reason"
- # Redis-specific attributes
- REDIS_CLIENT_LIBRARY = "redis.client.library"
- REDIS_CLIENT_CONNECTION_PUBSUB = "redis.client.connection.pubsub"
- REDIS_CLIENT_CONNECTION_CLOSE_REASON = "redis.client.connection.close.reason"
- REDIS_CLIENT_CONNECTION_NOTIFICATION = "redis.client.connection.notification"
- REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS = "redis.client.operation.retry_attempts"
- REDIS_CLIENT_OPERATION_BLOCKING = "redis.client.operation.blocking"
- REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION = "redis.client.pubsub.message.direction"
- REDIS_CLIENT_PUBSUB_CHANNEL = "redis.client.pubsub.channel"
- REDIS_CLIENT_PUBSUB_SHARDED = "redis.client.pubsub.sharded"
- REDIS_CLIENT_ERROR_INTERNAL = "redis.client.errors.internal"
- REDIS_CLIENT_ERROR_CATEGORY = "redis.client.errors.category"
- REDIS_CLIENT_STREAM_NAME = "redis.client.stream.name"
- REDIS_CLIENT_CONSUMER_GROUP = "redis.client.consumer_group"
- REDIS_CLIENT_CSC_RESULT = "redis.client.csc.result"
- REDIS_CLIENT_CSC_REASON = "redis.client.csc.reason"
- class ConnectionState(Enum):
- IDLE = "idle"
- USED = "used"
- class PubSubDirection(Enum):
- PUBLISH = "publish"
- RECEIVE = "receive"
- class CSCResult(Enum):
- HIT = "hit"
- MISS = "miss"
- class CSCReason(Enum):
- FULL = "full"
- INVALIDATION = "invalidation"
- class GeoFailoverReason(Enum):
- AUTOMATIC = "automatic"
- MANUAL = "manual"
- class AttributeBuilder:
- """
- Helper class to build OTel semantic convention attributes for Redis operations.
- """
- @staticmethod
- def build_base_attributes(
- server_address: Optional[str] = None,
- server_port: Optional[int] = None,
- db_namespace: Optional[int] = None,
- ) -> Dict[str, Any]:
- """
- Build base attributes common to all Redis operations.
- Args:
- server_address: Redis server address (FQDN or IP)
- server_port: Redis server port
- db_namespace: Redis database index
- Returns:
- Dictionary of base attributes
- """
- attrs: Dict[str, Any] = {
- DB_SYSTEM: "redis",
- REDIS_CLIENT_LIBRARY: f"redis-py:v{redis.__version__}",
- }
- if server_address is not None:
- attrs[SERVER_ADDRESS] = server_address
- if server_port is not None:
- attrs[SERVER_PORT] = server_port
- if db_namespace is not None:
- attrs[DB_NAMESPACE] = str(db_namespace)
- return attrs
- @staticmethod
- def build_operation_attributes(
- command_name: Optional[Union[str, bytes]] = None,
- batch_size: Optional[int] = None, # noqa
- network_peer_address: Optional[str] = None,
- network_peer_port: Optional[int] = None,
- stored_procedure_name: Optional[str] = None,
- retry_attempts: Optional[int] = None,
- is_blocking: Optional[bool] = None,
- ) -> Dict[str, Any]:
- """
- Build attributes for a Redis operation (command execution).
- Args:
- command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI'), can be str or bytes
- batch_size: Number of commands in batch (for pipelines/transactions)
- network_peer_address: Resolved peer address
- network_peer_port: Peer port number
- stored_procedure_name: Lua script name or SHA1 digest
- retry_attempts: Number of retry attempts made
- is_blocking: Whether the operation is a blocking command
- Returns:
- Dictionary of operation attributes
- """
- attrs: Dict[str, Any] = {}
- if command_name is not None:
- # Ensure command_name is a string (it can be bytes from args[0])
- if isinstance(command_name, bytes):
- command_name = command_name.decode("utf-8", errors="replace")
- attrs[DB_OPERATION_NAME] = command_name.upper()
- if network_peer_address is not None:
- attrs[NETWORK_PEER_ADDRESS] = network_peer_address
- if network_peer_port is not None:
- attrs[NETWORK_PEER_PORT] = network_peer_port
- if stored_procedure_name is not None:
- attrs[DB_STORED_PROCEDURE_NAME] = stored_procedure_name
- if retry_attempts is not None and retry_attempts > 0:
- attrs[REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS] = retry_attempts
- if is_blocking is not None:
- attrs[REDIS_CLIENT_OPERATION_BLOCKING] = is_blocking
- return attrs
- @staticmethod
- def build_connection_attributes(
- pool_name: Optional[str] = None,
- connection_state: Optional[ConnectionState] = None,
- connection_name: Optional[str] = None,
- is_pubsub: Optional[bool] = None,
- ) -> Dict[str, Any]:
- """
- Build attributes for connection pool metrics.
- Args:
- pool_name: Unique connection pool name
- connection_state: Connection state ('idle' or 'used')
- is_pubsub: Whether this is a PubSub connection
- connection_name: Unique connection name
- Returns:
- Dictionary of connection pool attributes
- """
- attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
- if pool_name is not None:
- attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name
- if connection_state is not None:
- attrs[DB_CLIENT_CONNECTION_STATE] = connection_state.value
- if is_pubsub is not None:
- attrs[REDIS_CLIENT_CONNECTION_PUBSUB] = is_pubsub
- if connection_name is not None:
- attrs[DB_CLIENT_CONNECTION_NAME] = connection_name
- return attrs
- @staticmethod
- def build_error_attributes(
- error_type: Optional[Exception] = None,
- is_internal: Optional[bool] = None,
- ) -> Dict[str, Any]:
- """
- Build error attributes.
- Args:
- is_internal: Whether the error is internal (e.g., timeout, network error)
- error_type: The exception that occurred
- Returns:
- Dictionary of error attributes
- """
- attrs: Dict[str, Any] = {}
- if error_type is not None:
- attrs[ERROR_TYPE] = error_type.__class__.__name__
- if (
- hasattr(error_type, "status_code")
- and error_type.status_code is not None
- ):
- attrs[DB_RESPONSE_STATUS_CODE] = error_type.status_code
- else:
- attrs[DB_RESPONSE_STATUS_CODE] = "error"
- if hasattr(error_type, "error_type") and error_type.error_type is not None:
- attrs[REDIS_CLIENT_ERROR_CATEGORY] = error_type.error_type.value
- else:
- attrs[REDIS_CLIENT_ERROR_CATEGORY] = "other"
- if is_internal is not None:
- attrs[REDIS_CLIENT_ERROR_INTERNAL] = is_internal
- return attrs
- @staticmethod
- def build_pubsub_message_attributes(
- direction: PubSubDirection,
- channel: Optional[str] = None,
- sharded: Optional[bool] = None,
- ) -> Dict[str, Any]:
- """
- Build attributes for a PubSub message.
- Args:
- direction: Message direction ('publish' or 'receive')
- channel: Pub/Sub channel name
- sharded: True if sharded Pub/Sub channel
- Returns:
- Dictionary of PubSub message attributes
- """
- attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
- attrs[REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION] = direction.value
- if channel is not None:
- attrs[REDIS_CLIENT_PUBSUB_CHANNEL] = channel
- if sharded is not None:
- attrs[REDIS_CLIENT_PUBSUB_SHARDED] = sharded
- return attrs
- @staticmethod
- def build_streaming_attributes(
- stream_name: Optional[str] = None,
- consumer_group: Optional[str] = None,
- consumer_name: Optional[str] = None, # noqa
- ) -> Dict[str, Any]:
- """
- Build attributes for a streaming operation.
- Args:
- stream_name: Name of the stream
- consumer_group: Name of the consumer group
- consumer_name: Name of the consumer
- Returns:
- Dictionary of streaming attributes
- """
- attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
- if stream_name is not None:
- attrs[REDIS_CLIENT_STREAM_NAME] = stream_name
- if consumer_group is not None:
- attrs[REDIS_CLIENT_CONSUMER_GROUP] = consumer_group
- return attrs
- @staticmethod
- def build_csc_attributes(
- pool_name: Optional[str] = None,
- result: Optional[CSCResult] = None,
- reason: Optional[CSCReason] = None,
- ) -> Dict[str, Any]:
- """
- Build attributes for a Client Side Caching (CSC) operation.
- Args:
- pool_name: Connection pool name (used only for csc_items metric)
- result: CSC result ('hit' or 'miss')
- reason: Reason for CSC eviction ('full' or 'invalidation')
- Returns:
- Dictionary of CSC attributes
- """
- attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
- if pool_name is not None:
- attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name
- if result is not None:
- attrs[REDIS_CLIENT_CSC_RESULT] = result.value
- if reason is not None:
- attrs[REDIS_CLIENT_CSC_REASON] = reason.value
- return attrs
- @staticmethod
- def build_geo_failover_attributes(
- fail_from: Union["SyncDatabase", "AsyncDatabase"],
- fail_to: Union["SyncDatabase", "AsyncDatabase"],
- reason: GeoFailoverReason,
- ) -> Dict[str, Any]:
- """
- Build attributes for a geo failover.
- Args:
- fail_from: Database failed from
- fail_to: Database failed to
- reason: Reason for the failover
- Returns:
- Dictionary of geo failover attributes
- """
- attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
- attrs[DB_CLIENT_GEOFAILOVER_FAIL_FROM] = get_db_name(fail_from)
- attrs[DB_CLIENT_GEOFAILOVER_FAIL_TO] = get_db_name(fail_to)
- attrs[DB_CLIENT_GEOFAILOVER_REASON] = reason.value
- return attrs
- @staticmethod
- def build_pool_name(
- server_address: str,
- server_port: int,
- db_namespace: int = 0,
- ) -> str:
- """
- Build a unique connection pool name.
- Args:
- server_address: Redis server address
- server_port: Redis server port
- db_namespace: Redis database index
- Returns:
- Unique pool name in format "address:port/db"
- """
- return f"{server_address}:{server_port}/{db_namespace}"
- def get_pool_name(pool: Union["ConnectionPoolInterface", "ConnectionPool"]) -> str:
- """
- Get a short string representation of a connection pool for observability.
- This provides a concise pool identifier suitable for use as a metric attribute,
- in the format: host:port_uniqueID (matching go-redis format)
- Args:
- pool: Connection pool instance
- Returns:
- Short pool name in format "host:port_uniqueID"
- Example:
- >>> pool = ConnectionPool(host='localhost', port=6379, db=0)
- >>> get_pool_name(pool)
- 'localhost:6379_a1b2c3d4'
- """
- host = pool.connection_kwargs.get("host", "unknown")
- port = pool.connection_kwargs.get("port", 6379)
- # Get unique pool ID if available (added for observability)
- pool_id = getattr(pool, "_pool_id", "")
- if pool_id:
- return f"{host}:{port}_{pool_id}"
- else:
- return f"{host}:{port}"
- def get_db_name(database: Union["SyncDatabase", "AsyncDatabase"]):
- """
- Get a short string representation of a database for observability.
- Args:
- database: Database instance
- Returns:
- Short database name in format "{host}:{port}/{weight}"
- """
- host = database.client.get_connection_kwargs()["host"]
- port = database.client.get_connection_kwargs()["port"]
- weight = database.weight
- return f"{host}:{port}/{weight}"
|