attributes.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428
  1. """
  2. OpenTelemetry semantic convention attributes for Redis.
  3. This module provides constants and helper functions for building OTel attributes
  4. according to the semantic conventions for database clients.
  5. Reference: https://opentelemetry.io/docs/specs/semconv/database/redis/
  6. """
  7. from enum import Enum
  8. from typing import TYPE_CHECKING, Any, Dict, Optional, Union
  9. import redis
  10. if TYPE_CHECKING:
  11. from redis.asyncio.connection import ConnectionPool
  12. from redis.asyncio.multidb.database import AsyncDatabase
  13. from redis.connection import ConnectionPoolInterface
  14. from redis.multidb.database import SyncDatabase
  15. # Database semantic convention attributes
  16. DB_SYSTEM = "db.system"
  17. DB_NAMESPACE = "db.namespace"
  18. DB_OPERATION_NAME = "db.operation.name"
  19. DB_RESPONSE_STATUS_CODE = "db.response.status_code"
  20. DB_STORED_PROCEDURE_NAME = "db.stored_procedure.name"
  21. # Error attributes
  22. ERROR_TYPE = "error.type"
  23. # Network attributes
  24. NETWORK_PEER_ADDRESS = "network.peer.address"
  25. NETWORK_PEER_PORT = "network.peer.port"
  26. # Server attributes
  27. SERVER_ADDRESS = "server.address"
  28. SERVER_PORT = "server.port"
  29. # Connection pool attributes
  30. DB_CLIENT_CONNECTION_POOL_NAME = "db.client.connection.pool.name"
  31. DB_CLIENT_CONNECTION_STATE = "db.client.connection.state"
  32. DB_CLIENT_CONNECTION_NAME = "db.client.connection.name"
  33. # Geofailover attributes
  34. DB_CLIENT_GEOFAILOVER_FAIL_FROM = "db.client.geofailover.fail_from"
  35. DB_CLIENT_GEOFAILOVER_FAIL_TO = "db.client.geofailover.fail_to"
  36. DB_CLIENT_GEOFAILOVER_REASON = "db.client.geofailover.reason"
  37. # Redis-specific attributes
  38. REDIS_CLIENT_LIBRARY = "redis.client.library"
  39. REDIS_CLIENT_CONNECTION_PUBSUB = "redis.client.connection.pubsub"
  40. REDIS_CLIENT_CONNECTION_CLOSE_REASON = "redis.client.connection.close.reason"
  41. REDIS_CLIENT_CONNECTION_NOTIFICATION = "redis.client.connection.notification"
  42. REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS = "redis.client.operation.retry_attempts"
  43. REDIS_CLIENT_OPERATION_BLOCKING = "redis.client.operation.blocking"
  44. REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION = "redis.client.pubsub.message.direction"
  45. REDIS_CLIENT_PUBSUB_CHANNEL = "redis.client.pubsub.channel"
  46. REDIS_CLIENT_PUBSUB_SHARDED = "redis.client.pubsub.sharded"
  47. REDIS_CLIENT_ERROR_INTERNAL = "redis.client.errors.internal"
  48. REDIS_CLIENT_ERROR_CATEGORY = "redis.client.errors.category"
  49. REDIS_CLIENT_STREAM_NAME = "redis.client.stream.name"
  50. REDIS_CLIENT_CONSUMER_GROUP = "redis.client.consumer_group"
  51. REDIS_CLIENT_CSC_RESULT = "redis.client.csc.result"
  52. REDIS_CLIENT_CSC_REASON = "redis.client.csc.reason"
  53. class ConnectionState(Enum):
  54. IDLE = "idle"
  55. USED = "used"
  56. class PubSubDirection(Enum):
  57. PUBLISH = "publish"
  58. RECEIVE = "receive"
  59. class CSCResult(Enum):
  60. HIT = "hit"
  61. MISS = "miss"
  62. class CSCReason(Enum):
  63. FULL = "full"
  64. INVALIDATION = "invalidation"
  65. class GeoFailoverReason(Enum):
  66. AUTOMATIC = "automatic"
  67. MANUAL = "manual"
  68. class AttributeBuilder:
  69. """
  70. Helper class to build OTel semantic convention attributes for Redis operations.
  71. """
  72. @staticmethod
  73. def build_base_attributes(
  74. server_address: Optional[str] = None,
  75. server_port: Optional[int] = None,
  76. db_namespace: Optional[int] = None,
  77. ) -> Dict[str, Any]:
  78. """
  79. Build base attributes common to all Redis operations.
  80. Args:
  81. server_address: Redis server address (FQDN or IP)
  82. server_port: Redis server port
  83. db_namespace: Redis database index
  84. Returns:
  85. Dictionary of base attributes
  86. """
  87. attrs: Dict[str, Any] = {
  88. DB_SYSTEM: "redis",
  89. REDIS_CLIENT_LIBRARY: f"redis-py:v{redis.__version__}",
  90. }
  91. if server_address is not None:
  92. attrs[SERVER_ADDRESS] = server_address
  93. if server_port is not None:
  94. attrs[SERVER_PORT] = server_port
  95. if db_namespace is not None:
  96. attrs[DB_NAMESPACE] = str(db_namespace)
  97. return attrs
  98. @staticmethod
  99. def build_operation_attributes(
  100. command_name: Optional[Union[str, bytes]] = None,
  101. batch_size: Optional[int] = None, # noqa
  102. network_peer_address: Optional[str] = None,
  103. network_peer_port: Optional[int] = None,
  104. stored_procedure_name: Optional[str] = None,
  105. retry_attempts: Optional[int] = None,
  106. is_blocking: Optional[bool] = None,
  107. ) -> Dict[str, Any]:
  108. """
  109. Build attributes for a Redis operation (command execution).
  110. Args:
  111. command_name: Redis command name (e.g., 'GET', 'SET', 'MULTI'), can be str or bytes
  112. batch_size: Number of commands in batch (for pipelines/transactions)
  113. network_peer_address: Resolved peer address
  114. network_peer_port: Peer port number
  115. stored_procedure_name: Lua script name or SHA1 digest
  116. retry_attempts: Number of retry attempts made
  117. is_blocking: Whether the operation is a blocking command
  118. Returns:
  119. Dictionary of operation attributes
  120. """
  121. attrs: Dict[str, Any] = {}
  122. if command_name is not None:
  123. # Ensure command_name is a string (it can be bytes from args[0])
  124. if isinstance(command_name, bytes):
  125. command_name = command_name.decode("utf-8", errors="replace")
  126. attrs[DB_OPERATION_NAME] = command_name.upper()
  127. if network_peer_address is not None:
  128. attrs[NETWORK_PEER_ADDRESS] = network_peer_address
  129. if network_peer_port is not None:
  130. attrs[NETWORK_PEER_PORT] = network_peer_port
  131. if stored_procedure_name is not None:
  132. attrs[DB_STORED_PROCEDURE_NAME] = stored_procedure_name
  133. if retry_attempts is not None and retry_attempts > 0:
  134. attrs[REDIS_CLIENT_OPERATION_RETRY_ATTEMPTS] = retry_attempts
  135. if is_blocking is not None:
  136. attrs[REDIS_CLIENT_OPERATION_BLOCKING] = is_blocking
  137. return attrs
  138. @staticmethod
  139. def build_connection_attributes(
  140. pool_name: Optional[str] = None,
  141. connection_state: Optional[ConnectionState] = None,
  142. connection_name: Optional[str] = None,
  143. is_pubsub: Optional[bool] = None,
  144. ) -> Dict[str, Any]:
  145. """
  146. Build attributes for connection pool metrics.
  147. Args:
  148. pool_name: Unique connection pool name
  149. connection_state: Connection state ('idle' or 'used')
  150. is_pubsub: Whether this is a PubSub connection
  151. connection_name: Unique connection name
  152. Returns:
  153. Dictionary of connection pool attributes
  154. """
  155. attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
  156. if pool_name is not None:
  157. attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name
  158. if connection_state is not None:
  159. attrs[DB_CLIENT_CONNECTION_STATE] = connection_state.value
  160. if is_pubsub is not None:
  161. attrs[REDIS_CLIENT_CONNECTION_PUBSUB] = is_pubsub
  162. if connection_name is not None:
  163. attrs[DB_CLIENT_CONNECTION_NAME] = connection_name
  164. return attrs
  165. @staticmethod
  166. def build_error_attributes(
  167. error_type: Optional[Exception] = None,
  168. is_internal: Optional[bool] = None,
  169. ) -> Dict[str, Any]:
  170. """
  171. Build error attributes.
  172. Args:
  173. is_internal: Whether the error is internal (e.g., timeout, network error)
  174. error_type: The exception that occurred
  175. Returns:
  176. Dictionary of error attributes
  177. """
  178. attrs: Dict[str, Any] = {}
  179. if error_type is not None:
  180. attrs[ERROR_TYPE] = error_type.__class__.__name__
  181. if (
  182. hasattr(error_type, "status_code")
  183. and error_type.status_code is not None
  184. ):
  185. attrs[DB_RESPONSE_STATUS_CODE] = error_type.status_code
  186. else:
  187. attrs[DB_RESPONSE_STATUS_CODE] = "error"
  188. if hasattr(error_type, "error_type") and error_type.error_type is not None:
  189. attrs[REDIS_CLIENT_ERROR_CATEGORY] = error_type.error_type.value
  190. else:
  191. attrs[REDIS_CLIENT_ERROR_CATEGORY] = "other"
  192. if is_internal is not None:
  193. attrs[REDIS_CLIENT_ERROR_INTERNAL] = is_internal
  194. return attrs
  195. @staticmethod
  196. def build_pubsub_message_attributes(
  197. direction: PubSubDirection,
  198. channel: Optional[str] = None,
  199. sharded: Optional[bool] = None,
  200. ) -> Dict[str, Any]:
  201. """
  202. Build attributes for a PubSub message.
  203. Args:
  204. direction: Message direction ('publish' or 'receive')
  205. channel: Pub/Sub channel name
  206. sharded: True if sharded Pub/Sub channel
  207. Returns:
  208. Dictionary of PubSub message attributes
  209. """
  210. attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
  211. attrs[REDIS_CLIENT_PUBSUB_MESSAGE_DIRECTION] = direction.value
  212. if channel is not None:
  213. attrs[REDIS_CLIENT_PUBSUB_CHANNEL] = channel
  214. if sharded is not None:
  215. attrs[REDIS_CLIENT_PUBSUB_SHARDED] = sharded
  216. return attrs
  217. @staticmethod
  218. def build_streaming_attributes(
  219. stream_name: Optional[str] = None,
  220. consumer_group: Optional[str] = None,
  221. consumer_name: Optional[str] = None, # noqa
  222. ) -> Dict[str, Any]:
  223. """
  224. Build attributes for a streaming operation.
  225. Args:
  226. stream_name: Name of the stream
  227. consumer_group: Name of the consumer group
  228. consumer_name: Name of the consumer
  229. Returns:
  230. Dictionary of streaming attributes
  231. """
  232. attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
  233. if stream_name is not None:
  234. attrs[REDIS_CLIENT_STREAM_NAME] = stream_name
  235. if consumer_group is not None:
  236. attrs[REDIS_CLIENT_CONSUMER_GROUP] = consumer_group
  237. return attrs
  238. @staticmethod
  239. def build_csc_attributes(
  240. pool_name: Optional[str] = None,
  241. result: Optional[CSCResult] = None,
  242. reason: Optional[CSCReason] = None,
  243. ) -> Dict[str, Any]:
  244. """
  245. Build attributes for a Client Side Caching (CSC) operation.
  246. Args:
  247. pool_name: Connection pool name (used only for csc_items metric)
  248. result: CSC result ('hit' or 'miss')
  249. reason: Reason for CSC eviction ('full' or 'invalidation')
  250. Returns:
  251. Dictionary of CSC attributes
  252. """
  253. attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
  254. if pool_name is not None:
  255. attrs[DB_CLIENT_CONNECTION_POOL_NAME] = pool_name
  256. if result is not None:
  257. attrs[REDIS_CLIENT_CSC_RESULT] = result.value
  258. if reason is not None:
  259. attrs[REDIS_CLIENT_CSC_REASON] = reason.value
  260. return attrs
  261. @staticmethod
  262. def build_geo_failover_attributes(
  263. fail_from: Union["SyncDatabase", "AsyncDatabase"],
  264. fail_to: Union["SyncDatabase", "AsyncDatabase"],
  265. reason: GeoFailoverReason,
  266. ) -> Dict[str, Any]:
  267. """
  268. Build attributes for a geo failover.
  269. Args:
  270. fail_from: Database failed from
  271. fail_to: Database failed to
  272. reason: Reason for the failover
  273. Returns:
  274. Dictionary of geo failover attributes
  275. """
  276. attrs: Dict[str, Any] = AttributeBuilder.build_base_attributes()
  277. attrs[DB_CLIENT_GEOFAILOVER_FAIL_FROM] = get_db_name(fail_from)
  278. attrs[DB_CLIENT_GEOFAILOVER_FAIL_TO] = get_db_name(fail_to)
  279. attrs[DB_CLIENT_GEOFAILOVER_REASON] = reason.value
  280. return attrs
  281. @staticmethod
  282. def build_pool_name(
  283. server_address: str,
  284. server_port: int,
  285. db_namespace: int = 0,
  286. ) -> str:
  287. """
  288. Build a unique connection pool name.
  289. Args:
  290. server_address: Redis server address
  291. server_port: Redis server port
  292. db_namespace: Redis database index
  293. Returns:
  294. Unique pool name in format "address:port/db"
  295. """
  296. return f"{server_address}:{server_port}/{db_namespace}"
  297. def get_pool_name(pool: Union["ConnectionPoolInterface", "ConnectionPool"]) -> str:
  298. """
  299. Get a short string representation of a connection pool for observability.
  300. This provides a concise pool identifier suitable for use as a metric attribute,
  301. in the format: host:port_uniqueID (matching go-redis format)
  302. Args:
  303. pool: Connection pool instance
  304. Returns:
  305. Short pool name in format "host:port_uniqueID"
  306. Example:
  307. >>> pool = ConnectionPool(host='localhost', port=6379, db=0)
  308. >>> get_pool_name(pool)
  309. 'localhost:6379_a1b2c3d4'
  310. """
  311. host = pool.connection_kwargs.get("host", "unknown")
  312. port = pool.connection_kwargs.get("port", 6379)
  313. # Get unique pool ID if available (added for observability)
  314. pool_id = getattr(pool, "_pool_id", "")
  315. if pool_id:
  316. return f"{host}:{port}_{pool_id}"
  317. else:
  318. return f"{host}:{port}"
  319. def get_db_name(database: Union["SyncDatabase", "AsyncDatabase"]):
  320. """
  321. Get a short string representation of a database for observability.
  322. Args:
  323. database: Database instance
  324. Returns:
  325. Short database name in format "{host}:{port}/{weight}"
  326. """
  327. host = database.client.get_connection_kwargs()["host"]
  328. port = database.client.get_connection_kwargs()["port"]
  329. weight = database.weight
  330. return f"{host}:{port}/{weight}"