- Removed the MetricsManager initialization from `run_helper.py` to avoid duplication, as metrics are now handled in `main.py`. - Updated logging levels in `server_prometheus.py` and `metrics_middleware.py` to use debug instead of info for less critical messages. - Added metrics configuration to `BaseDependencyFactory` for better management of metrics settings. - Deleted the obsolete `metrics_exporter.py` file to streamline the codebase. - Updated various tests to reflect changes in the metrics handling and ensure proper functionality.
252 lines
11 KiB
Python
252 lines
11 KiB
Python
import pytest
|
||
import sqlite3
|
||
import os
|
||
from datetime import datetime, timezone, timedelta
|
||
from unittest.mock import Mock, patch, AsyncMock
|
||
|
||
from helper_bot.utils.auto_unban_scheduler import AutoUnbanScheduler
|
||
|
||
|
||
class TestAutoUnbanIntegration:
|
||
"""Интеграционные тесты для автоматического разбана"""
|
||
|
||
@pytest.fixture
|
||
def test_db_path(self):
|
||
"""Путь к тестовой базе данных"""
|
||
return 'database/test_auto_unban.db'
|
||
|
||
@pytest.fixture
|
||
def setup_test_db(self, test_db_path):
|
||
"""Создает тестовую базу данных с таблицей blacklist"""
|
||
# Удаляем старую тестовую базу если она существует
|
||
if os.path.exists(test_db_path):
|
||
os.remove(test_db_path)
|
||
|
||
# Создаем новую базу данных
|
||
conn = sqlite3.connect(test_db_path)
|
||
cursor = conn.cursor()
|
||
|
||
# Создаем таблицу blacklist
|
||
cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS blacklist (
|
||
user_id INTEGER PRIMARY KEY,
|
||
user_name TEXT,
|
||
message_for_user TEXT,
|
||
date_to_unban INTEGER
|
||
)
|
||
''')
|
||
|
||
# Добавляем тестовые данные
|
||
today_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
|
||
tomorrow_timestamp = int((datetime.now(timezone(timedelta(hours=3))) + timedelta(days=1)).timestamp())
|
||
|
||
test_data = [
|
||
(123, "test_user1", "Test ban 1", today_timestamp), # Разблокируется сегодня
|
||
(456, "test_user2", "Test ban 2", today_timestamp), # Разблокируется сегодня
|
||
(789, "test_user3", "Test ban 3", tomorrow_timestamp), # Разблокируется завтра
|
||
(999, "test_user4", "Test ban 4", None), # Навсегда заблокирован
|
||
]
|
||
|
||
cursor.executemany(
|
||
"INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)",
|
||
test_data
|
||
)
|
||
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
yield test_db_path
|
||
|
||
# Очистка после тестов
|
||
if os.path.exists(test_db_path):
|
||
os.remove(test_db_path)
|
||
|
||
@pytest.fixture
|
||
def mock_bdf(self, test_db_path):
|
||
"""Создает мок фабрики зависимостей с тестовой базой"""
|
||
mock_factory = Mock()
|
||
mock_factory.settings = {
|
||
'Telegram': {
|
||
'group_for_logs': '-1001234567890',
|
||
'important_logs': '-1001234567891'
|
||
}
|
||
}
|
||
|
||
# Создаем реальный экземпляр базы данных с тестовым файлом
|
||
from database.async_db import AsyncBotDB
|
||
import os
|
||
mock_factory.database = AsyncBotDB(test_db_path)
|
||
|
||
return mock_factory
|
||
|
||
@pytest.fixture
|
||
def mock_bot(self):
|
||
"""Создает мок бота"""
|
||
mock_bot = Mock()
|
||
mock_bot.send_message = AsyncMock()
|
||
return mock_bot
|
||
|
||
@pytest.mark.asyncio
|
||
@patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
|
||
async def test_auto_unban_with_real_db(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
|
||
"""Тест автоматического разбана с реальной базой данных"""
|
||
# Настройка моков
|
||
mock_get_instance.return_value = mock_bdf
|
||
|
||
# Создаем планировщик
|
||
scheduler = AutoUnbanScheduler()
|
||
scheduler.bot_db = mock_bdf.database
|
||
scheduler.set_bot(mock_bot)
|
||
|
||
# Проверяем начальное состояние базы
|
||
conn = sqlite3.connect(setup_test_db)
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT COUNT(*) FROM blacklist")
|
||
initial_count = cursor.fetchone()[0]
|
||
assert initial_count == 4
|
||
|
||
# Выполняем автоматический разбан
|
||
await scheduler.auto_unban_users()
|
||
|
||
# Проверяем, что пользователи с сегодняшней датой разблокированы
|
||
current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
|
||
cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?",
|
||
(current_timestamp,))
|
||
today_count = cursor.fetchone()[0]
|
||
assert today_count == 0
|
||
|
||
# Проверяем, что пользователи с завтрашней датой остались
|
||
cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban > ?",
|
||
(current_timestamp,))
|
||
tomorrow_count = cursor.fetchone()[0]
|
||
assert tomorrow_count == 1
|
||
|
||
# Проверяем, что навсегда заблокированные пользователи остались
|
||
cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NULL")
|
||
permanent_count = cursor.fetchone()[0]
|
||
assert permanent_count == 1
|
||
|
||
# Проверяем общее количество записей
|
||
cursor.execute("SELECT COUNT(*) FROM blacklist")
|
||
final_count = cursor.fetchone()[0]
|
||
assert final_count == 2 # Остались только завтрашние и навсегда заблокированные
|
||
|
||
conn.close()
|
||
|
||
# Проверяем, что отчет был отправлен
|
||
mock_bot.send_message.assert_called_once()
|
||
|
||
@pytest.mark.asyncio
|
||
@patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
|
||
async def test_auto_unban_no_users_today(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
|
||
"""Тест разбана когда нет пользователей для разблокировки сегодня"""
|
||
# Настройка моков
|
||
mock_get_instance.return_value = mock_bdf
|
||
|
||
# Удаляем пользователей с сегодняшней датой
|
||
conn = sqlite3.connect(setup_test_db)
|
||
cursor = conn.cursor()
|
||
current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
|
||
cursor.execute("DELETE FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?", (current_timestamp,))
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
# Создаем планировщик
|
||
scheduler = AutoUnbanScheduler()
|
||
scheduler.bot_db = mock_bdf.database
|
||
scheduler.set_bot(mock_bot)
|
||
|
||
# Выполняем автоматический разбан
|
||
await scheduler.auto_unban_users()
|
||
|
||
# Проверяем, что отчет не был отправлен (нет пользователей для разблокировки)
|
||
mock_bot.send_message.assert_not_called()
|
||
|
||
@pytest.mark.asyncio
|
||
@patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
|
||
async def test_auto_unban_database_error(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
|
||
"""Тест обработки ошибок базы данных"""
|
||
# Настройка моков
|
||
mock_get_instance.return_value = mock_bdf
|
||
|
||
# Создаем планировщик
|
||
scheduler = AutoUnbanScheduler()
|
||
scheduler.bot_db = mock_bdf.database
|
||
scheduler.set_bot(mock_bot)
|
||
|
||
# Удаляем таблицу чтобы вызвать ошибку
|
||
conn = sqlite3.connect(setup_test_db)
|
||
cursor = conn.cursor()
|
||
cursor.execute("DROP TABLE blacklist")
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
# Выполняем автоматический разбан
|
||
await scheduler.auto_unban_users()
|
||
|
||
# Проверяем, что отчет об ошибке был отправлен
|
||
mock_bot.send_message.assert_called_once()
|
||
call_args = mock_bot.send_message.call_args
|
||
assert call_args[1]['chat_id'] == '-1001234567891' # important_logs
|
||
assert "Ошибка автоматического разбана" in call_args[1]['text']
|
||
|
||
def test_date_format_consistency(self, setup_test_db, mock_bdf):
|
||
"""Тест консистентности формата дат"""
|
||
scheduler = AutoUnbanScheduler()
|
||
scheduler.bot_db = mock_bdf.database
|
||
|
||
# Проверяем, что дата в базе соответствует ожидаемому формату (timestamp)
|
||
conn = sqlite3.connect(setup_test_db)
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT date_to_unban FROM blacklist WHERE date_to_unban IS NOT NULL LIMIT 1")
|
||
result = cursor.fetchone()
|
||
conn.close()
|
||
|
||
if result and result[0]:
|
||
timestamp = result[0]
|
||
# Проверяем, что это валидный timestamp (целое число)
|
||
assert isinstance(timestamp, int)
|
||
assert timestamp > 0
|
||
# Проверяем, что timestamp можно преобразовать в дату
|
||
date_obj = datetime.fromtimestamp(timestamp)
|
||
assert isinstance(date_obj, datetime)
|
||
|
||
|
||
class TestSchedulerLifecycle:
|
||
"""Тесты жизненного цикла планировщика"""
|
||
|
||
def test_scheduler_start_stop(self):
|
||
"""Тест запуска и остановки планировщика"""
|
||
scheduler = AutoUnbanScheduler()
|
||
|
||
# Запускаем планировщик
|
||
scheduler.start_scheduler()
|
||
assert scheduler.scheduler.running
|
||
|
||
# Останавливаем планировщик (должно пройти без ошибок)
|
||
scheduler.stop_scheduler()
|
||
# APScheduler может не сразу остановиться, но это нормально
|
||
|
||
def test_scheduler_job_creation(self):
|
||
"""Тест создания задачи в планировщике"""
|
||
scheduler = AutoUnbanScheduler()
|
||
|
||
with patch.object(scheduler.scheduler, 'add_job') as mock_add_job:
|
||
scheduler.start_scheduler()
|
||
|
||
# Проверяем, что задача была создана с правильными параметрами
|
||
mock_add_job.assert_called_once()
|
||
call_args = mock_add_job.call_args
|
||
|
||
# Проверяем функцию
|
||
assert call_args[0][0] == scheduler.auto_unban_users
|
||
|
||
# Проверяем триггер (должен быть CronTrigger)
|
||
from apscheduler.triggers.cron import CronTrigger
|
||
assert isinstance(call_args[0][1], CronTrigger)
|
||
|
||
# Проверяем ID и имя задачи
|
||
assert call_args[1]['id'] == 'auto_unban_users'
|
||
assert call_args[1]['name'] == 'Автоматический разбан пользователей'
|
||
assert call_args[1]['replace_existing'] is True
|