""" Rate limiter для предотвращения Flood control ошибок в Telegram Bot API """ import asyncio import time from dataclasses import dataclass from typing import Any, Callable, Dict, Optional from aiogram.exceptions import TelegramAPIError, TelegramRetryAfter from logs.custom_logger import logger from .metrics import metrics @dataclass class RateLimitConfig: """Конфигурация для rate limiting""" messages_per_second: float = 0.5 # Максимум 0.5 сообщений в секунду на чат burst_limit: int = 3 # Максимум 3 сообщения подряд retry_after_multiplier: float = 1.2 # Множитель для увеличения задержки при retry max_retry_delay: float = 60.0 # Максимальная задержка между попытками class ChatRateLimiter: """Rate limiter для конкретного чата""" def __init__(self, config: RateLimitConfig): self.config = config self.last_send_time = 0.0 self.burst_count = 0 self.burst_reset_time = 0.0 self.retry_delay = 1.0 async def wait_if_needed(self) -> None: """Ждет если необходимо для соблюдения rate limit""" current_time = time.time() # Сбрасываем счетчик burst если прошло достаточно времени if current_time >= self.burst_reset_time: self.burst_count = 0 self.burst_reset_time = current_time + 1.0 # Проверяем burst limit if self.burst_count >= self.config.burst_limit: wait_time = self.burst_reset_time - current_time if wait_time > 0: logger.info(f"Burst limit reached, waiting {wait_time:.2f}s") await asyncio.sleep(wait_time) current_time = time.time() self.burst_count = 0 self.burst_reset_time = current_time + 1.0 # Проверяем минимальный интервал между сообщениями time_since_last = current_time - self.last_send_time min_interval = 1.0 / self.config.messages_per_second if time_since_last < min_interval: wait_time = min_interval - time_since_last logger.debug(f"Rate limiting: waiting {wait_time:.2f}s") await asyncio.sleep(wait_time) # Обновляем время последней отправки self.last_send_time = time.time() self.burst_count += 1 class GlobalRateLimiter: """Глобальный rate limiter для всех чатов""" def __init__(self, config: RateLimitConfig): self.config = config self.chat_limiters: Dict[int, ChatRateLimiter] = {} self.global_last_send = 0.0 self.global_min_interval = 0.1 # Минимум 100ms между любыми сообщениями def get_chat_limiter(self, chat_id: int) -> ChatRateLimiter: """Получает rate limiter для конкретного чата""" if chat_id not in self.chat_limiters: self.chat_limiters[chat_id] = ChatRateLimiter(self.config) return self.chat_limiters[chat_id] async def wait_if_needed(self, chat_id: int) -> None: """Ждет если необходимо для соблюдения глобального и чат-специфичного rate limit""" current_time = time.time() # Глобальный rate limit time_since_global = current_time - self.global_last_send if time_since_global < self.global_min_interval: wait_time = self.global_min_interval - time_since_global await asyncio.sleep(wait_time) current_time = time.time() # Чат-специфичный rate limit chat_limiter = self.get_chat_limiter(chat_id) await chat_limiter.wait_if_needed() self.global_last_send = time.time() class RetryHandler: """Обработчик повторных попыток с экспоненциальной задержкой""" def __init__(self, config: RateLimitConfig): self.config = config async def execute_with_retry( self, func: Callable, chat_id: int, *args, max_retries: int = 3, **kwargs ) -> Any: """Выполняет функцию с повторными попытками при ошибках""" retry_count = 0 current_delay = self.config.retry_after_multiplier total_wait_time = 0.0 while retry_count <= max_retries: try: result = await func(*args, **kwargs) # Записываем успешный запрос metrics.record_rate_limit_request(chat_id, True, total_wait_time) return result except TelegramRetryAfter as e: retry_count += 1 if retry_count > max_retries: logger.error(f"Max retries exceeded for RetryAfter: {e}") metrics.record_rate_limit_request( chat_id, False, total_wait_time, "RetryAfter" ) raise # Используем время ожидания от Telegram или наше увеличенное wait_time = max(e.retry_after, current_delay) wait_time = min(wait_time, self.config.max_retry_delay) total_wait_time += wait_time logger.warning( f"RetryAfter error, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries})" ) await asyncio.sleep(wait_time) current_delay *= self.config.retry_after_multiplier except TelegramAPIError as e: retry_count += 1 if retry_count > max_retries: logger.error(f"Max retries exceeded for TelegramAPIError: {e}") metrics.record_rate_limit_request( chat_id, False, total_wait_time, "TelegramAPIError" ) raise wait_time = min(current_delay, self.config.max_retry_delay) total_wait_time += wait_time logger.warning( f"TelegramAPIError, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries}): {e}" ) await asyncio.sleep(wait_time) current_delay *= self.config.retry_after_multiplier except Exception as e: # Для других ошибок не делаем retry logger.error(f"Non-retryable error: {e}") metrics.record_rate_limit_request( chat_id, False, total_wait_time, "Other" ) raise class TelegramRateLimiter: """Основной класс для rate limiting в Telegram боте""" def __init__(self, config: Optional[RateLimitConfig] = None): self.config = config or RateLimitConfig() self.global_limiter = GlobalRateLimiter(self.config) self.retry_handler = RetryHandler(self.config) async def send_with_rate_limit( self, send_func: Callable, chat_id: int, *args, **kwargs ) -> Any: """Отправляет сообщение с соблюдением rate limit и retry логики""" async def _send(): await self.global_limiter.wait_if_needed(chat_id) return await send_func(*args, **kwargs) return await self.retry_handler.execute_with_retry(_send, chat_id) # Глобальный экземпляр rate limiter from helper_bot.config.rate_limit_config import RateLimitSettings, get_rate_limit_config def _create_rate_limit_config(settings: RateLimitSettings) -> RateLimitConfig: """Создает RateLimitConfig из RateLimitSettings""" return RateLimitConfig( messages_per_second=settings.messages_per_second, burst_limit=settings.burst_limit, retry_after_multiplier=settings.retry_after_multiplier, max_retry_delay=settings.max_retry_delay, ) # Получаем конфигурацию из настроек _rate_limit_settings = get_rate_limit_config("production") _default_config = _create_rate_limit_config(_rate_limit_settings) telegram_rate_limiter = TelegramRateLimiter(_default_config) async def send_with_rate_limit( send_func: Callable, chat_id: int, *args, **kwargs ) -> Any: """ Удобная функция для отправки сообщений с rate limiting Args: send_func: Функция отправки (например, bot.send_message) chat_id: ID чата *args, **kwargs: Аргументы для функции отправки Returns: Результат выполнения функции отправки """ return await telegram_rate_limiter.send_with_rate_limit( send_func, chat_id, *args, **kwargs )