style: isort + black
This commit is contained in:
@@ -4,6 +4,7 @@ from typing import Optional
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
|
||||
from helper_bot.utils.base_dependency_factory import get_global_instance
|
||||
from logs.custom_logger import logger
|
||||
|
||||
|
||||
@@ -2,23 +2,26 @@ import os
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from database.async_db import AsyncBotDB
|
||||
from dotenv import load_dotenv
|
||||
|
||||
from database.async_db import AsyncBotDB
|
||||
from helper_bot.utils.s3_storage import S3StorageService
|
||||
from logs.custom_logger import logger
|
||||
|
||||
|
||||
class BaseDependencyFactory:
|
||||
def __init__(self):
|
||||
project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
env_path = os.path.join(project_dir, '.env')
|
||||
project_dir = os.path.dirname(
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
)
|
||||
env_path = os.path.join(project_dir, ".env")
|
||||
if os.path.exists(env_path):
|
||||
load_dotenv(env_path)
|
||||
|
||||
self.settings = {}
|
||||
self._project_dir = project_dir
|
||||
|
||||
database_path = os.getenv('DATABASE_PATH', 'database/tg-bot-database.db')
|
||||
database_path = os.getenv("DATABASE_PATH", "database/tg-bot-database.db")
|
||||
if not os.path.isabs(database_path):
|
||||
database_path = os.path.join(project_dir, database_path)
|
||||
|
||||
@@ -26,78 +29,87 @@ class BaseDependencyFactory:
|
||||
|
||||
self._load_settings_from_env()
|
||||
self._init_s3_storage()
|
||||
|
||||
|
||||
# ScoringManager инициализируется лениво
|
||||
self._scoring_manager = None
|
||||
|
||||
def _load_settings_from_env(self):
|
||||
"""Загружает настройки из переменных окружения."""
|
||||
self.settings['Telegram'] = {
|
||||
'bot_token': os.getenv('BOT_TOKEN', ''),
|
||||
'listen_bot_token': os.getenv('LISTEN_BOT_TOKEN', ''),
|
||||
'test_bot_token': os.getenv('TEST_BOT_TOKEN', ''),
|
||||
'preview_link': self._parse_bool(os.getenv('PREVIEW_LINK', 'false')),
|
||||
'main_public': os.getenv('MAIN_PUBLIC', ''),
|
||||
'group_for_posts': self._parse_int(os.getenv('GROUP_FOR_POSTS', '0')),
|
||||
'group_for_message': self._parse_int(os.getenv('GROUP_FOR_MESSAGE', '0')),
|
||||
'group_for_logs': self._parse_int(os.getenv('GROUP_FOR_LOGS', '0')),
|
||||
'important_logs': self._parse_int(os.getenv('IMPORTANT_LOGS', '0')),
|
||||
'archive': self._parse_int(os.getenv('ARCHIVE', '0')),
|
||||
'test_group': self._parse_int(os.getenv('TEST_GROUP', '0'))
|
||||
self.settings["Telegram"] = {
|
||||
"bot_token": os.getenv("BOT_TOKEN", ""),
|
||||
"listen_bot_token": os.getenv("LISTEN_BOT_TOKEN", ""),
|
||||
"test_bot_token": os.getenv("TEST_BOT_TOKEN", ""),
|
||||
"preview_link": self._parse_bool(os.getenv("PREVIEW_LINK", "false")),
|
||||
"main_public": os.getenv("MAIN_PUBLIC", ""),
|
||||
"group_for_posts": self._parse_int(os.getenv("GROUP_FOR_POSTS", "0")),
|
||||
"group_for_message": self._parse_int(os.getenv("GROUP_FOR_MESSAGE", "0")),
|
||||
"group_for_logs": self._parse_int(os.getenv("GROUP_FOR_LOGS", "0")),
|
||||
"important_logs": self._parse_int(os.getenv("IMPORTANT_LOGS", "0")),
|
||||
"archive": self._parse_int(os.getenv("ARCHIVE", "0")),
|
||||
"test_group": self._parse_int(os.getenv("TEST_GROUP", "0")),
|
||||
}
|
||||
|
||||
self.settings['Settings'] = {
|
||||
'logs': self._parse_bool(os.getenv('LOGS', 'false')),
|
||||
'test': self._parse_bool(os.getenv('TEST', 'false'))
|
||||
self.settings["Settings"] = {
|
||||
"logs": self._parse_bool(os.getenv("LOGS", "false")),
|
||||
"test": self._parse_bool(os.getenv("TEST", "false")),
|
||||
}
|
||||
|
||||
self.settings['Metrics'] = {
|
||||
'host': os.getenv('METRICS_HOST', '0.0.0.0'),
|
||||
'port': self._parse_int(os.getenv('METRICS_PORT', '8080'))
|
||||
self.settings["Metrics"] = {
|
||||
"host": os.getenv("METRICS_HOST", "0.0.0.0"),
|
||||
"port": self._parse_int(os.getenv("METRICS_PORT", "8080")),
|
||||
}
|
||||
|
||||
self.settings['S3'] = {
|
||||
'enabled': self._parse_bool(os.getenv('S3_ENABLED', 'false')),
|
||||
'endpoint_url': os.getenv('S3_ENDPOINT_URL', ''),
|
||||
'access_key': os.getenv('S3_ACCESS_KEY', ''),
|
||||
'secret_key': os.getenv('S3_SECRET_KEY', ''),
|
||||
'bucket_name': os.getenv('S3_BUCKET_NAME', ''),
|
||||
'region': os.getenv('S3_REGION', 'us-east-1')
|
||||
self.settings["S3"] = {
|
||||
"enabled": self._parse_bool(os.getenv("S3_ENABLED", "false")),
|
||||
"endpoint_url": os.getenv("S3_ENDPOINT_URL", ""),
|
||||
"access_key": os.getenv("S3_ACCESS_KEY", ""),
|
||||
"secret_key": os.getenv("S3_SECRET_KEY", ""),
|
||||
"bucket_name": os.getenv("S3_BUCKET_NAME", ""),
|
||||
"region": os.getenv("S3_REGION", "us-east-1"),
|
||||
}
|
||||
|
||||
|
||||
# Настройки ML-скоринга
|
||||
self.settings['Scoring'] = {
|
||||
self.settings["Scoring"] = {
|
||||
# RAG API
|
||||
'rag_enabled': self._parse_bool(os.getenv('RAG_ENABLED', 'false')),
|
||||
'rag_api_url': os.getenv('RAG_API_URL', ''),
|
||||
'rag_api_key': os.getenv('RAG_API_KEY', ''),
|
||||
'rag_api_timeout': self._parse_int(os.getenv('RAG_API_TIMEOUT', '30')),
|
||||
'rag_test_mode': self._parse_bool(os.getenv('RAG_TEST_MODE', 'false')),
|
||||
"rag_enabled": self._parse_bool(os.getenv("RAG_ENABLED", "false")),
|
||||
"rag_api_url": os.getenv("RAG_API_URL", ""),
|
||||
"rag_api_key": os.getenv("RAG_API_KEY", ""),
|
||||
"rag_api_timeout": self._parse_int(os.getenv("RAG_API_TIMEOUT", "30")),
|
||||
"rag_test_mode": self._parse_bool(os.getenv("RAG_TEST_MODE", "false")),
|
||||
# DeepSeek
|
||||
'deepseek_enabled': self._parse_bool(os.getenv('DEEPSEEK_ENABLED', 'false')),
|
||||
'deepseek_api_key': os.getenv('DEEPSEEK_API_KEY', ''),
|
||||
'deepseek_api_url': os.getenv('DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions'),
|
||||
'deepseek_model': os.getenv('DEEPSEEK_MODEL', 'deepseek-chat'),
|
||||
'deepseek_timeout': self._parse_int(os.getenv('DEEPSEEK_TIMEOUT', '30')),
|
||||
"deepseek_enabled": self._parse_bool(
|
||||
os.getenv("DEEPSEEK_ENABLED", "false")
|
||||
),
|
||||
"deepseek_api_key": os.getenv("DEEPSEEK_API_KEY", ""),
|
||||
"deepseek_api_url": os.getenv(
|
||||
"DEEPSEEK_API_URL", "https://api.deepseek.com/v1/chat/completions"
|
||||
),
|
||||
"deepseek_model": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
|
||||
"deepseek_timeout": self._parse_int(os.getenv("DEEPSEEK_TIMEOUT", "30")),
|
||||
}
|
||||
|
||||
|
||||
def _init_s3_storage(self):
|
||||
"""Инициализирует S3StorageService если S3 включен."""
|
||||
self.s3_storage = None
|
||||
if self.settings['S3']['enabled']:
|
||||
s3_config = self.settings['S3']
|
||||
if s3_config['endpoint_url'] and s3_config['access_key'] and s3_config['secret_key'] and s3_config['bucket_name']:
|
||||
if self.settings["S3"]["enabled"]:
|
||||
s3_config = self.settings["S3"]
|
||||
if (
|
||||
s3_config["endpoint_url"]
|
||||
and s3_config["access_key"]
|
||||
and s3_config["secret_key"]
|
||||
and s3_config["bucket_name"]
|
||||
):
|
||||
self.s3_storage = S3StorageService(
|
||||
endpoint_url=s3_config['endpoint_url'],
|
||||
access_key=s3_config['access_key'],
|
||||
secret_key=s3_config['secret_key'],
|
||||
bucket_name=s3_config['bucket_name'],
|
||||
region=s3_config['region']
|
||||
endpoint_url=s3_config["endpoint_url"],
|
||||
access_key=s3_config["access_key"],
|
||||
secret_key=s3_config["secret_key"],
|
||||
bucket_name=s3_config["bucket_name"],
|
||||
region=s3_config["region"],
|
||||
)
|
||||
|
||||
def _parse_bool(self, value: str) -> bool:
|
||||
"""Парсит строковое значение в boolean."""
|
||||
return value.lower() in ('true', '1', 'yes', 'on')
|
||||
return value.lower() in ("true", "1", "yes", "on")
|
||||
|
||||
def _parse_int(self, value: str) -> int:
|
||||
"""Парсит строковое значение в integer."""
|
||||
@@ -105,7 +117,7 @@ class BaseDependencyFactory:
|
||||
return int(value)
|
||||
except (ValueError, TypeError):
|
||||
return 0
|
||||
|
||||
|
||||
def _parse_float(self, value: str) -> float:
|
||||
"""Парсит строковое значение в float."""
|
||||
try:
|
||||
@@ -119,87 +131,95 @@ class BaseDependencyFactory:
|
||||
def get_db(self) -> AsyncBotDB:
|
||||
"""Возвращает подключение к базе данных."""
|
||||
return self.database
|
||||
|
||||
|
||||
def get_s3_storage(self) -> Optional[S3StorageService]:
|
||||
"""Возвращает S3StorageService если S3 включен, иначе None."""
|
||||
return self.s3_storage
|
||||
|
||||
|
||||
def _init_scoring_manager(self):
|
||||
"""
|
||||
Инициализирует ScoringManager с RAG API клиентом и DeepSeek сервисом.
|
||||
|
||||
|
||||
Вызывается лениво при первом обращении к get_scoring_manager().
|
||||
"""
|
||||
from helper_bot.services.scoring import (DeepSeekService, RagApiClient,
|
||||
ScoringManager)
|
||||
|
||||
scoring_config = self.settings['Scoring']
|
||||
|
||||
from helper_bot.services.scoring import (
|
||||
DeepSeekService,
|
||||
RagApiClient,
|
||||
ScoringManager,
|
||||
)
|
||||
|
||||
scoring_config = self.settings["Scoring"]
|
||||
|
||||
# Инициализация RAG API клиента
|
||||
rag_client = None
|
||||
if scoring_config['rag_enabled']:
|
||||
api_url = scoring_config['rag_api_url']
|
||||
api_key = scoring_config['rag_api_key']
|
||||
|
||||
if scoring_config["rag_enabled"]:
|
||||
api_url = scoring_config["rag_api_url"]
|
||||
api_key = scoring_config["rag_api_key"]
|
||||
|
||||
if not api_url or not api_key:
|
||||
logger.warning("RAG включен, но не указаны RAG_API_URL или RAG_API_KEY")
|
||||
else:
|
||||
rag_client = RagApiClient(
|
||||
api_url=api_url,
|
||||
api_key=api_key,
|
||||
timeout=scoring_config['rag_api_timeout'],
|
||||
test_mode=scoring_config['rag_test_mode'],
|
||||
timeout=scoring_config["rag_api_timeout"],
|
||||
test_mode=scoring_config["rag_test_mode"],
|
||||
enabled=True,
|
||||
)
|
||||
logger.info(f"RagApiClient инициализирован: {api_url} (test_mode={scoring_config['rag_test_mode']})")
|
||||
|
||||
logger.info(
|
||||
f"RagApiClient инициализирован: {api_url} (test_mode={scoring_config['rag_test_mode']})"
|
||||
)
|
||||
|
||||
# Инициализация DeepSeek сервиса
|
||||
deepseek_service = None
|
||||
if scoring_config['deepseek_enabled'] and scoring_config['deepseek_api_key']:
|
||||
if scoring_config["deepseek_enabled"] and scoring_config["deepseek_api_key"]:
|
||||
deepseek_service = DeepSeekService(
|
||||
api_key=scoring_config['deepseek_api_key'],
|
||||
api_url=scoring_config['deepseek_api_url'],
|
||||
model=scoring_config['deepseek_model'],
|
||||
timeout=scoring_config['deepseek_timeout'],
|
||||
api_key=scoring_config["deepseek_api_key"],
|
||||
api_url=scoring_config["deepseek_api_url"],
|
||||
model=scoring_config["deepseek_model"],
|
||||
timeout=scoring_config["deepseek_timeout"],
|
||||
enabled=True,
|
||||
)
|
||||
logger.info(f"DeepSeekService инициализирован: {scoring_config['deepseek_model']}")
|
||||
|
||||
logger.info(
|
||||
f"DeepSeekService инициализирован: {scoring_config['deepseek_model']}"
|
||||
)
|
||||
|
||||
# Создаем менеджер
|
||||
self._scoring_manager = ScoringManager(
|
||||
rag_client=rag_client,
|
||||
deepseek_service=deepseek_service,
|
||||
)
|
||||
|
||||
|
||||
return self._scoring_manager
|
||||
|
||||
|
||||
def get_scoring_manager(self):
|
||||
"""
|
||||
Возвращает ScoringManager для ML-скоринга постов.
|
||||
|
||||
|
||||
Инициализируется лениво при первом вызове.
|
||||
|
||||
|
||||
Returns:
|
||||
ScoringManager или None если скоринг полностью отключен
|
||||
"""
|
||||
if self._scoring_manager is None:
|
||||
scoring_config = self.settings.get('Scoring', {})
|
||||
|
||||
scoring_config = self.settings.get("Scoring", {})
|
||||
|
||||
# Проверяем, включен ли хотя бы один сервис
|
||||
rag_enabled = scoring_config.get('rag_enabled', False)
|
||||
deepseek_enabled = scoring_config.get('deepseek_enabled', False)
|
||||
|
||||
rag_enabled = scoring_config.get("rag_enabled", False)
|
||||
deepseek_enabled = scoring_config.get("deepseek_enabled", False)
|
||||
|
||||
if not rag_enabled and not deepseek_enabled:
|
||||
logger.info("Scoring полностью отключен (RAG и DeepSeek disabled)")
|
||||
return None
|
||||
|
||||
|
||||
self._init_scoring_manager()
|
||||
|
||||
|
||||
return self._scoring_manager
|
||||
|
||||
|
||||
_global_instance = None
|
||||
|
||||
|
||||
def get_global_instance():
|
||||
"""Возвращает глобальный экземпляр BaseDependencyFactory."""
|
||||
global _global_instance
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -10,8 +10,13 @@ from contextlib import asynccontextmanager
|
||||
from functools import wraps
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from prometheus_client import (CONTENT_TYPE_LATEST, Counter, Gauge, Histogram,
|
||||
generate_latest)
|
||||
from prometheus_client import (
|
||||
CONTENT_TYPE_LATEST,
|
||||
Counter,
|
||||
Gauge,
|
||||
Histogram,
|
||||
generate_latest,
|
||||
)
|
||||
from prometheus_client.core import CollectorRegistry
|
||||
|
||||
# Метрики rate limiter теперь создаются в основном классе
|
||||
@@ -19,372 +24,399 @@ from prometheus_client.core import CollectorRegistry
|
||||
|
||||
class BotMetrics:
|
||||
"""Central class for managing all bot metrics."""
|
||||
|
||||
|
||||
def __init__(self):
|
||||
self.registry = CollectorRegistry()
|
||||
|
||||
|
||||
# Создаем метрики rate limiter в том же registry
|
||||
self._create_rate_limit_metrics()
|
||||
|
||||
|
||||
# Bot commands counter
|
||||
self.bot_commands_total = Counter(
|
||||
'bot_commands_total',
|
||||
'Total number of bot commands processed',
|
||||
['command', 'status', 'handler_type', 'user_type'],
|
||||
registry=self.registry
|
||||
"bot_commands_total",
|
||||
"Total number of bot commands processed",
|
||||
["command", "status", "handler_type", "user_type"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Method execution time histogram
|
||||
self.method_duration_seconds = Histogram(
|
||||
'method_duration_seconds',
|
||||
'Time spent executing methods',
|
||||
['method_name', 'handler_type', 'status'],
|
||||
"method_duration_seconds",
|
||||
"Time spent executing methods",
|
||||
["method_name", "handler_type", "status"],
|
||||
# Оптимизированные buckets для Telegram API (обычно < 1 сек)
|
||||
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0],
|
||||
registry=self.registry
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Errors counter
|
||||
self.errors_total = Counter(
|
||||
'errors_total',
|
||||
'Total number of errors',
|
||||
['error_type', 'handler_type', 'method_name'],
|
||||
registry=self.registry
|
||||
"errors_total",
|
||||
"Total number of errors",
|
||||
["error_type", "handler_type", "method_name"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Active users gauge
|
||||
self.active_users = Gauge(
|
||||
'active_users',
|
||||
'Number of currently active users',
|
||||
['user_type'],
|
||||
registry=self.registry
|
||||
"active_users",
|
||||
"Number of currently active users",
|
||||
["user_type"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Total users gauge (отдельная метрика)
|
||||
self.total_users = Gauge(
|
||||
'total_users',
|
||||
'Total number of users in database',
|
||||
registry=self.registry
|
||||
"total_users", "Total number of users in database", registry=self.registry
|
||||
)
|
||||
|
||||
|
||||
# Database query metrics
|
||||
self.db_query_duration_seconds = Histogram(
|
||||
'db_query_duration_seconds',
|
||||
'Time spent executing database queries',
|
||||
['query_type', 'table_name', 'operation'],
|
||||
"db_query_duration_seconds",
|
||||
"Time spent executing database queries",
|
||||
["query_type", "table_name", "operation"],
|
||||
# Оптимизированные buckets для SQLite/PostgreSQL
|
||||
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5],
|
||||
registry=self.registry
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Database queries counter
|
||||
self.db_queries_total = Counter(
|
||||
'db_queries_total',
|
||||
'Total number of database queries executed',
|
||||
['query_type', 'table_name', 'operation'],
|
||||
registry=self.registry
|
||||
"db_queries_total",
|
||||
"Total number of database queries executed",
|
||||
["query_type", "table_name", "operation"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Database errors counter
|
||||
self.db_errors_total = Counter(
|
||||
'db_errors_total',
|
||||
'Total number of database errors',
|
||||
['error_type', 'query_type', 'table_name', 'operation'],
|
||||
registry=self.registry
|
||||
"db_errors_total",
|
||||
"Total number of database errors",
|
||||
["error_type", "query_type", "table_name", "operation"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Message processing metrics
|
||||
self.messages_processed_total = Counter(
|
||||
'messages_processed_total',
|
||||
'Total number of messages processed',
|
||||
['message_type', 'chat_type', 'handler_type'],
|
||||
registry=self.registry
|
||||
"messages_processed_total",
|
||||
"Total number of messages processed",
|
||||
["message_type", "chat_type", "handler_type"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Middleware execution metrics
|
||||
self.middleware_duration_seconds = Histogram(
|
||||
'middleware_duration_seconds',
|
||||
'Time spent in middleware execution',
|
||||
['middleware_name', 'status'],
|
||||
"middleware_duration_seconds",
|
||||
"Time spent in middleware execution",
|
||||
["middleware_name", "status"],
|
||||
# Middleware должен быть быстрым
|
||||
buckets=[0.001, 0.005, 0.01, 0.05, 0.1, 0.25],
|
||||
registry=self.registry
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Rate limiting metrics
|
||||
self.rate_limit_hits_total = Counter(
|
||||
'rate_limit_hits_total',
|
||||
'Total number of rate limit hits',
|
||||
['limit_type', 'user_id', 'action'],
|
||||
registry=self.registry
|
||||
"rate_limit_hits_total",
|
||||
"Total number of rate limit hits",
|
||||
["limit_type", "user_id", "action"],
|
||||
registry=self.registry,
|
||||
)
|
||||
# User activity metrics
|
||||
self.user_activity_total = Counter(
|
||||
'user_activity_total',
|
||||
'Total user activity events',
|
||||
['activity_type', 'user_type', 'chat_type'],
|
||||
registry=self.registry
|
||||
"user_activity_total",
|
||||
"Total user activity events",
|
||||
["activity_type", "user_type", "chat_type"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# File download metrics
|
||||
self.file_downloads_total = Counter(
|
||||
'file_downloads_total',
|
||||
'Total number of file downloads',
|
||||
['content_type', 'status'],
|
||||
registry=self.registry
|
||||
"file_downloads_total",
|
||||
"Total number of file downloads",
|
||||
["content_type", "status"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.file_download_duration_seconds = Histogram(
|
||||
'file_download_duration_seconds',
|
||||
'Time spent downloading files',
|
||||
['content_type'],
|
||||
"file_download_duration_seconds",
|
||||
"Time spent downloading files",
|
||||
["content_type"],
|
||||
buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0],
|
||||
registry=self.registry
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.file_download_size_bytes = Histogram(
|
||||
'file_download_size_bytes',
|
||||
'Size of downloaded files in bytes',
|
||||
['content_type'],
|
||||
"file_download_size_bytes",
|
||||
"Size of downloaded files in bytes",
|
||||
["content_type"],
|
||||
buckets=[1024, 10240, 102400, 1048576, 10485760, 104857600, 1073741824],
|
||||
registry=self.registry
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
# Media processing metrics
|
||||
self.media_processing_total = Counter(
|
||||
'media_processing_total',
|
||||
'Total number of media processing operations',
|
||||
['content_type', 'status'],
|
||||
registry=self.registry
|
||||
"media_processing_total",
|
||||
"Total number of media processing operations",
|
||||
["content_type", "status"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.media_processing_duration_seconds = Histogram(
|
||||
'media_processing_duration_seconds',
|
||||
'Time spent processing media',
|
||||
['content_type'],
|
||||
"media_processing_duration_seconds",
|
||||
"Time spent processing media",
|
||||
["content_type"],
|
||||
buckets=[0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0],
|
||||
registry=self.registry
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
def _create_rate_limit_metrics(self):
|
||||
"""Создает метрики rate limiter в основном registry"""
|
||||
try:
|
||||
# Создаем метрики rate limiter в том же registry
|
||||
self.rate_limit_requests_total = Counter(
|
||||
'rate_limit_requests_total',
|
||||
'Total number of rate limited requests',
|
||||
['chat_id', 'status', 'error_type'],
|
||||
registry=self.registry
|
||||
"rate_limit_requests_total",
|
||||
"Total number of rate limited requests",
|
||||
["chat_id", "status", "error_type"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.rate_limit_errors_total = Counter(
|
||||
'rate_limit_errors_total',
|
||||
'Total number of rate limit errors',
|
||||
['error_type', 'chat_id'],
|
||||
registry=self.registry
|
||||
"rate_limit_errors_total",
|
||||
"Total number of rate limit errors",
|
||||
["error_type", "chat_id"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.rate_limit_wait_duration_seconds = Histogram(
|
||||
'rate_limit_wait_duration_seconds',
|
||||
'Time spent waiting due to rate limiting',
|
||||
['chat_id'],
|
||||
"rate_limit_wait_duration_seconds",
|
||||
"Time spent waiting due to rate limiting",
|
||||
["chat_id"],
|
||||
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0],
|
||||
registry=self.registry
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.rate_limit_active_chats = Gauge(
|
||||
'rate_limit_active_chats',
|
||||
'Number of active chats with rate limiting',
|
||||
registry=self.registry
|
||||
"rate_limit_active_chats",
|
||||
"Number of active chats with rate limiting",
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.rate_limit_success_rate = Gauge(
|
||||
'rate_limit_success_rate',
|
||||
'Success rate of rate limited requests',
|
||||
['chat_id'],
|
||||
registry=self.registry
|
||||
"rate_limit_success_rate",
|
||||
"Success rate of rate limited requests",
|
||||
["chat_id"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.rate_limit_requests_per_minute = Gauge(
|
||||
'rate_limit_requests_per_minute',
|
||||
'Requests per minute',
|
||||
['chat_id'],
|
||||
registry=self.registry
|
||||
"rate_limit_requests_per_minute",
|
||||
"Requests per minute",
|
||||
["chat_id"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.rate_limit_total_requests = Gauge(
|
||||
'rate_limit_total_requests',
|
||||
'Total number of requests',
|
||||
['chat_id'],
|
||||
registry=self.registry
|
||||
"rate_limit_total_requests",
|
||||
"Total number of requests",
|
||||
["chat_id"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.rate_limit_total_errors = Gauge(
|
||||
'rate_limit_total_errors',
|
||||
'Total number of errors',
|
||||
['chat_id', 'error_type'],
|
||||
registry=self.registry
|
||||
"rate_limit_total_errors",
|
||||
"Total number of errors",
|
||||
["chat_id", "error_type"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
self.rate_limit_avg_wait_time_seconds = Gauge(
|
||||
'rate_limit_avg_wait_time_seconds',
|
||||
'Average wait time in seconds',
|
||||
['chat_id'],
|
||||
registry=self.registry
|
||||
"rate_limit_avg_wait_time_seconds",
|
||||
"Average wait time in seconds",
|
||||
["chat_id"],
|
||||
registry=self.registry,
|
||||
)
|
||||
|
||||
|
||||
except Exception as e:
|
||||
# Логируем ошибку, но не прерываем инициализацию
|
||||
import logging
|
||||
|
||||
logging.warning(f"Failed to create rate limit metrics: {e}")
|
||||
|
||||
def record_command(self, command_type: str, handler_type: str = "unknown", user_type: str = "unknown", status: str = "success"):
|
||||
|
||||
def record_command(
|
||||
self,
|
||||
command_type: str,
|
||||
handler_type: str = "unknown",
|
||||
user_type: str = "unknown",
|
||||
status: str = "success",
|
||||
):
|
||||
"""Record a bot command execution."""
|
||||
self.bot_commands_total.labels(
|
||||
command=command_type,
|
||||
status=status,
|
||||
handler_type=handler_type,
|
||||
user_type=user_type
|
||||
user_type=user_type,
|
||||
).inc()
|
||||
|
||||
def record_error(self, error_type: str, handler_type: str = "unknown", method_name: str = "unknown"):
|
||||
|
||||
def record_error(
|
||||
self,
|
||||
error_type: str,
|
||||
handler_type: str = "unknown",
|
||||
method_name: str = "unknown",
|
||||
):
|
||||
"""Record an error occurrence."""
|
||||
self.errors_total.labels(
|
||||
error_type=error_type,
|
||||
handler_type=handler_type,
|
||||
method_name=method_name
|
||||
error_type=error_type, handler_type=handler_type, method_name=method_name
|
||||
).inc()
|
||||
|
||||
def record_method_duration(self, method_name: str, duration: float, handler_type: str = "unknown", status: str = "success"):
|
||||
|
||||
def record_method_duration(
|
||||
self,
|
||||
method_name: str,
|
||||
duration: float,
|
||||
handler_type: str = "unknown",
|
||||
status: str = "success",
|
||||
):
|
||||
"""Record method execution duration."""
|
||||
self.method_duration_seconds.labels(
|
||||
method_name=method_name,
|
||||
handler_type=handler_type,
|
||||
status=status
|
||||
method_name=method_name, handler_type=handler_type, status=status
|
||||
).observe(duration)
|
||||
|
||||
|
||||
def set_active_users(self, count: int, user_type: str = "daily"):
|
||||
"""Set the number of active users for a specific type."""
|
||||
self.active_users.labels(user_type=user_type).set(count)
|
||||
|
||||
|
||||
def set_total_users(self, count: int):
|
||||
"""Set the total number of users in database."""
|
||||
self.total_users.set(count)
|
||||
|
||||
def record_db_query(self, query_type: str, duration: float, table_name: str = "unknown", operation: str = "unknown"):
|
||||
|
||||
def record_db_query(
|
||||
self,
|
||||
query_type: str,
|
||||
duration: float,
|
||||
table_name: str = "unknown",
|
||||
operation: str = "unknown",
|
||||
):
|
||||
"""Record database query duration."""
|
||||
self.db_query_duration_seconds.labels(
|
||||
query_type=query_type,
|
||||
table_name=table_name,
|
||||
operation=operation
|
||||
query_type=query_type, table_name=table_name, operation=operation
|
||||
).observe(duration)
|
||||
self.db_queries_total.labels(
|
||||
query_type=query_type,
|
||||
table_name=table_name,
|
||||
operation=operation
|
||||
query_type=query_type, table_name=table_name, operation=operation
|
||||
).inc()
|
||||
|
||||
def record_message(self, message_type: str, chat_type: str = "unknown", handler_type: str = "unknown"):
|
||||
|
||||
def record_message(
|
||||
self,
|
||||
message_type: str,
|
||||
chat_type: str = "unknown",
|
||||
handler_type: str = "unknown",
|
||||
):
|
||||
"""Record a processed message."""
|
||||
self.messages_processed_total.labels(
|
||||
message_type=message_type,
|
||||
chat_type=chat_type,
|
||||
handler_type=handler_type
|
||||
message_type=message_type, chat_type=chat_type, handler_type=handler_type
|
||||
).inc()
|
||||
|
||||
def record_middleware(self, middleware_name: str, duration: float, status: str = "success"):
|
||||
|
||||
def record_middleware(
|
||||
self, middleware_name: str, duration: float, status: str = "success"
|
||||
):
|
||||
"""Record middleware execution duration."""
|
||||
self.middleware_duration_seconds.labels(
|
||||
middleware_name=middleware_name,
|
||||
status=status
|
||||
middleware_name=middleware_name, status=status
|
||||
).observe(duration)
|
||||
|
||||
|
||||
def record_file_download(self, content_type: str, file_size: int, duration: float):
|
||||
"""Record file download metrics."""
|
||||
self.file_downloads_total.labels(
|
||||
content_type=content_type,
|
||||
status="success"
|
||||
content_type=content_type, status="success"
|
||||
).inc()
|
||||
|
||||
self.file_download_duration_seconds.labels(
|
||||
content_type=content_type
|
||||
).observe(duration)
|
||||
|
||||
self.file_download_size_bytes.labels(
|
||||
content_type=content_type
|
||||
).observe(file_size)
|
||||
|
||||
|
||||
self.file_download_duration_seconds.labels(content_type=content_type).observe(
|
||||
duration
|
||||
)
|
||||
|
||||
self.file_download_size_bytes.labels(content_type=content_type).observe(
|
||||
file_size
|
||||
)
|
||||
|
||||
def record_file_download_error(self, content_type: str, error_message: str):
|
||||
"""Record file download error metrics."""
|
||||
self.file_downloads_total.labels(
|
||||
content_type=content_type,
|
||||
status="error"
|
||||
content_type=content_type, status="error"
|
||||
).inc()
|
||||
|
||||
|
||||
self.errors_total.labels(
|
||||
error_type="file_download_error",
|
||||
handler_type="media_processing",
|
||||
method_name="download_file"
|
||||
method_name="download_file",
|
||||
).inc()
|
||||
|
||||
def record_media_processing(self, content_type: str, duration: float, success: bool):
|
||||
|
||||
def record_media_processing(
|
||||
self, content_type: str, duration: float, success: bool
|
||||
):
|
||||
"""Record media processing metrics."""
|
||||
status = "success" if success else "error"
|
||||
|
||||
|
||||
self.media_processing_total.labels(
|
||||
content_type=content_type,
|
||||
status=status
|
||||
content_type=content_type, status=status
|
||||
).inc()
|
||||
|
||||
|
||||
self.media_processing_duration_seconds.labels(
|
||||
content_type=content_type
|
||||
).observe(duration)
|
||||
|
||||
|
||||
if not success:
|
||||
self.errors_total.labels(
|
||||
error_type="media_processing_error",
|
||||
handler_type="media_processing",
|
||||
method_name="add_in_db_media"
|
||||
method_name="add_in_db_media",
|
||||
).inc()
|
||||
|
||||
def record_db_error(self, error_type: str, query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"):
|
||||
|
||||
def record_db_error(
|
||||
self,
|
||||
error_type: str,
|
||||
query_type: str = "unknown",
|
||||
table_name: str = "unknown",
|
||||
operation: str = "unknown",
|
||||
):
|
||||
"""Record database error occurrence."""
|
||||
self.db_errors_total.labels(
|
||||
error_type=error_type,
|
||||
query_type=query_type,
|
||||
table_name=table_name,
|
||||
operation=operation
|
||||
operation=operation,
|
||||
).inc()
|
||||
|
||||
def record_rate_limit_request(self, chat_id: int, success: bool, wait_time: float = 0.0, error_type: str = None):
|
||||
|
||||
def record_rate_limit_request(
|
||||
self,
|
||||
chat_id: int,
|
||||
success: bool,
|
||||
wait_time: float = 0.0,
|
||||
error_type: str = None,
|
||||
):
|
||||
"""Record rate limit request metrics."""
|
||||
try:
|
||||
# Определяем статус
|
||||
status = "success" if success else "error"
|
||||
|
||||
|
||||
# Записываем счетчик запросов
|
||||
self.rate_limit_requests_total.labels(
|
||||
chat_id=str(chat_id),
|
||||
status=status,
|
||||
error_type=error_type or "none"
|
||||
chat_id=str(chat_id), status=status, error_type=error_type or "none"
|
||||
).inc()
|
||||
|
||||
|
||||
# Записываем время ожидания
|
||||
if wait_time > 0:
|
||||
self.rate_limit_wait_duration_seconds.labels(
|
||||
chat_id=str(chat_id)
|
||||
).observe(wait_time)
|
||||
|
||||
|
||||
# Записываем ошибки
|
||||
if not success and error_type:
|
||||
self.rate_limit_errors_total.labels(
|
||||
error_type=error_type,
|
||||
chat_id=str(chat_id)
|
||||
error_type=error_type, chat_id=str(chat_id)
|
||||
).inc()
|
||||
except Exception as e:
|
||||
import logging
|
||||
|
||||
logging.warning(f"Failed to record rate limit request: {e}")
|
||||
|
||||
|
||||
def update_rate_limit_gauges(self):
|
||||
"""Update rate limit gauge metrics."""
|
||||
try:
|
||||
@@ -392,52 +424,51 @@ class BotMetrics:
|
||||
|
||||
# Обновляем количество активных чатов
|
||||
self.rate_limit_active_chats.set(len(rate_limit_monitor.stats))
|
||||
|
||||
|
||||
# Обновляем метрики для каждого чата
|
||||
for chat_id, chat_stats in rate_limit_monitor.stats.items():
|
||||
chat_id_str = str(chat_id)
|
||||
|
||||
|
||||
# Процент успеха
|
||||
self.rate_limit_success_rate.labels(
|
||||
chat_id=chat_id_str
|
||||
).set(chat_stats.success_rate)
|
||||
|
||||
self.rate_limit_success_rate.labels(chat_id=chat_id_str).set(
|
||||
chat_stats.success_rate
|
||||
)
|
||||
|
||||
# Запросов в минуту
|
||||
self.rate_limit_requests_per_minute.labels(
|
||||
chat_id=chat_id_str
|
||||
).set(chat_stats.requests_per_minute)
|
||||
|
||||
self.rate_limit_requests_per_minute.labels(chat_id=chat_id_str).set(
|
||||
chat_stats.requests_per_minute
|
||||
)
|
||||
|
||||
# Общее количество запросов
|
||||
self.rate_limit_total_requests.labels(
|
||||
chat_id=chat_id_str
|
||||
).set(chat_stats.total_requests)
|
||||
|
||||
self.rate_limit_total_requests.labels(chat_id=chat_id_str).set(
|
||||
chat_stats.total_requests
|
||||
)
|
||||
|
||||
# Среднее время ожидания
|
||||
self.rate_limit_avg_wait_time_seconds.labels(
|
||||
chat_id=chat_id_str
|
||||
).set(chat_stats.average_wait_time)
|
||||
|
||||
self.rate_limit_avg_wait_time_seconds.labels(chat_id=chat_id_str).set(
|
||||
chat_stats.average_wait_time
|
||||
)
|
||||
|
||||
# Количество ошибок по типам
|
||||
if chat_stats.retry_after_errors > 0:
|
||||
self.rate_limit_total_errors.labels(
|
||||
chat_id=chat_id_str,
|
||||
error_type="RetryAfter"
|
||||
chat_id=chat_id_str, error_type="RetryAfter"
|
||||
).set(chat_stats.retry_after_errors)
|
||||
|
||||
|
||||
if chat_stats.other_errors > 0:
|
||||
self.rate_limit_total_errors.labels(
|
||||
chat_id=chat_id_str,
|
||||
error_type="Other"
|
||||
chat_id=chat_id_str, error_type="Other"
|
||||
).set(chat_stats.other_errors)
|
||||
except Exception as e:
|
||||
import logging
|
||||
|
||||
logging.warning(f"Failed to update rate limit gauges: {e}")
|
||||
|
||||
|
||||
def get_metrics(self) -> bytes:
|
||||
"""Generate metrics in Prometheus format."""
|
||||
# Обновляем gauge метрики rate limiter перед генерацией
|
||||
self.update_rate_limit_gauges()
|
||||
|
||||
|
||||
return generate_latest(self.registry)
|
||||
|
||||
|
||||
@@ -448,6 +479,7 @@ metrics = BotMetrics()
|
||||
# Decorators for easy metric collection
|
||||
def track_time(method_name: str = None, handler_type: str = "unknown"):
|
||||
"""Decorator to track execution time of functions."""
|
||||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
@@ -456,27 +488,19 @@ def track_time(method_name: str = None, handler_type: str = "unknown"):
|
||||
result = await func(*args, **kwargs)
|
||||
duration = time.time() - start_time
|
||||
metrics.record_method_duration(
|
||||
method_name or func.__name__,
|
||||
duration,
|
||||
handler_type,
|
||||
"success"
|
||||
method_name or func.__name__, duration, handler_type, "success"
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
duration = time.time() - start_time
|
||||
metrics.record_method_duration(
|
||||
method_name or func.__name__,
|
||||
duration,
|
||||
handler_type,
|
||||
"error"
|
||||
method_name or func.__name__, duration, handler_type, "error"
|
||||
)
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
handler_type,
|
||||
method_name or func.__name__
|
||||
type(e).__name__, handler_type, method_name or func.__name__
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
start_time = time.time()
|
||||
@@ -484,35 +508,29 @@ def track_time(method_name: str = None, handler_type: str = "unknown"):
|
||||
result = func(*args, **kwargs)
|
||||
duration = time.time() - start_time
|
||||
metrics.record_method_duration(
|
||||
method_name or func.__name__,
|
||||
duration,
|
||||
handler_type,
|
||||
"success"
|
||||
method_name or func.__name__, duration, handler_type, "success"
|
||||
)
|
||||
return result
|
||||
except Exception as e:
|
||||
duration = time.time() - start_time
|
||||
metrics.record_method_duration(
|
||||
method_name or func.__name__,
|
||||
duration,
|
||||
handler_type,
|
||||
"error"
|
||||
method_name or func.__name__, duration, handler_type, "error"
|
||||
)
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
handler_type,
|
||||
method_name or func.__name__
|
||||
type(e).__name__, handler_type, method_name or func.__name__
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def track_errors(handler_type: str = "unknown", method_name: str = None):
|
||||
"""Decorator to track errors in functions."""
|
||||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
@@ -520,32 +538,32 @@ def track_errors(handler_type: str = "unknown", method_name: str = None):
|
||||
return await func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
handler_type,
|
||||
method_name or func.__name__
|
||||
type(e).__name__, handler_type, method_name or func.__name__
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as e:
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
handler_type,
|
||||
method_name or func.__name__
|
||||
type(e).__name__, handler_type, method_name or func.__name__
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def db_query_time(query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"):
|
||||
def db_query_time(
|
||||
query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"
|
||||
):
|
||||
"""Decorator to track database query execution time."""
|
||||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
@@ -559,18 +577,11 @@ def db_query_time(query_type: str = "unknown", table_name: str = "unknown", oper
|
||||
duration = time.time() - start_time
|
||||
metrics.record_db_query(query_type, duration, table_name, operation)
|
||||
metrics.record_db_error(
|
||||
type(e).__name__,
|
||||
query_type,
|
||||
table_name,
|
||||
operation
|
||||
)
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
"database",
|
||||
func.__name__
|
||||
type(e).__name__, query_type, table_name, operation
|
||||
)
|
||||
metrics.record_error(type(e).__name__, "database", func.__name__)
|
||||
raise
|
||||
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
start_time = time.time()
|
||||
@@ -583,21 +594,15 @@ def db_query_time(query_type: str = "unknown", table_name: str = "unknown", oper
|
||||
duration = time.time() - start_time
|
||||
metrics.record_db_query(query_type, duration, table_name, operation)
|
||||
metrics.record_db_error(
|
||||
type(e).__name__,
|
||||
query_type,
|
||||
table_name,
|
||||
operation
|
||||
)
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
"database",
|
||||
func.__name__
|
||||
type(e).__name__, query_type, table_name, operation
|
||||
)
|
||||
metrics.record_error(type(e).__name__, "database", func.__name__)
|
||||
raise
|
||||
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
@@ -612,16 +617,13 @@ async def track_middleware(middleware_name: str):
|
||||
except Exception as e:
|
||||
duration = time.time() - start_time
|
||||
metrics.record_middleware(middleware_name, duration, "error")
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
"middleware",
|
||||
middleware_name
|
||||
)
|
||||
metrics.record_error(type(e).__name__, "middleware", middleware_name)
|
||||
raise
|
||||
|
||||
|
||||
def track_media_processing(content_type: str = "unknown"):
|
||||
"""Decorator to track media processing operations."""
|
||||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
@@ -635,7 +637,7 @@ def track_media_processing(content_type: str = "unknown"):
|
||||
duration = time.time() - start_time
|
||||
metrics.record_media_processing(content_type, duration, False)
|
||||
raise
|
||||
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
start_time = time.time()
|
||||
@@ -648,15 +650,17 @@ def track_media_processing(content_type: str = "unknown"):
|
||||
duration = time.time() - start_time
|
||||
metrics.record_media_processing(content_type, duration, False)
|
||||
raise
|
||||
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def track_file_operations(content_type: str = "unknown"):
|
||||
"""Decorator to track file download/upload operations."""
|
||||
|
||||
def decorator(func):
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
@@ -664,43 +668,44 @@ def track_file_operations(content_type: str = "unknown"):
|
||||
try:
|
||||
result = await func(*args, **kwargs)
|
||||
duration = time.time() - start_time
|
||||
|
||||
|
||||
# Получаем размер файла из результата
|
||||
file_size = 0
|
||||
if result and isinstance(result, str) and os.path.exists(result):
|
||||
file_size = os.path.getsize(result)
|
||||
|
||||
|
||||
# Записываем метрики
|
||||
metrics.record_file_download(content_type, file_size, duration)
|
||||
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
duration = time.time() - start_time
|
||||
metrics.record_file_download_error(content_type, str(e))
|
||||
raise
|
||||
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
start_time = time.time()
|
||||
try:
|
||||
result = func(*args, **kwargs)
|
||||
duration = time.time() - start_time
|
||||
|
||||
|
||||
# Получаем размер файла из результата
|
||||
file_size = 0
|
||||
if result and isinstance(result, str) and os.path.exists(result):
|
||||
file_size = os.path.getsize(result)
|
||||
|
||||
|
||||
# Записываем метрики
|
||||
metrics.record_file_download(content_type, file_size, duration)
|
||||
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
duration = time.time() - start_time
|
||||
metrics.record_file_download_error(content_type, str(e))
|
||||
raise
|
||||
|
||||
|
||||
if asyncio.iscoroutinefunction(func):
|
||||
return async_wrapper
|
||||
return sync_wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -1,20 +1,23 @@
|
||||
"""
|
||||
Rate limiter для предотвращения Flood control ошибок в Telegram Bot API
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, Dict, Optional
|
||||
|
||||
from aiogram.exceptions import TelegramAPIError, TelegramRetryAfter
|
||||
|
||||
from logs.custom_logger import logger
|
||||
|
||||
from .metrics import metrics
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass
|
||||
class RateLimitConfig:
|
||||
"""Конфигурация для rate limiting"""
|
||||
|
||||
messages_per_second: float = 0.5 # Максимум 0.5 сообщений в секунду на чат
|
||||
burst_limit: int = 3 # Максимум 3 сообщения подряд
|
||||
retry_after_multiplier: float = 1.2 # Множитель для увеличения задержки при retry
|
||||
@@ -23,23 +26,23 @@ class RateLimitConfig:
|
||||
|
||||
class ChatRateLimiter:
|
||||
"""Rate limiter для конкретного чата"""
|
||||
|
||||
|
||||
def __init__(self, config: RateLimitConfig):
|
||||
self.config = config
|
||||
self.last_send_time = 0.0
|
||||
self.burst_count = 0
|
||||
self.burst_reset_time = 0.0
|
||||
self.retry_delay = 1.0
|
||||
|
||||
|
||||
async def wait_if_needed(self) -> None:
|
||||
"""Ждет если необходимо для соблюдения rate limit"""
|
||||
current_time = time.time()
|
||||
|
||||
|
||||
# Сбрасываем счетчик burst если прошло достаточно времени
|
||||
if current_time >= self.burst_reset_time:
|
||||
self.burst_count = 0
|
||||
self.burst_reset_time = current_time + 1.0
|
||||
|
||||
|
||||
# Проверяем burst limit
|
||||
if self.burst_count >= self.config.burst_limit:
|
||||
wait_time = self.burst_reset_time - current_time
|
||||
@@ -49,16 +52,16 @@ class ChatRateLimiter:
|
||||
current_time = time.time()
|
||||
self.burst_count = 0
|
||||
self.burst_reset_time = current_time + 1.0
|
||||
|
||||
|
||||
# Проверяем минимальный интервал между сообщениями
|
||||
time_since_last = current_time - self.last_send_time
|
||||
min_interval = 1.0 / self.config.messages_per_second
|
||||
|
||||
|
||||
if time_since_last < min_interval:
|
||||
wait_time = min_interval - time_since_last
|
||||
logger.debug(f"Rate limiting: waiting {wait_time:.2f}s")
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
|
||||
# Обновляем время последней отправки
|
||||
self.last_send_time = time.time()
|
||||
self.burst_count += 1
|
||||
@@ -66,126 +69,126 @@ class ChatRateLimiter:
|
||||
|
||||
class GlobalRateLimiter:
|
||||
"""Глобальный rate limiter для всех чатов"""
|
||||
|
||||
|
||||
def __init__(self, config: RateLimitConfig):
|
||||
self.config = config
|
||||
self.chat_limiters: Dict[int, ChatRateLimiter] = {}
|
||||
self.global_last_send = 0.0
|
||||
self.global_min_interval = 0.1 # Минимум 100ms между любыми сообщениями
|
||||
|
||||
|
||||
def get_chat_limiter(self, chat_id: int) -> ChatRateLimiter:
|
||||
"""Получает rate limiter для конкретного чата"""
|
||||
if chat_id not in self.chat_limiters:
|
||||
self.chat_limiters[chat_id] = ChatRateLimiter(self.config)
|
||||
return self.chat_limiters[chat_id]
|
||||
|
||||
|
||||
async def wait_if_needed(self, chat_id: int) -> None:
|
||||
"""Ждет если необходимо для соблюдения глобального и чат-специфичного rate limit"""
|
||||
current_time = time.time()
|
||||
|
||||
|
||||
# Глобальный rate limit
|
||||
time_since_global = current_time - self.global_last_send
|
||||
if time_since_global < self.global_min_interval:
|
||||
wait_time = self.global_min_interval - time_since_global
|
||||
await asyncio.sleep(wait_time)
|
||||
current_time = time.time()
|
||||
|
||||
|
||||
# Чат-специфичный rate limit
|
||||
chat_limiter = self.get_chat_limiter(chat_id)
|
||||
await chat_limiter.wait_if_needed()
|
||||
|
||||
|
||||
self.global_last_send = time.time()
|
||||
|
||||
|
||||
class RetryHandler:
|
||||
"""Обработчик повторных попыток с экспоненциальной задержкой"""
|
||||
|
||||
|
||||
def __init__(self, config: RateLimitConfig):
|
||||
self.config = config
|
||||
|
||||
|
||||
async def execute_with_retry(
|
||||
self,
|
||||
func: Callable,
|
||||
chat_id: int,
|
||||
*args,
|
||||
max_retries: int = 3,
|
||||
**kwargs
|
||||
self, func: Callable, chat_id: int, *args, max_retries: int = 3, **kwargs
|
||||
) -> Any:
|
||||
"""Выполняет функцию с повторными попытками при ошибках"""
|
||||
retry_count = 0
|
||||
current_delay = self.config.retry_after_multiplier
|
||||
total_wait_time = 0.0
|
||||
|
||||
|
||||
while retry_count <= max_retries:
|
||||
try:
|
||||
result = await func(*args, **kwargs)
|
||||
# Записываем успешный запрос
|
||||
metrics.record_rate_limit_request(chat_id, True, total_wait_time)
|
||||
return result
|
||||
|
||||
|
||||
except TelegramRetryAfter as e:
|
||||
retry_count += 1
|
||||
if retry_count > max_retries:
|
||||
logger.error(f"Max retries exceeded for RetryAfter: {e}")
|
||||
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "RetryAfter")
|
||||
metrics.record_rate_limit_request(
|
||||
chat_id, False, total_wait_time, "RetryAfter"
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
# Используем время ожидания от Telegram или наше увеличенное
|
||||
wait_time = max(e.retry_after, current_delay)
|
||||
wait_time = min(wait_time, self.config.max_retry_delay)
|
||||
total_wait_time += wait_time
|
||||
|
||||
logger.warning(f"RetryAfter error, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries})")
|
||||
|
||||
logger.warning(
|
||||
f"RetryAfter error, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries})"
|
||||
)
|
||||
await asyncio.sleep(wait_time)
|
||||
current_delay *= self.config.retry_after_multiplier
|
||||
|
||||
|
||||
except TelegramAPIError as e:
|
||||
retry_count += 1
|
||||
if retry_count > max_retries:
|
||||
logger.error(f"Max retries exceeded for TelegramAPIError: {e}")
|
||||
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "TelegramAPIError")
|
||||
metrics.record_rate_limit_request(
|
||||
chat_id, False, total_wait_time, "TelegramAPIError"
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
wait_time = min(current_delay, self.config.max_retry_delay)
|
||||
total_wait_time += wait_time
|
||||
logger.warning(f"TelegramAPIError, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries}): {e}")
|
||||
logger.warning(
|
||||
f"TelegramAPIError, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries}): {e}"
|
||||
)
|
||||
await asyncio.sleep(wait_time)
|
||||
current_delay *= self.config.retry_after_multiplier
|
||||
|
||||
|
||||
except Exception as e:
|
||||
# Для других ошибок не делаем retry
|
||||
logger.error(f"Non-retryable error: {e}")
|
||||
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "Other")
|
||||
metrics.record_rate_limit_request(
|
||||
chat_id, False, total_wait_time, "Other"
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
class TelegramRateLimiter:
|
||||
"""Основной класс для rate limiting в Telegram боте"""
|
||||
|
||||
|
||||
def __init__(self, config: Optional[RateLimitConfig] = None):
|
||||
self.config = config or RateLimitConfig()
|
||||
self.global_limiter = GlobalRateLimiter(self.config)
|
||||
self.retry_handler = RetryHandler(self.config)
|
||||
|
||||
|
||||
async def send_with_rate_limit(
|
||||
self,
|
||||
send_func: Callable,
|
||||
chat_id: int,
|
||||
*args,
|
||||
**kwargs
|
||||
self, send_func: Callable, chat_id: int, *args, **kwargs
|
||||
) -> Any:
|
||||
"""Отправляет сообщение с соблюдением rate limit и retry логики"""
|
||||
|
||||
|
||||
async def _send():
|
||||
await self.global_limiter.wait_if_needed(chat_id)
|
||||
return await send_func(*args, **kwargs)
|
||||
|
||||
|
||||
return await self.retry_handler.execute_with_retry(_send, chat_id)
|
||||
|
||||
|
||||
# Глобальный экземпляр rate limiter
|
||||
from helper_bot.config.rate_limit_config import (RateLimitSettings,
|
||||
get_rate_limit_config)
|
||||
from helper_bot.config.rate_limit_config import RateLimitSettings, get_rate_limit_config
|
||||
|
||||
|
||||
def _create_rate_limit_config(settings: RateLimitSettings) -> RateLimitConfig:
|
||||
@@ -194,9 +197,10 @@ def _create_rate_limit_config(settings: RateLimitSettings) -> RateLimitConfig:
|
||||
messages_per_second=settings.messages_per_second,
|
||||
burst_limit=settings.burst_limit,
|
||||
retry_after_multiplier=settings.retry_after_multiplier,
|
||||
max_retry_delay=settings.max_retry_delay
|
||||
max_retry_delay=settings.max_retry_delay,
|
||||
)
|
||||
|
||||
|
||||
# Получаем конфигурацию из настроек
|
||||
_rate_limit_settings = get_rate_limit_config("production")
|
||||
_default_config = _create_rate_limit_config(_rate_limit_settings)
|
||||
@@ -204,16 +208,20 @@ _default_config = _create_rate_limit_config(_rate_limit_settings)
|
||||
telegram_rate_limiter = TelegramRateLimiter(_default_config)
|
||||
|
||||
|
||||
async def send_with_rate_limit(send_func: Callable, chat_id: int, *args, **kwargs) -> Any:
|
||||
async def send_with_rate_limit(
|
||||
send_func: Callable, chat_id: int, *args, **kwargs
|
||||
) -> Any:
|
||||
"""
|
||||
Удобная функция для отправки сообщений с rate limiting
|
||||
|
||||
|
||||
Args:
|
||||
send_func: Функция отправки (например, bot.send_message)
|
||||
chat_id: ID чата
|
||||
*args, **kwargs: Аргументы для функции отправки
|
||||
|
||||
|
||||
Returns:
|
||||
Результат выполнения функции отправки
|
||||
"""
|
||||
return await telegram_rate_limiter.send_with_rate_limit(send_func, chat_id, *args, **kwargs)
|
||||
return await telegram_rate_limiter.send_with_rate_limit(
|
||||
send_func, chat_id, *args, **kwargs
|
||||
)
|
||||
|
||||
@@ -1,114 +1,114 @@
|
||||
"""
|
||||
Сервис для работы с S3 хранилищем.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import aioboto3
|
||||
|
||||
from logs.custom_logger import logger
|
||||
|
||||
|
||||
class S3StorageService:
|
||||
"""Сервис для работы с S3 хранилищем."""
|
||||
|
||||
def __init__(self, endpoint_url: str, access_key: str, secret_key: str,
|
||||
bucket_name: str, region: str = "us-east-1"):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
endpoint_url: str,
|
||||
access_key: str,
|
||||
secret_key: str,
|
||||
bucket_name: str,
|
||||
region: str = "us-east-1",
|
||||
):
|
||||
self.endpoint_url = endpoint_url
|
||||
self.access_key = access_key
|
||||
self.secret_key = secret_key
|
||||
self.bucket_name = bucket_name
|
||||
self.region = region
|
||||
self.session = aioboto3.Session()
|
||||
|
||||
async def upload_file(self, file_path: str, s3_key: str,
|
||||
content_type: Optional[str] = None) -> bool:
|
||||
|
||||
async def upload_file(
|
||||
self, file_path: str, s3_key: str, content_type: Optional[str] = None
|
||||
) -> bool:
|
||||
"""Загружает файл в S3."""
|
||||
try:
|
||||
async with self.session.client(
|
||||
's3',
|
||||
"s3",
|
||||
endpoint_url=self.endpoint_url,
|
||||
aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key,
|
||||
region_name=self.region
|
||||
region_name=self.region,
|
||||
) as s3:
|
||||
extra_args = {}
|
||||
if content_type:
|
||||
extra_args['ContentType'] = content_type
|
||||
|
||||
extra_args["ContentType"] = content_type
|
||||
|
||||
await s3.upload_file(
|
||||
file_path,
|
||||
self.bucket_name,
|
||||
s3_key,
|
||||
ExtraArgs=extra_args
|
||||
file_path, self.bucket_name, s3_key, ExtraArgs=extra_args
|
||||
)
|
||||
logger.info(f"Файл загружен в S3: {s3_key}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка загрузки файла в S3 {s3_key}: {e}")
|
||||
return False
|
||||
|
||||
async def upload_fileobj(self, file_obj, s3_key: str,
|
||||
content_type: Optional[str] = None) -> bool:
|
||||
|
||||
async def upload_fileobj(
|
||||
self, file_obj, s3_key: str, content_type: Optional[str] = None
|
||||
) -> bool:
|
||||
"""Загружает файл из объекта в S3."""
|
||||
try:
|
||||
async with self.session.client(
|
||||
's3',
|
||||
"s3",
|
||||
endpoint_url=self.endpoint_url,
|
||||
aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key,
|
||||
region_name=self.region
|
||||
region_name=self.region,
|
||||
) as s3:
|
||||
extra_args = {}
|
||||
if content_type:
|
||||
extra_args['ContentType'] = content_type
|
||||
|
||||
extra_args["ContentType"] = content_type
|
||||
|
||||
await s3.upload_fileobj(
|
||||
file_obj,
|
||||
self.bucket_name,
|
||||
s3_key,
|
||||
ExtraArgs=extra_args
|
||||
file_obj, self.bucket_name, s3_key, ExtraArgs=extra_args
|
||||
)
|
||||
logger.info(f"Файл загружен в S3 из объекта: {s3_key}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка загрузки файла в S3 из объекта {s3_key}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def download_file(self, s3_key: str, local_path: str) -> bool:
|
||||
"""Скачивает файл из S3 на локальный диск."""
|
||||
try:
|
||||
async with self.session.client(
|
||||
's3',
|
||||
"s3",
|
||||
endpoint_url=self.endpoint_url,
|
||||
aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key,
|
||||
region_name=self.region
|
||||
region_name=self.region,
|
||||
) as s3:
|
||||
# Создаем директорию если её нет
|
||||
os.makedirs(os.path.dirname(local_path), exist_ok=True)
|
||||
|
||||
await s3.download_file(
|
||||
self.bucket_name,
|
||||
s3_key,
|
||||
local_path
|
||||
)
|
||||
|
||||
await s3.download_file(self.bucket_name, s3_key, local_path)
|
||||
logger.info(f"Файл скачан из S3: {s3_key} -> {local_path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка скачивания файла из S3 {s3_key}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
async def download_to_temp(self, s3_key: str) -> Optional[str]:
|
||||
"""Скачивает файл из S3 во временный файл. Возвращает путь к временному файлу."""
|
||||
try:
|
||||
# Определяем расширение из ключа
|
||||
ext = Path(s3_key).suffix or '.bin'
|
||||
ext = Path(s3_key).suffix or ".bin"
|
||||
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
|
||||
temp_path = temp_file.name
|
||||
temp_file.close()
|
||||
|
||||
|
||||
success = await self.download_file(s3_key, temp_path)
|
||||
if success:
|
||||
return temp_path
|
||||
@@ -120,33 +120,35 @@ class S3StorageService:
|
||||
pass
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка скачивания файла из S3 во временный файл {s3_key}: {e}")
|
||||
logger.error(
|
||||
f"Ошибка скачивания файла из S3 во временный файл {s3_key}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
async def file_exists(self, s3_key: str) -> bool:
|
||||
"""Проверяет существование файла в S3."""
|
||||
try:
|
||||
async with self.session.client(
|
||||
's3',
|
||||
"s3",
|
||||
endpoint_url=self.endpoint_url,
|
||||
aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key,
|
||||
region_name=self.region
|
||||
region_name=self.region,
|
||||
) as s3:
|
||||
await s3.head_object(Bucket=self.bucket_name, Key=s3_key)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
async def delete_file(self, s3_key: str) -> bool:
|
||||
"""Удаляет файл из S3."""
|
||||
try:
|
||||
async with self.session.client(
|
||||
's3',
|
||||
"s3",
|
||||
endpoint_url=self.endpoint_url,
|
||||
aws_access_key_id=self.access_key,
|
||||
aws_secret_access_key=self.secret_key,
|
||||
region_name=self.region
|
||||
region_name=self.region,
|
||||
) as s3:
|
||||
await s3.delete_object(Bucket=self.bucket_name, Key=s3_key)
|
||||
logger.info(f"Файл удален из S3: {s3_key}")
|
||||
@@ -154,23 +156,35 @@ class S3StorageService:
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка удаления файла из S3 {s3_key}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def generate_s3_key(self, content_type: str, file_id: str) -> str:
|
||||
"""Генерирует S3 ключ для файла. Один и тот же для всех постов с этим file_id."""
|
||||
type_folders = {
|
||||
'photo': 'photos',
|
||||
'video': 'videos',
|
||||
'audio': 'music',
|
||||
'voice': 'voice',
|
||||
'video_note': 'video_notes'
|
||||
"photo": "photos",
|
||||
"video": "videos",
|
||||
"audio": "music",
|
||||
"voice": "voice",
|
||||
"video_note": "video_notes",
|
||||
}
|
||||
|
||||
folder = type_folders.get(content_type, 'other')
|
||||
|
||||
folder = type_folders.get(content_type, "other")
|
||||
# Определяем расширение из file_id или используем дефолтное
|
||||
ext = '.jpg' if content_type == 'photo' else \
|
||||
'.mp4' if content_type == 'video' else \
|
||||
'.mp3' if content_type == 'audio' else \
|
||||
'.ogg' if content_type == 'voice' else \
|
||||
'.mp4' if content_type == 'video_note' else '.bin'
|
||||
|
||||
ext = (
|
||||
".jpg"
|
||||
if content_type == "photo"
|
||||
else (
|
||||
".mp4"
|
||||
if content_type == "video"
|
||||
else (
|
||||
".mp3"
|
||||
if content_type == "audio"
|
||||
else (
|
||||
".ogg"
|
||||
if content_type == "voice"
|
||||
else ".mp4" if content_type == "video_note" else ".bin"
|
||||
)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
return f"{folder}/{file_id}{ext}"
|
||||
|
||||
Reference in New Issue
Block a user