fix quality code

This commit is contained in:
2026-02-01 23:03:23 +03:00
parent 731e68a597
commit f8962225ee
106 changed files with 8456 additions and 5810 deletions

View File

@@ -4,33 +4,34 @@ from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, Mock, patch
import pytest
from helper_bot.utils.auto_unban_scheduler import AutoUnbanScheduler
class TestAutoUnbanIntegration:
"""Интеграционные тесты для автоматического разбана"""
@pytest.fixture
def test_db_path(self):
"""Путь к тестовой базе данных"""
return 'database/test_auto_unban.db'
return "database/test_auto_unban.db"
@pytest.fixture
def setup_test_db(self, test_db_path):
"""Создает тестовую базу данных с таблицами blacklist, our_users и blacklist_history"""
# Удаляем старую тестовую базу если она существует
if os.path.exists(test_db_path):
os.remove(test_db_path)
# Создаем новую базу данных
conn = sqlite3.connect(test_db_path)
cursor = conn.cursor()
# Включаем поддержку внешних ключей
cursor.execute("PRAGMA foreign_keys = ON")
# Создаем таблицу our_users (нужна для внешних ключей)
cursor.execute('''
cursor.execute("""
CREATE TABLE IF NOT EXISTS our_users (
user_id INTEGER NOT NULL PRIMARY KEY,
first_name TEXT,
@@ -44,10 +45,10 @@ class TestAutoUnbanIntegration:
date_changed INTEGER NOT NULL,
voice_bot_welcome_received BOOLEAN DEFAULT 0
)
''')
""")
# Создаем таблицу blacklist
cursor.execute('''
cursor.execute("""
CREATE TABLE IF NOT EXISTS blacklist (
user_id INTEGER NOT NULL PRIMARY KEY,
message_for_user TEXT,
@@ -56,10 +57,10 @@ class TestAutoUnbanIntegration:
ban_author INTEGER,
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE
)
''')
""")
# Создаем таблицу blacklist_history
cursor.execute('''
cursor.execute("""
CREATE TABLE IF NOT EXISTS blacklist_history (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
@@ -72,273 +73,419 @@ class TestAutoUnbanIntegration:
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
)
''')
""")
# Создаем индексы для blacklist_history
cursor.execute('''
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id)
''')
cursor.execute('''
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban)
''')
cursor.execute('''
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban)
''')
""")
# Добавляем тестовых пользователей в our_users
current_time = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
users_data = [
(123, "Test", "Test User 1", "test_user1", 0, "ru", 0, "😊", current_time, current_time, 0),
(456, "Test", "Test User 2", "test_user2", 0, "ru", 0, "😊", current_time, current_time, 0),
(789, "Test", "Test User 3", "test_user3", 0, "ru", 0, "😊", current_time, current_time, 0),
(999, "Test", "Test User 4", "test_user4", 0, "ru", 0, "😊", current_time, current_time, 0),
(
123,
"Test",
"Test User 1",
"test_user1",
0,
"ru",
0,
"😊",
current_time,
current_time,
0,
),
(
456,
"Test",
"Test User 2",
"test_user2",
0,
"ru",
0,
"😊",
current_time,
current_time,
0,
),
(
789,
"Test",
"Test User 3",
"test_user3",
0,
"ru",
0,
"😊",
current_time,
current_time,
0,
),
(
999,
"Test",
"Test User 4",
"test_user4",
0,
"ru",
0,
"😊",
current_time,
current_time,
0,
),
]
cursor.executemany(
"""INSERT INTO our_users (user_id, first_name, full_name, username, is_bot,
language_code, has_stickers, emoji, date_added, date_changed, voice_bot_welcome_received)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
users_data
users_data,
)
# Добавляем тестовые данные в blacklist
today_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
tomorrow_timestamp = int((datetime.now(timezone(timedelta(hours=3))) + timedelta(days=1)).timestamp())
tomorrow_timestamp = int(
(datetime.now(timezone(timedelta(hours=3))) + timedelta(days=1)).timestamp()
)
blacklist_data = [
(123, "Test ban 1", today_timestamp, current_time, None), # Разблокируется сегодня
(456, "Test ban 2", today_timestamp, current_time, None), # Разблокируется сегодня
(789, "Test ban 3", tomorrow_timestamp, current_time, None), # Разблокируется завтра
(999, "Test ban 4", None, current_time, None), # Навсегда заблокирован
(
123,
"Test ban 1",
today_timestamp,
current_time,
None,
), # Разблокируется сегодня
(
456,
"Test ban 2",
today_timestamp,
current_time,
None,
), # Разблокируется сегодня
(
789,
"Test ban 3",
tomorrow_timestamp,
current_time,
None,
), # Разблокируется завтра
(999, "Test ban 4", None, current_time, None), # Навсегда заблокирован
]
cursor.executemany(
"INSERT INTO blacklist (user_id, message_for_user, date_to_unban, created_at, ban_author) VALUES (?, ?, ?, ?, ?)",
blacklist_data
blacklist_data,
)
# Добавляем тестовые данные в blacklist_history
# Для пользователей 123 и 456 (которые будут разблокированы) создаем записи с date_unban = NULL
yesterday_timestamp = int((datetime.now(timezone(timedelta(hours=3))) - timedelta(days=1)).timestamp())
yesterday_timestamp = int(
(datetime.now(timezone(timedelta(hours=3))) - timedelta(days=1)).timestamp()
)
history_data = [
(123, "Test ban 1", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Будет разблокирован
(456, "Test ban 2", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Будет разблокирован
(789, "Test ban 3", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Не будет разблокирован сегодня
(999, "Test ban 4", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Навсегда заблокирован
(
123,
"Test ban 1",
yesterday_timestamp,
None,
None,
yesterday_timestamp,
yesterday_timestamp,
), # Будет разблокирован
(
456,
"Test ban 2",
yesterday_timestamp,
None,
None,
yesterday_timestamp,
yesterday_timestamp,
), # Будет разблокирован
(
789,
"Test ban 3",
yesterday_timestamp,
None,
None,
yesterday_timestamp,
yesterday_timestamp,
), # Не будет разблокирован сегодня
(
999,
"Test ban 4",
yesterday_timestamp,
None,
None,
yesterday_timestamp,
yesterday_timestamp,
), # Навсегда заблокирован
]
cursor.executemany(
"""INSERT INTO blacklist_history
(user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
history_data
history_data,
)
conn.commit()
conn.close()
yield test_db_path
# Очистка после тестов
if os.path.exists(test_db_path):
os.remove(test_db_path)
@pytest.fixture
def mock_bdf(self, test_db_path):
"""Создает мок фабрики зависимостей с тестовой базой"""
mock_factory = Mock()
mock_factory.settings = {
'Telegram': {
'group_for_logs': '-1001234567890',
'important_logs': '-1001234567891'
"Telegram": {
"group_for_logs": "-1001234567890",
"important_logs": "-1001234567891",
}
}
# Создаем реальный экземпляр базы данных с тестовым файлом
import os
from database.async_db import AsyncBotDB
mock_factory.database = AsyncBotDB(test_db_path)
return mock_factory
@pytest.fixture
def mock_bot(self):
"""Создает мок бота"""
mock_bot = Mock()
mock_bot.send_message = AsyncMock()
return mock_bot
@pytest.mark.asyncio
@patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
async def test_auto_unban_with_real_db(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
@patch("helper_bot.utils.auto_unban_scheduler.get_global_instance")
async def test_auto_unban_with_real_db(
self, mock_get_instance, setup_test_db, mock_bdf, mock_bot
):
"""Тест автоматического разбана с реальной базой данных"""
# Настройка моков
mock_get_instance.return_value = mock_bdf
# Создаем планировщик
scheduler = AutoUnbanScheduler()
scheduler.bot_db = mock_bdf.database
scheduler.set_bot(mock_bot)
# Проверяем начальное состояние базы
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM blacklist")
initial_count = cursor.fetchone()[0]
assert initial_count == 4
# Проверяем начальное состояние истории: должно быть 2 записи с date_unban IS NULL для user_id 123 и 456
cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (123, 456) AND date_unban IS NULL")
cursor.execute(
"SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (123, 456) AND date_unban IS NULL"
)
initial_open_history = cursor.fetchone()[0]
assert initial_open_history == 2
# Запоминаем время до разбана для проверки updated_at
before_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
before_unban_timestamp = int(
datetime.now(timezone(timedelta(hours=3))).timestamp()
)
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
# Запоминаем время после разбана для проверки updated_at
after_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
after_unban_timestamp = int(
datetime.now(timezone(timedelta(hours=3))).timestamp()
)
# Проверяем, что пользователи с сегодняшней датой разблокированы
current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?",
(current_timestamp,))
cursor.execute(
"SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?",
(current_timestamp,),
)
today_count = cursor.fetchone()[0]
assert today_count == 0
# Проверяем, что пользователи с завтрашней датой остались
cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban > ?",
(current_timestamp,))
cursor.execute(
"SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban > ?",
(current_timestamp,),
)
tomorrow_count = cursor.fetchone()[0]
assert tomorrow_count == 1
# Проверяем, что навсегда заблокированные пользователи остались
cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NULL")
permanent_count = cursor.fetchone()[0]
assert permanent_count == 1
# Проверяем общее количество записей
cursor.execute("SELECT COUNT(*) FROM blacklist")
final_count = cursor.fetchone()[0]
assert final_count == 2 # Остались только завтрашние и навсегда заблокированные
# Проверяем историю банов: для user_id 123 и 456 должны быть установлены date_unban
cursor.execute("SELECT user_id, date_unban, updated_at FROM blacklist_history WHERE user_id IN (123, 456) ORDER BY user_id")
cursor.execute(
"SELECT user_id, date_unban, updated_at FROM blacklist_history WHERE user_id IN (123, 456) ORDER BY user_id"
)
history_records = cursor.fetchall()
assert len(history_records) == 2
for user_id, date_unban, updated_at in history_records:
# Проверяем, что date_unban установлен (не NULL)
assert date_unban is not None, f"date_unban должен быть установлен для user_id={user_id}"
assert isinstance(date_unban, int), f"date_unban должен быть integer для user_id={user_id}"
assert (
date_unban is not None
), f"date_unban должен быть установлен для user_id={user_id}"
assert isinstance(
date_unban, int
), f"date_unban должен быть integer для user_id={user_id}"
# Проверяем, что date_unban находится в разумных пределах (между before и after)
assert before_unban_timestamp <= date_unban <= after_unban_timestamp, \
f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {date_unban}"
assert (
before_unban_timestamp <= date_unban <= after_unban_timestamp
), f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {date_unban}"
# Проверяем, что updated_at обновлен
assert updated_at is not None, f"updated_at должен быть установлен для user_id={user_id}"
assert isinstance(updated_at, int), f"updated_at должен быть integer для user_id={user_id}"
assert before_unban_timestamp <= updated_at <= after_unban_timestamp, \
f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {updated_at}"
assert (
updated_at is not None
), f"updated_at должен быть установлен для user_id={user_id}"
assert isinstance(
updated_at, int
), f"updated_at должен быть integer для user_id={user_id}"
assert (
before_unban_timestamp <= updated_at <= after_unban_timestamp
), f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {updated_at}"
# Проверяем, что для user_id 789 и 999 записи в истории остались без изменений (date_unban все еще NULL)
cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (789, 999) AND date_unban IS NULL")
cursor.execute(
"SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (789, 999) AND date_unban IS NULL"
)
unchanged_history = cursor.fetchone()[0]
assert unchanged_history == 2, "Записи для user_id 789 и 999 должны остаться с date_unban = NULL"
assert (
unchanged_history == 2
), "Записи для user_id 789 и 999 должны остаться с date_unban = NULL"
conn.close()
# Проверяем, что отчет был отправлен
mock_bot.send_message.assert_called_once()
@pytest.mark.asyncio
@patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
async def test_auto_unban_no_users_today(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
@patch("helper_bot.utils.auto_unban_scheduler.get_global_instance")
async def test_auto_unban_no_users_today(
self, mock_get_instance, setup_test_db, mock_bdf, mock_bot
):
"""Тест разбана когда нет пользователей для разблокировки сегодня"""
# Настройка моков
mock_get_instance.return_value = mock_bdf
# Удаляем пользователей с сегодняшней датой
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
cursor.execute("DELETE FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?", (current_timestamp,))
cursor.execute(
"DELETE FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?",
(current_timestamp,),
)
# Проверяем начальное состояние истории: все записи должны иметь date_unban = NULL
cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL")
cursor.execute(
"SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL"
)
initial_open_history = cursor.fetchone()[0]
assert initial_open_history == 4 # Все 4 записи должны быть открытыми
conn.commit()
conn.close()
# Создаем планировщик
scheduler = AutoUnbanScheduler()
scheduler.bot_db = mock_bdf.database
scheduler.set_bot(mock_bot)
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
# Проверяем, что история не изменилась (все записи все еще с date_unban = NULL)
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL")
cursor.execute(
"SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL"
)
final_open_history = cursor.fetchone()[0]
assert final_open_history == 4, "История не должна изменяться, если нет пользователей для разблокировки"
assert (
final_open_history == 4
), "История не должна изменяться, если нет пользователей для разблокировки"
conn.close()
# Проверяем, что отчет не был отправлен (нет пользователей для разблокировки)
mock_bot.send_message.assert_not_called()
@pytest.mark.asyncio
@patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
async def test_auto_unban_database_error(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
@patch("helper_bot.utils.auto_unban_scheduler.get_global_instance")
async def test_auto_unban_database_error(
self, mock_get_instance, setup_test_db, mock_bdf, mock_bot
):
"""Тест обработки ошибок базы данных"""
# Настройка моков
mock_get_instance.return_value = mock_bdf
# Создаем планировщик
scheduler = AutoUnbanScheduler()
scheduler.bot_db = mock_bdf.database
scheduler.set_bot(mock_bot)
# Удаляем таблицу чтобы вызвать ошибку
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
cursor.execute("DROP TABLE blacklist")
conn.commit()
conn.close()
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
# Проверяем, что отчет об ошибке был отправлен
mock_bot.send_message.assert_called_once()
call_args = mock_bot.send_message.call_args
assert call_args[1]['chat_id'] == '-1001234567891' # important_logs
assert "Ошибка автоматического разбана" in call_args[1]['text']
assert call_args[1]["chat_id"] == "-1001234567891" # important_logs
assert "Ошибка автоматического разбана" in call_args[1]["text"]
@pytest.mark.asyncio
@patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
async def test_auto_unban_updates_history(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
@patch("helper_bot.utils.auto_unban_scheduler.get_global_instance")
async def test_auto_unban_updates_history(
self, mock_get_instance, setup_test_db, mock_bdf, mock_bot
):
"""Тест что автоматический разбан обновляет историю банов"""
# Настройка моков
mock_get_instance.return_value = mock_bdf
# Создаем планировщик
scheduler = AutoUnbanScheduler()
scheduler.bot_db = mock_bdf.database
scheduler.set_bot(mock_bot)
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
# Проверяем начальное состояние: для user_id 123 и 456 должны быть записи с date_unban = NULL
cursor.execute("""
SELECT id, user_id, date_ban, date_unban, updated_at
@@ -347,26 +494,32 @@ class TestAutoUnbanIntegration:
ORDER BY user_id
""")
initial_records = cursor.fetchall()
assert len(initial_records) == 2, "Должно быть 2 открытые записи для user_id 123 и 456"
assert (
len(initial_records) == 2
), "Должно быть 2 открытые записи для user_id 123 и 456"
# Запоминаем ID записей и их начальные значения updated_at
record_ids = {row[0]: (row[1], row[4]) for row in initial_records}
# Запоминаем время до разбана
before_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
before_unban_timestamp = int(
datetime.now(timezone(timedelta(hours=3))).timestamp()
)
conn.close()
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
# Запоминаем время после разбана
after_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
after_unban_timestamp = int(
datetime.now(timezone(timedelta(hours=3))).timestamp()
)
# Проверяем, что записи обновлены
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
cursor.execute("""
SELECT id, user_id, date_ban, date_unban, updated_at
FROM blacklist_history
@@ -374,32 +527,45 @@ class TestAutoUnbanIntegration:
ORDER BY user_id
""")
updated_records = cursor.fetchall()
assert len(updated_records) == 2, "Должно быть 2 записи для user_id 123 и 456"
for record_id, user_id, date_ban, date_unban, updated_at in updated_records:
# Проверяем, что это одна из наших записей
assert record_id in record_ids, f"Запись с id={record_id} должна быть в исходных записях"
assert (
record_id in record_ids
), f"Запись с id={record_id} должна быть в исходных записях"
# Проверяем, что date_unban установлен
assert date_unban is not None, f"date_unban должен быть установлен для user_id={user_id}"
assert isinstance(date_unban, int), f"date_unban должен быть integer для user_id={user_id}"
assert (
date_unban is not None
), f"date_unban должен быть установлен для user_id={user_id}"
assert isinstance(
date_unban, int
), f"date_unban должен быть integer для user_id={user_id}"
# Проверяем, что date_unban находится в разумных пределах
assert before_unban_timestamp <= date_unban <= after_unban_timestamp, \
f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}"
assert (
before_unban_timestamp <= date_unban <= after_unban_timestamp
), f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}"
# Проверяем, что updated_at обновлен (должен быть больше начального значения)
assert updated_at is not None, f"updated_at должен быть установлен для user_id={user_id}"
assert isinstance(updated_at, int), f"updated_at должен быть integer для user_id={user_id}"
assert before_unban_timestamp <= updated_at <= after_unban_timestamp, \
f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}"
assert (
updated_at is not None
), f"updated_at должен быть установлен для user_id={user_id}"
assert isinstance(
updated_at, int
), f"updated_at должен быть integer для user_id={user_id}"
assert (
before_unban_timestamp <= updated_at <= after_unban_timestamp
), f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}"
# Проверяем, что updated_at действительно обновлен (больше начального значения)
initial_updated_at = record_ids[record_id][1]
assert updated_at >= initial_updated_at, \
f"updated_at для user_id={user_id} должен быть больше или равен начальному значению"
assert (
updated_at >= initial_updated_at
), f"updated_at для user_id={user_id} должен быть больше или равен начальному значению"
# Проверяем, что обновлена только последняя запись для каждого пользователя
# (если бы было несколько записей, обновилась бы только последняя)
cursor.execute("""
@@ -407,29 +573,35 @@ class TestAutoUnbanIntegration:
WHERE user_id IN (123, 456) AND date_unban IS NOT NULL
""")
closed_records = cursor.fetchone()[0]
assert closed_records == 2, "Должно быть закрыто 2 записи (по одной для каждого пользователя)"
assert (
closed_records == 2
), "Должно быть закрыто 2 записи (по одной для каждого пользователя)"
cursor.execute("""
SELECT COUNT(*) FROM blacklist_history
WHERE user_id IN (123, 456) AND date_unban IS NULL
""")
open_records = cursor.fetchone()[0]
assert open_records == 0, "Не должно быть открытых записей для user_id 123 и 456"
assert (
open_records == 0
), "Не должно быть открытых записей для user_id 123 и 456"
conn.close()
def test_date_format_consistency(self, setup_test_db, mock_bdf):
"""Тест консистентности формата дат"""
scheduler = AutoUnbanScheduler()
scheduler.bot_db = mock_bdf.database
# Проверяем, что дата в базе соответствует ожидаемому формату (timestamp)
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
cursor.execute("SELECT date_to_unban FROM blacklist WHERE date_to_unban IS NOT NULL LIMIT 1")
cursor.execute(
"SELECT date_to_unban FROM blacklist WHERE date_to_unban IS NOT NULL LIMIT 1"
)
result = cursor.fetchone()
conn.close()
if result and result[0]:
timestamp = result[0]
# Проверяем, что это валидный timestamp (целое число)
@@ -442,38 +614,39 @@ class TestAutoUnbanIntegration:
class TestSchedulerLifecycle:
"""Тесты жизненного цикла планировщика"""
def test_scheduler_start_stop(self):
"""Тест запуска и остановки планировщика"""
scheduler = AutoUnbanScheduler()
# Запускаем планировщик
scheduler.start_scheduler()
assert scheduler.scheduler.running
# Останавливаем планировщик (должно пройти без ошибок)
scheduler.stop_scheduler()
# APScheduler может не сразу остановиться, но это нормально
def test_scheduler_job_creation(self):
"""Тест создания задачи в планировщике"""
scheduler = AutoUnbanScheduler()
with patch.object(scheduler.scheduler, 'add_job') as mock_add_job:
with patch.object(scheduler.scheduler, "add_job") as mock_add_job:
scheduler.start_scheduler()
# Проверяем, что задача была создана с правильными параметрами
mock_add_job.assert_called_once()
call_args = mock_add_job.call_args
# Проверяем функцию
assert call_args[0][0] == scheduler.auto_unban_users
# Проверяем триггер (должен быть CronTrigger)
from apscheduler.triggers.cron import CronTrigger
assert isinstance(call_args[0][1], CronTrigger)
# Проверяем ID и имя задачи
assert call_args[1]['id'] == 'auto_unban_users'
assert call_args[1]['name'] == 'Автоматический разбан пользователей'
assert call_args[1]['replace_existing'] is True
assert call_args[1]["id"] == "auto_unban_users"
assert call_args[1]["name"] == "Автоматический разбан пользователей"
assert call_args[1]["replace_existing"] is True