""" Metrics middleware for aiogram 3.x. Automatically collects metrics for message processing, command execution, and errors. """ from typing import Any, Awaitable, Callable, Dict from aiogram import BaseMiddleware from aiogram.types import TelegramObject, Message, CallbackQuery from aiogram.enums import ChatType import time import logging from ..utils.metrics import metrics class MetricsMiddleware(BaseMiddleware): """Middleware for automatic metrics collection in aiogram handlers.""" def __init__(self): super().__init__() self.logger = logging.getLogger(__name__) async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: """Process event and collect metrics.""" # Добавляем логирование для диагностики self.logger.info(f"📊 MetricsMiddleware called for event type: {type(event).__name__}") # Extract command info before execution command_info = None if isinstance(event, Message): self.logger.info(f"📊 Processing Message event") await self._record_message_metrics(event) if event.text and event.text.startswith('/'): command_info = { 'command': event.text.split()[0][1:], # Remove '/' and get command name 'user_type': "user" if event.from_user else "unknown", 'handler_type': "message_handler" } elif isinstance(event, CallbackQuery): self.logger.info(f"📊 Processing CallbackQuery event") await self._record_callback_metrics(event) if event.data: parts = event.data.split(':', 1) if parts: command_info = { 'command': parts[0], 'user_type': "user" if event.from_user else "unknown", 'handler_type': "callback_handler" } else: self.logger.info(f"📊 Processing unknown event type: {type(event).__name__}") # Execute handler with timing start_time = time.time() try: result = await handler(event, data) duration = time.time() - start_time # Record successful execution handler_name = self._get_handler_name(handler) self.logger.info(f"📊 Recording successful execution: {handler_name}") metrics.record_method_duration( handler_name, duration, "handler", "success" ) # Record command with success status if applicable if command_info: metrics.record_command( command_info['command'], command_info['handler_type'], command_info['user_type'], "success" ) return result except Exception as e: duration = time.time() - start_time # Record error and timing handler_name = self._get_handler_name(handler) self.logger.error(f"📊 Recording error execution: {handler_name}, error: {type(e).__name__}") metrics.record_method_duration( handler_name, duration, "handler", "error" ) metrics.record_error( type(e).__name__, "handler", handler_name ) # Record command with error status if applicable if command_info: metrics.record_command( command_info['command'], command_info['handler_type'], command_info['user_type'], "error" ) raise def _get_handler_name(self, handler: Callable) -> str: """Extract handler name efficiently.""" # Проверяем различные способы получения имени хендлера if hasattr(handler, '__name__') and handler.__name__ != '': return handler.__name__ elif hasattr(handler, '__qualname__') and handler.__qualname__ != '': return handler.__qualname__ elif hasattr(handler, 'callback') and hasattr(handler.callback, '__name__'): return handler.callback.__name__ elif hasattr(handler, 'view') and hasattr(handler.view, '__name__'): return handler.view.__name__ else: # Пытаемся получить имя из строкового представления handler_str = str(handler) if 'function' in handler_str: # Извлекаем имя функции из строки import re match = re.search(r'function\s+(\w+)', handler_str) if match: return match.group(1) return "unknown" async def _record_message_metrics(self, message: Message): """Record message metrics efficiently.""" # Determine message type message_type = "text" if message.photo: message_type = "photo" elif message.video: message_type = "video" elif message.audio: message_type = "audio" elif message.document: message_type = "document" elif message.voice: message_type = "voice" elif message.sticker: message_type = "sticker" elif message.animation: message_type = "animation" # Determine chat type chat_type = "private" if message.chat.type == ChatType.GROUP: chat_type = "group" elif message.chat.type == ChatType.SUPERGROUP: chat_type = "supergroup" elif message.chat.type == ChatType.CHANNEL: chat_type = "channel" # Record message processing metrics.record_message(message_type, chat_type, "message_handler") async def _record_callback_metrics(self, callback: CallbackQuery): """Record callback metrics efficiently.""" metrics.record_message("callback_query", "callback", "callback_handler") class DatabaseMetricsMiddleware(BaseMiddleware): """Middleware for database operation metrics.""" async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: """Process event and collect database metrics.""" # Check if this handler involves database operations handler_name = handler.__name__ if hasattr(handler, '__name__') else "unknown" # You can add specific database operation detection logic here # For now, we'll just pass through and let individual decorators handle it return await handler(event, data) class ErrorMetricsMiddleware(BaseMiddleware): """Middleware for error tracking and metrics.""" async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: """Process event and collect error metrics.""" try: return await handler(event, data) except Exception as e: # Record error metrics handler_name = handler.__name__ if hasattr(handler, '__name__') else "unknown" metrics.record_error( type(e).__name__, "handler", handler_name ) raise