""" Metrics module for Telegram bot monitoring with Prometheus. Provides predefined metrics for bot commands, errors, performance, and user activity. """ from typing import Dict, Any, Optional from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST from prometheus_client.core import CollectorRegistry import time from functools import wraps import asyncio from contextlib import asynccontextmanager class BotMetrics: """Central class for managing all bot metrics.""" def __init__(self): self.registry = CollectorRegistry() # Bot commands counter self.bot_commands_total = Counter( 'bot_commands_total', 'Total number of bot commands processed', ['command_type', 'handler_type', 'user_type'], registry=self.registry ) # Method execution time histogram self.method_duration_seconds = Histogram( 'method_duration_seconds', 'Time spent executing methods', ['method_name', 'handler_type', 'status'], # Оптимизированные buckets для Telegram API (обычно < 1 сек) buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0], registry=self.registry ) # Errors counter self.errors_total = Counter( 'errors_total', 'Total number of errors', ['error_type', 'handler_type', 'method_name'], registry=self.registry ) # Active users gauge self.active_users = Gauge( 'active_users', 'Number of currently active users', ['user_type'], registry=self.registry ) # Database query metrics self.db_query_duration_seconds = Histogram( 'db_query_duration_seconds', 'Time spent executing database queries', ['query_type', 'table_name', 'operation'], # Оптимизированные buckets для SQLite/PostgreSQL buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5], registry=self.registry ) # Message processing metrics self.messages_processed_total = Counter( 'messages_processed_total', 'Total number of messages processed', ['message_type', 'chat_type', 'handler_type'], registry=self.registry ) # Middleware execution metrics self.middleware_duration_seconds = Histogram( 'middleware_duration_seconds', 'Time spent in middleware execution', ['middleware_name', 'status'], # Middleware должен быть быстрым buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25], registry=self.registry ) # Rate limiting metrics self.rate_limit_hits_total = Counter( 'rate_limit_hits_total', 'Total number of rate limit hits', ['limit_type', 'handler_type'], registry=self.registry ) def record_command(self, command_type: str, handler_type: str = "unknown", user_type: str = "unknown"): """Record a bot command execution.""" self.bot_commands_total.labels( command_type=command_type, handler_type=handler_type, user_type=user_type ).inc() def record_error(self, error_type: str, handler_type: str = "unknown", method_name: str = "unknown"): """Record an error occurrence.""" self.errors_total.labels( error_type=error_type, handler_type=handler_type, method_name=method_name ).inc() def record_method_duration(self, method_name: str, duration: float, handler_type: str = "unknown", status: str = "success"): """Record method execution duration.""" self.method_duration_seconds.labels( method_name=method_name, handler_type=handler_type, status=status ).observe(duration) def set_active_users(self, count: int, user_type: str = "total"): """Set the number of active users.""" self.active_users.labels(user_type=user_type).set(count) def record_db_query(self, query_type: str, duration: float, table_name: str = "unknown", operation: str = "unknown"): """Record database query duration.""" self.db_query_duration_seconds.labels( query_type=query_type, table_name=table_name, operation=operation ).observe(duration) def record_message(self, message_type: str, chat_type: str = "unknown", handler_type: str = "unknown"): """Record a processed message.""" self.messages_processed_total.labels( message_type=message_type, chat_type=chat_type, handler_type=handler_type ).inc() def record_middleware(self, middleware_name: str, duration: float, status: str = "success"): """Record middleware execution duration.""" self.middleware_duration_seconds.labels( middleware_name=middleware_name, status=status ).observe(duration) def get_metrics(self) -> bytes: """Generate metrics in Prometheus format.""" return generate_latest(self.registry) # Global metrics instance metrics = BotMetrics() # Decorators for easy metric collection def track_time(method_name: str = None, handler_type: str = "unknown"): """Decorator to track execution time of functions.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "success" ) return result except Exception as e: duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "error" ) metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "success" ) return result except Exception as e: duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "error" ) metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def track_errors(handler_type: str = "unknown", method_name: str = None): """Decorator to track errors in functions.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise @wraps(func) def sync_wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def db_query_time(query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"): """Decorator to track database query execution time.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) return result except Exception as e: duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) metrics.record_error( type(e).__name__, "database", func.__name__ ) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) return result except Exception as e: duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) metrics.record_error( type(e).__name__, "database", func.__name__ ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator @asynccontextmanager async def track_middleware(middleware_name: str): """Context manager to track middleware execution time.""" start_time = time.time() try: yield duration = time.time() - start_time metrics.record_middleware(middleware_name, duration, "success") except Exception as e: duration = time.time() - start_time metrics.record_middleware(middleware_name, duration, "error") metrics.record_error( type(e).__name__, "middleware", middleware_name ) raise