""" Metrics module for Telegram bot monitoring with Prometheus. Provides predefined metrics for bot commands, errors, performance, and user activity. """ from typing import Dict, Any, Optional from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST from prometheus_client.core import CollectorRegistry import time import os from functools import wraps import asyncio from contextlib import asynccontextmanager # Метрики rate limiter теперь создаются в основном классе class BotMetrics: """Central class for managing all bot metrics.""" def __init__(self): self.registry = CollectorRegistry() # Создаем метрики rate limiter в том же registry self._create_rate_limit_metrics() # Bot commands counter self.bot_commands_total = Counter( 'bot_commands_total', 'Total number of bot commands processed', ['command', 'status', 'handler_type', 'user_type'], registry=self.registry ) # Method execution time histogram self.method_duration_seconds = Histogram( 'method_duration_seconds', 'Time spent executing methods', ['method_name', 'handler_type', 'status'], # Оптимизированные buckets для Telegram API (обычно < 1 сек) buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0], registry=self.registry ) # Errors counter self.errors_total = Counter( 'errors_total', 'Total number of errors', ['error_type', 'handler_type', 'method_name'], registry=self.registry ) # Active users gauge self.active_users = Gauge( 'active_users', 'Number of currently active users', ['user_type'], registry=self.registry ) # Total users gauge (отдельная метрика) self.total_users = Gauge( 'total_users', 'Total number of users in database', registry=self.registry ) # Database query metrics self.db_query_duration_seconds = Histogram( 'db_query_duration_seconds', 'Time spent executing database queries', ['query_type', 'table_name', 'operation'], # Оптимизированные buckets для SQLite/PostgreSQL buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5], registry=self.registry ) # Database queries counter self.db_queries_total = Counter( 'db_queries_total', 'Total number of database queries executed', ['query_type', 'table_name', 'operation'], registry=self.registry ) # Database errors counter self.db_errors_total = Counter( 'db_errors_total', 'Total number of database errors', ['error_type', 'query_type', 'table_name', 'operation'], registry=self.registry ) # Message processing metrics self.messages_processed_total = Counter( 'messages_processed_total', 'Total number of messages processed', ['message_type', 'chat_type', 'handler_type'], registry=self.registry ) # Middleware execution metrics self.middleware_duration_seconds = Histogram( 'middleware_duration_seconds', 'Time spent in middleware execution', ['middleware_name', 'status'], # Middleware должен быть быстрым buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25], registry=self.registry ) # Rate limiting metrics self.rate_limit_hits_total = Counter( 'rate_limit_hits_total', 'Total number of rate limit hits', ['limit_type', 'user_id', 'action'], registry=self.registry ) # User activity metrics self.user_activity_total = Counter( 'user_activity_total', 'Total user activity events', ['activity_type', 'user_type', 'chat_type'], registry=self.registry ) # File download metrics self.file_downloads_total = Counter( 'file_downloads_total', 'Total number of file downloads', ['content_type', 'status'], registry=self.registry ) self.file_download_duration_seconds = Histogram( 'file_download_duration_seconds', 'Time spent downloading files', ['content_type'], buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0], registry=self.registry ) self.file_download_size_bytes = Histogram( 'file_download_size_bytes', 'Size of downloaded files in bytes', ['content_type'], buckets=[1024, 10240, 102400, 1048576, 10485760, 104857600, 1073741824], registry=self.registry ) # Media processing metrics self.media_processing_total = Counter( 'media_processing_total', 'Total number of media processing operations', ['content_type', 'status'], registry=self.registry ) self.media_processing_duration_seconds = Histogram( 'media_processing_duration_seconds', 'Time spent processing media', ['content_type'], buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0], registry=self.registry ) def _create_rate_limit_metrics(self): """Создает метрики rate limiter в основном registry""" try: # Создаем метрики rate limiter в том же registry self.rate_limit_requests_total = Counter( 'rate_limit_requests_total', 'Total number of rate limited requests', ['chat_id', 'status', 'error_type'], registry=self.registry ) self.rate_limit_errors_total = Counter( 'rate_limit_errors_total', 'Total number of rate limit errors', ['error_type', 'chat_id'], registry=self.registry ) self.rate_limit_wait_duration_seconds = Histogram( 'rate_limit_wait_duration_seconds', 'Time spent waiting due to rate limiting', ['chat_id'], buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0], registry=self.registry ) self.rate_limit_active_chats = Gauge( 'rate_limit_active_chats', 'Number of active chats with rate limiting', registry=self.registry ) self.rate_limit_success_rate = Gauge( 'rate_limit_success_rate', 'Success rate of rate limited requests', ['chat_id'], registry=self.registry ) self.rate_limit_requests_per_minute = Gauge( 'rate_limit_requests_per_minute', 'Requests per minute', ['chat_id'], registry=self.registry ) self.rate_limit_total_requests = Gauge( 'rate_limit_total_requests', 'Total number of requests', ['chat_id'], registry=self.registry ) self.rate_limit_total_errors = Gauge( 'rate_limit_total_errors', 'Total number of errors', ['chat_id', 'error_type'], registry=self.registry ) self.rate_limit_avg_wait_time_seconds = Gauge( 'rate_limit_avg_wait_time_seconds', 'Average wait time in seconds', ['chat_id'], registry=self.registry ) except Exception as e: # Логируем ошибку, но не прерываем инициализацию import logging logging.warning(f"Failed to create rate limit metrics: {e}") def record_command(self, command_type: str, handler_type: str = "unknown", user_type: str = "unknown", status: str = "success"): """Record a bot command execution.""" self.bot_commands_total.labels( command=command_type, status=status, handler_type=handler_type, user_type=user_type ).inc() def record_error(self, error_type: str, handler_type: str = "unknown", method_name: str = "unknown"): """Record an error occurrence.""" self.errors_total.labels( error_type=error_type, handler_type=handler_type, method_name=method_name ).inc() def record_method_duration(self, method_name: str, duration: float, handler_type: str = "unknown", status: str = "success"): """Record method execution duration.""" self.method_duration_seconds.labels( method_name=method_name, handler_type=handler_type, status=status ).observe(duration) def set_active_users(self, count: int, user_type: str = "daily"): """Set the number of active users for a specific type.""" self.active_users.labels(user_type=user_type).set(count) def set_total_users(self, count: int): """Set the total number of users in database.""" self.total_users.set(count) def record_db_query(self, query_type: str, duration: float, table_name: str = "unknown", operation: str = "unknown"): """Record database query duration.""" self.db_query_duration_seconds.labels( query_type=query_type, table_name=table_name, operation=operation ).observe(duration) self.db_queries_total.labels( query_type=query_type, table_name=table_name, operation=operation ).inc() def record_message(self, message_type: str, chat_type: str = "unknown", handler_type: str = "unknown"): """Record a processed message.""" self.messages_processed_total.labels( message_type=message_type, chat_type=chat_type, handler_type=handler_type ).inc() def record_middleware(self, middleware_name: str, duration: float, status: str = "success"): """Record middleware execution duration.""" self.middleware_duration_seconds.labels( middleware_name=middleware_name, status=status ).observe(duration) def record_file_download(self, content_type: str, file_size: int, duration: float): """Record file download metrics.""" self.file_downloads_total.labels( content_type=content_type, status="success" ).inc() self.file_download_duration_seconds.labels( content_type=content_type ).observe(duration) self.file_download_size_bytes.labels( content_type=content_type ).observe(file_size) def record_file_download_error(self, content_type: str, error_message: str): """Record file download error metrics.""" self.file_downloads_total.labels( content_type=content_type, status="error" ).inc() self.errors_total.labels( error_type="file_download_error", handler_type="media_processing", method_name="download_file" ).inc() def record_media_processing(self, content_type: str, duration: float, success: bool): """Record media processing metrics.""" status = "success" if success else "error" self.media_processing_total.labels( content_type=content_type, status=status ).inc() self.media_processing_duration_seconds.labels( content_type=content_type ).observe(duration) if not success: self.errors_total.labels( error_type="media_processing_error", handler_type="media_processing", method_name="add_in_db_media" ).inc() def record_db_error(self, error_type: str, query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"): """Record database error occurrence.""" self.db_errors_total.labels( error_type=error_type, query_type=query_type, table_name=table_name, operation=operation ).inc() def record_rate_limit_request(self, chat_id: int, success: bool, wait_time: float = 0.0, error_type: str = None): """Record rate limit request metrics.""" try: # Определяем статус status = "success" if success else "error" # Записываем счетчик запросов self.rate_limit_requests_total.labels( chat_id=str(chat_id), status=status, error_type=error_type or "none" ).inc() # Записываем время ожидания if wait_time > 0: self.rate_limit_wait_duration_seconds.labels( chat_id=str(chat_id) ).observe(wait_time) # Записываем ошибки if not success and error_type: self.rate_limit_errors_total.labels( error_type=error_type, chat_id=str(chat_id) ).inc() except Exception as e: import logging logging.warning(f"Failed to record rate limit request: {e}") def update_rate_limit_gauges(self): """Update rate limit gauge metrics.""" try: from .rate_limit_monitor import rate_limit_monitor # Обновляем количество активных чатов self.rate_limit_active_chats.set(len(rate_limit_monitor.stats)) # Обновляем метрики для каждого чата for chat_id, chat_stats in rate_limit_monitor.stats.items(): chat_id_str = str(chat_id) # Процент успеха self.rate_limit_success_rate.labels( chat_id=chat_id_str ).set(chat_stats.success_rate) # Запросов в минуту self.rate_limit_requests_per_minute.labels( chat_id=chat_id_str ).set(chat_stats.requests_per_minute) # Общее количество запросов self.rate_limit_total_requests.labels( chat_id=chat_id_str ).set(chat_stats.total_requests) # Среднее время ожидания self.rate_limit_avg_wait_time_seconds.labels( chat_id=chat_id_str ).set(chat_stats.average_wait_time) # Количество ошибок по типам if chat_stats.retry_after_errors > 0: self.rate_limit_total_errors.labels( chat_id=chat_id_str, error_type="RetryAfter" ).set(chat_stats.retry_after_errors) if chat_stats.other_errors > 0: self.rate_limit_total_errors.labels( chat_id=chat_id_str, error_type="Other" ).set(chat_stats.other_errors) except Exception as e: import logging logging.warning(f"Failed to update rate limit gauges: {e}") def get_metrics(self) -> bytes: """Generate metrics in Prometheus format.""" # Обновляем gauge метрики rate limiter перед генерацией self.update_rate_limit_gauges() return generate_latest(self.registry) # Global metrics instance metrics = BotMetrics() # Decorators for easy metric collection def track_time(method_name: str = None, handler_type: str = "unknown"): """Decorator to track execution time of functions.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "success" ) return result except Exception as e: duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "error" ) metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "success" ) return result except Exception as e: duration = time.time() - start_time metrics.record_method_duration( method_name or func.__name__, duration, handler_type, "error" ) metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def track_errors(handler_type: str = "unknown", method_name: str = None): """Decorator to track errors in functions.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): try: return await func(*args, **kwargs) except Exception as e: metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise @wraps(func) def sync_wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: metrics.record_error( type(e).__name__, handler_type, method_name or func.__name__ ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def db_query_time(query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"): """Decorator to track database query execution time.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) return result except Exception as e: duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) metrics.record_db_error( type(e).__name__, query_type, table_name, operation ) metrics.record_error( type(e).__name__, "database", func.__name__ ) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) return result except Exception as e: duration = time.time() - start_time metrics.record_db_query(query_type, duration, table_name, operation) metrics.record_db_error( type(e).__name__, query_type, table_name, operation ) metrics.record_error( type(e).__name__, "database", func.__name__ ) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator @asynccontextmanager async def track_middleware(middleware_name: str): """Context manager to track middleware execution time.""" start_time = time.time() try: yield duration = time.time() - start_time metrics.record_middleware(middleware_name, duration, "success") except Exception as e: duration = time.time() - start_time metrics.record_middleware(middleware_name, duration, "error") metrics.record_error( type(e).__name__, "middleware", middleware_name ) raise def track_media_processing(content_type: str = "unknown"): """Decorator to track media processing operations.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time metrics.record_media_processing(content_type, duration, True) return result except Exception as e: duration = time.time() - start_time metrics.record_media_processing(content_type, duration, False) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time metrics.record_media_processing(content_type, duration, True) return result except Exception as e: duration = time.time() - start_time metrics.record_media_processing(content_type, duration, False) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator def track_file_operations(content_type: str = "unknown"): """Decorator to track file download/upload operations.""" def decorator(func): @wraps(func) async def async_wrapper(*args, **kwargs): start_time = time.time() try: result = await func(*args, **kwargs) duration = time.time() - start_time # Получаем размер файла из результата file_size = 0 if result and isinstance(result, str) and os.path.exists(result): file_size = os.path.getsize(result) # Записываем метрики metrics.record_file_download(content_type, file_size, duration) return result except Exception as e: duration = time.time() - start_time metrics.record_file_download_error(content_type, str(e)) raise @wraps(func) def sync_wrapper(*args, **kwargs): start_time = time.time() try: result = func(*args, **kwargs) duration = time.time() - start_time # Получаем размер файла из результата file_size = 0 if result and isinstance(result, str) and os.path.exists(result): file_size = os.path.getsize(result) # Записываем метрики metrics.record_file_download(content_type, file_size, duration) return result except Exception as e: duration = time.time() - start_time metrics.record_file_download_error(content_type, str(e)) raise if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator