707 lines
24 KiB
Python
707 lines
24 KiB
Python
"""
|
|
Metrics module for Telegram bot monitoring with Prometheus.
|
|
Provides predefined metrics for bot commands, errors, performance, and user activity.
|
|
"""
|
|
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from contextlib import asynccontextmanager
|
|
from functools import wraps
|
|
from typing import Any, Dict, Optional
|
|
|
|
from prometheus_client import (CONTENT_TYPE_LATEST, Counter, Gauge, Histogram,
|
|
generate_latest)
|
|
from prometheus_client.core import CollectorRegistry
|
|
|
|
# Метрики rate limiter теперь создаются в основном классе
|
|
|
|
|
|
class BotMetrics:
|
|
"""Central class for managing all bot metrics."""
|
|
|
|
def __init__(self):
|
|
self.registry = CollectorRegistry()
|
|
|
|
# Создаем метрики rate limiter в том же registry
|
|
self._create_rate_limit_metrics()
|
|
|
|
# Bot commands counter
|
|
self.bot_commands_total = Counter(
|
|
"bot_commands_total",
|
|
"Total number of bot commands processed",
|
|
["command", "status", "handler_type", "user_type"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Method execution time histogram
|
|
self.method_duration_seconds = Histogram(
|
|
"method_duration_seconds",
|
|
"Time spent executing methods",
|
|
["method_name", "handler_type", "status"],
|
|
# Оптимизированные buckets для Telegram API (обычно < 1 сек)
|
|
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Errors counter
|
|
self.errors_total = Counter(
|
|
"errors_total",
|
|
"Total number of errors",
|
|
["error_type", "handler_type", "method_name"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Active users gauge
|
|
self.active_users = Gauge(
|
|
"active_users",
|
|
"Number of currently active users",
|
|
["user_type"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Total users gauge (отдельная метрика)
|
|
self.total_users = Gauge(
|
|
"total_users", "Total number of users in database", registry=self.registry
|
|
)
|
|
|
|
# Database query metrics
|
|
self.db_query_duration_seconds = Histogram(
|
|
"db_query_duration_seconds",
|
|
"Time spent executing database queries",
|
|
["query_type", "table_name", "operation"],
|
|
# Оптимизированные buckets для SQLite/PostgreSQL
|
|
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Database queries counter
|
|
self.db_queries_total = Counter(
|
|
"db_queries_total",
|
|
"Total number of database queries executed",
|
|
["query_type", "table_name", "operation"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Database errors counter
|
|
self.db_errors_total = Counter(
|
|
"db_errors_total",
|
|
"Total number of database errors",
|
|
["error_type", "query_type", "table_name", "operation"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Message processing metrics
|
|
self.messages_processed_total = Counter(
|
|
"messages_processed_total",
|
|
"Total number of messages processed",
|
|
["message_type", "chat_type", "handler_type"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Middleware execution metrics
|
|
self.middleware_duration_seconds = Histogram(
|
|
"middleware_duration_seconds",
|
|
"Time spent in middleware execution",
|
|
["middleware_name", "status"],
|
|
# Middleware должен быть быстрым
|
|
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Rate limiting metrics
|
|
self.rate_limit_hits_total = Counter(
|
|
"rate_limit_hits_total",
|
|
"Total number of rate limit hits",
|
|
["limit_type", "user_id", "action"],
|
|
registry=self.registry,
|
|
)
|
|
# User activity metrics
|
|
self.user_activity_total = Counter(
|
|
"user_activity_total",
|
|
"Total user activity events",
|
|
["activity_type", "user_type", "chat_type"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# File download metrics
|
|
self.file_downloads_total = Counter(
|
|
"file_downloads_total",
|
|
"Total number of file downloads",
|
|
["content_type", "status"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.file_download_duration_seconds = Histogram(
|
|
"file_download_duration_seconds",
|
|
"Time spent downloading files",
|
|
["content_type"],
|
|
buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.file_download_size_bytes = Histogram(
|
|
"file_download_size_bytes",
|
|
"Size of downloaded files in bytes",
|
|
["content_type"],
|
|
buckets=[1024, 10240, 102400, 1048576, 10485760, 104857600, 1073741824],
|
|
registry=self.registry,
|
|
)
|
|
|
|
# Media processing metrics
|
|
self.media_processing_total = Counter(
|
|
"media_processing_total",
|
|
"Total number of media processing operations",
|
|
["content_type", "status"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.media_processing_duration_seconds = Histogram(
|
|
"media_processing_duration_seconds",
|
|
"Time spent processing media",
|
|
["content_type"],
|
|
buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0],
|
|
registry=self.registry,
|
|
)
|
|
|
|
def _create_rate_limit_metrics(self):
|
|
"""Создает метрики rate limiter в основном registry"""
|
|
try:
|
|
# Создаем метрики rate limiter в том же registry
|
|
self.rate_limit_requests_total = Counter(
|
|
"rate_limit_requests_total",
|
|
"Total number of rate limited requests",
|
|
["chat_id", "status", "error_type"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.rate_limit_errors_total = Counter(
|
|
"rate_limit_errors_total",
|
|
"Total number of rate limit errors",
|
|
["error_type", "chat_id"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.rate_limit_wait_duration_seconds = Histogram(
|
|
"rate_limit_wait_duration_seconds",
|
|
"Time spent waiting due to rate limiting",
|
|
["chat_id"],
|
|
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.rate_limit_active_chats = Gauge(
|
|
"rate_limit_active_chats",
|
|
"Number of active chats with rate limiting",
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.rate_limit_success_rate = Gauge(
|
|
"rate_limit_success_rate",
|
|
"Success rate of rate limited requests",
|
|
["chat_id"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.rate_limit_requests_per_minute = Gauge(
|
|
"rate_limit_requests_per_minute",
|
|
"Requests per minute",
|
|
["chat_id"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.rate_limit_total_requests = Gauge(
|
|
"rate_limit_total_requests",
|
|
"Total number of requests",
|
|
["chat_id"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.rate_limit_total_errors = Gauge(
|
|
"rate_limit_total_errors",
|
|
"Total number of errors",
|
|
["chat_id", "error_type"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
self.rate_limit_avg_wait_time_seconds = Gauge(
|
|
"rate_limit_avg_wait_time_seconds",
|
|
"Average wait time in seconds",
|
|
["chat_id"],
|
|
registry=self.registry,
|
|
)
|
|
|
|
except Exception as e:
|
|
# Логируем ошибку, но не прерываем инициализацию
|
|
import logging
|
|
|
|
logging.warning(f"Failed to create rate limit metrics: {e}")
|
|
|
|
def record_command(
|
|
self,
|
|
command_type: str,
|
|
handler_type: str = "unknown",
|
|
user_type: str = "unknown",
|
|
status: str = "success",
|
|
):
|
|
"""Record a bot command execution."""
|
|
self.bot_commands_total.labels(
|
|
command=command_type,
|
|
status=status,
|
|
handler_type=handler_type,
|
|
user_type=user_type,
|
|
).inc()
|
|
|
|
def record_error(
|
|
self,
|
|
error_type: str,
|
|
handler_type: str = "unknown",
|
|
method_name: str = "unknown",
|
|
):
|
|
"""Record an error occurrence."""
|
|
self.errors_total.labels(
|
|
error_type=error_type, handler_type=handler_type, method_name=method_name
|
|
).inc()
|
|
|
|
def record_method_duration(
|
|
self,
|
|
method_name: str,
|
|
duration: float,
|
|
handler_type: str = "unknown",
|
|
status: str = "success",
|
|
):
|
|
"""Record method execution duration."""
|
|
self.method_duration_seconds.labels(
|
|
method_name=method_name, handler_type=handler_type, status=status
|
|
).observe(duration)
|
|
|
|
def set_active_users(self, count: int, user_type: str = "daily"):
|
|
"""Set the number of active users for a specific type."""
|
|
self.active_users.labels(user_type=user_type).set(count)
|
|
|
|
def set_total_users(self, count: int):
|
|
"""Set the total number of users in database."""
|
|
self.total_users.set(count)
|
|
|
|
def record_db_query(
|
|
self,
|
|
query_type: str,
|
|
duration: float,
|
|
table_name: str = "unknown",
|
|
operation: str = "unknown",
|
|
):
|
|
"""Record database query duration."""
|
|
self.db_query_duration_seconds.labels(
|
|
query_type=query_type, table_name=table_name, operation=operation
|
|
).observe(duration)
|
|
self.db_queries_total.labels(
|
|
query_type=query_type, table_name=table_name, operation=operation
|
|
).inc()
|
|
|
|
def record_message(
|
|
self,
|
|
message_type: str,
|
|
chat_type: str = "unknown",
|
|
handler_type: str = "unknown",
|
|
):
|
|
"""Record a processed message."""
|
|
self.messages_processed_total.labels(
|
|
message_type=message_type, chat_type=chat_type, handler_type=handler_type
|
|
).inc()
|
|
|
|
def record_middleware(
|
|
self, middleware_name: str, duration: float, status: str = "success"
|
|
):
|
|
"""Record middleware execution duration."""
|
|
self.middleware_duration_seconds.labels(
|
|
middleware_name=middleware_name, status=status
|
|
).observe(duration)
|
|
|
|
def record_file_download(self, content_type: str, file_size: int, duration: float):
|
|
"""Record file download metrics."""
|
|
self.file_downloads_total.labels(
|
|
content_type=content_type, status="success"
|
|
).inc()
|
|
|
|
self.file_download_duration_seconds.labels(content_type=content_type).observe(
|
|
duration
|
|
)
|
|
|
|
self.file_download_size_bytes.labels(content_type=content_type).observe(
|
|
file_size
|
|
)
|
|
|
|
def record_file_download_error(self, content_type: str, error_message: str):
|
|
"""Record file download error metrics."""
|
|
self.file_downloads_total.labels(
|
|
content_type=content_type, status="error"
|
|
).inc()
|
|
|
|
self.errors_total.labels(
|
|
error_type="file_download_error",
|
|
handler_type="media_processing",
|
|
method_name="download_file",
|
|
).inc()
|
|
|
|
def record_media_processing(
|
|
self, content_type: str, duration: float, success: bool
|
|
):
|
|
"""Record media processing metrics."""
|
|
status = "success" if success else "error"
|
|
|
|
self.media_processing_total.labels(
|
|
content_type=content_type, status=status
|
|
).inc()
|
|
|
|
self.media_processing_duration_seconds.labels(
|
|
content_type=content_type
|
|
).observe(duration)
|
|
|
|
if not success:
|
|
self.errors_total.labels(
|
|
error_type="media_processing_error",
|
|
handler_type="media_processing",
|
|
method_name="add_in_db_media",
|
|
).inc()
|
|
|
|
def record_db_error(
|
|
self,
|
|
error_type: str,
|
|
query_type: str = "unknown",
|
|
table_name: str = "unknown",
|
|
operation: str = "unknown",
|
|
):
|
|
"""Record database error occurrence."""
|
|
self.db_errors_total.labels(
|
|
error_type=error_type,
|
|
query_type=query_type,
|
|
table_name=table_name,
|
|
operation=operation,
|
|
).inc()
|
|
|
|
def record_rate_limit_request(
|
|
self,
|
|
chat_id: int,
|
|
success: bool,
|
|
wait_time: float = 0.0,
|
|
error_type: str = None,
|
|
):
|
|
"""Record rate limit request metrics."""
|
|
try:
|
|
# Определяем статус
|
|
status = "success" if success else "error"
|
|
|
|
# Записываем счетчик запросов
|
|
self.rate_limit_requests_total.labels(
|
|
chat_id=str(chat_id), status=status, error_type=error_type or "none"
|
|
).inc()
|
|
|
|
# Записываем время ожидания
|
|
if wait_time > 0:
|
|
self.rate_limit_wait_duration_seconds.labels(
|
|
chat_id=str(chat_id)
|
|
).observe(wait_time)
|
|
|
|
# Записываем ошибки
|
|
if not success and error_type:
|
|
self.rate_limit_errors_total.labels(
|
|
error_type=error_type, chat_id=str(chat_id)
|
|
).inc()
|
|
except Exception as e:
|
|
import logging
|
|
|
|
logging.warning(f"Failed to record rate limit request: {e}")
|
|
|
|
def update_rate_limit_gauges(self):
|
|
"""Update rate limit gauge metrics."""
|
|
try:
|
|
from .rate_limit_monitor import rate_limit_monitor
|
|
|
|
# Обновляем количество активных чатов
|
|
self.rate_limit_active_chats.set(len(rate_limit_monitor.stats))
|
|
|
|
# Обновляем метрики для каждого чата
|
|
for chat_id, chat_stats in rate_limit_monitor.stats.items():
|
|
chat_id_str = str(chat_id)
|
|
|
|
# Процент успеха
|
|
self.rate_limit_success_rate.labels(chat_id=chat_id_str).set(
|
|
chat_stats.success_rate
|
|
)
|
|
|
|
# Запросов в минуту
|
|
self.rate_limit_requests_per_minute.labels(chat_id=chat_id_str).set(
|
|
chat_stats.requests_per_minute
|
|
)
|
|
|
|
# Общее количество запросов
|
|
self.rate_limit_total_requests.labels(chat_id=chat_id_str).set(
|
|
chat_stats.total_requests
|
|
)
|
|
|
|
# Среднее время ожидания
|
|
self.rate_limit_avg_wait_time_seconds.labels(chat_id=chat_id_str).set(
|
|
chat_stats.average_wait_time
|
|
)
|
|
|
|
# Количество ошибок по типам
|
|
if chat_stats.retry_after_errors > 0:
|
|
self.rate_limit_total_errors.labels(
|
|
chat_id=chat_id_str, error_type="RetryAfter"
|
|
).set(chat_stats.retry_after_errors)
|
|
|
|
if chat_stats.other_errors > 0:
|
|
self.rate_limit_total_errors.labels(
|
|
chat_id=chat_id_str, error_type="Other"
|
|
).set(chat_stats.other_errors)
|
|
except Exception as e:
|
|
import logging
|
|
|
|
logging.warning(f"Failed to update rate limit gauges: {e}")
|
|
|
|
def get_metrics(self) -> bytes:
|
|
"""Generate metrics in Prometheus format."""
|
|
# Обновляем gauge метрики rate limiter перед генерацией
|
|
self.update_rate_limit_gauges()
|
|
|
|
return generate_latest(self.registry)
|
|
|
|
|
|
# Global metrics instance
|
|
metrics = BotMetrics()
|
|
|
|
|
|
# Decorators for easy metric collection
|
|
def track_time(method_name: str = None, handler_type: str = "unknown"):
|
|
"""Decorator to track execution time of functions."""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def async_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = await func(*args, **kwargs)
|
|
duration = time.time() - start_time
|
|
metrics.record_method_duration(
|
|
method_name or func.__name__, duration, handler_type, "success"
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_method_duration(
|
|
method_name or func.__name__, duration, handler_type, "error"
|
|
)
|
|
metrics.record_error(
|
|
type(e).__name__, handler_type, method_name or func.__name__
|
|
)
|
|
raise
|
|
|
|
@wraps(func)
|
|
def sync_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
duration = time.time() - start_time
|
|
metrics.record_method_duration(
|
|
method_name or func.__name__, duration, handler_type, "success"
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_method_duration(
|
|
method_name or func.__name__, duration, handler_type, "error"
|
|
)
|
|
metrics.record_error(
|
|
type(e).__name__, handler_type, method_name or func.__name__
|
|
)
|
|
raise
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
return async_wrapper
|
|
return sync_wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def track_errors(handler_type: str = "unknown", method_name: str = None):
|
|
"""Decorator to track errors in functions."""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def async_wrapper(*args, **kwargs):
|
|
try:
|
|
return await func(*args, **kwargs)
|
|
except Exception as e:
|
|
metrics.record_error(
|
|
type(e).__name__, handler_type, method_name or func.__name__
|
|
)
|
|
raise
|
|
|
|
@wraps(func)
|
|
def sync_wrapper(*args, **kwargs):
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except Exception as e:
|
|
metrics.record_error(
|
|
type(e).__name__, handler_type, method_name or func.__name__
|
|
)
|
|
raise
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
return async_wrapper
|
|
return sync_wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def db_query_time(
|
|
query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"
|
|
):
|
|
"""Decorator to track database query execution time."""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def async_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = await func(*args, **kwargs)
|
|
duration = time.time() - start_time
|
|
metrics.record_db_query(query_type, duration, table_name, operation)
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_db_query(query_type, duration, table_name, operation)
|
|
metrics.record_db_error(
|
|
type(e).__name__, query_type, table_name, operation
|
|
)
|
|
metrics.record_error(type(e).__name__, "database", func.__name__)
|
|
raise
|
|
|
|
@wraps(func)
|
|
def sync_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
duration = time.time() - start_time
|
|
metrics.record_db_query(query_type, duration, table_name, operation)
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_db_query(query_type, duration, table_name, operation)
|
|
metrics.record_db_error(
|
|
type(e).__name__, query_type, table_name, operation
|
|
)
|
|
metrics.record_error(type(e).__name__, "database", func.__name__)
|
|
raise
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
return async_wrapper
|
|
return sync_wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
@asynccontextmanager
|
|
async def track_middleware(middleware_name: str):
|
|
"""Context manager to track middleware execution time."""
|
|
start_time = time.time()
|
|
try:
|
|
yield
|
|
duration = time.time() - start_time
|
|
metrics.record_middleware(middleware_name, duration, "success")
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_middleware(middleware_name, duration, "error")
|
|
metrics.record_error(type(e).__name__, "middleware", middleware_name)
|
|
raise
|
|
|
|
|
|
def track_media_processing(content_type: str = "unknown"):
|
|
"""Decorator to track media processing operations."""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def async_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = await func(*args, **kwargs)
|
|
duration = time.time() - start_time
|
|
metrics.record_media_processing(content_type, duration, True)
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_media_processing(content_type, duration, False)
|
|
raise
|
|
|
|
@wraps(func)
|
|
def sync_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
duration = time.time() - start_time
|
|
metrics.record_media_processing(content_type, duration, True)
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_media_processing(content_type, duration, False)
|
|
raise
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
return async_wrapper
|
|
return sync_wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
def track_file_operations(content_type: str = "unknown"):
|
|
"""Decorator to track file download/upload operations."""
|
|
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def async_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = await func(*args, **kwargs)
|
|
duration = time.time() - start_time
|
|
|
|
# Получаем размер файла из результата
|
|
file_size = 0
|
|
if result and isinstance(result, str) and os.path.exists(result):
|
|
file_size = os.path.getsize(result)
|
|
|
|
# Записываем метрики
|
|
metrics.record_file_download(content_type, file_size, duration)
|
|
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_file_download_error(content_type, str(e))
|
|
raise
|
|
|
|
@wraps(func)
|
|
def sync_wrapper(*args, **kwargs):
|
|
start_time = time.time()
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
duration = time.time() - start_time
|
|
|
|
# Получаем размер файла из результата
|
|
file_size = 0
|
|
if result and isinstance(result, str) and os.path.exists(result):
|
|
file_size = os.path.getsize(result)
|
|
|
|
# Записываем метрики
|
|
metrics.record_file_download(content_type, file_size, duration)
|
|
|
|
return result
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
metrics.record_file_download_error(content_type, str(e))
|
|
raise
|
|
|
|
if asyncio.iscoroutinefunction(func):
|
|
return async_wrapper
|
|
return sync_wrapper
|
|
|
|
return decorator
|