Files
AnonBot/services/rate_limiting/rate_limit_service.py

143 lines
6.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Сервис для управления rate limiting в AnonBot
"""
from typing import Any, Callable, Dict, Optional
from aiogram.exceptions import TelegramAPIError, TelegramRetryAfter
from config.constants import MIN_REQUESTS_FOR_ADAPTATION, HIGH_ERROR_RATE_THRESHOLD, LOW_ERROR_RATE_THRESHOLD
from services.infrastructure.logger import get_logger
from .rate_limit_config import RateLimitSettings, get_adaptive_config, get_rate_limit_config
from .rate_limiter import send_with_rate_limit, telegram_rate_limiter
logger = get_logger(__name__)
class RateLimitService:
"""Сервис для управления rate limiting"""
def __init__(self):
self.rate_limiter = telegram_rate_limiter
self.config = get_rate_limit_config()
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'retry_after_errors': 0,
'other_errors': 0,
'total_wait_time': 0.0
}
async def send_with_rate_limit(
self,
send_func: Callable,
chat_id: int,
*args,
**kwargs
) -> Any:
"""
Отправляет сообщение с соблюдением rate limit
Args:
send_func: Функция отправки (например, bot.send_message)
chat_id: ID чата
*args, **kwargs: Аргументы для функции отправки
Returns:
Результат выполнения функции отправки
"""
self.stats['total_requests'] += 1
logger.info(f"Обработка rate limit запроса для чата {chat_id}")
try:
result, wait_time = await self.rate_limiter.send_with_rate_limit(send_func, chat_id, *args, **kwargs)
self.stats['successful_requests'] += 1
self.stats['total_wait_time'] += wait_time
logger.info(f"Rate limited сообщение успешно отправлено в чат {chat_id}, время ожидания: {wait_time:.2f}с")
return result
except TelegramRetryAfter as e:
self.stats['failed_requests'] += 1
self.stats['retry_after_errors'] += 1
logger.warning(f"Превышен rate limit для чата {chat_id}: {e}")
raise
except TelegramAPIError as e:
self.stats['failed_requests'] += 1
self.stats['other_errors'] += 1
logger.error(f"Ошибка Telegram API для чата {chat_id}: {e}")
raise
except Exception as e:
self.stats['failed_requests'] += 1
self.stats['other_errors'] += 1
logger.error(f"Неожиданная ошибка в rate limit сервисе для чата {chat_id}: {e}")
raise
def get_stats(self) -> Dict[str, Any]:
"""Получает статистику rate limiting"""
total = self.stats['total_requests']
if total == 0:
return {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'success_rate': 0.0,
'error_rate': 0.0,
'retry_after_errors': 0,
'other_errors': 0,
'retry_after_rate': 0.0,
'other_error_rate': 0.0,
'average_wait_time': 0.0
}
return {
'total_requests': total,
'successful_requests': self.stats['successful_requests'],
'failed_requests': self.stats['failed_requests'],
'success_rate': self.stats['successful_requests'] / total,
'error_rate': self.stats['failed_requests'] / total,
'retry_after_errors': self.stats['retry_after_errors'],
'other_errors': self.stats['other_errors'],
'retry_after_rate': self.stats['retry_after_errors'] / total,
'other_error_rate': self.stats['other_errors'] / total,
'average_wait_time': self.stats['total_wait_time'] / total if total > 0 else 0.0
}
def reset_stats(self):
"""Сбрасывает статистику"""
self.stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'retry_after_errors': 0,
'other_errors': 0,
'total_wait_time': 0.0
}
logger.info("Статистика rate limit сброшена")
def update_config(self, new_config: RateLimitSettings):
"""Обновляет конфигурацию rate limiting"""
self.config = new_config
logger.info(f"Конфигурация rate limit обновлена: {new_config}")
def get_adaptive_config(self) -> RateLimitSettings:
"""Получает адаптивную конфигурацию на основе текущей статистики"""
error_rate = self.stats['failed_requests'] / max(1, self.stats['total_requests'])
return get_adaptive_config(error_rate, self.config)
def should_adapt_config(self) -> bool:
"""Определяет, нужно ли адаптировать конфигурацию"""
if self.stats['total_requests'] < MIN_REQUESTS_FOR_ADAPTATION: # Недостаточно данных
return False
error_rate = self.stats['failed_requests'] / self.stats['total_requests']
return error_rate > HIGH_ERROR_RATE_THRESHOLD or error_rate < LOW_ERROR_RATE_THRESHOLD # Высокий или низкий уровень ошибок
async def adapt_config_if_needed(self):
"""Адаптирует конфигурацию если необходимо"""
if self.should_adapt_config():
new_config = self.get_adaptive_config()
self.update_config(new_config)
logger.info("Конфигурация rate limit адаптирована на основе текущей производительности")