- Реализованы методы `get_post_text_and_anonymity_by_message_id` и `get_post_text_and_anonymity_by_helper_id` в `PostRepository` для получения текста поста и флага анонимности. - Обновлена модель `TelegramPost`, добавлено поле `is_anonymous`. - Изменена схема базы данных для включения поля `is_anonymous` в таблицу `post_from_telegram_suggest`. - Обновлены функции публикации постов в `PostPublishService` для учета анонимности. - Добавлены тесты для проверки новых функций и корректности работы с анонимностью.
821 lines
35 KiB
Python
821 lines
35 KiB
Python
import pytest
|
||
from unittest.mock import Mock, patch, AsyncMock
|
||
from datetime import datetime
|
||
import os
|
||
|
||
from helper_bot.utils.helper_func import (
|
||
get_first_name,
|
||
get_text_message,
|
||
determine_anonymity,
|
||
check_username_and_full_name,
|
||
safe_html_escape,
|
||
download_file,
|
||
prepare_media_group_from_middlewares,
|
||
add_in_db_media_mediagroup,
|
||
add_in_db_media,
|
||
send_media_group_message_to_private_chat,
|
||
send_media_group_to_channel,
|
||
send_text_message,
|
||
send_photo_message,
|
||
send_video_message,
|
||
send_video_note_message,
|
||
send_audio_message,
|
||
send_voice_message,
|
||
check_access,
|
||
add_days_to_date,
|
||
get_banned_users_list,
|
||
get_banned_users_buttons,
|
||
delete_user_blacklist,
|
||
update_user_info,
|
||
check_user_emoji,
|
||
get_random_emoji
|
||
)
|
||
from helper_bot.utils.messages import get_message
|
||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory, get_global_instance
|
||
from database.async_db import AsyncBotDB
|
||
import helper_bot.utils.messages as messages # Import for patching constants
|
||
|
||
class TestHelperFunctions:
|
||
"""Тесты для вспомогательных функций"""
|
||
|
||
@pytest.fixture
|
||
def mock_message(self):
|
||
"""Создает мок сообщения для тестирования"""
|
||
message = Mock()
|
||
message.from_user = Mock()
|
||
message.from_user.first_name = "Test"
|
||
message.from_user.full_name = "Test User"
|
||
message.from_user.username = "testuser"
|
||
return message
|
||
|
||
def test_get_first_name(self, mock_message):
|
||
"""Тест функции получения имени пользователя"""
|
||
# Тест с обычным именем
|
||
result = get_first_name(mock_message)
|
||
assert result == "Test"
|
||
|
||
# Тест с пустым именем - функция get_first_name не обрабатывает None
|
||
# поэтому этот тест будет падать, что ожидаемо
|
||
mock_message.from_user.first_name = None
|
||
try:
|
||
result = get_first_name(mock_message)
|
||
assert False, "Ожидалась ошибка при None first_name"
|
||
except AttributeError:
|
||
pass # Ожидаемое поведение
|
||
|
||
def test_get_text_message(self, mock_message):
|
||
"""Тест функции обработки текста сообщения"""
|
||
# Тест с обычным текстом (legacy - определяется по тексту)
|
||
text = "Привет, это тестовое сообщение"
|
||
result = get_text_message(text, "Test", "testuser")
|
||
assert "Test" in result
|
||
assert "testuser" in result
|
||
assert "тестовое сообщение" in result
|
||
assert "Автор поста" in result
|
||
|
||
# Тест с пустым текстом
|
||
result = get_text_message("", "Test", "testuser")
|
||
assert "Test" in result
|
||
assert "testuser" in result
|
||
|
||
# Тест с текстом без специальных слов
|
||
text = "Обычный текст без специальных слов"
|
||
result = get_text_message(text, "Test", "testuser")
|
||
assert "Test" in result
|
||
assert "testuser" in result
|
||
assert "Обычный текст без специальных слов" in result
|
||
|
||
def test_get_text_message_with_is_anonymous_true(self, mock_message):
|
||
"""Тест функции get_text_message с is_anonymous=True"""
|
||
text = "Тестовый пост"
|
||
result = get_text_message(text, "Test", "testuser", is_anonymous=True)
|
||
assert "Пост из ТГ:" in result
|
||
assert "Тестовый пост" in result
|
||
assert "Пост опубликован анонимно" in result
|
||
assert "Автор поста" not in result
|
||
|
||
def test_get_text_message_with_is_anonymous_false(self, mock_message):
|
||
"""Тест функции get_text_message с is_anonymous=False"""
|
||
text = "Тестовый пост"
|
||
result = get_text_message(text, "Test", "testuser", is_anonymous=False)
|
||
assert "Пост из ТГ:" in result
|
||
assert "Тестовый пост" in result
|
||
assert "Автор поста" in result
|
||
assert "Test" in result
|
||
assert "testuser" in result
|
||
assert "Пост опубликован анонимно" not in result
|
||
|
||
def test_get_text_message_with_is_anonymous_none_legacy(self, mock_message):
|
||
"""Тест функции get_text_message с is_anonymous=None (legacy - определяется по тексту)"""
|
||
# Тест с "анон" в тексте
|
||
text = "Тестовый пост анон"
|
||
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
|
||
assert "Пост из ТГ:" in result
|
||
assert "Тестовый пост анон" in result
|
||
assert "Пост опубликован анонимно" in result
|
||
|
||
# Тест с "неанон" в тексте
|
||
text = "Тестовый пост неанон"
|
||
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
|
||
assert "Пост из ТГ:" in result
|
||
assert "Тестовый пост неанон" in result
|
||
assert "Автор поста" in result
|
||
|
||
# Тест с "не анон" в тексте
|
||
text = "Тестовый пост не анон"
|
||
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
|
||
assert "Автор поста" in result
|
||
|
||
def test_get_text_message_with_username_none(self, mock_message):
|
||
"""Тест функции get_text_message без username"""
|
||
text = "Тестовый пост"
|
||
result = get_text_message(text, "Test", None, is_anonymous=False)
|
||
assert "Test" in result
|
||
assert "(Ник не указан)" in result
|
||
assert "@" not in result
|
||
|
||
def test_determine_anonymity_with_anon(self):
|
||
"""Тест функции determine_anonymity с 'анон' в тексте"""
|
||
assert determine_anonymity("Этот пост анон") is True
|
||
assert determine_anonymity("анон") is True
|
||
assert determine_anonymity("АНОН") is True # Проверка регистра
|
||
assert determine_anonymity("пост анонимный анон") is True
|
||
|
||
def test_determine_anonymity_with_neanon(self):
|
||
"""Тест функции determine_anonymity с 'неанон' в тексте"""
|
||
assert determine_anonymity("Этот пост неанон") is False
|
||
assert determine_anonymity("неанон") is False
|
||
assert determine_anonymity("НЕАНОН") is False # Проверка регистра
|
||
assert determine_anonymity("пост неанон") is False
|
||
|
||
def test_determine_anonymity_with_ne_anon(self):
|
||
"""Тест функции determine_anonymity с 'не анон' в тексте"""
|
||
assert determine_anonymity("Этот пост не анон") is False
|
||
assert determine_anonymity("не анон") is False
|
||
assert determine_anonymity("НЕ АНОН") is False # Проверка регистра
|
||
assert determine_anonymity("пост не анон") is False
|
||
|
||
def test_determine_anonymity_priority_neanon_over_anon(self):
|
||
"""Тест приоритета 'неанон' над 'анон'"""
|
||
# Если есть и "анон" и "неанон", должен вернуть False
|
||
assert determine_anonymity("анон неанон") is False
|
||
assert determine_anonymity("неанон анон") is False
|
||
assert determine_anonymity("не анон анон") is False
|
||
|
||
def test_determine_anonymity_without_keywords(self):
|
||
"""Тест функции determine_anonymity без ключевых слов"""
|
||
assert determine_anonymity("Обычный текст") is False
|
||
assert determine_anonymity("") is False
|
||
assert determine_anonymity("Пост без специальных слов") is False
|
||
|
||
def test_determine_anonymity_with_none(self):
|
||
"""Тест функции determine_anonymity с None"""
|
||
assert determine_anonymity(None) is False
|
||
|
||
def test_determine_anonymity_with_empty_string(self):
|
||
"""Тест функции determine_anonymity с пустой строкой"""
|
||
assert determine_anonymity("") is False
|
||
assert determine_anonymity(" ") is False # Только пробелы
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_username_and_full_name(self):
|
||
"""Тест функции проверки изменений username и full_name"""
|
||
# Создаем мок базы данных
|
||
mock_db = Mock(spec=AsyncBotDB)
|
||
mock_db.get_username = AsyncMock(return_value="olduser")
|
||
mock_db.get_full_name_by_id = AsyncMock(return_value="Old User")
|
||
|
||
# Тест с измененными данными
|
||
result = await check_username_and_full_name(123456, "newuser", "New User", mock_db)
|
||
assert result is True
|
||
|
||
# Тест с неизмененными данными
|
||
result = await check_username_and_full_name(123456, "olduser", "Old User", mock_db)
|
||
assert result is False
|
||
|
||
# Тест с частично измененными данными
|
||
result = await check_username_and_full_name(123456, "olduser", "New User", mock_db)
|
||
assert result is True
|
||
|
||
result = await check_username_and_full_name(123456, "newuser", "Old User", mock_db)
|
||
assert result is True
|
||
|
||
|
||
class TestSafeHtmlEscape:
|
||
"""Тесты для функции безопасного экранирования HTML"""
|
||
|
||
def test_safe_html_escape_normal_text(self):
|
||
"""Тест экранирования обычного текста"""
|
||
result = safe_html_escape("Hello World")
|
||
assert result == "Hello World"
|
||
|
||
def test_safe_html_escape_html_tags(self):
|
||
"""Тест экранирования HTML тегов"""
|
||
result = safe_html_escape("<script>alert('xss')</script>")
|
||
assert result == "<script>alert('xss')</script>"
|
||
|
||
def test_safe_html_escape_special_chars(self):
|
||
"""Тест экранирования специальных символов"""
|
||
result = safe_html_escape("& < > \" '")
|
||
assert result == "& < > " '"
|
||
|
||
def test_safe_html_escape_none_input(self):
|
||
"""Тест экранирования None значения"""
|
||
result = safe_html_escape(None)
|
||
assert result == ""
|
||
|
||
def test_safe_html_escape_empty_string(self):
|
||
"""Тест экранирования пустой строки"""
|
||
result = safe_html_escape("")
|
||
assert result == ""
|
||
|
||
def test_safe_html_escape_non_string_input(self):
|
||
"""Тест экранирования нестрокового ввода"""
|
||
result = safe_html_escape(123)
|
||
assert result == "123"
|
||
|
||
|
||
class TestMessages:
|
||
"""Тесты для системы сообщений"""
|
||
|
||
def test_get_message(self):
|
||
"""Тест функции получения сообщений"""
|
||
# Тест с существующим ключом
|
||
result = get_message("Test", "HELLO_MESSAGE")
|
||
assert isinstance(result, str)
|
||
assert len(result) > 0
|
||
|
||
# Тест с несуществующим ключом
|
||
try:
|
||
result = get_message("Test", "NON_EXISTENT_KEY")
|
||
assert False, "Ожидалась ошибка KeyError"
|
||
except KeyError:
|
||
pass # Ожидаемое поведение
|
||
|
||
# Тест с пустым именем
|
||
result = get_message("", "HELLO_MESSAGE")
|
||
assert isinstance(result, str)
|
||
assert len(result) > 0
|
||
|
||
# Тест с None именем - ожидаем ошибку
|
||
try:
|
||
result = get_message(None, "HELLO_MESSAGE")
|
||
assert False, "Ожидалась ошибка TypeError"
|
||
except TypeError:
|
||
pass # Ожидаемое поведение
|
||
|
||
def test_get_message_all_types(self):
|
||
"""Тест всех типов сообщений"""
|
||
# Patch the constants dictionary to include 'SUGGEST_NEWS_2' for testing purposes
|
||
with patch.dict(messages.constants, {'SUGGEST_NEWS_2': 'Test message 2'}):
|
||
message_types = [
|
||
"HELLO_MESSAGE",
|
||
"SUGGEST_NEWS",
|
||
"SUGGEST_NEWS_2",
|
||
"BYE_MESSAGE",
|
||
"SUCCESS_SEND_MESSAGE",
|
||
"CONNECT_WITH_ADMIN",
|
||
"QUESTION"
|
||
]
|
||
|
||
for msg_type in message_types:
|
||
result = get_message("Test", msg_type)
|
||
assert isinstance(result, str)
|
||
assert len(result) > 0
|
||
|
||
|
||
class TestBaseDependencyFactory:
|
||
"""Тесты для фабрики зависимостей"""
|
||
|
||
def test_singleton_pattern(self):
|
||
"""Тест паттерна синглтон"""
|
||
# Сбрасываем глобальный экземпляр
|
||
import helper_bot.utils.base_dependency_factory
|
||
helper_bot.utils.base_dependency_factory._global_instance = None
|
||
|
||
# Получаем два экземпляра
|
||
instance1 = get_global_instance()
|
||
instance2 = get_global_instance()
|
||
|
||
# Проверяем, что это один и тот же объект
|
||
assert instance1 is instance2
|
||
assert id(instance1) == id(instance2)
|
||
|
||
def test_factory_initialization_with_mock_config(self):
|
||
"""Тест инициализации фабрики с мок конфигурацией"""
|
||
# With os.getenv mocked in tests/mocks.py, BaseDependencyFactory can be directly tested
|
||
factory = BaseDependencyFactory()
|
||
assert factory.settings is not None
|
||
assert factory.database is not None
|
||
|
||
def test_get_settings_method(self):
|
||
"""Тест метода get_settings"""
|
||
# With os.getenv mocked, settings can be directly accessed and verified
|
||
factory = BaseDependencyFactory()
|
||
settings = factory.get_settings()
|
||
assert settings['Telegram']['bot_token'] == 'test_token_123'
|
||
assert settings['Settings']['logs'] is True
|
||
|
||
def test_get_db_method(self):
|
||
"""Тест метода get_db"""
|
||
# No need for configparser patch, os.getenv is already mocked globally
|
||
factory = BaseDependencyFactory()
|
||
db = factory.get_db()
|
||
|
||
assert db is not None
|
||
assert db == factory.database
|
||
|
||
|
||
class TestDatabaseIntegration:
|
||
"""Тесты интеграции с базой данных"""
|
||
|
||
def test_database_connection(self):
|
||
"""Тест подключения к базе данных"""
|
||
# No need for configparser patch, os.getenv is already mocked globally
|
||
factory = BaseDependencyFactory()
|
||
|
||
# Проверяем, что база данных была создана
|
||
# (mock_db is already a Mock object from tests/mocks.py)
|
||
# So, we just check if it's the correct mock instance
|
||
assert factory.database is not None
|
||
|
||
# Проверяем, что get_db возвращает тот же экземпляр
|
||
db1 = factory.get_db()
|
||
db2 = factory.get_db()
|
||
assert db1 is db2
|
||
|
||
|
||
class TestConfigurationHandling:
|
||
"""Тесты обработки конфигурации"""
|
||
|
||
def test_boolean_config_values(self):
|
||
"""Тест обработки булевых значений в конфигурации"""
|
||
# Now that os.getenv is mocked, we can directly test
|
||
factory = BaseDependencyFactory()
|
||
settings = factory.get_settings()
|
||
assert settings['Settings']['logs'] is True
|
||
assert settings['Settings']['test'] is False
|
||
|
||
def test_string_config_values(self):
|
||
"""Тест обработки строковых значений в конфигурации"""
|
||
# Now that os.getenv is mocked, we can directly test
|
||
factory = BaseDependencyFactory()
|
||
settings = factory.get_settings()
|
||
assert settings['Telegram']['bot_token'] == 'test_token_123'
|
||
assert settings['Telegram']['main_public'] == '@test'
|
||
|
||
|
||
class TestDownloadFile:
|
||
"""Тесты для функции скачивания файлов"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_download_file_success(self):
|
||
"""Тест успешного скачивания файла"""
|
||
mock_message = Mock()
|
||
mock_message.bot = AsyncMock()
|
||
|
||
# Мокаем get_file
|
||
mock_file = Mock()
|
||
mock_file.file_path = "photos/file_123.jpg"
|
||
mock_message.bot.get_file.return_value = mock_file
|
||
|
||
# Мокаем download_file
|
||
mock_message.bot.download_file = AsyncMock()
|
||
|
||
# Мокаем os.makedirs и другие зависимости
|
||
with patch('os.makedirs') as mock_makedirs:
|
||
with patch('os.path.join', return_value="files/photos/file_123.jpg"):
|
||
with patch('os.path.exists', return_value=True):
|
||
with patch('os.path.getsize', return_value=1024):
|
||
with patch('os.path.basename', return_value='file_123.jpg'):
|
||
with patch('os.path.splitext', return_value=('file_123', '.jpg')):
|
||
with patch('helper_bot.utils.metrics.metrics') as mock_metrics:
|
||
result = await download_file(mock_message, "file_id_123", "photo")
|
||
|
||
assert result == "files/photos/file_123.jpg"
|
||
mock_makedirs.assert_called()
|
||
mock_message.bot.get_file.assert_called_once_with("file_id_123")
|
||
mock_message.bot.download_file.assert_called_once()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_download_file_exception(self):
|
||
"""Тест обработки ошибки при скачивании"""
|
||
mock_message = Mock()
|
||
mock_message.bot = AsyncMock()
|
||
mock_message.bot.get_file.side_effect = Exception("Network error")
|
||
|
||
with patch('os.makedirs'):
|
||
with patch('helper_bot.utils.helper_func.logger') as mock_logger:
|
||
result = await download_file(mock_message, "file_id_123")
|
||
|
||
assert result is None
|
||
mock_logger.error.assert_called_once()
|
||
|
||
|
||
class TestPrepareMediaGroup:
|
||
"""Тесты для подготовки медиагрупп"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_prepare_media_group_photos(self):
|
||
"""Тест подготовки медиагруппы с фотографиями"""
|
||
album = []
|
||
for i in range(3):
|
||
message = Mock()
|
||
message.photo = [Mock()]
|
||
message.photo[-1].file_id = f"photo_{i}"
|
||
album.append(message)
|
||
|
||
result = await prepare_media_group_from_middlewares(album, "Тестовая подпись")
|
||
|
||
assert len(result) == 3
|
||
assert result[0].media == "photo_0"
|
||
assert result[1].media == "photo_1"
|
||
assert result[2].media == "photo_2"
|
||
assert result[0].caption == "Тестовая подпись" # Первое фото должно иметь caption
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_prepare_media_group_mixed_types(self):
|
||
"""Тест подготовки медиагруппы с разными типами медиа"""
|
||
album = []
|
||
|
||
# Фото
|
||
photo_message = Mock()
|
||
photo_message.photo = [Mock()]
|
||
photo_message.photo[-1].file_id = "photo_1"
|
||
album.append(photo_message)
|
||
|
||
# Видео
|
||
video_message = Mock()
|
||
video_message.photo = None
|
||
video_message.video = Mock()
|
||
video_message.video.file_id = "video_1"
|
||
album.append(video_message)
|
||
|
||
# Аудио
|
||
audio_message = Mock()
|
||
audio_message.photo = None
|
||
audio_message.video = None
|
||
audio_message.audio = Mock()
|
||
audio_message.audio.file_id = "audio_1"
|
||
album.append(audio_message)
|
||
|
||
result = await prepare_media_group_from_middlewares(album, "Смешанная группа")
|
||
|
||
assert len(result) == 3
|
||
assert result[0].media == "photo_1"
|
||
assert result[1].media == "video_1"
|
||
assert result[2].media == "audio_1"
|
||
assert result[0].caption == "Смешанная группа" # Первое медиа должно иметь caption
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_prepare_media_group_empty_album(self):
|
||
"""Тест подготовки пустой медиагруппы"""
|
||
album = []
|
||
result = await prepare_media_group_from_middlewares(album, "Пустая группа")
|
||
assert result == []
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_prepare_media_group_unsupported_type(self):
|
||
"""Тест подготовки медиагруппы с неподдерживаемым типом"""
|
||
album = []
|
||
message = Mock()
|
||
message.photo = None
|
||
message.video = None
|
||
message.audio = None
|
||
message.document = None # Добавляем document = None
|
||
album.append(message)
|
||
|
||
result = await prepare_media_group_from_middlewares(album, "Тест")
|
||
assert result == []
|
||
|
||
|
||
class TestMediaDatabaseOperations:
|
||
"""Тесты для операций с медиа в базе данных"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_add_in_db_media_mediagroup(self):
|
||
"""Тест добавления медиагруппы в базу данных"""
|
||
sent_message = []
|
||
for i in range(2):
|
||
message = Mock()
|
||
message.message_id = i + 1
|
||
message.photo = [Mock()]
|
||
message.photo[-1].file_id = f"photo_{i}"
|
||
sent_message.append(message)
|
||
|
||
mock_db = AsyncMock()
|
||
|
||
with patch('helper_bot.utils.helper_func.download_file', return_value=f"files/photo_{i}.jpg"):
|
||
await add_in_db_media_mediagroup(sent_message, mock_db)
|
||
|
||
assert mock_db.add_post_content.call_count == 2
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_add_in_db_media_photo(self):
|
||
"""Тест добавления фото в базу данных"""
|
||
mock_message = Mock()
|
||
mock_message.message_id = 123
|
||
mock_message.photo = [Mock()]
|
||
mock_message.photo[-1].file_id = "photo_123"
|
||
|
||
mock_db = AsyncMock()
|
||
|
||
with patch('helper_bot.utils.helper_func.download_file', return_value="files/photo_123.jpg"):
|
||
await add_in_db_media(mock_message, mock_db)
|
||
|
||
mock_db.add_post_content.assert_called_once_with(
|
||
123, 123, "files/photo_123.jpg", 'photo'
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_add_in_db_media_video(self):
|
||
"""Тест добавления видео в базу данных"""
|
||
mock_message = Mock()
|
||
mock_message.message_id = 123
|
||
mock_message.photo = None # У видео нет фото
|
||
mock_message.video = Mock()
|
||
mock_message.video.file_id = "video_123"
|
||
|
||
mock_db = AsyncMock()
|
||
|
||
with patch('helper_bot.utils.helper_func.download_file', return_value="files/video_123.mp4"):
|
||
await add_in_db_media(mock_message, mock_db)
|
||
|
||
mock_db.add_post_content.assert_called_once_with(
|
||
123, 123, "files/video_123.mp4", 'video'
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_add_in_db_media_voice(self):
|
||
"""Тест добавления голосового сообщения в базу данных"""
|
||
mock_message = Mock()
|
||
mock_message.message_id = 123
|
||
mock_message.photo = None # У голосового сообщения нет фото
|
||
mock_message.video = None # У голосового сообщения нет видео
|
||
mock_message.voice = Mock()
|
||
mock_message.voice.file_id = "voice_123"
|
||
|
||
mock_db = AsyncMock()
|
||
|
||
with patch('helper_bot.utils.helper_func.download_file', return_value="files/voice_123.ogg"):
|
||
await add_in_db_media(mock_message, mock_db)
|
||
|
||
mock_db.add_post_content.assert_called_once_with(
|
||
123, 123, "files/voice_123.ogg", 'voice'
|
||
)
|
||
|
||
|
||
class TestSendMessageFunctions:
|
||
"""Тесты для функций отправки сообщений"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_text_message_without_markup(self):
|
||
"""Тест отправки текстового сообщения без разметки"""
|
||
mock_message = Mock()
|
||
mock_message.bot = AsyncMock()
|
||
mock_message.bot.send_message = AsyncMock()
|
||
|
||
mock_sent_message = Mock()
|
||
mock_sent_message.message_id = 456
|
||
mock_message.bot.send_message.return_value = mock_sent_message
|
||
|
||
result = await send_text_message(123, mock_message, "Тестовое сообщение")
|
||
|
||
assert result == 456
|
||
mock_message.bot.send_message.assert_called_once_with(
|
||
chat_id=123,
|
||
text="Тестовое сообщение"
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_text_message_with_markup(self):
|
||
"""Тест отправки текстового сообщения с разметкой"""
|
||
mock_message = Mock()
|
||
mock_message.bot = AsyncMock()
|
||
mock_message.bot.send_message = AsyncMock()
|
||
|
||
mock_markup = Mock()
|
||
mock_sent_message = Mock()
|
||
mock_sent_message.message_id = 456
|
||
mock_message.bot.send_message.return_value = mock_sent_message
|
||
|
||
result = await send_text_message(123, mock_message, "Тестовое сообщение", mock_markup)
|
||
|
||
assert result == 456
|
||
mock_message.bot.send_message.assert_called_once_with(
|
||
chat_id=123,
|
||
text="Тестовое сообщение",
|
||
reply_markup=mock_markup
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_photo_message(self):
|
||
"""Тест отправки фото"""
|
||
mock_message = Mock()
|
||
mock_message.bot = AsyncMock()
|
||
mock_message.bot.send_photo = AsyncMock()
|
||
|
||
mock_sent_message = Mock()
|
||
mock_message.bot.send_photo.return_value = mock_sent_message
|
||
|
||
result = await send_photo_message(123, mock_message, "photo.jpg", "Подпись к фото")
|
||
|
||
assert result == mock_sent_message
|
||
mock_message.bot.send_photo.assert_called_once_with(
|
||
chat_id=123,
|
||
caption="Подпись к фото",
|
||
photo="photo.jpg"
|
||
)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_send_video_message(self):
|
||
"""Тест отправки видео"""
|
||
mock_message = Mock()
|
||
mock_message.bot = AsyncMock()
|
||
mock_message.bot.send_video = AsyncMock()
|
||
|
||
mock_sent_message = Mock()
|
||
mock_message.bot.send_video.return_value = mock_sent_message
|
||
|
||
result = await send_video_message(123, mock_message, "video.mp4", "Подпись к видео")
|
||
|
||
assert result == mock_sent_message
|
||
mock_message.bot.send_video.assert_called_once_with(
|
||
chat_id=123,
|
||
caption="Подпись к видео",
|
||
video="video.mp4"
|
||
)
|
||
|
||
|
||
class TestUtilityFunctions:
|
||
"""Тесты для утилитарных функций"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_access(self):
|
||
"""Тест проверки доступа"""
|
||
mock_db = AsyncMock()
|
||
mock_db.is_admin.return_value = True
|
||
|
||
result = await check_access(123, mock_db)
|
||
assert result is True
|
||
|
||
mock_db.is_admin.return_value = False
|
||
result = await check_access(123, mock_db)
|
||
assert result is False
|
||
|
||
def test_add_days_to_date(self):
|
||
"""Тест добавления дней к дате"""
|
||
with patch('helper_bot.utils.helper_func.datetime') as mock_datetime:
|
||
from datetime import timedelta
|
||
mock_now = datetime(2024, 1, 1)
|
||
mock_datetime.now.return_value = mock_now
|
||
mock_datetime.timedelta = timedelta
|
||
|
||
result = add_days_to_date(5)
|
||
expected_timestamp = int((mock_now + timedelta(days=5)).timestamp())
|
||
assert result == expected_timestamp
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_banned_users_list(self):
|
||
"""Тест получения списка заблокированных пользователей"""
|
||
mock_db = AsyncMock()
|
||
mock_db.get_banned_users_from_db_with_limits.return_value = [
|
||
(123, "Spam", 1704067200), # user_id, ban_reason, unban_date (timestamp)
|
||
(456, "Violation", 1704153600)
|
||
]
|
||
mock_db.get_username.return_value = None
|
||
mock_db.get_full_name_by_id.return_value = "Test User"
|
||
|
||
result = await get_banned_users_list(0, mock_db)
|
||
|
||
assert "Список заблокированных пользователей:" in result
|
||
assert "Test User" in result
|
||
assert "Spam" in result
|
||
assert "Violation" in result
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_banned_users_list_with_string_timestamp(self):
|
||
"""Тест получения списка заблокированных пользователей со строковым timestamp"""
|
||
mock_db = AsyncMock()
|
||
mock_db.get_banned_users_from_db_with_limits.return_value = [
|
||
(123, "Spam", "1704067200"), # user_id, ban_reason, unban_date (string timestamp)
|
||
(456, "Violation", "1704153600")
|
||
]
|
||
mock_db.get_username.return_value = None
|
||
mock_db.get_full_name_by_id.return_value = "Test User"
|
||
|
||
result = await get_banned_users_list(0, mock_db)
|
||
|
||
assert "Список заблокированных пользователей:" in result
|
||
assert "Test User" in result
|
||
assert "Spam" in result
|
||
assert "Violation" in result
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_banned_users_buttons(self):
|
||
"""Тест получения кнопок заблокированных пользователей"""
|
||
mock_db = AsyncMock()
|
||
mock_db.get_banned_users_from_db.return_value = [
|
||
(123, "Spam", 1704067200), # user_id, ban_reason, unban_date
|
||
(456, "Violation", 1704153600)
|
||
]
|
||
mock_db.get_username.return_value = None
|
||
mock_db.get_full_name_by_id.return_value = "Test User"
|
||
|
||
result = await get_banned_users_buttons(mock_db)
|
||
|
||
assert len(result) == 2
|
||
assert result[0] == ("Test User", 123)
|
||
assert result[1] == ("Test User", 456)
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_delete_user_blacklist(self):
|
||
"""Тест удаления пользователя из черного списка"""
|
||
mock_db = AsyncMock()
|
||
mock_db.delete_user_blacklist.return_value = True
|
||
|
||
result = await delete_user_blacklist(123, mock_db)
|
||
assert result is True
|
||
|
||
mock_db.delete_user_blacklist.assert_called_once_with(user_id=123)
|
||
|
||
|
||
class TestUserManagement:
|
||
"""Тесты для управления пользователями"""
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_update_user_info_new_user(self):
|
||
"""Тест обновления информации о новом пользователе"""
|
||
mock_message = Mock()
|
||
mock_message.from_user.id = 123
|
||
mock_message.from_user.full_name = "Test User"
|
||
mock_message.from_user.username = "testuser"
|
||
mock_message.from_user.is_bot = False
|
||
mock_message.from_user.language_code = "ru"
|
||
mock_message.answer = AsyncMock()
|
||
mock_message.bot.send_message = AsyncMock()
|
||
|
||
with patch('helper_bot.utils.helper_func.get_first_name', return_value="Test"):
|
||
with patch('helper_bot.utils.helper_func.get_random_emoji', return_value="😀"):
|
||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||
mock_bot_db.user_exists = AsyncMock(return_value=False)
|
||
mock_bot_db.add_user = AsyncMock()
|
||
mock_bot_db.update_user_date = AsyncMock()
|
||
|
||
await update_user_info("test", mock_message)
|
||
|
||
mock_bot_db.add_user.assert_called_once()
|
||
mock_bot_db.update_user_date.assert_called_once()
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_user_emoji_existing(self):
|
||
"""Тест проверки эмодзи пользователя (существующий)"""
|
||
mock_message = Mock()
|
||
mock_message.from_user.id = 123
|
||
|
||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||
mock_bot_db.get_user_emoji = AsyncMock(return_value="😀")
|
||
|
||
result = await check_user_emoji(mock_message)
|
||
assert result == "😀"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_check_user_emoji_new(self):
|
||
"""Тест проверки эмодзи пользователя (новый)"""
|
||
mock_message = Mock()
|
||
mock_message.from_user.id = 123
|
||
|
||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||
mock_bot_db.get_user_emoji = AsyncMock(return_value=None)
|
||
mock_bot_db.update_user_emoji = AsyncMock()
|
||
|
||
with patch('helper_bot.utils.helper_func.get_random_emoji', return_value="😀"):
|
||
result = await check_user_emoji(mock_message)
|
||
assert result == "😀"
|
||
mock_bot_db.update_user_emoji.assert_called_once_with(user_id=123, emoji="😀")
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_random_emoji_success(self):
|
||
"""Тест получения случайного эмодзи (успех)"""
|
||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||
mock_bot_db.check_emoji_exists = AsyncMock(return_value=False)
|
||
|
||
with patch('helper_bot.utils.helper_func.random.choice', return_value="😀"):
|
||
result = await get_random_emoji()
|
||
assert result == "😀"
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_get_random_emoji_fallback(self):
|
||
"""Тест получения случайного эмодзи (fallback)"""
|
||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||
mock_bot_db.check_emoji_exists = AsyncMock(return_value=True) # Все эмодзи заняты
|
||
|
||
with patch('helper_bot.utils.helper_func.random.choice', return_value="😀"):
|
||
with patch('helper_bot.utils.helper_func.logger') as mock_logger:
|
||
result = await get_random_emoji()
|
||
assert result == "Эмоджи не определен"
|
||
mock_logger.error.assert_called_once()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
pytest.main([__file__, '-v']) |