Files
telegram-helper-bot/tests/test_rate_limiter.py
Andrey 5f6882d348 Implement audio record management features in AsyncBotDB and AudioRepository
- Added methods to delete audio moderation records and retrieve all audio records in async_db.py.
- Enhanced AudioRepository with functionality to delete audio records by file name and retrieve all audio message records.
- Improved logging for audio record operations to enhance monitoring and debugging capabilities.
- Updated related handlers to ensure proper integration of new audio management features.
2025-09-05 01:31:50 +03:00

311 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Тесты для rate limiter
"""
import asyncio
import time
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from helper_bot.utils.rate_limiter import (
RateLimitConfig,
ChatRateLimiter,
GlobalRateLimiter,
RetryHandler,
TelegramRateLimiter,
send_with_rate_limit
)
from helper_bot.utils.rate_limit_monitor import RateLimitMonitor, RateLimitStats, record_rate_limit_request
from helper_bot.config.rate_limit_config import RateLimitSettings, get_rate_limit_config
class TestRateLimitConfig:
"""Тесты для RateLimitConfig"""
def test_default_config(self):
"""Тест создания конфигурации по умолчанию"""
config = RateLimitConfig()
assert config.messages_per_second == 0.5
assert config.burst_limit == 3
assert config.retry_after_multiplier == 1.2
assert config.max_retry_delay == 60.0
class TestChatRateLimiter:
"""Тесты для ChatRateLimiter"""
def test_initialization(self):
"""Тест инициализации"""
config = RateLimitConfig(messages_per_second=1.0, burst_limit=2)
limiter = ChatRateLimiter(config)
assert limiter.config == config
assert limiter.last_send_time == 0.0
assert limiter.burst_count == 0
assert limiter.retry_delay == 1.0
@pytest.mark.asyncio
async def test_wait_if_needed_no_wait(self):
"""Тест что не ждет если не нужно"""
config = RateLimitConfig(messages_per_second=10.0, burst_limit=10)
limiter = ChatRateLimiter(config)
start_time = time.time()
await limiter.wait_if_needed()
end_time = time.time()
# Должно пройти очень быстро
assert end_time - start_time < 0.1
@pytest.mark.asyncio
async def test_wait_if_needed_with_wait(self):
"""Тест что ждет если нужно"""
config = RateLimitConfig(messages_per_second=0.5, burst_limit=10) # 1 сообщение в 2 секунды
limiter = ChatRateLimiter(config)
# Первый вызов не должен ждать
start_time = time.time()
await limiter.wait_if_needed()
first_call_time = time.time() - start_time
# Второй вызов должен ждать
start_time = time.time()
await limiter.wait_if_needed()
second_call_time = time.time() - start_time
assert first_call_time < 0.1
assert second_call_time >= 1.8 # Должно ждать около 2 секунд
@pytest.mark.asyncio
async def test_burst_limit(self):
"""Тест ограничения burst"""
config = RateLimitConfig(messages_per_second=10.0, burst_limit=2)
limiter = ChatRateLimiter(config)
# Первые два вызова не должны ждать
start_time = time.time()
await limiter.wait_if_needed()
await limiter.wait_if_needed()
first_two_calls_time = time.time() - start_time
# Третий вызов должен ждать
start_time = time.time()
await limiter.wait_if_needed()
third_call_time = time.time() - start_time
assert first_two_calls_time < 0.2 # Более мягкое ограничение
assert third_call_time >= 0.8 # Должно ждать около 1 секунды (с учетом погрешности)
class TestGlobalRateLimiter:
"""Тесты для GlobalRateLimiter"""
def test_initialization(self):
"""Тест инициализации"""
config = RateLimitConfig()
limiter = GlobalRateLimiter(config)
assert limiter.config == config
assert limiter.chat_limiters == {}
assert limiter.global_last_send == 0.0
def test_get_chat_limiter(self):
"""Тест получения limiter для чата"""
config = RateLimitConfig()
limiter = GlobalRateLimiter(config)
chat_limiter = limiter.get_chat_limiter(123)
assert isinstance(chat_limiter, ChatRateLimiter)
assert limiter.chat_limiters[123] == chat_limiter
# Повторный вызов должен вернуть тот же объект
same_limiter = limiter.get_chat_limiter(123)
assert same_limiter is chat_limiter
class TestRetryHandler:
"""Тесты для RetryHandler"""
def test_initialization(self):
"""Тест инициализации"""
config = RateLimitConfig()
handler = RetryHandler(config)
assert handler.config == config
@pytest.mark.asyncio
async def test_execute_with_retry_success(self):
"""Тест успешного выполнения без retry"""
config = RateLimitConfig()
handler = RetryHandler(config)
mock_func = AsyncMock(return_value="success")
result = await handler.execute_with_retry(mock_func, 123)
assert result == "success"
mock_func.assert_called_once()
@pytest.mark.asyncio
async def test_execute_with_retry_retry_after(self):
"""Тест retry после RetryAfter ошибки"""
from aiogram.exceptions import TelegramRetryAfter
config = RateLimitConfig(retry_after_multiplier=1.0, max_retry_delay=1.0)
handler = RetryHandler(config)
mock_func = AsyncMock()
# Создаем мок для TelegramRetryAfter
from unittest.mock import MagicMock
retry_after_error = TelegramRetryAfter(
method=MagicMock(),
message="Flood control exceeded",
retry_after=1 # 1 секунда
)
mock_func.side_effect = [
retry_after_error, # Первый вызов - ошибка
"success" # Второй вызов - успех
]
start_time = time.time()
result = await handler.execute_with_retry(mock_func, 123, max_retries=1)
end_time = time.time()
assert result == "success"
assert mock_func.call_count == 2
assert end_time - start_time >= 0.1 # Должно ждать
class TestTelegramRateLimiter:
"""Тесты для TelegramRateLimiter"""
def test_initialization(self):
"""Тест инициализации"""
config = RateLimitConfig()
limiter = TelegramRateLimiter(config)
assert limiter.config == config
assert isinstance(limiter.global_limiter, GlobalRateLimiter)
assert isinstance(limiter.retry_handler, RetryHandler)
@pytest.mark.asyncio
async def test_send_with_rate_limit(self):
"""Тест отправки с rate limiting"""
config = RateLimitConfig(messages_per_second=10.0, burst_limit=10)
limiter = TelegramRateLimiter(config)
mock_send_func = AsyncMock(return_value="sent")
result = await limiter.send_with_rate_limit(mock_send_func, 123)
assert result == "sent"
mock_send_func.assert_called_once()
class TestRateLimitMonitor:
"""Тесты для RateLimitMonitor"""
def test_initialization(self):
"""Тест инициализации"""
monitor = RateLimitMonitor()
assert monitor.stats == {}
assert isinstance(monitor.global_stats, RateLimitStats)
assert monitor.max_history_size == 1000
def test_record_request_success(self):
"""Тест записи успешного запроса"""
monitor = RateLimitMonitor()
monitor.record_request(123, True, 0.5)
assert 123 in monitor.stats
chat_stats = monitor.stats[123]
assert chat_stats.total_requests == 1
assert chat_stats.successful_requests == 1
assert chat_stats.failed_requests == 0
assert chat_stats.total_wait_time == 0.5
def test_record_request_failure(self):
"""Тест записи неудачного запроса"""
monitor = RateLimitMonitor()
monitor.record_request(123, False, 1.0, "RetryAfter")
assert 123 in monitor.stats
chat_stats = monitor.stats[123]
assert chat_stats.total_requests == 1
assert chat_stats.successful_requests == 0
assert chat_stats.failed_requests == 1
assert chat_stats.retry_after_errors == 1
assert chat_stats.total_wait_time == 1.0
def test_get_chat_stats(self):
"""Тест получения статистики чата"""
monitor = RateLimitMonitor()
# Статистика для несуществующего чата
assert monitor.get_chat_stats(999) is None
# Записываем запрос
monitor.record_request(123, True, 0.5)
# Получаем статистику
stats = monitor.get_chat_stats(123)
assert stats is not None
assert stats.chat_id == 123
assert stats.total_requests == 1
def test_success_rate_calculation(self):
"""Тест расчета процента успеха"""
monitor = RateLimitMonitor()
# 3 успешных, 1 неудачный
monitor.record_request(123, True, 0.1)
monitor.record_request(123, True, 0.2)
monitor.record_request(123, True, 0.3)
monitor.record_request(123, False, 0.4, "RetryAfter")
stats = monitor.get_chat_stats(123)
assert stats.success_rate == 0.75 # 3/4
assert stats.error_rate == 0.25 # 1/4
class TestRateLimitConfig:
"""Тесты для конфигурации rate limiting"""
def test_get_rate_limit_config(self):
"""Тест получения конфигурации"""
# Тест production конфигурации
prod_config = get_rate_limit_config("production")
assert prod_config.messages_per_second == 0.5
assert prod_config.burst_limit == 2
# Тест development конфигурации
dev_config = get_rate_limit_config("development")
assert dev_config.messages_per_second == 1.0
assert dev_config.burst_limit == 3
# Тест strict конфигурации
strict_config = get_rate_limit_config("strict")
assert strict_config.messages_per_second == 0.3
assert strict_config.burst_limit == 1
# Тест неизвестной конфигурации (должна вернуть production)
unknown_config = get_rate_limit_config("unknown")
assert unknown_config.messages_per_second == 0.5
@pytest.mark.asyncio
async def test_send_with_rate_limit_integration():
"""Интеграционный тест для send_with_rate_limit"""
mock_send_func = AsyncMock(return_value="message_sent")
result = await send_with_rate_limit(mock_send_func, 123)
assert result == "message_sent"
mock_send_func.assert_called_once()
if __name__ == "__main__":
pytest.main([__file__])