""" Metrics module for Telegram bot monitoring with Prometheus. Provides predefined metrics for bot commands, errors, performance, and user activity. """ import asyncio import os import time from contextlib import asynccontextmanager from functools import wraps from typing import Any, Dict, Optional from prometheus_client import ( CONTENT_TYPE_LATEST, Counter, Gauge, Histogram, generate_latest, ) from prometheus_client.core import CollectorRegistry # Метрики rate limiter теперь создаются в основном классе class BotMetrics: """Central class for managing all bot metrics.""" def __init__(self): self.registry = CollectorRegistry() # Создаем метрики rate limiter в том же registry self._create_rate_limit_metrics() # Bot commands counter self.bot_commands_total = Counter( "bot_commands_total", "Total number of bot commands processed", ["command", "status", "handler_type", "user_type"], registry=self.registry, ) # Method execution time histogram self.method_duration_seconds = Histogram( "method_duration_seconds", "Time spent executing methods", ["method_name", "handler_type", "status"], # Оптимизированные buckets для Telegram API (обычно < 1 сек) buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0], registry=self.registry, ) # Errors counter self.errors_total = Counter( "errors_total", "Total number of errors", ["error_type", "handler_type", "method_name"], registry=self.registry, ) # Active users gauge self.active_users = Gauge( "active_users", "Number of currently active users", ["user_type"], registry=self.registry, ) # Total users gauge (отдельная метрика) self.total_users = Gauge( "total_users", "Total number of users in database", registry=self.registry ) # Database query metrics self.db_query_duration_seconds = Histogram( "db_query_duration_seconds", "Time spent executing database queries", ["query_type", "table_name", "operation"], # Оптимизированные buckets для SQLite/PostgreSQL buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5], registry=self.registry, ) # Database queries counter self.db_queries_total = Counter( "db_queries_total", "Total number of database queries executed", ["query_type", "table_name", "operation"], registry=self.registry, ) # Database errors counter self.db_errors_total = Counter( "db_errors_total", "Total number of database errors", ["error_type", "query_type", "table_name", "operation"], registry=self.registry, ) # Message processing metrics self.messages_processed_total = Counter( "messages_processed_total", "Total number of messages processed", ["message_type", "chat_type", "handler_type"], registry=self.registry, ) # Middleware execution metrics self.middleware_duration_seconds = Histogram( "middleware_duration_seconds", "Time spent in middleware execution", ["middleware_name", "status"], # Middleware должен быть быстрым buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25], registry=self.registry, ) # Rate limiting metrics self.rate_limit_hits_total = Counter( "rate_limit_hits_total", "Total number of rate limit hits", ["limit_type", "user_id", "action"], registry=self.registry, ) # User activity metrics self.user_activity_total = Counter( "user_activity_total", "Total user activity events", ["activity_type", "user_type", "chat_type"], registry=self.registry, ) # File download metrics self.file_downloads_total = Counter( "file_downloads_total", "Total number of file downloads", ["content_type", "status"], registry=self.registry, ) self.file_download_duration_seconds = Histogram( "file_download_duration_seconds", "Time spent downloading files", ["content_type"], buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0], registry=self.registry, ) self.file_download_size_bytes = Histogram( "file_download_size_bytes", "Size of downloaded files in bytes", ["content_type"], buckets=[1024, 10240, 102400, 1048576, 10485760, 104857600, 1073741824], registry=self.registry, ) # Media processing metrics self.media_processing_total = Counter( "media_processing_total", "Total number of media processing operations", ["content_type", "status"], registry=self.registry, ) self.media_processing_duration_seconds = Histogram( "media_processing_duration_seconds", "Time spent processing media", ["content_type"], buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0], registry=self.registry, ) def _create_rate_limit_metrics(self): """Создает метрики rate limiter в основном registry""" try: # Создаем метрики rate limiter в том же registry self.rate_limit_requests_total = Counter( "rate_limit_requests_total", "Total number of rate limited requests", ["chat_id", "status", "error_type"], registry=self.registry, ) self.rate_limit_errors_total = Counter( "rate_limit_errors_total", "Total number of rate limit errors", ["error_type", "chat_id"], registry=self.registry, ) self.rate_limit_wait_duration_seconds = Histogram( "rate_limit_wait_duration_seconds", "Time spent waiting due to rate limiting", ["chat_id"], buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0], registry=self.registry, ) self.rate_limit_active_chats = Gauge( "rate_limit_active_chats", "Number of active chats with rate limiting", registry=self.registry, ) self.rate_limit_success_rate = Gauge( "rate_limit_success_rate", "Success rate of rate limited requests", ["chat_id"], registry=self.registry, ) self.rate_limit_requests_per_minute = Gauge( "rate_limit_requests_per_minute", "Requests per minute", ["chat_id"], registry=self.registry, ) self.rate_limit_total_requests = Gauge( "rate_limit_total_requests", "Total number of requests", ["chat_id"], registry=self.registry, ) self.rate_limit_total_errors = Gauge( "rate_limit_total_errors", "Total number of errors", ["chat_id", "error_type"], registry=self.registry, ) self.rate_limit_avg_wait_time_seconds = Gauge( "rate_limit_avg_wait_time_seconds", "Average wait time in seconds", ["chat_id"], registry=self.registry, ) except Exception as e: # Логируем ошибку, но не прерываем инициализацию import logging logging.warning(f"Failed to create rate limit metrics: {e}") def record_command( self, command_type: str, handler_type: str = "unknown", user_type: str = "unknown", status: str = "success", ): """Record a bot command execution.""" self.bot_commands_total.labels( command=command_type, status=status, handler_type=handler_type, user_type=user_type, ).inc() def record_error( self, error_type: str, handler_type: str = "unknown", method_name: str = "unknown", ): """Record an error occurrence.""" self.errors_total.labels( error_type=error_type, handler_type=handler_type, method_name=method_name ).inc() def record_method_duration( self, method_name: str, duration: float, handler_type: str = "unknown", status: str = "success", ): """Record method execution duration.""" self.method_duration_seconds.labels( method_name=method_name, handler_type=handler_type, status=status ).observe(duration) def set_active_users(self, count: int, user_type: str = "daily"): """Set the number of active users for a specific type.""" self.active_users.labels(user_type=user_type).set(count) def set_total_users(self, count: int): """Set the total number of users in database.""" self.total_users.set(count) def record_db_query( self, query_type: str, duration: float, table_name: str = "unknown", operation: str = "unknown", ): """Record database query duration.""" self.db_query_duration_seconds.labels( query_type=query_type, table_name=table_name, operation=operation ).observe(duration) self.db_queries_total.labels( query_type=query_type, table_name=table_name, operation=operation ).inc() def record_message( self, message_type: str, chat_type: str = "unknown", handler_type: str = "unknown", ): """Record a processed message.""" self.messages_processed_total.labels( message_type=message_type, chat_type=chat_type, handler_type=handler_type ).inc() def record_middleware( self, middleware_name: str, duration: float, status: str = "success" ): """Record middleware execution duration.""" self.middleware_duration_seconds.labels( middleware_name=middleware_name, status=status ).observe(duration) def record_file_download(self, content_type: str, file_size: int, duration: float): """Record file download metrics.""" self.file_downloads_total.labels( content_type=content_type, status="success" ).inc() self.file_download_duration_seconds.labels(content_type=content_type).observe( duration ) self.file_download_size_bytes.labels(content_type=content_type).observe( file_size ) def record_file_download_error(self, content_type: str, error_message: str): """Record file download error metrics.""" self.file_downloads_total.labels( content_type=content_type, status="error" ).inc() self.errors_total.labels( error_type="file_download_error", handler_type="media_processing", method_name="download_file", ).inc() def record_media_processing( self, content_type: str, duration: float, success: bool ): """Record media processing metrics.""" status = "success" if success else "error" self.media_processing_total.labels( content_type=content_type, status=status ).inc() self.media_processing_duration_seconds.labels( content_type=content_type ).observe(duration) if not success: self.errors_total.labels( error_type="media_processing_error", handler_type="media_processing", method_name="add_in_db_media", ).inc() def record_db_error( self, error_type: str, query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown", ): """Record database error occurrence.""" self.db_errors_total.labels( error_type=error_type, query_type=query_type, table_name=table_name, operation=operation, ).inc() def record_rate_limit_request( self, chat_id: int, success: bool, wait_time: float = 0.0, error_type: str = None, ): """Record rate limit request metrics.""" try: # Определяем статус status = "success" if success else "error" # Записываем счетчик запросов self.rate_limit_requests_total.labels( chat_id=str(chat_id), status=status, error_type=error_type or "none" ).inc() # Записываем время ожидания if wait_time > 0: self.rate_limit_wait_duration_seconds.labels( chat_id=str(chat_id) ).observe(wait_time) # Записываем ошибки if not success and error_type: self.rate_limit_errors_total.labels( error_type=error_type, chat_id=str(chat_id) ).inc() except Exception as e: import logging logging.warning(f"Failed to record rate limit request: {e}") def update_rate_limit_gauges(self): """Update rate limit gauge metrics.""" try: from .rate_limit_monitor import rate_limit_monitor # Обновляем количество активных чатов self.rate_limit_active_chats.set(len(rate_limit_monitor.stats)) # Обновляем метрики для каждого чата for chat_id, chat_stats in rate_limit_monitor.stats.items(): chat_id_str = str(chat_id) # Процент успеха self.rate_limit_success_rate.labels(chat_id=chat_id_str).set( chat_stats.success_rate ) # Запросов в минуту self.rate_limit_requests_per_minute.labels(chat_id=chat_id_str).set( chat_stats.requests_per_minute ) # Общее количество запросов self.rate_limit_total_requests.labels(chat_id=chat_id_str).set( chat_stats.total_requests ) # Среднее время ожидания self.rate_limit_avg_wait_time_seconds.labels(chat_id=chat_id_str).set( chat_stats.average_wait_time ) # Количество ошибок по типам if chat_stats.retry_after_errors > 0: self.rate_limit_total_errors.labels( chat_id=chat_id_str, error_type="RetryAfter" ).set(chat_stats.retry_after_errors) if chat_stats.other_errors > 0: self.rate_limit_total_errors.labels( chat_id=chat_id_str, error_type="Other" ).set(chat_stats.other_errors) except Exception as e: import logging logging.warning(f"Failed to update rate limit gauges: {e}") def get_metrics(self) -> bytes: """Generate metrics in Prometheus format.""" # Обновляем gauge метрики rate limiter перед генерацией self.update_rate_limit_gauges() return generate_latest(self.registry) # Global metrics instance metrics = BotMetrics() # Decorators for easy metric collection def track_time(method_name: str = None, handler_type: str = "unknown"): """Decorator to track execution time of functions.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "success" ) return result except Exception as e: duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "error" ) metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "success" ) return result except Exception as e: duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "error" ) metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def track_errors(handler_type: str = "unknown", method_name: str = None): """Decorator to track errors in functions.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise @wraps(func) def sync_wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def db_query_time( query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown" ): """Decorator to track database query execution time.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) return result except Exception as e: duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) metrics.record_db_error( type(e).__name__, query_type, table_name, operation ) metrics.record_error(type(e).__name__, "database", func.__name__) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) return result except Exception as e: duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) metrics.record_db_error( type(e).__name__, query_type, table_name, operation ) metrics.record_error(type(e).__name__, "database", func.__name__) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator @asynccontextmanager async def track_middleware(middleware_name: str): """Context manager to track middleware execution time.""" start_time = time.time() try: yield duration = time.time() - start_time metrics.record_middleware(middleware_name, duration, "success") except Exception as e: duration = time.time() - start_time metrics.record_middleware(middleware_name, duration, "error") metrics.record_error(type(e).__name__, "middleware", middleware_name) raise def track_media_processing(content_type: str = "unknown"): """Decorator to track media processing operations.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time metrics.record_media_processing(content_type, duration, True) return result except Exception as e: duration = time.time() - start_time metrics.record_media_processing(content_type, duration, False) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time metrics.record_media_processing(content_type, duration, True) return result except Exception as e: duration = time.time() - start_time metrics.record_media_processing(content_type, duration, False) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def track_file_operations(content_type: str = "unknown"): """Decorator to track file download/upload operations.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time # Получаем размер файла из результата file_size = 0 if result and isinstance(result, str) and os.path.exists(result): file_size = os.path.getsize(result) # Записываем метрики metrics.record_file_download(content_type, file_size, duration) return result except Exception as e: duration = time.time() - start_time metrics.record_file_download_error(content_type, str(e)) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time # Получаем размер файла из результата file_size = 0 if result and isinstance(result, str) and os.path.exists(result): file_size = os.path.getsize(result) # Записываем метрики metrics.record_file_download(content_type, file_size, duration) return result except Exception as e: duration = time.time() - start_time metrics.record_file_download_error(content_type, str(e)) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator