Files
telegram-helper-bot/tests/test_text_middleware.py
2026-02-02 00:54:23 +03:00

92 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Тесты для helper_bot.middlewares.text_middleware (BulkTextMiddleware).
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from helper_bot.middlewares.text_middleware import BulkTextMiddleware
@pytest.mark.unit
@pytest.mark.asyncio
class TestBulkTextMiddleware:
"""Тесты для BulkTextMiddleware."""
@pytest.fixture
def middleware(self):
"""Middleware с минимальной latency для быстрых тестов."""
return BulkTextMiddleware(latency=0.001)
@pytest.fixture
def mock_handler(self):
"""Мок handler."""
return AsyncMock(return_value="ok")
async def test_no_text_passes_immediately(self, middleware, mock_handler):
"""Сообщение без text передаётся в handler сразу."""
event = MagicMock()
event.chat = MagicMock()
event.chat.id = 1
event.from_user = MagicMock()
event.from_user.id = 10
event.text = None
event.message_id = 1
data = {}
result = await middleware(mock_handler, event, data)
mock_handler.assert_called_once_with(event, data)
assert result == "ok"
async def test_single_text_after_latency_calls_handler_with_concatenated_text(
self, middleware, mock_handler
):
"""Одно текстовое сообщение после latency передаётся в data['texts'] и handler вызывается."""
event = MagicMock()
event.chat = MagicMock()
event.chat.id = 100
event.from_user = MagicMock()
event.from_user.id = 200
event.text = "Hello"
event.message_id = 5
data = {}
result = await middleware(mock_handler, event, data)
mock_handler.assert_called_once()
assert data["texts"] == "Hello"
assert result == "ok"
async def test_two_messages_same_key_concatenated(self, middleware, mock_handler):
"""Два сообщения с одним ключом (chat_id, user_id) за latency конкатенируются."""
# Первое сообщение
event1 = MagicMock()
event1.chat = MagicMock()
event1.chat.id = 1
event1.from_user = MagicMock()
event1.from_user.id = 2
event1.text = "A"
event1.message_id = 1
data1 = {}
# Запускаем без ожидания второго сообщения — после latency будет одно
result1 = await middleware(mock_handler, event1, data1)
assert data1["texts"] == "A"
assert result1 == "ok"
async def test_messages_sorted_by_message_id(self, middleware, mock_handler):
"""Сообщения в data['texts'] отсортированы по message_id."""
event = MagicMock()
event.chat = MagicMock()
event.chat.id = 5
event.from_user = MagicMock()
event.from_user.id = 5
event.text = "Only"
event.message_id = 42
data = {}
await middleware(mock_handler, event, data)
assert data["texts"] == "Only"