220 lines
9.1 KiB
Python
220 lines
9.1 KiB
Python
"""
|
||
Rate limiter для предотвращения Flood control ошибок в Telegram Bot API
|
||
"""
|
||
import asyncio
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import Any, Callable, Dict, Optional
|
||
|
||
from aiogram.exceptions import TelegramAPIError, TelegramRetryAfter
|
||
from logs.custom_logger import logger
|
||
|
||
from .metrics import metrics
|
||
|
||
|
||
@dataclass
|
||
class RateLimitConfig:
|
||
"""Конфигурация для rate limiting"""
|
||
messages_per_second: float = 0.5 # Максимум 0.5 сообщений в секунду на чат
|
||
burst_limit: int = 3 # Максимум 3 сообщения подряд
|
||
retry_after_multiplier: float = 1.2 # Множитель для увеличения задержки при retry
|
||
max_retry_delay: float = 60.0 # Максимальная задержка между попытками
|
||
|
||
|
||
class ChatRateLimiter:
|
||
"""Rate limiter для конкретного чата"""
|
||
|
||
def __init__(self, config: RateLimitConfig):
|
||
self.config = config
|
||
self.last_send_time = 0.0
|
||
self.burst_count = 0
|
||
self.burst_reset_time = 0.0
|
||
self.retry_delay = 1.0
|
||
|
||
async def wait_if_needed(self) -> None:
|
||
"""Ждет если необходимо для соблюдения rate limit"""
|
||
current_time = time.time()
|
||
|
||
# Сбрасываем счетчик burst если прошло достаточно времени
|
||
if current_time >= self.burst_reset_time:
|
||
self.burst_count = 0
|
||
self.burst_reset_time = current_time + 1.0
|
||
|
||
# Проверяем burst limit
|
||
if self.burst_count >= self.config.burst_limit:
|
||
wait_time = self.burst_reset_time - current_time
|
||
if wait_time > 0:
|
||
logger.info(f"Burst limit reached, waiting {wait_time:.2f}s")
|
||
await asyncio.sleep(wait_time)
|
||
current_time = time.time()
|
||
self.burst_count = 0
|
||
self.burst_reset_time = current_time + 1.0
|
||
|
||
# Проверяем минимальный интервал между сообщениями
|
||
time_since_last = current_time - self.last_send_time
|
||
min_interval = 1.0 / self.config.messages_per_second
|
||
|
||
if time_since_last < min_interval:
|
||
wait_time = min_interval - time_since_last
|
||
logger.debug(f"Rate limiting: waiting {wait_time:.2f}s")
|
||
await asyncio.sleep(wait_time)
|
||
|
||
# Обновляем время последней отправки
|
||
self.last_send_time = time.time()
|
||
self.burst_count += 1
|
||
|
||
|
||
class GlobalRateLimiter:
|
||
"""Глобальный rate limiter для всех чатов"""
|
||
|
||
def __init__(self, config: RateLimitConfig):
|
||
self.config = config
|
||
self.chat_limiters: Dict[int, ChatRateLimiter] = {}
|
||
self.global_last_send = 0.0
|
||
self.global_min_interval = 0.1 # Минимум 100ms между любыми сообщениями
|
||
|
||
def get_chat_limiter(self, chat_id: int) -> ChatRateLimiter:
|
||
"""Получает rate limiter для конкретного чата"""
|
||
if chat_id not in self.chat_limiters:
|
||
self.chat_limiters[chat_id] = ChatRateLimiter(self.config)
|
||
return self.chat_limiters[chat_id]
|
||
|
||
async def wait_if_needed(self, chat_id: int) -> None:
|
||
"""Ждет если необходимо для соблюдения глобального и чат-специфичного rate limit"""
|
||
current_time = time.time()
|
||
|
||
# Глобальный rate limit
|
||
time_since_global = current_time - self.global_last_send
|
||
if time_since_global < self.global_min_interval:
|
||
wait_time = self.global_min_interval - time_since_global
|
||
await asyncio.sleep(wait_time)
|
||
current_time = time.time()
|
||
|
||
# Чат-специфичный rate limit
|
||
chat_limiter = self.get_chat_limiter(chat_id)
|
||
await chat_limiter.wait_if_needed()
|
||
|
||
self.global_last_send = time.time()
|
||
|
||
|
||
class RetryHandler:
|
||
"""Обработчик повторных попыток с экспоненциальной задержкой"""
|
||
|
||
def __init__(self, config: RateLimitConfig):
|
||
self.config = config
|
||
|
||
async def execute_with_retry(
|
||
self,
|
||
func: Callable,
|
||
chat_id: int,
|
||
*args,
|
||
max_retries: int = 3,
|
||
**kwargs
|
||
) -> Any:
|
||
"""Выполняет функцию с повторными попытками при ошибках"""
|
||
retry_count = 0
|
||
current_delay = self.config.retry_after_multiplier
|
||
total_wait_time = 0.0
|
||
|
||
while retry_count <= max_retries:
|
||
try:
|
||
result = await func(*args, **kwargs)
|
||
# Записываем успешный запрос
|
||
metrics.record_rate_limit_request(chat_id, True, total_wait_time)
|
||
return result
|
||
|
||
except TelegramRetryAfter as e:
|
||
retry_count += 1
|
||
if retry_count > max_retries:
|
||
logger.error(f"Max retries exceeded for RetryAfter: {e}")
|
||
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "RetryAfter")
|
||
raise
|
||
|
||
# Используем время ожидания от Telegram или наше увеличенное
|
||
wait_time = max(e.retry_after, current_delay)
|
||
wait_time = min(wait_time, self.config.max_retry_delay)
|
||
total_wait_time += wait_time
|
||
|
||
logger.warning(f"RetryAfter error, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries})")
|
||
await asyncio.sleep(wait_time)
|
||
current_delay *= self.config.retry_after_multiplier
|
||
|
||
except TelegramAPIError as e:
|
||
retry_count += 1
|
||
if retry_count > max_retries:
|
||
logger.error(f"Max retries exceeded for TelegramAPIError: {e}")
|
||
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "TelegramAPIError")
|
||
raise
|
||
|
||
wait_time = min(current_delay, self.config.max_retry_delay)
|
||
total_wait_time += wait_time
|
||
logger.warning(f"TelegramAPIError, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries}): {e}")
|
||
await asyncio.sleep(wait_time)
|
||
current_delay *= self.config.retry_after_multiplier
|
||
|
||
except Exception as e:
|
||
# Для других ошибок не делаем retry
|
||
logger.error(f"Non-retryable error: {e}")
|
||
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "Other")
|
||
raise
|
||
|
||
|
||
class TelegramRateLimiter:
|
||
"""Основной класс для rate limiting в Telegram боте"""
|
||
|
||
def __init__(self, config: Optional[RateLimitConfig] = None):
|
||
self.config = config or RateLimitConfig()
|
||
self.global_limiter = GlobalRateLimiter(self.config)
|
||
self.retry_handler = RetryHandler(self.config)
|
||
|
||
async def send_with_rate_limit(
|
||
self,
|
||
send_func: Callable,
|
||
chat_id: int,
|
||
*args,
|
||
**kwargs
|
||
) -> Any:
|
||
"""Отправляет сообщение с соблюдением rate limit и retry логики"""
|
||
|
||
async def _send():
|
||
await self.global_limiter.wait_if_needed(chat_id)
|
||
return await send_func(*args, **kwargs)
|
||
|
||
return await self.retry_handler.execute_with_retry(_send, chat_id)
|
||
|
||
|
||
# Глобальный экземпляр rate limiter
|
||
from helper_bot.config.rate_limit_config import (RateLimitSettings,
|
||
get_rate_limit_config)
|
||
|
||
|
||
def _create_rate_limit_config(settings: RateLimitSettings) -> RateLimitConfig:
|
||
"""Создает RateLimitConfig из RateLimitSettings"""
|
||
return RateLimitConfig(
|
||
messages_per_second=settings.messages_per_second,
|
||
burst_limit=settings.burst_limit,
|
||
retry_after_multiplier=settings.retry_after_multiplier,
|
||
max_retry_delay=settings.max_retry_delay
|
||
)
|
||
|
||
# Получаем конфигурацию из настроек
|
||
_rate_limit_settings = get_rate_limit_config("production")
|
||
_default_config = _create_rate_limit_config(_rate_limit_settings)
|
||
|
||
telegram_rate_limiter = TelegramRateLimiter(_default_config)
|
||
|
||
|
||
async def send_with_rate_limit(send_func: Callable, chat_id: int, *args, **kwargs) -> Any:
|
||
"""
|
||
Удобная функция для отправки сообщений с rate limiting
|
||
|
||
Args:
|
||
send_func: Функция отправки (например, bot.send_message)
|
||
chat_id: ID чата
|
||
*args, **kwargs: Аргументы для функции отправки
|
||
|
||
Returns:
|
||
Результат выполнения функции отправки
|
||
"""
|
||
return await telegram_rate_limiter.send_with_rate_limit(send_func, chat_id, *args, **kwargs)
|