143 lines
6.1 KiB
Python
143 lines
6.1 KiB
Python
"""
|
||
Сервис для управления rate limiting в AnonBot
|
||
"""
|
||
from typing import Any, Callable, Dict, Optional
|
||
|
||
from aiogram.exceptions import TelegramAPIError, TelegramRetryAfter
|
||
|
||
from config.constants import MIN_REQUESTS_FOR_ADAPTATION, HIGH_ERROR_RATE_THRESHOLD, LOW_ERROR_RATE_THRESHOLD
|
||
from services.infrastructure.logger import get_logger
|
||
from .rate_limit_config import RateLimitSettings, get_adaptive_config, get_rate_limit_config
|
||
from .rate_limiter import send_with_rate_limit, telegram_rate_limiter
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class RateLimitService:
|
||
"""Сервис для управления rate limiting"""
|
||
|
||
def __init__(self):
|
||
self.rate_limiter = telegram_rate_limiter
|
||
self.config = get_rate_limit_config()
|
||
self.stats = {
|
||
'total_requests': 0,
|
||
'successful_requests': 0,
|
||
'failed_requests': 0,
|
||
'retry_after_errors': 0,
|
||
'other_errors': 0,
|
||
'total_wait_time': 0.0
|
||
}
|
||
|
||
async def send_with_rate_limit(
|
||
self,
|
||
send_func: Callable,
|
||
chat_id: int,
|
||
*args,
|
||
**kwargs
|
||
) -> Any:
|
||
"""
|
||
Отправляет сообщение с соблюдением rate limit
|
||
|
||
Args:
|
||
send_func: Функция отправки (например, bot.send_message)
|
||
chat_id: ID чата
|
||
*args, **kwargs: Аргументы для функции отправки
|
||
|
||
Returns:
|
||
Результат выполнения функции отправки
|
||
"""
|
||
self.stats['total_requests'] += 1
|
||
logger.info(f"Обработка rate limit запроса для чата {chat_id}")
|
||
|
||
try:
|
||
result, wait_time = await self.rate_limiter.send_with_rate_limit(send_func, chat_id, *args, **kwargs)
|
||
self.stats['successful_requests'] += 1
|
||
self.stats['total_wait_time'] += wait_time
|
||
logger.info(f"Rate limited сообщение успешно отправлено в чат {chat_id}, время ожидания: {wait_time:.2f}с")
|
||
return result
|
||
|
||
except TelegramRetryAfter as e:
|
||
self.stats['failed_requests'] += 1
|
||
self.stats['retry_after_errors'] += 1
|
||
logger.warning(f"Превышен rate limit для чата {chat_id}: {e}")
|
||
raise
|
||
|
||
except TelegramAPIError as e:
|
||
self.stats['failed_requests'] += 1
|
||
self.stats['other_errors'] += 1
|
||
logger.error(f"Ошибка Telegram API для чата {chat_id}: {e}")
|
||
raise
|
||
|
||
except Exception as e:
|
||
self.stats['failed_requests'] += 1
|
||
self.stats['other_errors'] += 1
|
||
logger.error(f"Неожиданная ошибка в rate limit сервисе для чата {chat_id}: {e}")
|
||
raise
|
||
|
||
def get_stats(self) -> Dict[str, Any]:
|
||
"""Получает статистику rate limiting"""
|
||
total = self.stats['total_requests']
|
||
if total == 0:
|
||
return {
|
||
'total_requests': 0,
|
||
'successful_requests': 0,
|
||
'failed_requests': 0,
|
||
'success_rate': 0.0,
|
||
'error_rate': 0.0,
|
||
'retry_after_errors': 0,
|
||
'other_errors': 0,
|
||
'retry_after_rate': 0.0,
|
||
'other_error_rate': 0.0,
|
||
'average_wait_time': 0.0
|
||
}
|
||
|
||
return {
|
||
'total_requests': total,
|
||
'successful_requests': self.stats['successful_requests'],
|
||
'failed_requests': self.stats['failed_requests'],
|
||
'success_rate': self.stats['successful_requests'] / total,
|
||
'error_rate': self.stats['failed_requests'] / total,
|
||
'retry_after_errors': self.stats['retry_after_errors'],
|
||
'other_errors': self.stats['other_errors'],
|
||
'retry_after_rate': self.stats['retry_after_errors'] / total,
|
||
'other_error_rate': self.stats['other_errors'] / total,
|
||
'average_wait_time': self.stats['total_wait_time'] / total if total > 0 else 0.0
|
||
}
|
||
|
||
def reset_stats(self):
|
||
"""Сбрасывает статистику"""
|
||
self.stats = {
|
||
'total_requests': 0,
|
||
'successful_requests': 0,
|
||
'failed_requests': 0,
|
||
'retry_after_errors': 0,
|
||
'other_errors': 0,
|
||
'total_wait_time': 0.0
|
||
}
|
||
logger.info("Статистика rate limit сброшена")
|
||
|
||
def update_config(self, new_config: RateLimitSettings):
|
||
"""Обновляет конфигурацию rate limiting"""
|
||
self.config = new_config
|
||
logger.info(f"Конфигурация rate limit обновлена: {new_config}")
|
||
|
||
def get_adaptive_config(self) -> RateLimitSettings:
|
||
"""Получает адаптивную конфигурацию на основе текущей статистики"""
|
||
error_rate = self.stats['failed_requests'] / max(1, self.stats['total_requests'])
|
||
return get_adaptive_config(error_rate, self.config)
|
||
|
||
def should_adapt_config(self) -> bool:
|
||
"""Определяет, нужно ли адаптировать конфигурацию"""
|
||
if self.stats['total_requests'] < MIN_REQUESTS_FOR_ADAPTATION: # Недостаточно данных
|
||
return False
|
||
|
||
error_rate = self.stats['failed_requests'] / self.stats['total_requests']
|
||
return error_rate > HIGH_ERROR_RATE_THRESHOLD or error_rate < LOW_ERROR_RATE_THRESHOLD # Высокий или низкий уровень ошибок
|
||
|
||
async def adapt_config_if_needed(self):
|
||
"""Адаптирует конфигурацию если необходимо"""
|
||
if self.should_adapt_config():
|
||
new_config = self.get_adaptive_config()
|
||
self.update_config(new_config)
|
||
logger.info("Конфигурация rate limit адаптирована на основе текущей производительности")
|