Files
telegram-helper-bot/helper_bot/middlewares/metrics_middleware.py
Andrey fe06008930 Enhance metrics handling and logging in bot
- Integrated metrics scheduler start and stop functionality in `run_helper.py` for better resource management.
- Improved logging for metrics server operations in `server_prometheus.py`, ensuring clearer error reporting and status updates.
- Updated metrics middleware to collect comprehensive metrics for all event types, enhancing monitoring capabilities.
- Added active user metrics tracking in `admin_handlers.py` to provide insights on user engagement.
- Refactored command and callback handling in `metrics_middleware.py` to improve clarity and error handling.
2025-09-03 16:16:14 +03:00

578 lines
24 KiB
Python

"""
Enhanced Metrics middleware for aiogram 3.x.
Automatically collects ALL available metrics for comprehensive monitoring.
"""
from typing import Any, Awaitable, Callable, Dict, Union, Optional
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject, Message, CallbackQuery
from aiogram.enums import ChatType
import time
import logging
import asyncio
from ..utils.metrics import metrics
# Import button command mapping
try:
from ..handlers.private.constants import BUTTON_COMMAND_MAPPING
from ..handlers.callback.constants import CALLBACK_COMMAND_MAPPING
from ..handlers.admin.constants import ADMIN_BUTTON_COMMAND_MAPPING, ADMIN_COMMANDS
from ..handlers.voice.constants import (
BUTTON_COMMAND_MAPPING as VOICE_BUTTON_COMMAND_MAPPING,
COMMAND_MAPPING as VOICE_COMMAND_MAPPING,
CALLBACK_COMMAND_MAPPING as VOICE_CALLBACK_COMMAND_MAPPING
)
print(f"✅ Constants imported successfully: ADMIN_COMMANDS={len(ADMIN_COMMANDS)}, BUTTON_COMMAND_MAPPING={len(BUTTON_COMMAND_MAPPING)}")
print(f"✅ ADMIN_COMMANDS: {list(ADMIN_COMMANDS.keys())}")
print(f"✅ BUTTON_COMMAND_MAPPING: {list(BUTTON_COMMAND_MAPPING.keys())}")
except ImportError as e:
print(f"❌ Failed to import constants: {e}")
# Fallback if constants not available
BUTTON_COMMAND_MAPPING = {}
CALLBACK_COMMAND_MAPPING = {}
ADMIN_BUTTON_COMMAND_MAPPING = {}
ADMIN_COMMANDS = {}
VOICE_BUTTON_COMMAND_MAPPING = {}
VOICE_COMMAND_MAPPING = {}
VOICE_CALLBACK_COMMAND_MAPPING = {}
print("⚠️ Using empty constants fallback")
class MetricsMiddleware(BaseMiddleware):
"""Enhanced middleware for automatic collection of ALL available metrics."""
def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG) # Enable debug logging for metrics tracking
# Metrics update intervals
self.last_active_users_update = 0
self.active_users_update_interval = 300 # 5 minutes
self.last_middleware_metrics_update = 0
self.middleware_metrics_interval = 60 # 1 minute
# Middleware performance tracking
self.middleware_start_time = None
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
"""Process event and collect comprehensive metrics."""
# Simple print to verify middleware is called
print("=" * 50)
print("🔍 MetricsMiddleware CALLED!")
print(f"🔍 Event type: {type(event).__name__}")
print(f"🔍 Event: {event}")
print(f"🔍 Handler: {handler}")
print("=" * 50)
# Start middleware timing
middleware_start = time.time()
# Update active users periodically
current_time = time.time()
if current_time - self.last_active_users_update > self.active_users_update_interval:
await self._update_active_users_metric()
self.last_active_users_update = current_time
# Extract comprehensive command and event info
command_info = None
event_metrics = {}
# Debug: Log event type and structure
self.logger.info(f"📊 Event type: {type(event).__name__}")
self.logger.info(f"📊 Event: {event}")
# Correct order for aiogram 3.x: first check Update structure, then fallback to direct types
if hasattr(event, 'message') and event.message:
# Handle aiogram 3.x Update.message
self.logger.info(f"📊 Processing Update.message event: {event.message.text[:50] if event.message.text else 'No text'}")
event_metrics = await self._record_comprehensive_message_metrics(event.message)
command_info = self._extract_command_info_with_fallback(event.message)
elif hasattr(event, 'callback_query') and event.callback_query:
# Handle aiogram 3.x Update.callback_query
self.logger.info(f"📊 Processing Update.callback_query event: {event.callback_query.data[:50] if event.callback_query.data else 'No data'}")
event_metrics = await self._record_comprehensive_callback_metrics(event.callback_query)
command_info = self._extract_callback_command_info_with_fallback(event.callback_query)
elif isinstance(event, Message):
# Fallback for direct Message objects (if they exist)
self.logger.info(f"📊 Processing direct Message event: {event.text[:50] if event.text else 'No text'}")
event_metrics = await self._record_comprehensive_message_metrics(event)
command_info = self._extract_command_info_with_fallback(event)
elif isinstance(event, CallbackQuery):
# Fallback for direct CallbackQuery objects (if they exist)
self.logger.info(f"📊 Processing direct CallbackQuery event: {event.data[:50] if event.data else 'No data'}")
event_metrics = await self._record_comprehensive_callback_metrics(event)
command_info = self._extract_callback_command_info_with_fallback(event)
else:
# Unknown event type - log for debugging
self.logger.info(f"📊 Processing unknown event type: {type(event).__name__}")
self.logger.info(f"📊 Event attributes: {[attr for attr in dir(event) if not attr.startswith('_')]}")
event_metrics = await self._record_unknown_event_metrics(event)
# Execute handler with comprehensive timing and metrics
start_time = time.time()
try:
result = await handler(event, data)
duration = time.time() - start_time
# Record comprehensive successful execution metrics
handler_name = self._get_handler_name(handler)
self.logger.debug(f"📊 Recording successful execution: {handler_name} ({duration:.3f}s)")
# Record method duration
metrics.record_method_duration(
handler_name,
duration,
"handler",
"success"
)
# Record command metrics if available
if command_info:
metrics.record_command(
command_info['command'],
command_info['handler_type'],
command_info['user_type'],
"success"
)
# Record additional event metrics
await self._record_additional_success_metrics(event, event_metrics, handler_name)
return result
except Exception as e:
duration = time.time() - start_time
# Record comprehensive error metrics
handler_name = self._get_handler_name(handler)
error_type = type(e).__name__
self.logger.debug(f"📊 Recording error execution: {handler_name}, error: {error_type} ({duration:.3f}s)")
# Record method duration with error
metrics.record_method_duration(
handler_name,
duration,
"handler",
"error"
)
# Record error
metrics.record_error(
error_type,
"handler",
handler_name
)
# Record command with error status if applicable
if command_info:
metrics.record_command(
command_info['command'],
command_info['handler_type'],
command_info['user_type'],
"error"
)
# Record additional error metrics
await self._record_additional_error_metrics(event, event_metrics, handler_name, error_type)
raise
finally:
# Record middleware execution time
middleware_duration = time.time() - middleware_start
metrics.record_middleware("MetricsMiddleware", middleware_duration, "success")
async def _update_active_users_metric(self):
"""Periodically update active users metric from database."""
try:
self.logger.debug("📊 Updating active users metric...")
# Get database instance
from ..utils.base_dependency_factory import get_global_instance
bdf = get_global_instance()
bot_db = bdf.get_db()
# Count active users (last 24 hours) - date_changed is INTEGER (UNIX timestamp)
import time
current_timestamp = int(time.time())
one_day_ago = current_timestamp - (24 * 60 * 60)
# First, let's check if table exists and has data
check_table_query = """
SELECT name FROM sqlite_master
WHERE type='table' AND name='our_users'
"""
await bot_db.connect()
await bot_db.cursor.execute(check_table_query)
table_exists = await bot_db.cursor.fetchone()
if not table_exists:
self.logger.warning("⚠️ Table 'our_users' does not exist")
metrics.set_active_users(1, "daily") # Fallback to 1
metrics.set_active_users(1, "total")
await bot_db.close()
return
# Check total users first
total_users_query = "SELECT COUNT(*) FROM our_users"
await bot_db.cursor.execute(total_users_query)
total_users_result = await bot_db.cursor.fetchone()
total_users = total_users_result[0] if total_users_result else 0
self.logger.debug(f"📊 Total users in database: {total_users}")
if total_users == 0:
self.logger.warning("⚠️ No users in database")
metrics.set_active_users(1, "daily") # Fallback to 1
metrics.set_active_users(1, "total")
await bot_db.close()
return
# Now count active users (last 24 hours)
active_users_query = """
SELECT COUNT(DISTINCT user_id) as active_users
FROM our_users
WHERE date_changed > ?
"""
await bot_db.cursor.execute(active_users_query, (one_day_ago,))
result = await bot_db.cursor.fetchone()
active_users = result[0] if result else 0
# If no active users in last 24h, use total users as fallback
if active_users == 0:
self.logger.warning("⚠️ No active users in last 24h, using total users as fallback")
active_users = total_users
await bot_db.close()
# Update metrics
metrics.set_active_users(active_users, "daily")
metrics.set_active_users(total_users, "total")
self.logger.debug(f"📊 Active users updated: daily={active_users}, total={total_users}")
except Exception as e:
self.logger.error(f"❌ Failed to update active users metric: {e}")
# Set fallback values
metrics.set_active_users(1, "daily")
metrics.set_active_users(1, "total")
async def _record_comprehensive_message_metrics(self, message: Message) -> Dict[str, Any]:
"""Record comprehensive message metrics."""
metrics_data = {}
# Determine message type
message_type = "text"
if message.photo:
message_type = "photo"
elif message.video:
message_type = "video"
elif message.audio:
message_type = "audio"
elif message.document:
message_type = "document"
elif message.voice:
message_type = "voice"
elif message.sticker:
message_type = "sticker"
elif message.animation:
message_type = "animation"
# Determine chat type
chat_type = "private"
if message.chat.type == ChatType.GROUP:
chat_type = "group"
elif message.chat.type == ChatType.SUPERGROUP:
chat_type = "supergroup"
elif message.chat.type == ChatType.CHANNEL:
chat_type = "channel"
# Record message processing
metrics.record_message(message_type, chat_type, "message_handler")
# Store metrics data for later use
metrics_data.update({
'message_type': message_type,
'chat_type': chat_type,
'user_id': message.from_user.id if message.from_user else None,
'is_bot': message.from_user.is_bot if message.from_user else False
})
return metrics_data
async def _record_comprehensive_callback_metrics(self, callback: CallbackQuery) -> Dict[str, Any]:
"""Record comprehensive callback metrics."""
metrics_data = {}
# Record callback message
metrics.record_message("callback_query", "callback", "callback_handler")
# Store metrics data for later use
metrics_data.update({
'callback_data': callback.data,
'user_id': callback.from_user.id if callback.from_user else None,
'is_bot': callback.from_user.is_bot if callback.from_user else False
})
return metrics_data
async def _record_unknown_event_metrics(self, event: TelegramObject) -> Dict[str, Any]:
"""Record metrics for unknown event types."""
metrics_data = {}
# Record unknown event
metrics.record_message("unknown", "unknown", "unknown_handler")
# Store event type for later use
metrics_data.update({
'event_type': type(event).__name__,
'event_data': str(event)[:100] if hasattr(event, '__str__') else "unknown"
})
return metrics_data
def _extract_command_info_with_fallback(self, message: Message) -> Optional[Dict[str, str]]:
"""Extract command information with fallback for unknown commands."""
if not message.text:
return None
# Check if it's a slash command
if message.text.startswith('/'):
command_name = message.text.split()[0][1:] # Remove '/' and get command name
# Check if it's an admin command
if command_name in ADMIN_COMMANDS:
return {
'command': ADMIN_COMMANDS[command_name],
'user_type': "admin" if message.from_user else "unknown",
'handler_type': "admin_handler"
}
# Check if it's a voice bot command
elif command_name in VOICE_COMMAND_MAPPING:
return {
'command': VOICE_COMMAND_MAPPING[command_name],
'user_type': "user" if message.from_user else "unknown",
'handler_type': "voice_command_handler"
}
else:
# FALLBACK: Record unknown command
self.logger.debug(f"📊 Unknown command detected: /{command_name}")
return {
'command': command_name,
'user_type': "user" if message.from_user else "unknown",
'handler_type': "unknown_command_handler"
}
# Check if it's an admin button click
if message.text in ADMIN_BUTTON_COMMAND_MAPPING:
return {
'command': ADMIN_BUTTON_COMMAND_MAPPING[message.text],
'user_type': "admin" if message.from_user else "unknown",
'handler_type': "admin_button_handler"
}
# Check if it's a regular button click (text button)
if message.text in BUTTON_COMMAND_MAPPING:
return {
'command': BUTTON_COMMAND_MAPPING[message.text],
'user_type': "user" if message.from_user else "unknown",
'handler_type': "button_handler"
}
# Check if it's a voice bot button click
if message.text in VOICE_BUTTON_COMMAND_MAPPING:
return {
'command': VOICE_BUTTON_COMMAND_MAPPING[message.text],
'user_type': "user" if message.from_user else "unknown",
'handler_type': "voice_button_handler"
}
# FALLBACK: Record ANY text message as a command for metrics
if message.text and len(message.text.strip()) > 0:
# Clean text for command name (remove special chars, limit length)
clean_text = message.text.strip()[:30].replace(' ', '_').replace('\n', '_')
clean_text = ''.join(c for c in clean_text if c.isalnum() or c in '_')
self.logger.debug(f"📊 Text message recorded as command: {clean_text}")
return {
'command': f"text_{clean_text}",
'user_type': "user" if message.from_user else "unknown",
'handler_type': "text_message_handler"
}
return None
def _extract_callback_command_info_with_fallback(self, callback: CallbackQuery) -> Optional[Dict[str, str]]:
"""Extract callback command information with fallback."""
if not callback.data:
return None
# Extract command from callback data
parts = callback.data.split(':', 1)
if parts and parts[0] in CALLBACK_COMMAND_MAPPING:
return {
'command': CALLBACK_COMMAND_MAPPING[parts[0]],
'user_type': "user" if callback.from_user else "unknown",
'handler_type': "callback_handler"
}
# Check if it's a voice bot callback
if parts and parts[0] in VOICE_CALLBACK_COMMAND_MAPPING:
return {
'command': VOICE_CALLBACK_COMMAND_MAPPING[parts[0]],
'user_type': "user" if callback.from_user else "unknown",
'handler_type': "voice_callback_handler"
}
# FALLBACK: Record unknown callback
if parts:
self.logger.debug(f"📊 Unknown callback detected: {parts[0]}")
return {
'command': f"callback_{parts[0][:20]}",
'user_type': "user" if callback.from_user else "unknown",
'handler_type': "unknown_callback_handler"
}
return None
async def _record_additional_success_metrics(self, event: TelegramObject, event_metrics: Dict[str, Any], handler_name: str):
"""Record additional success metrics."""
try:
# Record rate limiting metrics (if applicable)
if hasattr(event, 'from_user') and event.from_user:
# You can add rate limiting logic here
pass
# Record user activity metrics
if event_metrics.get('user_id'):
# This could trigger additional user activity tracking
pass
except Exception as e:
self.logger.error(f"❌ Error recording additional success metrics: {e}")
async def _record_additional_error_metrics(self, event: TelegramObject, event_metrics: Dict[str, Any], handler_name: str, error_type: str):
"""Record additional error metrics."""
try:
# Record specific error context
if event_metrics.get('user_id'):
# You can add user-specific error tracking here
pass
except Exception as e:
self.logger.error(f"❌ Error recording additional error metrics: {e}")
def _get_handler_name(self, handler: Callable) -> str:
"""Extract handler name efficiently."""
# Проверяем различные способы получения имени хендлера
if hasattr(handler, '__name__') and handler.__name__ != '<lambda>':
return handler.__name__
elif hasattr(handler, '__qualname__') and handler.__qualname__ != '<lambda>':
return handler.__qualname__
elif hasattr(handler, 'callback') and hasattr(handler.callback, '__name__'):
return handler.callback.__name__
elif hasattr(handler, 'view') and hasattr(handler.view, '__name__'):
return handler.view.__name__
else:
# Пытаемся получить имя из строкового представления
handler_str = str(handler)
if 'function' in handler_str:
# Извлекаем имя функции из строки
import re
match = re.search(r'function\s+(\w+)', handler_str)
if match:
return match.group(1)
return "unknown"
class DatabaseMetricsMiddleware(BaseMiddleware):
"""Enhanced middleware for database operation metrics."""
def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
"""Process event and collect database metrics."""
# Check if this handler involves database operations
handler_name = handler.__name__ if hasattr(handler, '__name__') else "unknown"
# Record middleware start
start_time = time.time()
try:
result = await handler(event, data)
# Record successful database operation
duration = time.time() - start_time
metrics.record_middleware("DatabaseMetricsMiddleware", duration, "success")
return result
except Exception as e:
# Record failed database operation
duration = time.time() - start_time
metrics.record_middleware("DatabaseMetricsMiddleware", duration, "error")
metrics.record_error(
type(e).__name__,
"database_middleware",
handler_name
)
raise
class ErrorMetricsMiddleware(BaseMiddleware):
"""Enhanced middleware for error tracking and metrics."""
def __init__(self):
super().__init__()
self.logger = logging.getLogger(__name__)
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any]
) -> Any:
"""Process event and collect error metrics."""
# Record middleware start
start_time = time.time()
try:
result = await handler(event, data)
# Record successful error handling
duration = time.time() - start_time
metrics.record_middleware("ErrorMetricsMiddleware", duration, "success")
return result
except Exception as e:
# Record error metrics
duration = time.time() - start_time
handler_name = handler.__name__ if hasattr(handler, '__name__') else "unknown"
metrics.record_middleware("ErrorMetricsMiddleware", duration, "error")
metrics.record_error(
type(e).__name__,
"error_middleware",
handler_name
)
raise