import os import sqlite3 from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, Mock, patch import pytest from helper_bot.utils.auto_unban_scheduler import AutoUnbanScheduler class TestAutoUnbanIntegration: """Интеграционные тесты для автоматического разбана""" @pytest.fixture def test_db_path(self): """Путь к тестовой базе данных""" return 'database/test_auto_unban.db' @pytest.fixture def setup_test_db(self, test_db_path): """Создает тестовую базу данных с таблицами blacklist, our_users и blacklist_history""" # Удаляем старую тестовую базу если она существует if os.path.exists(test_db_path): os.remove(test_db_path) # Создаем новую базу данных conn = sqlite3.connect(test_db_path) cursor = conn.cursor() # Включаем поддержку внешних ключей cursor.execute("PRAGMA foreign_keys = ON") # Создаем таблицу our_users (нужна для внешних ключей) cursor.execute(''' CREATE TABLE IF NOT EXISTS our_users ( user_id INTEGER NOT NULL PRIMARY KEY, first_name TEXT, full_name TEXT, username TEXT, is_bot BOOLEAN DEFAULT 0, language_code TEXT, has_stickers BOOLEAN DEFAULT 0 NOT NULL, emoji TEXT, date_added INTEGER NOT NULL, date_changed INTEGER NOT NULL, voice_bot_welcome_received BOOLEAN DEFAULT 0 ) ''') # Создаем таблицу blacklist cursor.execute(''' CREATE TABLE IF NOT EXISTS blacklist ( user_id INTEGER NOT NULL PRIMARY KEY, message_for_user TEXT, date_to_unban INTEGER, created_at INTEGER DEFAULT (strftime('%s', 'now')), ban_author INTEGER, FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE ) ''') # Создаем таблицу blacklist_history cursor.execute(''' CREATE TABLE IF NOT EXISTS blacklist_history ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, message_for_user TEXT, date_ban INTEGER NOT NULL, date_unban INTEGER, ban_author INTEGER, created_at INTEGER DEFAULT (strftime('%s', 'now')), updated_at INTEGER DEFAULT (strftime('%s', 'now')), FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE, FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL ) ''') # Создаем индексы для blacklist_history cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban) ''') # Добавляем тестовых пользователей в our_users current_time = int(datetime.now(timezone(timedelta(hours=3))).timestamp()) users_data = [ (123, "Test", "Test User 1", "test_user1", 0, "ru", 0, "😊", current_time, current_time, 0), (456, "Test", "Test User 2", "test_user2", 0, "ru", 0, "😊", current_time, current_time, 0), (789, "Test", "Test User 3", "test_user3", 0, "ru", 0, "😊", current_time, current_time, 0), (999, "Test", "Test User 4", "test_user4", 0, "ru", 0, "😊", current_time, current_time, 0), ] cursor.executemany( """INSERT INTO our_users (user_id, first_name, full_name, username, is_bot, language_code, has_stickers, emoji, date_added, date_changed, voice_bot_welcome_received) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", users_data ) # Добавляем тестовые данные в blacklist today_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp()) tomorrow_timestamp = int((datetime.now(timezone(timedelta(hours=3))) + timedelta(days=1)).timestamp()) blacklist_data = [ (123, "Test ban 1", today_timestamp, current_time, None), # Разблокируется сегодня (456, "Test ban 2", today_timestamp, current_time, None), # Разблокируется сегодня (789, "Test ban 3", tomorrow_timestamp, current_time, None), # Разблокируется завтра (999, "Test ban 4", None, current_time, None), # Навсегда заблокирован ] cursor.executemany( "INSERT INTO blacklist (user_id, message_for_user, date_to_unban, created_at, ban_author) VALUES (?, ?, ?, ?, ?)", blacklist_data ) # Добавляем тестовые данные в blacklist_history # Для пользователей 123 и 456 (которые будут разблокированы) создаем записи с date_unban = NULL yesterday_timestamp = int((datetime.now(timezone(timedelta(hours=3))) - timedelta(days=1)).timestamp()) history_data = [ (123, "Test ban 1", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Будет разблокирован (456, "Test ban 2", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Будет разблокирован (789, "Test ban 3", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Не будет разблокирован сегодня (999, "Test ban 4", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Навсегда заблокирован ] cursor.executemany( """INSERT INTO blacklist_history (user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)""", history_data ) conn.commit() conn.close() yield test_db_path # Очистка после тестов if os.path.exists(test_db_path): os.remove(test_db_path) @pytest.fixture def mock_bdf(self, test_db_path): """Создает мок фабрики зависимостей с тестовой базой""" mock_factory = Mock() mock_factory.settings = { 'Telegram': { 'group_for_logs': '-1001234567890', 'important_logs': '-1001234567891' } } # Создаем реальный экземпляр базы данных с тестовым файлом import os from database.async_db import AsyncBotDB mock_factory.database = AsyncBotDB(test_db_path) return mock_factory @pytest.fixture def mock_bot(self): """Создает мок бота""" mock_bot = Mock() mock_bot.send_message = AsyncMock() return mock_bot @pytest.mark.asyncio @patch('helper_bot.utils.auto_unban_scheduler.get_global_instance') async def test_auto_unban_with_real_db(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot): """Тест автоматического разбана с реальной базой данных""" # Настройка моков mock_get_instance.return_value = mock_bdf # Создаем планировщик scheduler = AutoUnbanScheduler() scheduler.bot_db = mock_bdf.database scheduler.set_bot(mock_bot) # Проверяем начальное состояние базы conn = sqlite3.connect(setup_test_db) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM blacklist") initial_count = cursor.fetchone()[0] assert initial_count == 4 # Проверяем начальное состояние истории: должно быть 2 записи с date_unban IS NULL для user_id 123 и 456 cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (123, 456) AND date_unban IS NULL") initial_open_history = cursor.fetchone()[0] assert initial_open_history == 2 # Запоминаем время до разбана для проверки updated_at before_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp()) # Выполняем автоматический разбан await scheduler.auto_unban_users() # Запоминаем время после разбана для проверки updated_at after_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp()) # Проверяем, что пользователи с сегодняшней датой разблокированы current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp()) cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?", (current_timestamp,)) today_count = cursor.fetchone()[0] assert today_count == 0 # Проверяем, что пользователи с завтрашней датой остались cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban > ?", (current_timestamp,)) tomorrow_count = cursor.fetchone()[0] assert tomorrow_count == 1 # Проверяем, что навсегда заблокированные пользователи остались cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NULL") permanent_count = cursor.fetchone()[0] assert permanent_count == 1 # Проверяем общее количество записей cursor.execute("SELECT COUNT(*) FROM blacklist") final_count = cursor.fetchone()[0] assert final_count == 2 # Остались только завтрашние и навсегда заблокированные # Проверяем историю банов: для user_id 123 и 456 должны быть установлены date_unban cursor.execute("SELECT user_id, date_unban, updated_at FROM blacklist_history WHERE user_id IN (123, 456) ORDER BY user_id") history_records = cursor.fetchall() assert len(history_records) == 2 for user_id, date_unban, updated_at in history_records: # Проверяем, что date_unban установлен (не NULL) assert date_unban is not None, f"date_unban должен быть установлен для user_id={user_id}" assert isinstance(date_unban, int), f"date_unban должен быть integer для user_id={user_id}" # Проверяем, что date_unban находится в разумных пределах (между before и after) assert before_unban_timestamp <= date_unban <= after_unban_timestamp, \ f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {date_unban}" # Проверяем, что updated_at обновлен assert updated_at is not None, f"updated_at должен быть установлен для user_id={user_id}" assert isinstance(updated_at, int), f"updated_at должен быть integer для user_id={user_id}" assert before_unban_timestamp <= updated_at <= after_unban_timestamp, \ f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {updated_at}" # Проверяем, что для user_id 789 и 999 записи в истории остались без изменений (date_unban все еще NULL) cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (789, 999) AND date_unban IS NULL") unchanged_history = cursor.fetchone()[0] assert unchanged_history == 2, "Записи для user_id 789 и 999 должны остаться с date_unban = NULL" conn.close() # Проверяем, что отчет был отправлен mock_bot.send_message.assert_called_once() @pytest.mark.asyncio @patch('helper_bot.utils.auto_unban_scheduler.get_global_instance') async def test_auto_unban_no_users_today(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot): """Тест разбана когда нет пользователей для разблокировки сегодня""" # Настройка моков mock_get_instance.return_value = mock_bdf # Удаляем пользователей с сегодняшней датой conn = sqlite3.connect(setup_test_db) cursor = conn.cursor() current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp()) cursor.execute("DELETE FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?", (current_timestamp,)) # Проверяем начальное состояние истории: все записи должны иметь date_unban = NULL cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL") initial_open_history = cursor.fetchone()[0] assert initial_open_history == 4 # Все 4 записи должны быть открытыми conn.commit() conn.close() # Создаем планировщик scheduler = AutoUnbanScheduler() scheduler.bot_db = mock_bdf.database scheduler.set_bot(mock_bot) # Выполняем автоматический разбан await scheduler.auto_unban_users() # Проверяем, что история не изменилась (все записи все еще с date_unban = NULL) conn = sqlite3.connect(setup_test_db) cursor = conn.cursor() cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL") final_open_history = cursor.fetchone()[0] assert final_open_history == 4, "История не должна изменяться, если нет пользователей для разблокировки" conn.close() # Проверяем, что отчет не был отправлен (нет пользователей для разблокировки) mock_bot.send_message.assert_not_called() @pytest.mark.asyncio @patch('helper_bot.utils.auto_unban_scheduler.get_global_instance') async def test_auto_unban_database_error(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot): """Тест обработки ошибок базы данных""" # Настройка моков mock_get_instance.return_value = mock_bdf # Создаем планировщик scheduler = AutoUnbanScheduler() scheduler.bot_db = mock_bdf.database scheduler.set_bot(mock_bot) # Удаляем таблицу чтобы вызвать ошибку conn = sqlite3.connect(setup_test_db) cursor = conn.cursor() cursor.execute("DROP TABLE blacklist") conn.commit() conn.close() # Выполняем автоматический разбан await scheduler.auto_unban_users() # Проверяем, что отчет об ошибке был отправлен mock_bot.send_message.assert_called_once() call_args = mock_bot.send_message.call_args assert call_args[1]['chat_id'] == '-1001234567891' # important_logs assert "Ошибка автоматического разбана" in call_args[1]['text'] @pytest.mark.asyncio @patch('helper_bot.utils.auto_unban_scheduler.get_global_instance') async def test_auto_unban_updates_history(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot): """Тест что автоматический разбан обновляет историю банов""" # Настройка моков mock_get_instance.return_value = mock_bdf # Создаем планировщик scheduler = AutoUnbanScheduler() scheduler.bot_db = mock_bdf.database scheduler.set_bot(mock_bot) conn = sqlite3.connect(setup_test_db) cursor = conn.cursor() # Проверяем начальное состояние: для user_id 123 и 456 должны быть записи с date_unban = NULL cursor.execute(""" SELECT id, user_id, date_ban, date_unban, updated_at FROM blacklist_history WHERE user_id IN (123, 456) AND date_unban IS NULL ORDER BY user_id """) initial_records = cursor.fetchall() assert len(initial_records) == 2, "Должно быть 2 открытые записи для user_id 123 и 456" # Запоминаем ID записей и их начальные значения updated_at record_ids = {row[0]: (row[1], row[4]) for row in initial_records} # Запоминаем время до разбана before_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp()) conn.close() # Выполняем автоматический разбан await scheduler.auto_unban_users() # Запоминаем время после разбана after_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp()) # Проверяем, что записи обновлены conn = sqlite3.connect(setup_test_db) cursor = conn.cursor() cursor.execute(""" SELECT id, user_id, date_ban, date_unban, updated_at FROM blacklist_history WHERE user_id IN (123, 456) ORDER BY user_id """) updated_records = cursor.fetchall() assert len(updated_records) == 2, "Должно быть 2 записи для user_id 123 и 456" for record_id, user_id, date_ban, date_unban, updated_at in updated_records: # Проверяем, что это одна из наших записей assert record_id in record_ids, f"Запись с id={record_id} должна быть в исходных записях" # Проверяем, что date_unban установлен assert date_unban is not None, f"date_unban должен быть установлен для user_id={user_id}" assert isinstance(date_unban, int), f"date_unban должен быть integer для user_id={user_id}" # Проверяем, что date_unban находится в разумных пределах assert before_unban_timestamp <= date_unban <= after_unban_timestamp, \ f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}" # Проверяем, что updated_at обновлен (должен быть больше начального значения) assert updated_at is not None, f"updated_at должен быть установлен для user_id={user_id}" assert isinstance(updated_at, int), f"updated_at должен быть integer для user_id={user_id}" assert before_unban_timestamp <= updated_at <= after_unban_timestamp, \ f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}" # Проверяем, что updated_at действительно обновлен (больше начального значения) initial_updated_at = record_ids[record_id][1] assert updated_at >= initial_updated_at, \ f"updated_at для user_id={user_id} должен быть больше или равен начальному значению" # Проверяем, что обновлена только последняя запись для каждого пользователя # (если бы было несколько записей, обновилась бы только последняя) cursor.execute(""" SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (123, 456) AND date_unban IS NOT NULL """) closed_records = cursor.fetchone()[0] assert closed_records == 2, "Должно быть закрыто 2 записи (по одной для каждого пользователя)" cursor.execute(""" SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (123, 456) AND date_unban IS NULL """) open_records = cursor.fetchone()[0] assert open_records == 0, "Не должно быть открытых записей для user_id 123 и 456" conn.close() def test_date_format_consistency(self, setup_test_db, mock_bdf): """Тест консистентности формата дат""" scheduler = AutoUnbanScheduler() scheduler.bot_db = mock_bdf.database # Проверяем, что дата в базе соответствует ожидаемому формату (timestamp) conn = sqlite3.connect(setup_test_db) cursor = conn.cursor() cursor.execute("SELECT date_to_unban FROM blacklist WHERE date_to_unban IS NOT NULL LIMIT 1") result = cursor.fetchone() conn.close() if result and result[0]: timestamp = result[0] # Проверяем, что это валидный timestamp (целое число) assert isinstance(timestamp, int) assert timestamp > 0 # Проверяем, что timestamp можно преобразовать в дату date_obj = datetime.fromtimestamp(timestamp) assert isinstance(date_obj, datetime) class TestSchedulerLifecycle: """Тесты жизненного цикла планировщика""" def test_scheduler_start_stop(self): """Тест запуска и остановки планировщика""" scheduler = AutoUnbanScheduler() # Запускаем планировщик scheduler.start_scheduler() assert scheduler.scheduler.running # Останавливаем планировщик (должно пройти без ошибок) scheduler.stop_scheduler() # APScheduler может не сразу остановиться, но это нормально def test_scheduler_job_creation(self): """Тест создания задачи в планировщике""" scheduler = AutoUnbanScheduler() with patch.object(scheduler.scheduler, 'add_job') as mock_add_job: scheduler.start_scheduler() # Проверяем, что задача была создана с правильными параметрами mock_add_job.assert_called_once() call_args = mock_add_job.call_args # Проверяем функцию assert call_args[0][0] == scheduler.auto_unban_users # Проверяем триггер (должен быть CronTrigger) from apscheduler.triggers.cron import CronTrigger assert isinstance(call_args[0][1], CronTrigger) # Проверяем ID и имя задачи assert call_args[1]['id'] == 'auto_unban_users' assert call_args[1]['name'] == 'Автоматический разбан пользователей' assert call_args[1]['replace_existing'] is True