506 lines
19 KiB
Python
506 lines
19 KiB
Python
"""
|
||
Enhanced Metrics middleware for aiogram 3.x.
|
||
Automatically collects ALL available metrics for comprehensive monitoring.
|
||
"""
|
||
|
||
import asyncio
|
||
import logging
|
||
import time
|
||
from typing import Any, Awaitable, Callable, Dict, Optional, Union
|
||
|
||
from aiogram import BaseMiddleware
|
||
from aiogram.enums import ChatType
|
||
from aiogram.types import CallbackQuery, Message, TelegramObject
|
||
|
||
from ..utils.metrics import metrics
|
||
|
||
# Import button command mapping
|
||
try:
|
||
from ..handlers.admin.constants import (ADMIN_BUTTON_COMMAND_MAPPING,
|
||
ADMIN_COMMANDS)
|
||
from ..handlers.callback.constants import CALLBACK_COMMAND_MAPPING
|
||
from ..handlers.private.constants import BUTTON_COMMAND_MAPPING
|
||
from ..handlers.voice.constants import \
|
||
BUTTON_COMMAND_MAPPING as VOICE_BUTTON_COMMAND_MAPPING
|
||
from ..handlers.voice.constants import \
|
||
CALLBACK_COMMAND_MAPPING as VOICE_CALLBACK_COMMAND_MAPPING
|
||
from ..handlers.voice.constants import \
|
||
COMMAND_MAPPING as VOICE_COMMAND_MAPPING
|
||
except ImportError:
|
||
# Fallback if constants not available
|
||
BUTTON_COMMAND_MAPPING = {}
|
||
CALLBACK_COMMAND_MAPPING = {}
|
||
ADMIN_BUTTON_COMMAND_MAPPING = {}
|
||
ADMIN_COMMANDS = {}
|
||
VOICE_BUTTON_COMMAND_MAPPING = {}
|
||
VOICE_COMMAND_MAPPING = {}
|
||
VOICE_CALLBACK_COMMAND_MAPPING = {}
|
||
|
||
|
||
class MetricsMiddleware(BaseMiddleware):
|
||
"""Enhanced middleware for automatic collection of ALL available metrics."""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
# Metrics update intervals
|
||
self.last_active_users_update = 0
|
||
self.active_users_update_interval = 300 # 5 minutes
|
||
|
||
async def __call__(
|
||
self,
|
||
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
|
||
event: TelegramObject,
|
||
data: Dict[str, Any],
|
||
) -> Any:
|
||
"""Process event and collect comprehensive metrics."""
|
||
|
||
# Update active users periodically
|
||
current_time = time.time()
|
||
if (
|
||
current_time - self.last_active_users_update
|
||
> self.active_users_update_interval
|
||
):
|
||
await self._update_active_users_metric()
|
||
self.last_active_users_update = current_time
|
||
|
||
# Extract command and event info
|
||
command_info = None
|
||
event_metrics = {}
|
||
|
||
# Process event based on type
|
||
if hasattr(event, "message") and event.message:
|
||
event_metrics = await self._record_comprehensive_message_metrics(
|
||
event.message
|
||
)
|
||
command_info = self._extract_command_info_with_fallback(event.message)
|
||
elif hasattr(event, "callback_query") and event.callback_query:
|
||
event_metrics = await self._record_comprehensive_callback_metrics(
|
||
event.callback_query
|
||
)
|
||
command_info = self._extract_callback_command_info_with_fallback(
|
||
event.callback_query
|
||
)
|
||
elif isinstance(event, Message):
|
||
event_metrics = await self._record_comprehensive_message_metrics(event)
|
||
command_info = self._extract_command_info_with_fallback(event)
|
||
elif isinstance(event, CallbackQuery):
|
||
event_metrics = await self._record_comprehensive_callback_metrics(event)
|
||
command_info = self._extract_callback_command_info_with_fallback(event)
|
||
else:
|
||
event_metrics = await self._record_unknown_event_metrics(event)
|
||
|
||
if command_info:
|
||
self.logger.info(f"📊 Command info extracted: {command_info}")
|
||
else:
|
||
self.logger.warning(
|
||
f"📊 No command info extracted for event type: {type(event).__name__}"
|
||
)
|
||
|
||
# Execute handler with comprehensive timing and metrics
|
||
start_time = time.time()
|
||
try:
|
||
result = await handler(event, data)
|
||
duration = time.time() - start_time
|
||
|
||
# Record successful execution metrics
|
||
handler_name = self._get_handler_name(handler)
|
||
|
||
metrics.record_method_duration(handler_name, duration, "handler", "success")
|
||
|
||
if command_info:
|
||
metrics.record_command(
|
||
command_info["command"],
|
||
command_info["handler_type"],
|
||
command_info["user_type"],
|
||
"success",
|
||
)
|
||
|
||
await self._record_additional_success_metrics(
|
||
event, event_metrics, handler_name
|
||
)
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
duration = time.time() - start_time
|
||
|
||
# Record error metrics
|
||
handler_name = self._get_handler_name(handler)
|
||
error_type = type(e).__name__
|
||
|
||
metrics.record_method_duration(handler_name, duration, "handler", "error")
|
||
|
||
metrics.record_error(error_type, "handler", handler_name)
|
||
|
||
if command_info:
|
||
metrics.record_command(
|
||
command_info["command"],
|
||
command_info["handler_type"],
|
||
command_info["user_type"],
|
||
"error",
|
||
)
|
||
|
||
await self._record_additional_error_metrics(
|
||
event, event_metrics, handler_name, error_type
|
||
)
|
||
|
||
raise
|
||
finally:
|
||
# Record middleware execution time
|
||
middleware_duration = time.time() - start_time
|
||
metrics.record_middleware(
|
||
"MetricsMiddleware", middleware_duration, "success"
|
||
)
|
||
|
||
async def _update_active_users_metric(self):
|
||
"""Periodically update active users metric from database."""
|
||
try:
|
||
# TODO: Должна подключаться к базе данных, а не к глобальному экземпляру
|
||
from ..utils.base_dependency_factory import get_global_instance
|
||
|
||
bdf = get_global_instance()
|
||
bot_db = bdf.get_db()
|
||
|
||
# Используем правильные методы AsyncBotDB для выполнения запросов
|
||
# Простой подсчет всех пользователей в базе
|
||
total_users_query = "SELECT COUNT(DISTINCT user_id) as total FROM our_users"
|
||
total_users_result = await bot_db.fetch_one(total_users_query)
|
||
total_users = total_users_result["total"] if total_users_result else 1
|
||
|
||
# Подсчет активных за день пользователей (date_changed - это Unix timestamp)
|
||
daily_users_query = "SELECT COUNT(DISTINCT user_id) as daily FROM our_users WHERE date_changed > (strftime('%s', 'now', '-1 day'))"
|
||
daily_users_result = await bot_db.fetch_one(daily_users_query)
|
||
daily_users = daily_users_result["daily"] if daily_users_result else 1
|
||
|
||
# Устанавливаем метрики с правильными лейблами
|
||
metrics.set_active_users(daily_users, "daily")
|
||
metrics.set_total_users(total_users)
|
||
self.logger.info(
|
||
f"📊 Active users metric updated: {daily_users} (daily), {total_users} (total)"
|
||
)
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"❌ Failed to update users metric: {e}")
|
||
# Устанавливаем 1 как fallback
|
||
metrics.set_active_users(1, "daily")
|
||
metrics.set_total_users(1)
|
||
|
||
async def _record_comprehensive_message_metrics(
|
||
self, message: Message
|
||
) -> Dict[str, Any]:
|
||
"""Record comprehensive message metrics."""
|
||
# Determine message type
|
||
message_type = "text"
|
||
if message.photo:
|
||
message_type = "photo"
|
||
elif message.video:
|
||
message_type = "video"
|
||
elif message.audio:
|
||
message_type = "audio"
|
||
elif message.document:
|
||
message_type = "document"
|
||
elif message.voice:
|
||
message_type = "voice"
|
||
elif message.sticker:
|
||
message_type = "sticker"
|
||
elif message.animation:
|
||
message_type = "animation"
|
||
|
||
# Determine chat type
|
||
chat_type = "private"
|
||
if message.chat.type == ChatType.GROUP:
|
||
chat_type = "group"
|
||
elif message.chat.type == ChatType.SUPERGROUP:
|
||
chat_type = "supergroup"
|
||
elif message.chat.type == ChatType.CHANNEL:
|
||
chat_type = "channel"
|
||
|
||
# Record message processing
|
||
metrics.record_message(message_type, chat_type, "message_handler")
|
||
|
||
return {
|
||
"message_type": message_type,
|
||
"chat_type": chat_type,
|
||
"user_id": message.from_user.id if message.from_user else None,
|
||
"is_bot": message.from_user.is_bot if message.from_user else False,
|
||
}
|
||
|
||
async def _record_comprehensive_callback_metrics(
|
||
self, callback: CallbackQuery
|
||
) -> Dict[str, Any]:
|
||
"""Record comprehensive callback metrics."""
|
||
# Record callback message
|
||
metrics.record_message("callback_query", "callback", "callback_handler")
|
||
|
||
return {
|
||
"callback_data": callback.data,
|
||
"user_id": callback.from_user.id if callback.from_user else None,
|
||
"is_bot": callback.from_user.is_bot if callback.from_user else False,
|
||
}
|
||
|
||
async def _record_unknown_event_metrics(
|
||
self, event: TelegramObject
|
||
) -> Dict[str, Any]:
|
||
"""Record metrics for unknown event types."""
|
||
# Record unknown event
|
||
metrics.record_message("unknown", "unknown", "unknown_handler")
|
||
|
||
return {
|
||
"event_type": type(event).__name__,
|
||
"event_data": str(event)[:100] if hasattr(event, "__str__") else "unknown",
|
||
}
|
||
|
||
def _extract_command_info_with_fallback(
|
||
self, message: Message
|
||
) -> Optional[Dict[str, str]]:
|
||
"""Extract command information with fallback for unknown commands."""
|
||
if not message.text:
|
||
return None
|
||
|
||
# Check if it's a slash command
|
||
if message.text.startswith("/"):
|
||
command_name = message.text.split()[0][
|
||
1:
|
||
] # Remove '/' and get command name
|
||
|
||
# Check if it's an admin command
|
||
if command_name in ADMIN_COMMANDS:
|
||
return {
|
||
"command": ADMIN_COMMANDS[command_name],
|
||
"user_type": "admin" if message.from_user else "unknown",
|
||
"handler_type": "admin_handler",
|
||
}
|
||
# Check if it's a voice bot command
|
||
elif command_name in VOICE_COMMAND_MAPPING:
|
||
return {
|
||
"command": VOICE_COMMAND_MAPPING[command_name],
|
||
"user_type": "user" if message.from_user else "unknown",
|
||
"handler_type": "voice_command_handler",
|
||
}
|
||
else:
|
||
# FALLBACK: Record unknown command
|
||
return {
|
||
"command": command_name,
|
||
"user_type": "user" if message.from_user else "unknown",
|
||
"handler_type": "unknown_command_handler",
|
||
}
|
||
|
||
# Check if it's an admin button click
|
||
if message.text in ADMIN_BUTTON_COMMAND_MAPPING:
|
||
return {
|
||
"command": ADMIN_BUTTON_COMMAND_MAPPING[message.text],
|
||
"user_type": "admin" if message.from_user else "unknown",
|
||
"handler_type": "admin_button_handler",
|
||
}
|
||
|
||
# Check if it's a regular button click (text button)
|
||
if message.text in BUTTON_COMMAND_MAPPING:
|
||
return {
|
||
"command": BUTTON_COMMAND_MAPPING[message.text],
|
||
"user_type": "user" if message.from_user else "unknown",
|
||
"handler_type": "button_handler",
|
||
}
|
||
|
||
# Check if it's a voice bot button click
|
||
if message.text in VOICE_BUTTON_COMMAND_MAPPING:
|
||
return {
|
||
"command": VOICE_BUTTON_COMMAND_MAPPING[message.text],
|
||
"user_type": "user" if message.from_user else "unknown",
|
||
"handler_type": "voice_button_handler",
|
||
}
|
||
|
||
# FALLBACK: Record ANY text message as a command for metrics
|
||
if message.text and len(message.text.strip()) > 0:
|
||
return {
|
||
"command": f"text",
|
||
"user_type": "user" if message.from_user else "unknown",
|
||
"handler_type": "text_message_handler",
|
||
}
|
||
|
||
return None
|
||
|
||
def _extract_callback_command_info_with_fallback(
|
||
self, callback: CallbackQuery
|
||
) -> Optional[Dict[str, str]]:
|
||
"""Extract callback command information with fallback."""
|
||
if not callback.data:
|
||
return None
|
||
|
||
# Extract command from callback data
|
||
parts = callback.data.split(":", 1)
|
||
if parts and parts[0] in CALLBACK_COMMAND_MAPPING:
|
||
return {
|
||
"command": CALLBACK_COMMAND_MAPPING[parts[0]],
|
||
"user_type": "user" if callback.from_user else "unknown",
|
||
"handler_type": "callback_handler",
|
||
}
|
||
|
||
# Check if it's a voice bot callback
|
||
if parts and parts[0] in VOICE_CALLBACK_COMMAND_MAPPING:
|
||
return {
|
||
"command": VOICE_CALLBACK_COMMAND_MAPPING[parts[0]],
|
||
"user_type": "user" if callback.from_user else "unknown",
|
||
"handler_type": "voice_callback_handler",
|
||
}
|
||
|
||
# FALLBACK: Record unknown callback
|
||
if parts:
|
||
callback_data = parts[0]
|
||
|
||
# Группируем похожие callback'и по паттернам
|
||
if callback_data.startswith("ban_") and callback_data[4:].isdigit():
|
||
# callback_ban_123456 -> callback_ban
|
||
command = "callback_ban"
|
||
elif callback_data.startswith("page_") and callback_data[5:].isdigit():
|
||
# callback_page_2 -> callback_page
|
||
command = "callback_page"
|
||
else:
|
||
# Для остальных неизвестных callback'ов оставляем как есть
|
||
command = f"callback_{callback_data[:20]}"
|
||
|
||
return {
|
||
"command": command,
|
||
"user_type": "user" if callback.from_user else "unknown",
|
||
"handler_type": "unknown_callback_handler",
|
||
}
|
||
|
||
return None
|
||
|
||
async def _record_additional_success_metrics(
|
||
self, event: TelegramObject, event_metrics: Dict[str, Any], handler_name: str
|
||
):
|
||
"""Record additional success metrics."""
|
||
try:
|
||
# Record rate limiting metrics (if applicable)
|
||
if hasattr(event, "from_user") and event.from_user:
|
||
# You can add rate limiting logic here
|
||
pass
|
||
|
||
# Record user activity metrics
|
||
if event_metrics.get("user_id"):
|
||
# This could trigger additional user activity tracking
|
||
pass
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"❌ Error recording additional success metrics: {e}")
|
||
|
||
async def _record_additional_error_metrics(
|
||
self,
|
||
event: TelegramObject,
|
||
event_metrics: Dict[str, Any],
|
||
handler_name: str,
|
||
error_type: str,
|
||
):
|
||
"""Record additional error metrics."""
|
||
try:
|
||
# Record specific error context
|
||
if event_metrics.get("user_id"):
|
||
# You can add user-specific error tracking here
|
||
pass
|
||
|
||
except Exception as e:
|
||
self.logger.error(f"❌ Error recording additional error metrics: {e}")
|
||
|
||
def _get_handler_name(self, handler: Callable) -> str:
|
||
"""Extract handler name efficiently."""
|
||
# Check various ways to get handler name
|
||
if hasattr(handler, "__name__") and handler.__name__ != "<lambda>":
|
||
return handler.__name__
|
||
elif hasattr(handler, "__qualname__") and handler.__qualname__ != "<lambda>":
|
||
return handler.__qualname__
|
||
elif hasattr(handler, "callback") and hasattr(handler.callback, "__name__"):
|
||
return handler.callback.__name__
|
||
elif hasattr(handler, "view") and hasattr(handler.view, "__name__"):
|
||
return handler.view.__name__
|
||
else:
|
||
# Пытаемся получить имя из строкового представления
|
||
handler_str = str(handler)
|
||
if "function" in handler_str:
|
||
# Извлекаем имя функции из строки
|
||
import re
|
||
|
||
match = re.search(r"function\s+(\w+)", handler_str)
|
||
if match:
|
||
return match.group(1)
|
||
return "unknown"
|
||
|
||
|
||
class DatabaseMetricsMiddleware(BaseMiddleware):
|
||
"""Enhanced middleware for database operation metrics."""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
async def __call__(
|
||
self,
|
||
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
|
||
event: TelegramObject,
|
||
data: Dict[str, Any],
|
||
) -> Any:
|
||
"""Process event and collect database metrics."""
|
||
|
||
# Check if this handler involves database operations
|
||
handler_name = handler.__name__ if hasattr(handler, "__name__") else "unknown"
|
||
|
||
# Record middleware start
|
||
start_time = time.time()
|
||
|
||
try:
|
||
result = await handler(event, data)
|
||
|
||
# Record successful database operation
|
||
duration = time.time() - start_time
|
||
metrics.record_middleware("DatabaseMetricsMiddleware", duration, "success")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
# Record failed database operation
|
||
duration = time.time() - start_time
|
||
metrics.record_middleware("DatabaseMetricsMiddleware", duration, "error")
|
||
metrics.record_error(type(e).__name__, "database_middleware", handler_name)
|
||
raise
|
||
|
||
|
||
class ErrorMetricsMiddleware(BaseMiddleware):
|
||
"""Enhanced middleware for error tracking and metrics."""
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.logger = logging.getLogger(__name__)
|
||
|
||
async def __call__(
|
||
self,
|
||
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
|
||
event: TelegramObject,
|
||
data: Dict[str, Any],
|
||
) -> Any:
|
||
"""Process event and collect error metrics."""
|
||
|
||
# Record middleware start
|
||
start_time = time.time()
|
||
|
||
try:
|
||
result = await handler(event, data)
|
||
|
||
# Record successful error handling
|
||
duration = time.time() - start_time
|
||
metrics.record_middleware("ErrorMetricsMiddleware", duration, "success")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
# Record error metrics
|
||
duration = time.time() - start_time
|
||
handler_name = (
|
||
handler.__name__ if hasattr(handler, "__name__") else "unknown"
|
||
)
|
||
|
||
metrics.record_middleware("ErrorMetricsMiddleware", duration, "error")
|
||
metrics.record_error(type(e).__name__, "error_middleware", handler_name)
|
||
|
||
raise
|