""" Metrics middleware for aiogram 3.x. Automatically collects metrics for message processing, command execution, and errors. """ from typing import Any, Awaitable, Callable, Dict, Union, Optional from aiogram import BaseMiddleware from aiogram.types import TelegramObject, Message, CallbackQuery from aiogram.enums import ChatType import time import logging from ..utils.metrics import metrics # Import button command mapping try: from ..handlers.private.constants import BUTTON_COMMAND_MAPPING from ..handlers.callback.constants import CALLBACK_COMMAND_MAPPING from ..handlers.admin.constants import ADMIN_BUTTON_COMMAND_MAPPING, ADMIN_COMMANDS from ..handlers.voice.constants import ( BUTTON_COMMAND_MAPPING as VOICE_BUTTON_COMMAND_MAPPING, COMMAND_MAPPING as VOICE_COMMAND_MAPPING, CALLBACK_COMMAND_MAPPING as VOICE_CALLBACK_COMMAND_MAPPING ) except ImportError: # Fallback if constants not available BUTTON_COMMAND_MAPPING = {} CALLBACK_COMMAND_MAPPING = {} ADMIN_BUTTON_COMMAND_MAPPING = {} ADMIN_COMMANDS = {} VOICE_BUTTON_COMMAND_MAPPING = {} VOICE_COMMAND_MAPPING = {} VOICE_CALLBACK_COMMAND_MAPPING = {} class MetricsMiddleware(BaseMiddleware): """Middleware for automatic metrics collection in aiogram handlers.""" def __init__(self): super().__init__() self.logger = logging.getLogger(__name__) async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: """Process event and collect metrics.""" # Extract command info before execution command_info = None if isinstance(event, Message): self.logger.debug(f"📊 Processing Message event") await self._record_message_metrics(event) command_info = self._extract_command_info(event) elif isinstance(event, CallbackQuery): self.logger.debug(f"📊 Processing CallbackQuery event") await self._record_callback_metrics(event) command_info = self._extract_callback_command_info(event) else: self.logger.debug(f"📊 Processing unknown event type: {type(event).__name__}") # Execute handler with timing start_time = time.time() try: result = await handler(event, data) duration = time.time() - start_time # Record successful execution handler_name = self._get_handler_name(handler) self.logger.debug(f"📊 Recording successful execution: {handler_name}") metrics.record_method_duration( handler_name, duration, "handler", "success" ) # Record command with success status if applicable if command_info: metrics.record_command( command_info['command'], command_info['handler_type'], command_info['user_type'], "success" ) return result except Exception as e: duration = time.time() - start_time # Record error and timing handler_name = self._get_handler_name(handler) self.logger.debug(f"📊 Recording error execution: {handler_name}, error: {type(e).__name__}") metrics.record_method_duration( handler_name, duration, "handler", "error" ) metrics.record_error( type(e).__name__, "handler", handler_name ) # Record command with error status if applicable if command_info: metrics.record_command( command_info['command'], command_info['handler_type'], command_info['user_type'], "error" ) raise def _get_handler_name(self, handler: Callable) -> str: """Extract handler name efficiently.""" # Проверяем различные способы получения имени хендлера if hasattr(handler, '__name__') and handler.__name__ != '': return handler.__name__ elif hasattr(handler, '__qualname__') and handler.__qualname__ != '': return handler.__qualname__ elif hasattr(handler, 'callback') and hasattr(handler.callback, '__name__'): return handler.callback.__name__ elif hasattr(handler, 'view') and hasattr(handler.view, '__name__'): return handler.view.__name__ else: # Пытаемся получить имя из строкового представления handler_str = str(handler) if 'function' in handler_str: # Извлекаем имя функции из строки import re match = re.search(r'function\s+(\w+)', handler_str) if match: return match.group(1) return "unknown" async def _record_message_metrics(self, message: Message): """Record message metrics efficiently.""" # Determine message type message_type = "text" if message.photo: message_type = "photo" elif message.video: message_type = "video" elif message.audio: message_type = "audio" elif message.document: message_type = "document" elif message.voice: message_type = "voice" elif message.sticker: message_type = "sticker" elif message.animation: message_type = "animation" # Determine chat type chat_type = "private" if message.chat.type == ChatType.GROUP: chat_type = "group" elif message.chat.type == ChatType.SUPERGROUP: chat_type = "supergroup" elif message.chat.type == ChatType.CHANNEL: chat_type = "channel" # Record message processing metrics.record_message(message_type, chat_type, "message_handler") async def _record_callback_metrics(self, callback: CallbackQuery): """Record callback metrics efficiently.""" metrics.record_message("callback_query", "callback", "callback_handler") def _extract_command_info(self, message: Message) -> Optional[Dict[str, str]]: """Extract command information from message (commands or button clicks).""" if not message.text: return None # Check if it's a slash command if message.text.startswith('/'): command_name = message.text.split()[0][1:] # Remove '/' and get command name # Check if it's an admin command if command_name in ADMIN_COMMANDS: return { 'command': ADMIN_COMMANDS[command_name], 'user_type': "admin" if message.from_user else "unknown", 'handler_type': "admin_handler" } # Check if it's a voice bot command elif command_name in VOICE_COMMAND_MAPPING: return { 'command': VOICE_COMMAND_MAPPING[command_name], 'user_type': "user" if message.from_user else "unknown", 'handler_type': "voice_command_handler" } else: return { 'command': command_name, 'user_type': "user" if message.from_user else "unknown", 'handler_type': "message_handler" } # Check if it's an admin button click if message.text in ADMIN_BUTTON_COMMAND_MAPPING: return { 'command': ADMIN_BUTTON_COMMAND_MAPPING[message.text], 'user_type': "admin" if message.from_user else "unknown", 'handler_type': "admin_button_handler" } # Check if it's a regular button click (text button) if message.text in BUTTON_COMMAND_MAPPING: return { 'command': BUTTON_COMMAND_MAPPING[message.text], 'user_type': "user" if message.from_user else "unknown", 'handler_type': "button_handler" } # Check if it's a voice bot button click if message.text in VOICE_BUTTON_COMMAND_MAPPING: return { 'command': VOICE_BUTTON_COMMAND_MAPPING[message.text], 'user_type': "user" if message.from_user else "unknown", 'handler_type': "voice_button_handler" } return None def _extract_callback_command_info(self, callback: CallbackQuery) -> Optional[Dict[str, str]]: """Extract command information from callback query.""" if not callback.data: return None # Extract command from callback data parts = callback.data.split(':', 1) if parts and parts[0] in CALLBACK_COMMAND_MAPPING: return { 'command': CALLBACK_COMMAND_MAPPING[parts[0]], 'user_type': "user" if callback.from_user else "unknown", 'handler_type': "callback_handler" } # Check if it's a voice bot callback if parts and parts[0] in VOICE_CALLBACK_COMMAND_MAPPING: return { 'command': VOICE_CALLBACK_COMMAND_MAPPING[parts[0]], 'user_type': "user" if callback.from_user else "unknown", 'handler_type': "voice_callback_handler" } return None class DatabaseMetricsMiddleware(BaseMiddleware): """Middleware for database operation metrics.""" async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: """Process event and collect database metrics.""" # Check if this handler involves database operations handler_name = handler.__name__ if hasattr(handler, '__name__') else "unknown" # You can add specific database operation detection logic here # For now, we'll just pass through and let individual decorators handle it return await handler(event, data) class ErrorMetricsMiddleware(BaseMiddleware): """Middleware for error tracking and metrics.""" async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any] ) -> Any: """Process event and collect error metrics.""" try: return await handler(event, data) except Exception as e: # Record error metrics handler_name = handler.__name__ if hasattr(handler, '__name__') else "unknown" metrics.record_error( type(e).__name__, "handler", handler_name ) raise