Patch Notes dev-10 #12

Merged
KerradKerridi merged 9 commits from dev-10 into master 2026-01-23 16:54:47 +00:00
31 changed files with 2587 additions and 130 deletions

View File

@@ -3,7 +3,7 @@ from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from database.repository_factory import RepositoryFactory
from database.models import (
User, BlacklistUser, UserMessage, TelegramPost, PostContent,
User, BlacklistUser, BlacklistHistoryRecord, UserMessage, TelegramPost, PostContent,
Admin, AudioMessage
)
@@ -143,10 +143,18 @@ class AsyncBotDB:
"""Получает контент поста по helper_text_message_id."""
return await self.factory.posts.get_post_content_by_helper_id(last_post_id)
async def get_post_content_by_helper_id(self, helper_message_id: int) -> List[Tuple[str, str]]:
"""Алиас для get_post_content_from_telegram_by_last_id (используется callback-сервисом)."""
return await self.get_post_content_from_telegram_by_last_id(helper_message_id)
async def get_post_text_from_telegram_by_last_id(self, last_post_id: int) -> Optional[str]:
"""Получает текст поста по helper_text_message_id."""
return await self.factory.posts.get_post_text_by_helper_id(last_post_id)
async def get_post_text_by_helper_id(self, helper_message_id: int) -> Optional[str]:
"""Алиас для get_post_text_from_telegram_by_last_id (используется callback-сервисом)."""
return await self.get_post_text_from_telegram_by_last_id(helper_message_id)
async def get_post_ids_from_telegram_by_last_id(self, last_post_id: int) -> List[int]:
"""Получает ID сообщений по helper_text_message_id."""
return await self.factory.posts.get_post_ids_by_helper_id(last_post_id)
@@ -159,19 +167,80 @@ class AsyncBotDB:
"""Получает ID автора по helper_text_message_id."""
return await self.factory.posts.get_author_id_by_helper_message_id(helper_text_message_id)
async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по message_id."""
return await self.factory.posts.get_post_text_and_anonymity_by_message_id(message_id)
async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по helper_text_message_id."""
return await self.factory.posts.get_post_text_and_anonymity_by_helper_id(helper_message_id)
async def update_status_by_message_id(self, message_id: int, status: str) -> int:
"""Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк."""
return await self.factory.posts.update_status_by_message_id(message_id, status)
async def update_status_for_media_group_by_helper_id(
self, helper_message_id: int, status: str
) -> int:
"""Обновление статуса постов медиагруппы по helper_message_id. Возвращает число обновлённых строк."""
return await self.factory.posts.update_status_for_media_group_by_helper_id(
helper_message_id, status
)
# Методы для работы с черным списком
async def set_user_blacklist(self, user_id: int, user_name: str = None,
message_for_user: str = None, date_to_unban: int = None):
"""Добавляет пользователя в черный список."""
async def set_user_blacklist(
self,
user_id: int,
user_name: str = None,
message_for_user: str = None,
date_to_unban: int = None,
ban_author: Optional[int] = None,
):
"""
Добавляет пользователя в черный список.
Также создает запись в истории банов для отслеживания.
"""
blacklist_user = BlacklistUser(
user_id=user_id,
message_for_user=message_for_user,
date_to_unban=date_to_unban
date_to_unban=date_to_unban,
ban_author=ban_author,
)
await self.factory.blacklist.add_user(blacklist_user)
# Логируем в историю банов
try:
date_ban = int(datetime.now().timestamp())
history_record = BlacklistHistoryRecord(
user_id=user_id,
message_for_user=message_for_user,
date_ban=date_ban,
date_unban=None, # Будет установлено при разбане
ban_author=ban_author,
)
await self.factory.blacklist_history.add_record_on_ban(history_record)
except Exception as e:
# Ошибка записи в историю не должна ломать процесс бана
self.logger.error(
f"Ошибка записи в историю банов для user_id={user_id}: {e}"
)
async def delete_user_blacklist(self, user_id: int) -> bool:
"""Удаляет пользователя из черного списка."""
"""
Удаляет пользователя из черного списка.
Также обновляет запись в истории банов, устанавливая date_unban.
"""
# Сначала обновляем историю (если есть открытая запись)
try:
date_unban = int(datetime.now().timestamp())
await self.factory.blacklist_history.set_unban_date(user_id, date_unban)
except Exception as e:
# Ошибка записи в историю не должна ломать критический путь разбана
self.logger.error(
f"Ошибка обновления истории при разбане для user_id={user_id}: {e}"
)
# Удаляем из черного списка (критический путь)
return await self.factory.blacklist.remove_user(user_id)
async def check_user_in_blacklist(self, user_id: int) -> bool:

View File

@@ -26,6 +26,20 @@ class BlacklistUser:
message_for_user: Optional[str] = None
date_to_unban: Optional[int] = None
created_at: Optional[int] = None
ban_author: Optional[int] = None
@dataclass
class BlacklistHistoryRecord:
"""Модель записи истории банов/разбанов."""
user_id: int
message_for_user: Optional[str] = None
date_ban: int = 0
date_unban: Optional[int] = None
ban_author: Optional[int] = None
id: Optional[int] = None
created_at: Optional[int] = None
updated_at: Optional[int] = None
@dataclass
@@ -45,6 +59,8 @@ class TelegramPost:
author_id: int
helper_text_message_id: Optional[int] = None
created_at: Optional[int] = None
status: str = "suggest"
is_anonymous: Optional[bool] = None
@dataclass

View File

@@ -4,6 +4,7 @@
Содержит репозитории для разных сущностей:
- user_repository: работа с пользователями
- blacklist_repository: работа с черным списком
- blacklist_history_repository: работа с историей банов/разбанов
- message_repository: работа с сообщениями
- post_repository: работа с постами
- admin_repository: работа с администраторами
@@ -12,12 +13,13 @@
from .user_repository import UserRepository
from .blacklist_repository import BlacklistRepository
from .blacklist_history_repository import BlacklistHistoryRepository
from .message_repository import MessageRepository
from .post_repository import PostRepository
from .admin_repository import AdminRepository
from .audio_repository import AudioRepository
__all__ = [
'UserRepository', 'BlacklistRepository', 'MessageRepository', 'PostRepository',
'AdminRepository', 'AudioRepository'
'UserRepository', 'BlacklistRepository', 'BlacklistHistoryRepository',
'MessageRepository', 'PostRepository', 'AdminRepository', 'AudioRepository'
]

View File

@@ -0,0 +1,119 @@
from typing import Optional
from database.base import DatabaseConnection
from database.models import BlacklistHistoryRecord
class BlacklistHistoryRepository(DatabaseConnection):
"""Репозиторий для работы с историей банов/разбанов."""
async def create_tables(self):
"""Создание таблицы истории банов/разбанов."""
query = '''
CREATE TABLE IF NOT EXISTS blacklist_history (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
message_for_user TEXT,
date_ban INTEGER NOT NULL,
date_unban INTEGER,
ban_author INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
updated_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
)
'''
await self._execute_query(query)
# Создаем индексы
await self._execute_query(
"CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id)"
)
await self._execute_query(
"CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban)"
)
await self._execute_query(
"CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban)"
)
self.logger.info("Таблица истории банов/разбанов создана")
async def add_record_on_ban(self, record: BlacklistHistoryRecord) -> None:
"""Добавляет запись о бане в историю."""
query = """
INSERT INTO blacklist_history (
user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at
)
VALUES (?, ?, ?, ?, ?, ?, ?)
"""
# Используем текущее время, если не указано
from datetime import datetime
current_timestamp = int(datetime.now().timestamp())
params = (
record.user_id,
record.message_for_user,
record.date_ban,
record.date_unban,
record.ban_author,
record.created_at if record.created_at is not None else current_timestamp,
record.updated_at if record.updated_at is not None else current_timestamp,
)
await self._execute_query(query, params)
self.logger.info(
f"Запись о бане добавлена в историю: user_id={record.user_id}, "
f"date_ban={record.date_ban}"
)
async def set_unban_date(self, user_id: int, date_unban: int) -> bool:
"""
Обновляет date_unban и updated_at в последней записи (date_unban IS NULL) для пользователя.
Args:
user_id: ID пользователя
date_unban: Timestamp даты разбана
Returns:
True если запись обновлена, False если не найдена открытая запись
"""
try:
from datetime import datetime
current_timestamp = int(datetime.now().timestamp())
# SQLite не поддерживает ORDER BY в UPDATE, поэтому используем подзапрос
# Сначала проверяем, есть ли открытая запись
check_query = """
SELECT id FROM blacklist_history
WHERE user_id = ? AND date_unban IS NULL
ORDER BY id DESC
LIMIT 1
"""
rows = await self._execute_query_with_result(check_query, (user_id,))
if not rows:
self.logger.warning(
f"Не найдена открытая запись в истории для обновления: user_id={user_id}"
)
return False
# Обновляем найденную запись
update_query = """
UPDATE blacklist_history
SET date_unban = ?,
updated_at = ?
WHERE id = ?
"""
record_id = rows[0][0]
params = (date_unban, current_timestamp, record_id)
await self._execute_query(update_query, params)
self.logger.info(
f"Дата разбана обновлена в истории: user_id={user_id}, date_unban={date_unban}"
)
return True
except Exception as e:
self.logger.error(
f"Ошибка обновления даты разбана в истории для user_id={user_id}: {str(e)}"
)
return False

View File

@@ -14,7 +14,9 @@ class BlacklistRepository(DatabaseConnection):
message_for_user TEXT,
date_to_unban INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES our_users (user_id) ON DELETE CASCADE
ban_author INTEGER,
FOREIGN KEY (user_id) REFERENCES our_users (user_id) ON DELETE CASCADE,
FOREIGN KEY (ban_author) REFERENCES our_users (user_id) ON DELETE SET NULL
)
'''
await self._execute_query(query)
@@ -23,10 +25,15 @@ class BlacklistRepository(DatabaseConnection):
async def add_user(self, blacklist_user: BlacklistUser) -> None:
"""Добавляет пользователя в черный список."""
query = """
INSERT INTO blacklist (user_id, message_for_user, date_to_unban)
VALUES (?, ?, ?)
INSERT INTO blacklist (user_id, message_for_user, date_to_unban, ban_author)
VALUES (?, ?, ?, ?)
"""
params = (blacklist_user.user_id, blacklist_user.message_for_user, blacklist_user.date_to_unban)
params = (
blacklist_user.user_id,
blacklist_user.message_for_user,
blacklist_user.date_to_unban,
blacklist_user.ban_author,
)
await self._execute_query(query, params)
self.logger.info(f"Пользователь добавлен в черный список: user_id={blacklist_user.user_id}")
@@ -52,7 +59,11 @@ class BlacklistRepository(DatabaseConnection):
async def get_user(self, user_id: int) -> Optional[BlacklistUser]:
"""Возвращает информацию о пользователе в черном списке по user_id."""
query = "SELECT user_id, message_for_user, date_to_unban, created_at FROM blacklist WHERE user_id = ?"
query = """
SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
FROM blacklist
WHERE user_id = ?
"""
rows = await self._execute_query_with_result(query, (user_id,))
row = rows[0] if rows else None
@@ -61,40 +72,54 @@ class BlacklistRepository(DatabaseConnection):
user_id=row[0],
message_for_user=row[1],
date_to_unban=row[2],
created_at=row[3]
created_at=row[3],
ban_author=row[4] if len(row) > 4 else None,
)
return None
async def get_all_users(self, offset: int = 0, limit: int = 10) -> List[BlacklistUser]:
"""Возвращает список пользователей в черном списке."""
query = "SELECT user_id, message_for_user, date_to_unban, created_at FROM blacklist LIMIT ?, ?"
query = """
SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
FROM blacklist
LIMIT ?, ?
"""
rows = await self._execute_query_with_result(query, (offset, limit))
users = []
for row in rows:
users.append(BlacklistUser(
user_id=row[0],
message_for_user=row[1],
date_to_unban=row[2],
created_at=row[3]
))
users.append(
BlacklistUser(
user_id=row[0],
message_for_user=row[1],
date_to_unban=row[2],
created_at=row[3],
ban_author=row[4] if len(row) > 4 else None,
)
)
self.logger.info(f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {len(users)}")
return users
async def get_all_users_no_limit(self) -> List[BlacklistUser]:
"""Возвращает список всех пользователей в черном списке без лимитов."""
query = "SELECT user_id, message_for_user, date_to_unban, created_at FROM blacklist"
query = """
SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
FROM blacklist
"""
rows = await self._execute_query_with_result(query)
users = []
for row in rows:
users.append(BlacklistUser(
user_id=row[0],
message_for_user=row[1],
date_to_unban=row[2],
created_at=row[3]
))
users.append(
BlacklistUser(
user_id=row[0],
message_for_user=row[1],
date_to_unban=row[2],
created_at=row[3],
ban_author=row[4] if len(row) > 4 else None,
)
)
self.logger.info(f"Получен список всех пользователей в черном списке: {len(users)}")
return users

View File

@@ -17,6 +17,8 @@ class PostRepository(DatabaseConnection):
helper_text_message_id INTEGER,
author_id INTEGER,
created_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'suggest',
is_anonymous INTEGER,
FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE
)
'''
@@ -51,12 +53,15 @@ class PostRepository(DatabaseConnection):
"""Добавление поста."""
if not post.created_at:
post.created_at = int(datetime.now().timestamp())
status = post.status if post.status else "suggest"
# Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None)
is_anonymous_int = None if post.is_anonymous is None else (1 if post.is_anonymous else 0)
query = """
INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at)
VALUES (?, ?, ?, ?)
INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status, is_anonymous)
VALUES (?, ?, ?, ?, ?, ?)
"""
params = (post.message_id, post.text, post.author_id, post.created_at)
params = (post.message_id, post.text, post.author_id, post.created_at, status, is_anonymous_int)
await self._execute_query(query, params)
self.logger.info(f"Пост добавлен: message_id={post.message_id}")
@@ -66,6 +71,75 @@ class PostRepository(DatabaseConnection):
query = "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?"
await self._execute_query(query, (helper_message_id, message_id))
async def update_status_by_message_id(self, message_id: int, status: str) -> int:
"""Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"UPDATE post_from_telegram_suggest SET status = ? WHERE message_id = ?",
(status, message_id),
)
cur = await conn.execute("SELECT changes()")
row = await cur.fetchone()
n = row[0] if row else 0
await conn.commit()
if n == 0:
self.logger.warning(
f"update_status_by_message_id: 0 строк обновлено для message_id={message_id}, status={status}"
)
else:
self.logger.info(f"Статус поста message_id={message_id} обновлён на {status}")
return n
except Exception as e:
if conn:
await conn.rollback()
self.logger.error(f"Ошибка при обновлении статуса message_id={message_id}: {e}")
raise
finally:
if conn:
await conn.close()
async def update_status_for_media_group_by_helper_id(
self, helper_message_id: int, status: str
) -> int:
"""Обновление статуса постов медиагруппы по helper_text_message_id. Возвращает число обновлённых строк."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"""
UPDATE post_from_telegram_suggest
SET status = ?
WHERE message_id = ? OR helper_text_message_id = ?
""",
(status, helper_message_id, helper_message_id),
)
cur = await conn.execute("SELECT changes()")
row = await cur.fetchone()
n = row[0] if row else 0
await conn.commit()
if n == 0:
self.logger.warning(
f"update_status_for_media_group_by_helper_id: 0 строк обновлено "
f"для helper_message_id={helper_message_id}, status={status}"
)
else:
self.logger.info(
f"Статус медиагруппы helper_message_id={helper_message_id} обновлён на {status}"
)
return n
except Exception as e:
if conn:
await conn.rollback()
self.logger.error(
f"Ошибка при обновлении статуса медиагруппы helper_message_id={helper_message_id}: {e}"
)
raise
finally:
if conn:
await conn.close()
async def add_post_content(self, post_id: int, message_id: int, content_name: str, content_type: str) -> bool:
"""Добавление контента поста."""
try:
@@ -148,3 +222,33 @@ class PostRepository(DatabaseConnection):
self.logger.info(f"Получен author_id: {author_id} для helper_message_id={helper_message_id}")
return author_id
return None
async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> Tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по message_id."""
query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?"
rows = await self._execute_query_with_result(query, (message_id,))
row = rows[0] if rows else None
if row:
text = row[0] or ""
is_anonymous_int = row[1]
# Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
self.logger.info(f"Получены текст и is_anonymous для message_id={message_id}")
return text, is_anonymous
return None, None
async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> Tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по helper_text_message_id."""
query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
rows = await self._execute_query_with_result(query, (helper_message_id,))
row = rows[0] if rows else None
if row:
text = row[0] or ""
is_anonymous_int = row[1]
# Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
self.logger.info(f"Получены текст и is_anonymous для helper_message_id={helper_message_id}")
return text, is_anonymous
return None, None

View File

@@ -1,6 +1,7 @@
from typing import Optional
from database.repositories.user_repository import UserRepository
from database.repositories.blacklist_repository import BlacklistRepository
from database.repositories.blacklist_history_repository import BlacklistHistoryRepository
from database.repositories.message_repository import MessageRepository
from database.repositories.post_repository import PostRepository
from database.repositories.admin_repository import AdminRepository
@@ -14,6 +15,7 @@ class RepositoryFactory:
self.db_path = db_path
self._user_repo: Optional[UserRepository] = None
self._blacklist_repo: Optional[BlacklistRepository] = None
self._blacklist_history_repo: Optional[BlacklistHistoryRepository] = None
self._message_repo: Optional[MessageRepository] = None
self._post_repo: Optional[PostRepository] = None
self._admin_repo: Optional[AdminRepository] = None
@@ -33,6 +35,13 @@ class RepositoryFactory:
self._blacklist_repo = BlacklistRepository(self.db_path)
return self._blacklist_repo
@property
def blacklist_history(self) -> BlacklistHistoryRepository:
"""Возвращает репозиторий истории банов/разбанов."""
if self._blacklist_history_repo is None:
self._blacklist_history_repo = BlacklistHistoryRepository(self.db_path)
return self._blacklist_history_repo
@property
def messages(self) -> MessageRepository:
"""Возвращает репозиторий сообщений."""
@@ -65,6 +74,7 @@ class RepositoryFactory:
"""Создает все таблицы в базе данных."""
await self.users.create_tables()
await self.blacklist.create_tables()
await self.blacklist_history.create_tables()
await self.messages.create_tables()
await self.posts.create_tables()
await self.admins.create_tables()

View File

@@ -40,6 +40,20 @@ CREATE TABLE IF NOT EXISTS blacklist (
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE
);
-- Blacklist history for tracking all ban/unban events
CREATE TABLE IF NOT EXISTS blacklist_history (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
message_for_user TEXT,
date_ban INTEGER NOT NULL,
date_unban INTEGER,
ban_author INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
updated_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
);
-- User message history
CREATE TABLE IF NOT EXISTS user_messages (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -57,6 +71,8 @@ CREATE TABLE IF NOT EXISTS post_from_telegram_suggest (
helper_text_message_id INTEGER,
author_id INTEGER,
created_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'suggest',
is_anonymous INTEGER,
FOREIGN KEY (author_id) REFERENCES our_users(user_id) ON DELETE CASCADE
);
@@ -107,6 +123,9 @@ CREATE INDEX IF NOT EXISTS idx_audio_message_reference_author_id ON audio_messag
CREATE INDEX IF NOT EXISTS idx_user_messages_user_id ON user_messages(user_id);
CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_author_id ON post_from_telegram_suggest(author_id);
CREATE INDEX IF NOT EXISTS idx_blacklist_date_to_unban ON blacklist(date_to_unban);
CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id);
CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban);
CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban);
CREATE INDEX IF NOT EXISTS idx_user_messages_date ON user_messages(date);
CREATE INDEX IF NOT EXISTS idx_audio_message_reference_date ON audio_message_reference(date_added);
CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_date ON post_from_telegram_suggest(created_at);

View File

@@ -359,7 +359,8 @@ async def confirm_ban(
user_id=user_data['target_user_id'],
username=user_data['target_username'],
reason=user_data['ban_reason'],
ban_days=user_data['ban_days']
ban_days=user_data['ban_days'],
ban_author_id=message.from_user.id,
)
safe_username = escape_html(user_data['target_username'])

View File

@@ -117,7 +117,7 @@ class AdminService:
@track_time("ban_user", "admin_service")
@track_errors("admin_service", "ban_user")
async def ban_user(self, user_id: int, username: str, reason: str, ban_days: Optional[int]) -> None:
async def ban_user(self, user_id: int, username: str, reason: str, ban_days: Optional[int], ban_author_id: int) -> None:
"""Заблокировать пользователя"""
try:
# Проверяем, не заблокирован ли уже пользователь
@@ -130,7 +130,7 @@ class AdminService:
date_to_unban = add_days_to_date(ban_days)
# Сохраняем в БД (username больше не передается, так как не используется в новой схеме)
await self.bot_db.set_user_blacklist(user_id, None, reason, date_to_unban)
await self.bot_db.set_user_blacklist(user_id, None, reason, date_to_unban, ban_author=ban_author_id)
logger.info(f"Пользователь {user_id} ({username}) заблокирован. Причина: {reason}, срок: {ban_days} дней")

View File

@@ -13,6 +13,7 @@ from helper_bot.handlers.voice.services import AudioFileService
from helper_bot.keyboards.keyboards import create_keyboard_with_pagination, get_reply_keyboard_admin, \
create_keyboard_for_ban_reason
from helper_bot.utils.helper_func import get_banned_users_list, get_banned_users_buttons
from helper_bot.handlers.admin.utils import format_user_info
from helper_bot.utils.base_dependency_factory import get_global_instance
from .dependency_factory import get_post_publish_service, get_ban_service
from .exceptions import UserBlockedBotError, PostNotFoundError, UserNotFoundError, PublishError, BanError
@@ -125,7 +126,7 @@ async def ban_user_from_post(call: CallbackQuery, **kwargs):
@callback_router.callback_query(F.data.contains(CALLBACK_BAN))
@track_time("process_ban_user", "callback_handlers")
@track_errors("callback_handlers", "process_ban_user")
async def process_ban_user(call: CallbackQuery, state: FSMContext, **kwargs):
async def process_ban_user(call: CallbackQuery, state: FSMContext, bot_db: MagicData("bot_db"), **kwargs):
ban_service = get_ban_service()
# TODO: переделать на MagicData
user_id = call.data[4:]
@@ -140,17 +141,33 @@ async def process_ban_user(call: CallbackQuery, state: FSMContext, **kwargs):
return
try:
user_name = await ban_service.ban_user(str(user_id_int), "")
await state.update_data(user_id=user_id_int, user_name=user_name, message_for_user=None, date_to_unban=None)
# Получаем username пользователя
username = await ban_service.ban_user(str(user_id_int), "")
if not username:
raise UserNotFoundError(f"Пользователь с ID {user_id_int} не найден в базе")
# Получаем full_name пользователя из базы данных
full_name = await bot_db.get_full_name_by_id(user_id_int)
if not full_name:
full_name = 'Неизвестно'
# Сохраняем данные в формате, совместимом с admin_handlers
await state.update_data(
target_user_id=user_id_int,
target_username=username,
target_full_name=full_name
)
# Используем единый формат отображения информации о пользователе
user_info = format_user_info(user_id_int, username, full_name)
markup = create_keyboard_for_ban_reason()
user_name_escaped = html.escape(str(user_name))
full_name_escaped = html.escape(str(call.message.from_user.full_name))
await call.message.answer(
text=f"<b>Выбран пользователь:\nid:</b> {user_id_int}\n<b>username:</b> {user_name_escaped}\nИмя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
text=f"{user_info}\n\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup
)
await state.set_state('BAN_2')
await state.set_state('AWAIT_BAN_DETAILS')
logger.info(f"process_ban_user: Состояние изменено на AWAIT_BAN_DETAILS для пользователя {user_id_int}")
except UserNotFoundError:
markup = get_reply_keyboard_admin()
await call.message.answer(text='Пользователь с таким ID не найден в базе', reply_markup=markup)

View File

@@ -8,7 +8,7 @@ from aiogram.types import CallbackQuery
from helper_bot.utils.helper_func import (
send_text_message, send_photo_message, send_video_message,
send_video_note_message, send_audio_message, send_voice_message,
send_media_group_to_channel, delete_user_blacklist
send_media_group_to_channel, delete_user_blacklist, get_text_message
)
from helper_bot.keyboards.keyboards import create_keyboard_for_ban_reason
from .exceptions import (
@@ -78,10 +78,27 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_text_post")
async def _publish_text_post(self, call: CallbackQuery) -> None:
"""Публикация текстового поста"""
text_post = html.escape(str(call.message.text))
author_id = await self._get_author_id(call.message.message_id)
await send_text_message(self.main_public, call.message, text_post)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
# Получаем сырой текст и is_anonymous из базы
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
if raw_text is None:
raw_text = ""
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
await send_text_message(self.main_public, call.message, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Текст сообщения опубликован в канале {self.main_public}.')
@@ -89,10 +106,27 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_photo_post")
async def _publish_photo_post(self, call: CallbackQuery) -> None:
"""Публикация поста с фото"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, text_post_with_photo)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
# Получаем сырой текст и is_anonymous из базы
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
if raw_text is None:
raw_text = ""
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с фото опубликован в канале {self.main_public}.')
@@ -100,10 +134,27 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_video_post")
async def _publish_video_post(self, call: CallbackQuery) -> None:
"""Публикация поста с видео"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
await send_video_message(self.main_public, call.message, call.message.video.file_id, text_post_with_photo)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
# Получаем сырой текст и is_anonymous из базы
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
if raw_text is None:
raw_text = ""
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
await send_video_message(self.main_public, call.message, call.message.video.file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с видео опубликован в канале {self.main_public}.')
@@ -113,6 +164,11 @@ class PostPublishService:
"""Публикация поста с кружком"""
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_video_note_message(self.main_public, call.message, call.message.video_note.file_id)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с кружком опубликован в канале {self.main_public}.')
@@ -121,10 +177,27 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_audio_post")
async def _publish_audio_post(self, call: CallbackQuery) -> None:
"""Публикация поста с аудио"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
await send_audio_message(self.main_public, call.message, call.message.audio.file_id, text_post_with_photo)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
# Получаем сырой текст и is_anonymous из базы
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
if raw_text is None:
raw_text = ""
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
await send_audio_message(self.main_public, call.message, call.message.audio.file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с аудио опубликован в канале {self.main_public}.')
@@ -134,6 +207,11 @@ class PostPublishService:
"""Публикация поста с войсом"""
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_voice_message(self.main_public, call.message, call.message.voice.file_id)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с войсом опубликован в канале {self.main_public}.')
@@ -155,11 +233,12 @@ class PostPublishService:
logger.error(f"Контент медиагруппы не найден в базе данных для helper_message_id: {helper_message_id}")
raise PublishError("Контент медиагруппы не найден в базе данных")
# Получаем текст поста по helper_message_id
logger.debug(f"Получаю текст поста для helper_message_id: {helper_message_id}")
pre_text = await self.db.get_post_text_by_helper_id(helper_message_id)
post_text = html.escape(str(pre_text)) if pre_text else ""
logger.debug(f"Текст поста получен: {'пустой' if not post_text else f'длина: {len(post_text)} символов'}")
# Получаем сырой текст и is_anonymous по helper_message_id
logger.debug(f"Получаю текст и is_anonymous поста для helper_message_id: {helper_message_id}")
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_helper_id(helper_message_id)
if raw_text is None:
raw_text = ""
logger.debug(f"Текст поста получен: {'пустой' if not raw_text else f'длина: {len(raw_text)} символов'}, is_anonymous={is_anonymous}")
# Получаем ID автора по helper_message_id
logger.debug(f"Получаю ID автора для helper_message_id: {helper_message_id}")
@@ -169,15 +248,25 @@ class PostPublishService:
raise PostNotFoundError(f"Автор не найден для медиагруппы {helper_message_id}")
logger.debug(f"ID автора получен: {author_id}")
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
logger.debug(f"Сформирован финальный текст: {'пустой' if not formatted_text else f'длина: {len(formatted_text)} символов'}")
# Отправляем медиагруппу в канал
logger.info(f"Отправляю медиагруппу в канал {self.main_public}")
await send_media_group_to_channel(
bot=self._get_bot(call.message),
chat_id=self.main_public,
post_content=post_content,
post_text=post_text
post_text=formatted_text
)
await self.db.update_status_for_media_group_by_helper_id(helper_message_id, "approved")
logger.debug(f"Удаляю медиагруппу и уведомляю автора {author_id}")
await self._delete_media_group_and_notify_author(call, author_id)
logger.info(f'Медиагруппа опубликована в канале {self.main_public}.')
@@ -190,10 +279,8 @@ class PostPublishService:
@track_errors("post_publish_service", "decline_post")
async def decline_post(self, call: CallbackQuery) -> None:
"""Отклонение поста"""
logger.info(f"Начинаю отклонение поста. Message ID: {call.message.message_id}, Content type: {call.message.content_type}")
# Проверяем, является ли сообщение частью медиагруппы (осознанный костыль, т.к. сообщение к которому прикреплен коллбек без медиагруппы)
if call.message.text == CONTENT_TYPE_MEDIA_GROUP:
logger.debug("Сообщение является частью медиагруппы, вызываю _decline_media_group")
await self._decline_media_group(call)
return
@@ -201,7 +288,6 @@ class PostPublishService:
if content_type in [CONTENT_TYPE_TEXT, CONTENT_TYPE_PHOTO, CONTENT_TYPE_AUDIO,
CONTENT_TYPE_VOICE, CONTENT_TYPE_VIDEO, CONTENT_TYPE_VIDEO_NOTE]:
logger.debug(f"Отклоняю одиночный пост типа: {content_type}")
await self._decline_single_post(call)
else:
logger.error(f"Неподдерживаемый тип контента для отклонения: {content_type}")
@@ -211,15 +297,16 @@ class PostPublishService:
@track_errors("post_publish_service", "_decline_single_post")
async def _decline_single_post(self, call: CallbackQuery) -> None:
"""Отклонение одиночного поста"""
logger.debug(f"Отклоняю одиночный пост. Message ID: {call.message.message_id}")
author_id = await self._get_author_id(call.message.message_id)
logger.debug(f"ID автора получен: {author_id}")
logger.debug(f"Удаляю сообщение из группы {self.group_for_posts}")
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "declined")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'declined'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await self._get_bot(call.message).delete_message(chat_id=self.group_for_posts, message_id=call.message.message_id)
try:
logger.debug(f"Отправляю уведомление об отклонении автору {author_id}")
await send_text_message(author_id, call.message, MESSAGE_POST_DECLINED)
except Exception as e:
if str(e) == ERROR_BOT_BLOCKED:
@@ -234,7 +321,7 @@ class PostPublishService:
@track_media_processing("media_group")
async def _decline_media_group(self, call: CallbackQuery) -> None:
"""Отклонение медиагруппы"""
logger.debug(f"Отклоняю медиагруппу. Helper message ID: {call.message.message_id}")
await self.db.update_status_for_media_group_by_helper_id(call.message.message_id, "declined")
post_ids = await self.db.get_post_ids_from_telegram_by_last_id(call.message.message_id)
message_ids = post_ids.copy()
@@ -352,11 +439,14 @@ class BanService:
current_date = datetime.now()
date_to_unban = int((current_date + timedelta(days=7)).timestamp())
ban_author_id = call.from_user.id
await self.db.set_user_blacklist(
user_id=author_id,
user_name=None,
message_for_user="Спам",
date_to_unban=date_to_unban
date_to_unban=date_to_unban,
ban_author=ban_author_id,
)
await self._get_bot(call.message).delete_message(chat_id=self.group_for_posts, message_id=call.message.message_id)

View File

@@ -13,11 +13,13 @@ from dataclasses import dataclass
from aiogram import types
from aiogram.types import FSInputFile
from database.models import TelegramPost, User
from logs.custom_logger import logger
# Local imports - utilities
from helper_bot.utils.helper_func import (
get_first_name,
get_text_message,
determine_anonymity,
send_text_message,
send_photo_message,
send_media_group_message_to_private_chat,
@@ -154,11 +156,17 @@ class PostService:
markup = get_reply_keyboard_for_post()
sent_message_id = await send_text_message(self.settings.group_for_posts, message, post_text, markup)
# Сохраняем сырой текст и определяем анонимность
raw_text = message.text or ""
is_anonymous = determine_anonymity(raw_text)
post = TelegramPost(
message_id=sent_message_id,
text=message.text,
text=raw_text,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
@@ -176,11 +184,16 @@ class PostService:
self.settings.group_for_posts, message, message.photo[-1].file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -201,11 +214,16 @@ class PostService:
self.settings.group_for_posts, message, message.video.file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -222,11 +240,16 @@ class PostService:
self.settings.group_for_posts, message, message.video_note.file_id, markup
)
# Сохраняем пустую строку, так как video_note не имеет caption
raw_caption = ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -247,11 +270,16 @@ class PostService:
self.settings.group_for_posts, message, message.audio.file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -268,11 +296,16 @@ class PostService:
self.settings.group_for_posts, message, message.voice.file_id, markup
)
# Сохраняем пустую строку, так как voice не имеет caption
raw_caption = ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -285,17 +318,24 @@ class PostService:
@track_media_processing("media_group")
async def handle_media_group_post(self, message: types.Message, album: list, first_name: str) -> None:
"""Handle media group post submission"""
#TODO: Мне кажется тут какая-то дичь с одинаковыми переменными, в которых post_caption никуда не ведет
post_caption = " "
raw_caption = ""
if album and album[0].caption:
raw_caption = album[0].caption or ""
post_caption = get_text_message(album[0].caption.lower(), first_name, message.from_user.username)
# Определяем анонимность на основе сырого caption
is_anonymous = determine_anonymity(raw_caption)
# Создаем основной пост для медиагруппы
main_post = TelegramPost(
message_id=message.message_id, # ID основного сообщения медиагруппы
text=post_caption,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(main_post)

View File

@@ -85,14 +85,44 @@ def get_first_name(message: types.Message) -> str:
return ""
def get_text_message(post_text: str, first_name: str, username: str = None):
def determine_anonymity(post_text: str) -> bool:
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
Определяет, является ли пост анонимным на основе ключевых слов в тексте.
Args:
post_text: Текст сообщения
Returns:
bool: True, если "анон" в тексте; False, если "неанон" или "не анон" в тексте;
False по умолчанию (если нет ключевых слов)
"""
if not post_text:
return False
post_text_lower = post_text.lower()
# Сначала проверяем "неанон" или "не анон" (более специфичное условие)
if "неанон" in post_text_lower or "не анон" in post_text_lower:
return False
# Проверяем "анон"
if "анон" in post_text_lower:
return True
# По умолчанию False
return False
def get_text_message(post_text: str, first_name: str, username: str = None, is_anonymous: Optional[bool] = None):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон"
или переданного параметра is_anonymous.
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста (может быть None)
is_anonymous: Флаг анонимности (True - анонимно, False - не анонимно, None - legacy, определяется по тексту)
Returns:
str: - Сформированный текст сообщения.
@@ -109,12 +139,21 @@ def get_text_message(post_text: str, first_name: str, username: str = None):
else:
author_info = f"{first_name} (Ник не указан)"
if "неанон" in post_text or "не анон" in post_text:
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
elif "анон" in post_text:
return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно'
# Если передан is_anonymous, используем его, иначе определяем по тексту (legacy)
# TODO: Уверен можно укоротить
if is_anonymous is not None:
if is_anonymous:
return f'{safe_post_text}\n\nПост опубликован анонимно'
else:
return f'{safe_post_text}\n\nАвтор поста: {author_info}'
else:
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
# Legacy: определяем по тексту
if "неанон" in post_text or "не анон" in post_text:
return f'{safe_post_text}\n\nАвтор поста: {author_info}'
elif "анон" in post_text:
return f'{safe_post_text}\n\nПост опубликован анонимно'
else:
return f'{safe_post_text}\n\nАвтор поста: {author_info}'
@track_time("download_file", "helper_func")
@track_errors("helper_func", "download_file")

View File

@@ -21,7 +21,7 @@ aiohttp==3.9.1
# Network stability improvements
aiohttp[speedups]>=3.9.1
aiodns>=3.0.0
cchardet>=2.1.7
charset-normalizer>=3.0.0
# Development tools
pluggy==1.5.0

View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""
Скрипт миграции для добавления колонки ban_author в таблицу blacklist.
Колонка хранит user_id администратора, инициировавшего бан.
"""
import argparse
import asyncio
import os
import sys
from pathlib import Path
import aiosqlite
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
from logs.custom_logger import logger # noqa: E402
DEFAULT_DB_PATH = "database/tg-bot-database.db"
def _column_exists(rows: list, name: str) -> bool:
"""PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk)."""
for row in rows:
if row[1] == name:
return True
return False
async def main(db_path: str) -> None:
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error("База данных не найдена: %s", db_path)
print(f"Ошибка: база данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Проверяем наличие колонки ban_author
cursor = await conn.execute("PRAGMA table_info(blacklist)")
rows = await cursor.fetchall()
await cursor.close()
if not _column_exists(rows, "ban_author"):
logger.info("Добавление колонки ban_author в blacklist")
await conn.execute(
"ALTER TABLE blacklist "
"ADD COLUMN ban_author INTEGER REFERENCES our_users (user_id) ON DELETE SET NULL"
)
await conn.commit()
print("Колонка ban_author добавлена в таблицу blacklist.")
else:
print("Колонка ban_author уже существует в таблице blacklist.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Добавление колонки ban_author в blacklist"
)
parser.add_argument(
"--db",
default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
help="Путь к БД (или DB_PATH)",
)
args = parser.parse_args()
asyncio.run(main(args.db))

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""
Скрипт миграции для добавления колонки is_anonymous в таблицу post_from_telegram_suggest.
Для существующих записей определяет is_anonymous на основе текста или устанавливает NULL.
"""
import argparse
import asyncio
import os
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import aiosqlite
from logs.custom_logger import logger
from helper_bot.utils.helper_func import determine_anonymity
DEFAULT_DB_PATH = "database/tg-bot-database.db"
def _column_exists(rows: list, name: str) -> bool:
"""PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk)."""
for row in rows:
if row[1] == name:
return True
return False
async def main(db_path: str) -> None:
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error("База данных не найдена: %s", db_path)
print(f"Ошибка: база данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Проверяем наличие колонки is_anonymous
cursor = await conn.execute(
"PRAGMA table_info(post_from_telegram_suggest)"
)
rows = await cursor.fetchall()
await cursor.close()
if not _column_exists(rows, "is_anonymous"):
logger.info("Добавление колонки is_anonymous в post_from_telegram_suggest")
await conn.execute(
"ALTER TABLE post_from_telegram_suggest "
"ADD COLUMN is_anonymous INTEGER"
)
await conn.commit()
print("Колонка is_anonymous добавлена.")
else:
print("Колонка is_anonymous уже существует.")
# Получаем все записи с текстом для обновления
cursor = await conn.execute(
"SELECT message_id, text FROM post_from_telegram_suggest WHERE text IS NOT NULL"
)
posts = await cursor.fetchall()
await cursor.close()
updated_count = 0
null_count = 0
# Обновляем каждую запись
for message_id, text in posts:
try:
# Определяем is_anonymous на основе текста
# Если текст пустой или None, устанавливаем NULL (legacy)
if not text or not text.strip():
is_anonymous = None
else:
is_anonymous = determine_anonymity(text)
# Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None)
is_anonymous_int = None if is_anonymous is None else (1 if is_anonymous else 0)
await conn.execute(
"UPDATE post_from_telegram_suggest SET is_anonymous = ? WHERE message_id = ?",
(is_anonymous_int, message_id)
)
if is_anonymous is not None:
updated_count += 1
else:
null_count += 1
except Exception as e:
logger.error(f"Ошибка при обработке поста message_id={message_id}: {e}")
# В случае ошибки устанавливаем NULL
await conn.execute(
"UPDATE post_from_telegram_suggest SET is_anonymous = NULL WHERE message_id = ?",
(message_id,)
)
null_count += 1
# Обновляем записи без текста (устанавливаем NULL)
cursor = await conn.execute(
"SELECT COUNT(*) FROM post_from_telegram_suggest WHERE text IS NULL"
)
row = await cursor.fetchone()
posts_without_text = row[0] if row else 0
await cursor.close()
if posts_without_text > 0:
await conn.execute(
"UPDATE post_from_telegram_suggest SET is_anonymous = NULL WHERE text IS NULL"
)
null_count += posts_without_text
await conn.commit()
total_updated = updated_count + null_count
logger.info(
f"Миграция завершена. Обновлено записей: {total_updated} "
f"(определено: {updated_count}, установлено NULL: {null_count})"
)
print(f"Миграция завершена.")
print(f"Обновлено записей: {total_updated}")
print(f" - Определено is_anonymous: {updated_count}")
print(f" - Установлено NULL: {null_count}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Добавление колонки is_anonymous в post_from_telegram_suggest"
)
parser.add_argument(
"--db",
default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
help="Путь к БД (или DB_PATH)",
)
args = parser.parse_args()
asyncio.run(main(args.db))

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python3
"""
Скрипт для проставления status='legacy' всем существующим записям в post_from_telegram_suggest.
Добавляет колонку status, если её нет, затем обновляет все строки.
"""
import argparse
import asyncio
import os
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import aiosqlite
from logs.custom_logger import logger
DEFAULT_DB_PATH = "database/tg-bot-database.db"
def _column_exists(rows: list, name: str) -> bool:
"""PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk)."""
for row in rows:
if row[1] == name:
return True
return False
async def main(db_path: str) -> None:
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error("База данных не найдена: %s", db_path)
print(f"Ошибка: база данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Проверяем наличие колонки status
cursor = await conn.execute(
"PRAGMA table_info(post_from_telegram_suggest)"
)
rows = await cursor.fetchall()
await cursor.close()
if not _column_exists(rows, "status"):
logger.info("Добавление колонки status в post_from_telegram_suggest")
await conn.execute(
"ALTER TABLE post_from_telegram_suggest "
"ADD COLUMN status TEXT NOT NULL DEFAULT 'suggest'"
)
await conn.commit()
print("Колонка status добавлена.")
else:
print("Колонка status уже существует.")
# Обновляем все существующие записи на legacy
await conn.execute(
"UPDATE post_from_telegram_suggest SET status = 'legacy'"
)
await conn.commit()
cursor = await conn.execute("SELECT changes()")
row = await cursor.fetchone()
updated = row[0] if row else 0
await cursor.close()
logger.info("Обновлено записей в post_from_telegram_suggest: %d", updated)
print(f"Обновлено записей: {updated}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Backfill status='legacy' для post_from_telegram_suggest"
)
parser.add_argument(
"--db",
default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
help="Путь к БД (или DB_PATH)",
)
args = parser.parse_args()
asyncio.run(main(args.db))

148
scripts/clean_post_text.py Executable file
View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
Скрипт для приведения текста постов к "сырому" виду.
Удаляет форматирование, добавленное функцией get_text_message(), оставляя только исходный текст.
"""
import argparse
import asyncio
import html
import os
import re
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import aiosqlite
from logs.custom_logger import logger
DEFAULT_DB_PATH = "database/tg-bot-database.db"
# Паттерны для определения форматированного текста
PREFIX = "Пост из ТГ:\n"
ANONYMOUS_SUFFIX = "\n\nПост опубликован анонимно"
AUTHOR_SUFFIX_PATTERN = re.compile(r"\n\nАвтор поста: .+$")
def extract_raw_text(formatted_text: str) -> str:
"""
Извлекает сырой текст из форматированного текста поста.
Args:
formatted_text: Форматированный текст поста
Returns:
str: Сырой текст или исходный текст, если форматирование не обнаружено
"""
if not formatted_text:
return ""
# Проверяем, начинается ли текст с префикса
if not formatted_text.startswith(PREFIX):
# Текст уже в сыром виде или имеет другой формат
return formatted_text
# Извлекаем текст после префикса
text_after_prefix = formatted_text[len(PREFIX):]
# Проверяем, заканчивается ли текст на "Пост опубликован анонимно"
if text_after_prefix.endswith(ANONYMOUS_SUFFIX):
raw_text = text_after_prefix[:-len(ANONYMOUS_SUFFIX)]
# Проверяем, заканчивается ли текст на "Автор поста: ..."
elif AUTHOR_SUFFIX_PATTERN.search(text_after_prefix):
raw_text = AUTHOR_SUFFIX_PATTERN.sub("", text_after_prefix)
else:
# Не удалось определить формат, возвращаем текст без префикса
raw_text = text_after_prefix
# Декодируем HTML-экранирование
raw_text = html.unescape(raw_text)
return raw_text
async def main(db_path: str, dry_run: bool = False) -> None:
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error("База данных не найдена: %s", db_path)
print(f"Ошибка: база данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Получаем все записи с текстом
cursor = await conn.execute(
"SELECT message_id, text FROM post_from_telegram_suggest WHERE text IS NOT NULL AND text != ''"
)
posts = await cursor.fetchall()
await cursor.close()
updated_count = 0
skipped_count = 0
error_count = 0
print(f"Найдено записей для обработки: {len(posts)}")
if dry_run:
print("РЕЖИМ ПРОВЕРКИ (dry-run): изменения не будут сохранены")
# Обрабатываем каждую запись
for message_id, formatted_text in posts:
try:
# Извлекаем сырой текст
raw_text = extract_raw_text(formatted_text)
# Проверяем, изменился ли текст
if raw_text == formatted_text:
skipped_count += 1
continue
if dry_run:
print(f"\n[DRY-RUN] message_id={message_id}:")
print(f" Было: {formatted_text[:100]}...")
print(f" Станет: {raw_text[:100]}...")
else:
# Обновляем запись
await conn.execute(
"UPDATE post_from_telegram_suggest SET text = ? WHERE message_id = ?",
(raw_text, message_id)
)
updated_count += 1
except Exception as e:
logger.error(f"Ошибка при обработке поста message_id={message_id}: {e}")
error_count += 1
if not dry_run:
await conn.commit()
total_processed = updated_count + skipped_count + error_count
logger.info(
f"Обработка завершена. Всего записей: {total_processed}, "
f"обновлено: {updated_count}, пропущено: {skipped_count}, ошибок: {error_count}"
)
print(f"\nОбработка завершена:")
print(f" - Всего записей: {total_processed}")
print(f" - Обновлено: {updated_count}")
print(f" - Пропущено (уже в сыром виде): {skipped_count}")
print(f" - Ошибок: {error_count}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Приведение текста постов к 'сырому' виду"
)
parser.add_argument(
"--db",
default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
help="Путь к БД (или DB_PATH)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Режим проверки без сохранения изменений",
)
args = parser.parse_args()
asyncio.run(main(args.db, args.dry_run))

View File

@@ -0,0 +1,94 @@
#!/usr/bin/env python3
"""
Скрипт миграции для создания таблицы blacklist_history.
Таблица хранит историю всех операций бана/разбана пользователей.
"""
import argparse
import asyncio
import os
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import aiosqlite
from logs.custom_logger import logger
DEFAULT_DB_PATH = "database/tg-bot-database.db"
def _table_exists(rows: list, table_name: str) -> bool:
"""Проверяет существование таблицы по результатам PRAGMA table_list."""
for row in rows:
if row[1] == table_name: # name column
return True
return False
async def main(db_path: str) -> None:
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error("База данных не найдена: %s", db_path)
print(f"Ошибка: база данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Проверяем наличие таблицы blacklist_history
cursor = await conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='blacklist_history'"
)
rows = await cursor.fetchall()
await cursor.close()
if not rows:
logger.info("Создание таблицы blacklist_history")
# Создаем таблицу
await conn.execute("""
CREATE TABLE IF NOT EXISTS blacklist_history (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
message_for_user TEXT,
date_ban INTEGER NOT NULL,
date_unban INTEGER,
ban_author INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
updated_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
)
""")
# Создаем индексы
await conn.execute(
"CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id)"
)
await conn.execute(
"CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban)"
)
await conn.execute(
"CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban)"
)
await conn.commit()
logger.info("Таблица blacklist_history и индексы успешно созданы")
print("Таблица blacklist_history и индексы успешно созданы.")
else:
print("Таблица blacklist_history уже существует.")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Создание таблицы blacklist_history для истории банов/разбанов"
)
parser.add_argument(
"--db",
default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
help="Путь к БД (или DB_PATH)",
)
args = parser.parse_args()
asyncio.run(main(args.db))

View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""
Скрипт миграции для переноса записей из blacklist в blacklist_history.
Переносит все существующие записи из таблицы blacklist в таблицу blacklist_history.
"""
import argparse
import asyncio
import os
import sys
from pathlib import Path
from datetime import datetime
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import aiosqlite
from logs.custom_logger import logger
DEFAULT_DB_PATH = "database/tg-bot-database.db"
async def main(db_path: str) -> None:
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error("База данных не найдена: %s", db_path)
print(f"Ошибка: база данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Проверяем наличие таблицы blacklist_history
cursor = await conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='blacklist_history'"
)
rows = await cursor.fetchall()
await cursor.close()
if not rows:
logger.error("Таблица blacklist_history не найдена. Сначала запустите create_blacklist_history_table.py")
print("Ошибка: таблица blacklist_history не найдена. Сначала запустите create_blacklist_history_table.py")
return
# Получаем все записи из blacklist
cursor = await conn.execute(
"SELECT user_id, message_for_user, date_to_unban, created_at, ban_author FROM blacklist"
)
blacklist_records = await cursor.fetchall()
await cursor.close()
if not blacklist_records:
print("В таблице blacklist нет записей для переноса.")
logger.info("В таблице blacklist нет записей для переноса")
return
logger.info("Найдено записей в blacklist для переноса: %d", len(blacklist_records))
print(f"Найдено записей в blacklist для переноса: {len(blacklist_records)}")
# Получаем текущее время в Unix timestamp
current_time = int(datetime.now().timestamp())
# Переносим записи в blacklist_history
migrated_count = 0
skipped_count = 0
for record in blacklist_records:
user_id, message_for_user, date_to_unban, created_at, ban_author = record
# Проверяем, нет ли уже записи для этого user_id с таким же date_ban
# (чтобы избежать дубликатов при повторном запуске)
date_ban = created_at if created_at is not None else current_time
check_cursor = await conn.execute(
"SELECT id FROM blacklist_history WHERE user_id = ? AND date_ban = ?",
(user_id, date_ban)
)
existing = await check_cursor.fetchone()
await check_cursor.close()
if existing:
logger.debug("Запись для user_id=%d с date_ban=%d уже существует, пропускаем", user_id, date_ban)
skipped_count += 1
continue
# Вставляем запись в blacklist_history
await conn.execute(
"""
INSERT INTO blacklist_history
(user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
user_id,
message_for_user,
date_ban,
date_to_unban,
ban_author,
created_at if created_at is not None else current_time,
current_time
)
)
migrated_count += 1
await conn.commit()
logger.info(
"Миграция завершена. Перенесено записей: %d, пропущено (дубликаты): %d",
migrated_count,
skipped_count
)
print(f"Миграция завершена. Перенесено записей: {migrated_count}, пропущено (дубликаты): {skipped_count}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Перенос записей из blacklist в blacklist_history"
)
parser.add_argument(
"--db",
default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
help="Путь к БД (или DB_PATH)",
)
args = parser.parse_args()
asyncio.run(main(args.db))

View File

@@ -22,6 +22,8 @@ def mock_post_repository():
mock_repo = Mock(spec=PostRepository)
mock_repo._execute_query = AsyncMock()
mock_repo._execute_query_with_result = AsyncMock()
mock_repo.update_status_by_message_id = AsyncMock()
mock_repo.update_status_for_media_group_by_helper_id = AsyncMock()
mock_repo.logger = Mock()
return mock_repo
@@ -198,7 +200,9 @@ def mock_sql_queries():
"CREATE TABLE IF NOT EXISTS message_link_to_content"
],
'add_post': "INSERT INTO post_from_telegram_suggest",
'add_post_status': "status",
'update_helper': "UPDATE post_from_telegram_suggest SET helper_text_message_id",
'update_status': "UPDATE post_from_telegram_suggest SET status = ?",
'add_content': "INSERT OR IGNORE INTO content_post_from_telegram",
'add_link': "INSERT OR IGNORE INTO message_link_to_content",
'get_content': "SELECT cpft.content_name, cpft.content_type",

View File

@@ -14,6 +14,13 @@ class TestAsyncBotDB:
mock_factory.audio.delete_audio_moderate_record = AsyncMock()
mock_factory.users = Mock()
mock_factory.users.logger = Mock()
# Моки для blacklist и blacklist_history
mock_factory.blacklist = Mock()
mock_factory.blacklist.add_user = AsyncMock()
mock_factory.blacklist.remove_user = AsyncMock(return_value=True)
mock_factory.blacklist_history = Mock()
mock_factory.blacklist_history.add_record_on_ban = AsyncMock()
mock_factory.blacklist_history.set_unban_date = AsyncMock(return_value=True)
return mock_factory
@pytest.fixture
@@ -102,3 +109,107 @@ class TestAsyncBotDB:
await async_bot_db.delete_audio_moderate_record(message_id)
mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id)
@pytest.mark.asyncio
async def test_set_user_blacklist_calls_history(self, async_bot_db, mock_factory):
"""Тест что set_user_blacklist вызывает добавление в историю"""
user_id = 12345
message_for_user = "Нарушение правил"
date_to_unban = 1234567890
ban_author = 999
await async_bot_db.set_user_blacklist(
user_id=user_id,
user_name=None,
message_for_user=message_for_user,
date_to_unban=date_to_unban,
ban_author=ban_author
)
# Проверяем, что сначала добавлен в blacklist
mock_factory.blacklist.add_user.assert_called_once()
# Проверяем, что затем добавлена запись в историю
mock_factory.blacklist_history.add_record_on_ban.assert_called_once()
# Проверяем параметры записи в историю
history_call = mock_factory.blacklist_history.add_record_on_ban.call_args[0][0]
assert history_call.user_id == user_id
assert history_call.message_for_user == message_for_user
assert history_call.date_ban is not None
assert history_call.date_unban is None
assert history_call.ban_author == ban_author
@pytest.mark.asyncio
async def test_set_user_blacklist_history_error_does_not_fail(self, async_bot_db, mock_factory):
"""Тест что ошибка записи в историю не ломает процесс бана"""
user_id = 12345
mock_factory.blacklist_history.add_record_on_ban.side_effect = Exception("History error")
# Бан должен пройти успешно, несмотря на ошибку в истории
await async_bot_db.set_user_blacklist(
user_id=user_id,
message_for_user="Тест",
date_to_unban=None,
ban_author=None
)
# Проверяем, что пользователь все равно добавлен в blacklist
mock_factory.blacklist.add_user.assert_called_once()
# Проверяем, что попытка записи в историю была
mock_factory.blacklist_history.add_record_on_ban.assert_called_once()
@pytest.mark.asyncio
async def test_delete_user_blacklist_calls_history(self, async_bot_db, mock_factory):
"""Тест что delete_user_blacklist вызывает обновление истории"""
user_id = 12345
result = await async_bot_db.delete_user_blacklist(user_id)
# Проверяем, что сначала обновлена история
mock_factory.blacklist_history.set_unban_date.assert_called_once()
history_call = mock_factory.blacklist_history.set_unban_date.call_args
assert history_call[0][0] == user_id
assert history_call[0][1] is not None # date_unban timestamp
# Проверяем, что затем удален из blacklist
mock_factory.blacklist.remove_user.assert_called_once_with(user_id)
# Проверяем результат
assert result is True
@pytest.mark.asyncio
async def test_delete_user_blacklist_history_error_does_not_fail(self, async_bot_db, mock_factory):
"""Тест что ошибка обновления истории не ломает процесс разбана"""
user_id = 12345
mock_factory.blacklist_history.set_unban_date.side_effect = Exception("History error")
# Разбан должен пройти успешно, несмотря на ошибку в истории
result = await async_bot_db.delete_user_blacklist(user_id)
# Проверяем, что попытка обновления истории была
mock_factory.blacklist_history.set_unban_date.assert_called_once()
# Проверяем, что пользователь все равно удален из blacklist
mock_factory.blacklist.remove_user.assert_called_once_with(user_id)
# Проверяем результат
assert result is True
@pytest.mark.asyncio
async def test_delete_user_blacklist_returns_false_on_blacklist_error(self, async_bot_db, mock_factory):
"""Тест что delete_user_blacklist возвращает False при ошибке удаления из blacklist"""
user_id = 12345
mock_factory.blacklist.remove_user.return_value = False
result = await async_bot_db.delete_user_blacklist(user_id)
# Проверяем, что история обновлена
mock_factory.blacklist_history.set_unban_date.assert_called_once()
# Проверяем, что удаление из blacklist было попытка
mock_factory.blacklist.remove_user.assert_called_once_with(user_id)
# Проверяем результат
assert result is False

View File

@@ -17,7 +17,7 @@ class TestAutoUnbanIntegration:
@pytest.fixture
def setup_test_db(self, test_db_path):
"""Создает тестовую базу данных с таблицей blacklist"""
"""Создает тестовую базу данных с таблицами blacklist, our_users и blacklist_history"""
# Удаляем старую тестовую базу если она существует
if os.path.exists(test_db_path):
os.remove(test_db_path)
@@ -26,30 +26,112 @@ class TestAutoUnbanIntegration:
conn = sqlite3.connect(test_db_path)
cursor = conn.cursor()
# Создаем таблицу blacklist
# Включаем поддержку внешних ключей
cursor.execute("PRAGMA foreign_keys = ON")
# Создаем таблицу our_users (нужна для внешних ключей)
cursor.execute('''
CREATE TABLE IF NOT EXISTS blacklist (
user_id INTEGER PRIMARY KEY,
user_name TEXT,
message_for_user TEXT,
date_to_unban INTEGER
CREATE TABLE IF NOT EXISTS our_users (
user_id INTEGER NOT NULL PRIMARY KEY,
first_name TEXT,
full_name TEXT,
username TEXT,
is_bot BOOLEAN DEFAULT 0,
language_code TEXT,
has_stickers BOOLEAN DEFAULT 0 NOT NULL,
emoji TEXT,
date_added INTEGER NOT NULL,
date_changed INTEGER NOT NULL,
voice_bot_welcome_received BOOLEAN DEFAULT 0
)
''')
# Добавляем тестовые данные
# Создаем таблицу blacklist
cursor.execute('''
CREATE TABLE IF NOT EXISTS blacklist (
user_id INTEGER NOT NULL PRIMARY KEY,
message_for_user TEXT,
date_to_unban INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
ban_author INTEGER,
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE
)
''')
# Создаем таблицу blacklist_history
cursor.execute('''
CREATE TABLE IF NOT EXISTS blacklist_history (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
message_for_user TEXT,
date_ban INTEGER NOT NULL,
date_unban INTEGER,
ban_author INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
updated_at INTEGER DEFAULT (strftime('%s', 'now')),
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
)
''')
# Создаем индексы для blacklist_history
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban)
''')
# Добавляем тестовых пользователей в our_users
current_time = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
users_data = [
(123, "Test", "Test User 1", "test_user1", 0, "ru", 0, "😊", current_time, current_time, 0),
(456, "Test", "Test User 2", "test_user2", 0, "ru", 0, "😊", current_time, current_time, 0),
(789, "Test", "Test User 3", "test_user3", 0, "ru", 0, "😊", current_time, current_time, 0),
(999, "Test", "Test User 4", "test_user4", 0, "ru", 0, "😊", current_time, current_time, 0),
]
cursor.executemany(
"""INSERT INTO our_users (user_id, first_name, full_name, username, is_bot,
language_code, has_stickers, emoji, date_added, date_changed, voice_bot_welcome_received)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
users_data
)
# Добавляем тестовые данные в blacklist
today_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
tomorrow_timestamp = int((datetime.now(timezone(timedelta(hours=3))) + timedelta(days=1)).timestamp())
test_data = [
(123, "test_user1", "Test ban 1", today_timestamp), # Разблокируется сегодня
(456, "test_user2", "Test ban 2", today_timestamp), # Разблокируется сегодня
(789, "test_user3", "Test ban 3", tomorrow_timestamp), # Разблокируется завтра
(999, "test_user4", "Test ban 4", None), # Навсегда заблокирован
blacklist_data = [
(123, "Test ban 1", today_timestamp, current_time, None), # Разблокируется сегодня
(456, "Test ban 2", today_timestamp, current_time, None), # Разблокируется сегодня
(789, "Test ban 3", tomorrow_timestamp, current_time, None), # Разблокируется завтра
(999, "Test ban 4", None, current_time, None), # Навсегда заблокирован
]
cursor.executemany(
"INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)",
test_data
"INSERT INTO blacklist (user_id, message_for_user, date_to_unban, created_at, ban_author) VALUES (?, ?, ?, ?, ?)",
blacklist_data
)
# Добавляем тестовые данные в blacklist_history
# Для пользователей 123 и 456 (которые будут разблокированы) создаем записи с date_unban = NULL
yesterday_timestamp = int((datetime.now(timezone(timedelta(hours=3))) - timedelta(days=1)).timestamp())
history_data = [
(123, "Test ban 1", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Будет разблокирован
(456, "Test ban 2", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Будет разблокирован
(789, "Test ban 3", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Не будет разблокирован сегодня
(999, "Test ban 4", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Навсегда заблокирован
]
cursor.executemany(
"""INSERT INTO blacklist_history
(user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
history_data
)
conn.commit()
@@ -105,9 +187,20 @@ class TestAutoUnbanIntegration:
initial_count = cursor.fetchone()[0]
assert initial_count == 4
# Проверяем начальное состояние истории: должно быть 2 записи с date_unban IS NULL для user_id 123 и 456
cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (123, 456) AND date_unban IS NULL")
initial_open_history = cursor.fetchone()[0]
assert initial_open_history == 2
# Запоминаем время до разбана для проверки updated_at
before_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
# Запоминаем время после разбана для проверки updated_at
after_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
# Проверяем, что пользователи с сегодняшней датой разблокированы
current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?",
@@ -131,6 +224,32 @@ class TestAutoUnbanIntegration:
final_count = cursor.fetchone()[0]
assert final_count == 2 # Остались только завтрашние и навсегда заблокированные
# Проверяем историю банов: для user_id 123 и 456 должны быть установлены date_unban
cursor.execute("SELECT user_id, date_unban, updated_at FROM blacklist_history WHERE user_id IN (123, 456) ORDER BY user_id")
history_records = cursor.fetchall()
assert len(history_records) == 2
for user_id, date_unban, updated_at in history_records:
# Проверяем, что date_unban установлен (не NULL)
assert date_unban is not None, f"date_unban должен быть установлен для user_id={user_id}"
assert isinstance(date_unban, int), f"date_unban должен быть integer для user_id={user_id}"
# Проверяем, что date_unban находится в разумных пределах (между before и after)
assert before_unban_timestamp <= date_unban <= after_unban_timestamp, \
f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {date_unban}"
# Проверяем, что updated_at обновлен
assert updated_at is not None, f"updated_at должен быть установлен для user_id={user_id}"
assert isinstance(updated_at, int), f"updated_at должен быть integer для user_id={user_id}"
assert before_unban_timestamp <= updated_at <= after_unban_timestamp, \
f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {updated_at}"
# Проверяем, что для user_id 789 и 999 записи в истории остались без изменений (date_unban все еще NULL)
cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (789, 999) AND date_unban IS NULL")
unchanged_history = cursor.fetchone()[0]
assert unchanged_history == 2, "Записи для user_id 789 и 999 должны остаться с date_unban = NULL"
conn.close()
# Проверяем, что отчет был отправлен
@@ -148,6 +267,12 @@ class TestAutoUnbanIntegration:
cursor = conn.cursor()
current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
cursor.execute("DELETE FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?", (current_timestamp,))
# Проверяем начальное состояние истории: все записи должны иметь date_unban = NULL
cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL")
initial_open_history = cursor.fetchone()[0]
assert initial_open_history == 4 # Все 4 записи должны быть открытыми
conn.commit()
conn.close()
@@ -159,6 +284,14 @@ class TestAutoUnbanIntegration:
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
# Проверяем, что история не изменилась (все записи все еще с date_unban = NULL)
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL")
final_open_history = cursor.fetchone()[0]
assert final_open_history == 4, "История не должна изменяться, если нет пользователей для разблокировки"
conn.close()
# Проверяем, что отчет не был отправлен (нет пользователей для разблокировки)
mock_bot.send_message.assert_not_called()
@@ -190,6 +323,100 @@ class TestAutoUnbanIntegration:
assert call_args[1]['chat_id'] == '-1001234567891' # important_logs
assert "Ошибка автоматического разбана" in call_args[1]['text']
@pytest.mark.asyncio
@patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
async def test_auto_unban_updates_history(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
"""Тест что автоматический разбан обновляет историю банов"""
# Настройка моков
mock_get_instance.return_value = mock_bdf
# Создаем планировщик
scheduler = AutoUnbanScheduler()
scheduler.bot_db = mock_bdf.database
scheduler.set_bot(mock_bot)
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
# Проверяем начальное состояние: для user_id 123 и 456 должны быть записи с date_unban = NULL
cursor.execute("""
SELECT id, user_id, date_ban, date_unban, updated_at
FROM blacklist_history
WHERE user_id IN (123, 456) AND date_unban IS NULL
ORDER BY user_id
""")
initial_records = cursor.fetchall()
assert len(initial_records) == 2, "Должно быть 2 открытые записи для user_id 123 и 456"
# Запоминаем ID записей и их начальные значения updated_at
record_ids = {row[0]: (row[1], row[4]) for row in initial_records}
# Запоминаем время до разбана
before_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
conn.close()
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
# Запоминаем время после разбана
after_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
# Проверяем, что записи обновлены
conn = sqlite3.connect(setup_test_db)
cursor = conn.cursor()
cursor.execute("""
SELECT id, user_id, date_ban, date_unban, updated_at
FROM blacklist_history
WHERE user_id IN (123, 456)
ORDER BY user_id
""")
updated_records = cursor.fetchall()
assert len(updated_records) == 2, "Должно быть 2 записи для user_id 123 и 456"
for record_id, user_id, date_ban, date_unban, updated_at in updated_records:
# Проверяем, что это одна из наших записей
assert record_id in record_ids, f"Запись с id={record_id} должна быть в исходных записях"
# Проверяем, что date_unban установлен
assert date_unban is not None, f"date_unban должен быть установлен для user_id={user_id}"
assert isinstance(date_unban, int), f"date_unban должен быть integer для user_id={user_id}"
# Проверяем, что date_unban находится в разумных пределах
assert before_unban_timestamp <= date_unban <= after_unban_timestamp, \
f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}"
# Проверяем, что updated_at обновлен (должен быть больше начального значения)
assert updated_at is not None, f"updated_at должен быть установлен для user_id={user_id}"
assert isinstance(updated_at, int), f"updated_at должен быть integer для user_id={user_id}"
assert before_unban_timestamp <= updated_at <= after_unban_timestamp, \
f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}"
# Проверяем, что updated_at действительно обновлен (больше начального значения)
initial_updated_at = record_ids[record_id][1]
assert updated_at >= initial_updated_at, \
f"updated_at для user_id={user_id} должен быть больше или равен начальному значению"
# Проверяем, что обновлена только последняя запись для каждого пользователя
# (если бы было несколько записей, обновилась бы только последняя)
cursor.execute("""
SELECT COUNT(*) FROM blacklist_history
WHERE user_id IN (123, 456) AND date_unban IS NOT NULL
""")
closed_records = cursor.fetchone()[0]
assert closed_records == 2, "Должно быть закрыто 2 записи (по одной для каждого пользователя)"
cursor.execute("""
SELECT COUNT(*) FROM blacklist_history
WHERE user_id IN (123, 456) AND date_unban IS NULL
""")
open_records = cursor.fetchone()[0]
assert open_records == 0, "Не должно быть открытых записей для user_id 123 и 456"
conn.close()
def test_date_format_consistency(self, setup_test_db, mock_bdf):
"""Тест консистентности формата дат"""
scheduler = AutoUnbanScheduler()

View File

@@ -0,0 +1,257 @@
import pytest
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime
import time
from database.repositories.blacklist_history_repository import BlacklistHistoryRepository
from database.models import BlacklistHistoryRecord
class TestBlacklistHistoryRepository:
"""Тесты для BlacklistHistoryRepository"""
@pytest.fixture
def mock_db_connection(self):
"""Мок для DatabaseConnection"""
mock_connection = Mock()
mock_connection._execute_query = AsyncMock()
mock_connection._execute_query_with_result = AsyncMock()
mock_connection.logger = Mock()
return mock_connection
@pytest.fixture
def blacklist_history_repository(self, mock_db_connection):
"""Экземпляр BlacklistHistoryRepository для тестов"""
# Патчим наследование от DatabaseConnection
with patch.object(BlacklistHistoryRepository, '__init__', return_value=None):
repo = BlacklistHistoryRepository()
repo._execute_query = mock_db_connection._execute_query
repo._execute_query_with_result = mock_db_connection._execute_query_with_result
repo.logger = mock_db_connection.logger
return repo
@pytest.fixture
def sample_history_record(self):
"""Тестовая запись истории бана"""
current_time = int(time.time())
return BlacklistHistoryRecord(
user_id=12345,
message_for_user="Нарушение правил",
date_ban=current_time,
date_unban=None,
ban_author=999,
created_at=current_time,
updated_at=current_time,
)
@pytest.fixture
def sample_history_record_with_unban(self):
"""Тестовая запись истории бана с датой разбана"""
current_time = int(time.time())
return BlacklistHistoryRecord(
user_id=12345,
message_for_user="Нарушение правил",
date_ban=current_time - 86400, # Бан был вчера
date_unban=current_time, # Разбан сегодня
ban_author=999,
created_at=current_time - 86400,
updated_at=current_time,
)
@pytest.mark.asyncio
async def test_create_tables(self, blacklist_history_repository):
"""Тест создания таблицы истории банов/разбанов"""
await blacklist_history_repository.create_tables()
# Проверяем, что метод вызван (4 раза: таблица + 3 индекса)
assert blacklist_history_repository._execute_query.call_count == 4
calls = blacklist_history_repository._execute_query.call_args_list
# Проверяем, что создается таблица с правильной структурой
create_table_call = calls[0]
assert "CREATE TABLE IF NOT EXISTS blacklist_history" in create_table_call[0][0]
assert "id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT" in create_table_call[0][0]
assert "user_id INTEGER NOT NULL" in create_table_call[0][0]
assert "message_for_user TEXT" in create_table_call[0][0]
assert "date_ban INTEGER NOT NULL" in create_table_call[0][0]
assert "date_unban INTEGER" in create_table_call[0][0]
assert "ban_author INTEGER" in create_table_call[0][0]
assert "created_at INTEGER DEFAULT (strftime('%s', 'now'))" in create_table_call[0][0]
assert "updated_at INTEGER DEFAULT (strftime('%s', 'now'))" in create_table_call[0][0]
assert "FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE" in create_table_call[0][0]
assert "FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL" in create_table_call[0][0]
# Проверяем создание индексов
index_calls = calls[1:4]
index_names = [call[0][0] for call in index_calls]
assert any("idx_blacklist_history_user_id" in idx for idx in index_names)
assert any("idx_blacklist_history_date_ban" in idx for idx in index_names)
assert any("idx_blacklist_history_date_unban" in idx for idx in index_names)
# Проверяем логирование
blacklist_history_repository.logger.info.assert_called_once_with(
"Таблица истории банов/разбанов создана"
)
@pytest.mark.asyncio
async def test_add_record_on_ban(self, blacklist_history_repository, sample_history_record):
"""Тест добавления записи о бане в историю"""
await blacklist_history_repository.add_record_on_ban(sample_history_record)
# Проверяем, что метод вызван с правильными параметрами
blacklist_history_repository._execute_query.assert_called_once()
call_args = blacklist_history_repository._execute_query.call_args
# Проверяем SQL запрос
sql_query = call_args[0][0].replace('\n', ' ').replace(' ', ' ').strip()
assert "INSERT INTO blacklist_history" in sql_query
assert "user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at" in sql_query
# Проверяем параметры
params = call_args[0][1]
assert params[0] == 12345 # user_id
assert params[1] == "Нарушение правил" # message_for_user
assert params[2] == sample_history_record.date_ban # date_ban
assert params[3] is None # date_unban
assert params[4] == 999 # ban_author
assert params[5] == sample_history_record.created_at # created_at
assert params[6] == sample_history_record.updated_at # updated_at
# Проверяем логирование
blacklist_history_repository.logger.info.assert_called_once()
log_call = blacklist_history_repository.logger.info.call_args[0][0]
assert "Запись о бане добавлена в историю" in log_call
assert "user_id=12345" in log_call
@pytest.mark.asyncio
async def test_add_record_on_ban_with_defaults(self, blacklist_history_repository):
"""Тест добавления записи о бане с дефолтными значениями created_at и updated_at"""
record = BlacklistHistoryRecord(
user_id=12345,
message_for_user="Тест",
date_ban=int(time.time()),
date_unban=None,
ban_author=None,
created_at=None, # Будет установлено автоматически
updated_at=None, # Будет установлено автоматически
)
await blacklist_history_repository.add_record_on_ban(record)
# Проверяем, что метод вызван
blacklist_history_repository._execute_query.assert_called_once()
call_args = blacklist_history_repository._execute_query.call_args
# Проверяем, что created_at и updated_at установлены (не None)
params = call_args[0][1]
assert params[5] is not None # created_at
assert params[6] is not None # updated_at
assert isinstance(params[5], int)
assert isinstance(params[6], int)
@pytest.mark.asyncio
async def test_set_unban_date_success(self, blacklist_history_repository):
"""Тест успешного обновления даты разбана"""
user_id = 12345
date_unban = int(time.time())
# Мокируем результат проверки - находим открытую запись
blacklist_history_repository._execute_query_with_result.return_value = [(100,)] # id записи
result = await blacklist_history_repository.set_unban_date(user_id, date_unban)
# Проверяем, что сначала проверяется наличие записи
assert blacklist_history_repository._execute_query_with_result.call_count == 1
check_call = blacklist_history_repository._execute_query_with_result.call_args
assert "SELECT id FROM blacklist_history" in check_call[0][0]
assert check_call[0][1] == (user_id,)
# Проверяем, что затем обновляется запись
assert blacklist_history_repository._execute_query.call_count == 1
update_call = blacklist_history_repository._execute_query.call_args
update_query = update_call[0][0].replace('\n', ' ').replace(' ', ' ').strip()
assert "UPDATE blacklist_history" in update_query
assert "SET date_unban = ?" in update_query
assert "updated_at = ?" in update_query
# Проверяем параметры обновления
update_params = update_call[0][1]
assert update_params[0] == date_unban
assert update_params[1] is not None # updated_at (текущее время)
assert isinstance(update_params[1], int)
assert update_params[2] == 100 # id записи
# Проверяем результат
assert result is True
# Проверяем логирование
blacklist_history_repository.logger.info.assert_called_once()
log_call = blacklist_history_repository.logger.info.call_args[0][0]
assert "Дата разбана обновлена в истории" in log_call
assert f"user_id={user_id}" in log_call
@pytest.mark.asyncio
async def test_set_unban_date_no_open_record(self, blacklist_history_repository):
"""Тест обновления даты разбана когда нет открытой записи"""
user_id = 12345
date_unban = int(time.time())
# Мокируем результат проверки - нет открытых записей
blacklist_history_repository._execute_query_with_result.return_value = []
result = await blacklist_history_repository.set_unban_date(user_id, date_unban)
# Проверяем, что проверка была выполнена
assert blacklist_history_repository._execute_query_with_result.call_count == 1
# Проверяем, что UPDATE не был вызван (нет записей для обновления)
blacklist_history_repository._execute_query.assert_not_called()
# Проверяем результат
assert result is False
# Проверяем логирование предупреждения
blacklist_history_repository.logger.warning.assert_called_once()
log_call = blacklist_history_repository.logger.warning.call_args[0][0]
assert "Не найдена открытая запись в истории для обновления" in log_call
assert f"user_id={user_id}" in log_call
@pytest.mark.asyncio
async def test_set_unban_date_exception(self, blacklist_history_repository):
"""Тест обработки исключения при обновлении даты разбана"""
user_id = 12345
date_unban = int(time.time())
# Мокируем исключение при проверке
blacklist_history_repository._execute_query_with_result.side_effect = Exception("Database error")
result = await blacklist_history_repository.set_unban_date(user_id, date_unban)
# Проверяем, что метод вернул False при ошибке
assert result is False
# Проверяем логирование ошибки
blacklist_history_repository.logger.error.assert_called_once()
log_call = blacklist_history_repository.logger.error.call_args[0][0]
assert "Ошибка обновления даты разбана в истории" in log_call
assert f"user_id={user_id}" in log_call
@pytest.mark.asyncio
async def test_set_unban_date_update_exception(self, blacklist_history_repository):
"""Тест обработки исключения при обновлении записи"""
user_id = 12345
date_unban = int(time.time())
# Мокируем успешную проверку, но ошибку при обновлении
blacklist_history_repository._execute_query_with_result.return_value = [(100,)]
blacklist_history_repository._execute_query.side_effect = Exception("Update error")
result = await blacklist_history_repository.set_unban_date(user_id, date_unban)
# Проверяем, что метод вернул False при ошибке
assert result is False
# Проверяем логирование ошибки
blacklist_history_repository.logger.error.assert_called_once()
log_call = blacklist_history_repository.logger.error.call_args[0][0]
assert "Ошибка обновления даты разбана в истории" in log_call

View File

@@ -37,7 +37,8 @@ class TestBlacklistRepository:
user_id=12345,
message_for_user="Нарушение правил",
date_to_unban=int(time.time()) + 86400, # +1 день
created_at=int(time.time())
created_at=int(time.time()),
ban_author=999,
)
@pytest.fixture
@@ -47,7 +48,8 @@ class TestBlacklistRepository:
user_id=67890,
message_for_user="Постоянный бан",
date_to_unban=None,
created_at=int(time.time())
created_at=int(time.time()),
ban_author=None,
)
@pytest.mark.asyncio
@@ -82,11 +84,11 @@ class TestBlacklistRepository:
# Проверяем SQL запрос (учитываем форматирование)
sql_query = call_args[0][0].replace('\n', ' ').replace(' ', ' ').replace(' ', ' ').strip()
expected_sql = "INSERT INTO blacklist (user_id, message_for_user, date_to_unban) VALUES (?, ?, ?)"
expected_sql = "INSERT INTO blacklist (user_id, message_for_user, date_to_unban, ban_author) VALUES (?, ?, ?, ?)"
assert sql_query == expected_sql
# Проверяем параметры
assert call_args[0][1] == (12345, "Нарушение правил", sample_blacklist_user.date_to_unban)
assert call_args[0][1] == (12345, "Нарушение правил", sample_blacklist_user.date_to_unban, 999)
# Проверяем логирование
blacklist_repository.logger.info.assert_called_once_with(
@@ -99,7 +101,7 @@ class TestBlacklistRepository:
await blacklist_repository.add_user(sample_blacklist_user_permanent)
call_args = blacklist_repository._execute_query.call_args
assert call_args[0][1] == (67890, "Постоянный бан", None)
assert call_args[0][1] == (67890, "Постоянный бан", None, None)
blacklist_repository.logger.info.assert_called_once_with(
"Пользователь добавлен в черный список: user_id=67890"
@@ -182,7 +184,7 @@ class TestBlacklistRepository:
async def test_get_user_success(self, blacklist_repository):
"""Тест успешного получения пользователя по ID"""
# Симулируем результат запроса
mock_row = (12345, "Нарушение правил", int(time.time()) + 86400, int(time.time()))
mock_row = (12345, "Нарушение правил", int(time.time()) + 86400, int(time.time()), 111)
blacklist_repository._execute_query_with_result.return_value = [mock_row]
result = await blacklist_repository.get_user(12345)
@@ -193,12 +195,13 @@ class TestBlacklistRepository:
assert result.message_for_user == "Нарушение правил"
assert result.date_to_unban == mock_row[2]
assert result.created_at == mock_row[3]
assert result.ban_author == mock_row[4]
# Проверяем, что метод вызван с правильными параметрами
blacklist_repository._execute_query_with_result.assert_called_once()
call_args = blacklist_repository._execute_query_with_result.call_args
assert call_args[0][0] == "SELECT user_id, message_for_user, date_to_unban, created_at FROM blacklist WHERE user_id = ?"
assert "SELECT user_id, message_for_user, date_to_unban, created_at, ban_author" in call_args[0][0]
assert call_args[0][1] == (12345,)
@pytest.mark.asyncio

View File

@@ -38,7 +38,8 @@ class TestPostRepository:
text="Тестовый пост без даты",
author_id=67890,
helper_text_message_id=None,
created_at=None
created_at=None,
status="suggest",
)
@pytest.fixture
@@ -75,6 +76,8 @@ class TestPostRepository:
assert "CREATE TABLE IF NOT EXISTS post_from_telegram_suggest" in post_table_call
assert "message_id INTEGER NOT NULL PRIMARY KEY" in post_table_call
assert "created_at INTEGER NOT NULL" in post_table_call
assert "status TEXT NOT NULL DEFAULT 'suggest'" in post_table_call
assert "is_anonymous INTEGER" in post_table_call
assert "FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE" in post_table_call
# Проверяем создание таблицы контента
@@ -101,13 +104,18 @@ class TestPostRepository:
params = call_args[0][1]
assert "INSERT INTO post_from_telegram_suggest" in query
assert "VALUES (?, ?, ?, ?)" in query
assert params == (
sample_post.message_id,
sample_post.text,
sample_post.author_id,
sample_post.created_at
)
assert "status" in query
assert "is_anonymous" in query
assert "VALUES (?, ?, ?, ?, ?, ?)" in query
# Проверяем параметры: message_id, text, author_id, created_at, status, is_anonymous
assert params[0] == sample_post.message_id
assert params[1] == sample_post.text
assert params[2] == sample_post.author_id
assert params[3] == sample_post.created_at
assert params[4] == sample_post.status
# is_anonymous преобразуется в int (None -> None, True -> 1, False -> 0)
expected_is_anonymous = None if sample_post.is_anonymous is None else (1 if sample_post.is_anonymous else 0)
assert params[5] == expected_is_anonymous
@pytest.mark.asyncio
async def test_add_post_without_date(self, post_repository, sample_post_no_date):
@@ -126,7 +134,10 @@ class TestPostRepository:
call_args = post_repository._execute_query.call_args
params = call_args[0][1]
assert params[3] == sample_post_no_date.created_at # created_at field
assert params[3] == sample_post_no_date.created_at # created_at
assert params[4] == sample_post_no_date.status # status (default suggest)
# Проверяем is_anonymous (должен быть в параметрах)
assert len(params) == 6 # Всего 6 параметров включая is_anonymous
@pytest.mark.asyncio
async def test_add_post_logs_correctly(self, post_repository, sample_post):
@@ -160,6 +171,51 @@ class TestPostRepository:
assert "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?" in query
assert params == (helper_message_id, message_id)
@pytest.mark.asyncio
async def test_update_status_by_message_id(self, post_repository):
"""Тест обновления статуса поста по message_id."""
post_repository._execute_query = AsyncMock()
post_repository.logger = MagicMock()
message_id = 12345
status = "approved"
await post_repository.update_status_by_message_id(message_id, status)
post_repository._execute_query.assert_called_once()
call_args = post_repository._execute_query.call_args
query = call_args[0][0]
params = call_args[0][1]
assert "UPDATE post_from_telegram_suggest" in query
assert "SET status = ? WHERE message_id = ?" in query
assert params == (status, message_id)
post_repository.logger.info.assert_called_once()
@pytest.mark.asyncio
async def test_update_status_for_media_group_by_helper_id(self, post_repository):
"""Тест обновления статуса медиагруппы по helper_message_id."""
post_repository._execute_query = AsyncMock()
post_repository.logger = MagicMock()
helper_message_id = 99999
status = "declined"
await post_repository.update_status_for_media_group_by_helper_id(
helper_message_id, status
)
post_repository._execute_query.assert_called_once()
call_args = post_repository._execute_query.call_args
query = call_args[0][0]
params = call_args[0][1]
assert "UPDATE post_from_telegram_suggest" in query
assert "SET status = ?" in query
assert "message_id = ? OR helper_text_message_id = ?" in query
assert params == (status, helper_message_id, helper_message_id)
post_repository.logger.info.assert_called_once()
@pytest.mark.asyncio
async def test_add_post_content_success(self, post_repository):
"""Тест успешного добавления контента поста."""
@@ -426,6 +482,169 @@ class TestPostRepository:
# Проверяем, что logger.info не вызывался
post_repository.logger.info.assert_not_called()
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_message_id_found(self, post_repository):
"""Тест получения текста и is_anonymous по message_id (пост найден)."""
# Мокаем _execute_query_with_result
mock_result = [("Тестовый текст", 1)] # is_anonymous = 1 (True)
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
post_repository.logger = MagicMock()
message_id = 12345
result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
# Проверяем результат
text, is_anonymous = result
assert text == "Тестовый текст"
assert is_anonymous is True
# Проверяем вызов _execute_query_with_result
post_repository._execute_query_with_result.assert_called_once()
call_args = post_repository._execute_query_with_result.call_args
query = call_args[0][0]
params = call_args[0][1]
assert "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?" in query
assert params == (message_id,)
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_message_id_with_false(self, post_repository):
"""Тест получения текста и is_anonymous по message_id (is_anonymous = False)."""
# Мокаем _execute_query_with_result
mock_result = [("Тестовый текст", 0)] # is_anonymous = 0 (False)
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
message_id = 12345
result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
# Проверяем результат
text, is_anonymous = result
assert text == "Тестовый текст"
assert is_anonymous is False
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_message_id_with_null(self, post_repository):
"""Тест получения текста и is_anonymous по message_id (is_anonymous = NULL)."""
# Мокаем _execute_query_with_result
mock_result = [("Тестовый текст", None)] # is_anonymous = NULL
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
message_id = 12345
result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
# Проверяем результат
text, is_anonymous = result
assert text == "Тестовый текст"
assert is_anonymous is None
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_message_id_not_found(self, post_repository):
"""Тест получения текста и is_anonymous по message_id (пост не найден)."""
# Мокаем _execute_query_with_result
mock_result = []
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
message_id = 12345
result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
# Проверяем результат
text, is_anonymous = result
assert text is None
assert is_anonymous is None
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_helper_id_found(self, post_repository):
"""Тест получения текста и is_anonymous по helper_message_id (пост найден)."""
# Мокаем _execute_query_with_result
mock_result = [("Тестовый текст", 1)] # is_anonymous = 1 (True)
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
post_repository.logger = MagicMock()
helper_message_id = 67890
result = await post_repository.get_post_text_and_anonymity_by_helper_id(helper_message_id)
# Проверяем результат
text, is_anonymous = result
assert text == "Тестовый текст"
assert is_anonymous is True
# Проверяем вызов _execute_query_with_result
post_repository._execute_query_with_result.assert_called_once()
call_args = post_repository._execute_query_with_result.call_args
query = call_args[0][0]
params = call_args[0][1]
assert "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" in query
assert params == (helper_message_id,)
@pytest.mark.asyncio
async def test_add_post_with_is_anonymous_true(self, post_repository):
"""Тест добавления поста с is_anonymous=True."""
post = TelegramPost(
message_id=12345,
text="Тестовый пост анон",
author_id=67890,
created_at=int(datetime.now().timestamp()),
is_anonymous=True
)
post_repository._execute_query = AsyncMock()
await post_repository.add_post(post)
call_args = post_repository._execute_query.call_args
params = call_args[0][1]
# Проверяем, что is_anonymous преобразован в 1
assert params[5] == 1
@pytest.mark.asyncio
async def test_add_post_with_is_anonymous_false(self, post_repository):
"""Тест добавления поста с is_anonymous=False."""
post = TelegramPost(
message_id=12345,
text="Тестовый пост неанон",
author_id=67890,
created_at=int(datetime.now().timestamp()),
is_anonymous=False
)
post_repository._execute_query = AsyncMock()
await post_repository.add_post(post)
call_args = post_repository._execute_query.call_args
params = call_args[0][1]
# Проверяем, что is_anonymous преобразован в 0
assert params[5] == 0
@pytest.mark.asyncio
async def test_add_post_with_is_anonymous_none(self, post_repository):
"""Тест добавления поста с is_anonymous=None."""
post = TelegramPost(
message_id=12345,
text="Тестовый пост",
author_id=67890,
created_at=int(datetime.now().timestamp()),
is_anonymous=None
)
post_repository._execute_query = AsyncMock()
await post_repository.add_post(post)
call_args = post_repository._execute_query.call_args
params = call_args[0][1]
# Проверяем, что is_anonymous остался None
assert params[5] is None
@pytest.mark.asyncio
async def test_create_tables_logs_success(self, post_repository):
"""Тест логирования успешного создания таблиц."""

View File

@@ -495,3 +495,52 @@ class TestPostRepositoryIntegration:
expected_message_ids = [11111, 22222, 33333, 44444]
for expected_id in expected_message_ids:
assert expected_id in post_ids
@pytest.mark.asyncio
async def test_update_status_by_message_id_integration(self, post_repository, sample_post):
"""Интеграционный тест обновления статуса одиночного поста."""
await self._setup_test_database(post_repository)
await post_repository.add_post(sample_post)
await post_repository.update_status_by_message_id(sample_post.message_id, "approved")
rows = await post_repository._execute_query_with_result(
"SELECT status FROM post_from_telegram_suggest WHERE message_id = ?",
(sample_post.message_id,),
)
assert len(rows) == 1
assert rows[0][0] == "approved"
@pytest.mark.asyncio
async def test_update_status_for_media_group_by_helper_id_integration(
self, post_repository, sample_post_with_helper
):
"""Интеграционный тест обновления статуса медиагруппы по helper_message_id."""
await self._setup_test_database(post_repository)
await post_repository.add_post(sample_post_with_helper)
helper_message_id = 99999
helper_post = TelegramPost(
message_id=helper_message_id,
text="^",
author_id=67890,
helper_text_message_id=sample_post_with_helper.message_id,
created_at=int(datetime.now().timestamp()),
status="suggest",
)
await post_repository.add_post(helper_post)
await post_repository.update_helper_message(
sample_post_with_helper.message_id, helper_message_id
)
await post_repository.update_status_for_media_group_by_helper_id(
helper_message_id, "declined"
)
rows = await post_repository._execute_query_with_result(
"SELECT status FROM post_from_telegram_suggest "
"WHERE message_id = ? OR helper_text_message_id = ?",
(helper_message_id, helper_message_id),
)
assert len(rows) == 2
for row in rows:
assert row[0] == "declined"

287
tests/test_post_service.py Normal file
View File

@@ -0,0 +1,287 @@
"""Tests for PostService"""
import pytest
from unittest.mock import Mock, AsyncMock, MagicMock, patch
from datetime import datetime
from aiogram import types
from helper_bot.handlers.private.services import PostService, BotSettings
from database.models import TelegramPost, User
class TestPostService:
"""Test class for PostService"""
@pytest.fixture
def mock_db(self):
"""Mock database"""
db = Mock()
db.add_post = AsyncMock()
db.update_helper_message = AsyncMock()
db.get_user_by_id = AsyncMock()
return db
@pytest.fixture
def mock_settings(self):
"""Mock bot settings"""
return BotSettings(
group_for_posts="test_posts",
group_for_message="test_message",
main_public="test_public",
group_for_logs="test_logs",
important_logs="test_important",
preview_link="test_link",
logs="test_logs_setting",
test="test_test_setting"
)
@pytest.fixture
def post_service(self, mock_db, mock_settings):
"""Create PostService instance"""
return PostService(mock_db, mock_settings)
@pytest.fixture
def mock_message(self):
"""Mock Telegram message"""
message = Mock(spec=types.Message)
from_user = Mock()
from_user.id = 12345
from_user.first_name = "Test"
from_user.username = "testuser"
from_user.full_name = "Test User"
message.from_user = from_user
message.text = "Тестовый пост"
message.message_id = 100
message.bot = AsyncMock()
message.chat = Mock()
message.chat.id = 12345
return message
@pytest.mark.asyncio
async def test_handle_text_post_saves_raw_text(self, post_service, mock_message, mock_db):
"""Test that handle_text_post saves raw text to database"""
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted text"):
with patch('helper_bot.handlers.private.services.send_text_message', return_value=200):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
await post_service.handle_text_post(mock_message, "Test")
# Check that add_post was called
mock_db.add_post.assert_called_once()
call_args = mock_db.add_post.call_args[0][0]
# Check that raw text is saved
assert isinstance(call_args, TelegramPost)
assert call_args.text == "Тестовый пост" # Raw text
assert call_args.message_id == 200
assert call_args.author_id == 12345
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_text_post_determines_anonymity(self, post_service, mock_message, mock_db):
"""Test that handle_text_post determines anonymity correctly"""
mock_message.text = "Тестовый пост анон"
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted text"):
with patch('helper_bot.handlers.private.services.send_text_message', return_value=200):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
await post_service.handle_text_post(mock_message, "Test")
call_args = mock_db.add_post.call_args[0][0]
assert call_args.is_anonymous is True
@pytest.mark.asyncio
async def test_handle_photo_post_saves_raw_caption(self, post_service, mock_message, mock_db):
"""Test that handle_photo_post saves raw caption to database"""
mock_message.caption = "Тестовая подпись"
mock_message.photo = [Mock()]
mock_message.photo[-1].file_id = "photo_123"
sent_message = Mock()
sent_message.message_id = 201
sent_message.caption = "Formatted caption"
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted caption"):
with patch('helper_bot.handlers.private.services.send_photo_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_photo_post(mock_message, "Test")
mock_db.add_post.assert_called_once()
call_args = mock_db.add_post.call_args[0][0]
# Check that raw caption is saved
assert call_args.text == "Тестовая подпись" # Raw caption
assert call_args.message_id == 201
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_photo_post_without_caption(self, post_service, mock_message, mock_db):
"""Test that handle_photo_post handles missing caption"""
mock_message.caption = None
mock_message.photo = [Mock()]
mock_message.photo[-1].file_id = "photo_123"
sent_message = Mock()
sent_message.message_id = 202
with patch('helper_bot.handlers.private.services.get_text_message', return_value=""):
with patch('helper_bot.handlers.private.services.send_photo_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_photo_post(mock_message, "Test")
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "" # Empty string for missing caption
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_video_post_saves_raw_caption(self, post_service, mock_message, mock_db):
"""Test that handle_video_post saves raw caption to database"""
mock_message.caption = "Видео подпись"
mock_message.video = Mock()
mock_message.video.file_id = "video_123"
sent_message = Mock()
sent_message.message_id = 203
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
with patch('helper_bot.handlers.private.services.send_video_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_video_post(mock_message, "Test")
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "Видео подпись" # Raw caption
assert call_args.is_anonymous is True
@pytest.mark.asyncio
async def test_handle_audio_post_saves_raw_caption(self, post_service, mock_message, mock_db):
"""Test that handle_audio_post saves raw caption to database"""
mock_message.caption = "Аудио подпись"
mock_message.audio = Mock()
mock_message.audio.file_id = "audio_123"
sent_message = Mock()
sent_message.message_id = 204
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
with patch('helper_bot.handlers.private.services.send_audio_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_audio_post(mock_message, "Test")
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "Аудио подпись" # Raw caption
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_video_note_post_saves_empty_string(self, post_service, mock_message, mock_db):
"""Test that handle_video_note_post saves empty string"""
mock_message.video_note = Mock()
mock_message.video_note.file_id = "video_note_123"
sent_message = Mock()
sent_message.message_id = 205
with patch('helper_bot.handlers.private.services.send_video_note_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_video_note_post(mock_message)
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "" # Empty string
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_voice_post_saves_empty_string(self, post_service, mock_message, mock_db):
"""Test that handle_voice_post saves empty string"""
mock_message.voice = Mock()
mock_message.voice.file_id = "voice_123"
sent_message = Mock()
sent_message.message_id = 206
with patch('helper_bot.handlers.private.services.send_voice_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_voice_post(mock_message)
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "" # Empty string
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_media_group_post_saves_raw_caption(self, post_service, mock_message, mock_db):
"""Test that handle_media_group_post saves raw caption to database"""
mock_message.message_id = 300
mock_message.media_group_id = 1
album = [Mock()]
album[0].caption = "Медиагруппа подпись"
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
with patch('helper_bot.handlers.private.services.prepare_media_group_from_middlewares', return_value=[]):
with patch('helper_bot.handlers.private.services.send_media_group_message_to_private_chat', return_value=301):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.send_text_message', return_value=302):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
with patch('asyncio.sleep', return_value=None):
await post_service.handle_media_group_post(mock_message, album, "Test")
# Check main post
calls = mock_db.add_post.call_args_list
main_post = calls[0][0][0]
assert main_post.text == "Медиагруппа подпись" # Raw caption
assert main_post.message_id == 300
assert main_post.is_anonymous is True
@pytest.mark.asyncio
async def test_handle_media_group_post_without_caption(self, post_service, mock_message, mock_db):
"""Test that handle_media_group_post handles missing caption"""
mock_message.message_id = 301
mock_message.media_group_id = 1
album = [Mock()]
album[0].caption = None
with patch('helper_bot.handlers.private.services.get_text_message', return_value=" "):
with patch('helper_bot.handlers.private.services.prepare_media_group_from_middlewares', return_value=[]):
with patch('helper_bot.handlers.private.services.send_media_group_message_to_private_chat', return_value=302):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.send_text_message', return_value=303):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('asyncio.sleep', return_value=None):
await post_service.handle_media_group_post(mock_message, album, "Test")
calls = mock_db.add_post.call_args_list
main_post = calls[0][0][0]
assert main_post.text == "" # Empty string for missing caption
assert main_post.is_anonymous is False

View File

@@ -153,7 +153,7 @@ class TestAdminService:
self.mock_db.set_user_blacklist = AsyncMock(return_value=None)
# Act
await self.admin_service.ban_user(user_id, username, reason, ban_days)
await self.admin_service.ban_user(user_id, username, reason, ban_days, ban_author_id=999)
# Assert
self.mock_db.check_user_in_blacklist.assert_called_once_with(user_id)
@@ -187,10 +187,10 @@ class TestAdminService:
self.mock_db.set_user_blacklist = AsyncMock(return_value=None)
# Act
await self.admin_service.ban_user(user_id, username, reason, ban_days)
await self.admin_service.ban_user(user_id, username, reason, ban_days, ban_author_id=999)
# Assert
self.mock_db.set_user_blacklist.assert_called_once_with(user_id, None, reason, None)
self.mock_db.set_user_blacklist.assert_called_once_with(user_id, None, reason, None, ban_author=999)
@pytest.mark.asyncio
async def test_unban_user_success(self):

View File

@@ -6,6 +6,7 @@ import os
from helper_bot.utils.helper_func import (
get_first_name,
get_text_message,
determine_anonymity,
check_username_and_full_name,
safe_html_escape,
download_file,
@@ -64,12 +65,13 @@ class TestHelperFunctions:
def test_get_text_message(self, mock_message):
"""Тест функции обработки текста сообщения"""
# Тест с обычным текстом
# Тест с обычным текстом (legacy - определяется по тексту)
text = "Привет, это тестовое сообщение"
result = get_text_message(text, "Test", "testuser")
assert "Test" in result
assert "testuser" in result
assert "тестовое сообщение" in result
assert "Автор поста" in result
# Тест с пустым текстом
result = get_text_message("", "Test", "testuser")
@@ -83,6 +85,98 @@ class TestHelperFunctions:
assert "testuser" in result
assert "Обычный текст без специальных слов" in result
def test_get_text_message_with_is_anonymous_true(self, mock_message):
"""Тест функции get_text_message с is_anonymous=True"""
text = "Тестовый пост"
result = get_text_message(text, "Test", "testuser", is_anonymous=True)
assert "Пост из ТГ:" in result
assert "Тестовый пост" in result
assert "Пост опубликован анонимно" in result
assert "Автор поста" not in result
def test_get_text_message_with_is_anonymous_false(self, mock_message):
"""Тест функции get_text_message с is_anonymous=False"""
text = "Тестовый пост"
result = get_text_message(text, "Test", "testuser", is_anonymous=False)
assert "Пост из ТГ:" in result
assert "Тестовый пост" in result
assert "Автор поста" in result
assert "Test" in result
assert "testuser" in result
assert "Пост опубликован анонимно" not in result
def test_get_text_message_with_is_anonymous_none_legacy(self, mock_message):
"""Тест функции get_text_message с is_anonymous=None (legacy - определяется по тексту)"""
# Тест с "анон" в тексте
text = "Тестовый пост анон"
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
assert "Пост из ТГ:" in result
assert "Тестовый пост анон" in result
assert "Пост опубликован анонимно" in result
# Тест с "неанон" в тексте
text = "Тестовый пост неанон"
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
assert "Пост из ТГ:" in result
assert "Тестовый пост неанон" in result
assert "Автор поста" in result
# Тест с "не анон" в тексте
text = "Тестовый пост не анон"
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
assert "Автор поста" in result
def test_get_text_message_with_username_none(self, mock_message):
"""Тест функции get_text_message без username"""
text = "Тестовый пост"
result = get_text_message(text, "Test", None, is_anonymous=False)
assert "Test" in result
assert "(Ник не указан)" in result
assert "@" not in result
def test_determine_anonymity_with_anon(self):
"""Тест функции determine_anonymity с 'анон' в тексте"""
assert determine_anonymity("Этот пост анон") is True
assert determine_anonymity("анон") is True
assert determine_anonymity("АНОН") is True # Проверка регистра
assert determine_anonymity("пост анонимный анон") is True
def test_determine_anonymity_with_neanon(self):
"""Тест функции determine_anonymity с 'неанон' в тексте"""
assert determine_anonymity("Этот пост неанон") is False
assert determine_anonymity("неанон") is False
assert determine_anonymity("НЕАНОН") is False # Проверка регистра
assert determine_anonymity("пост неанон") is False
def test_determine_anonymity_with_ne_anon(self):
"""Тест функции determine_anonymity с 'не анон' в тексте"""
assert determine_anonymity("Этот пост не анон") is False
assert determine_anonymity("не анон") is False
assert determine_anonymity("НЕ АНОН") is False # Проверка регистра
assert determine_anonymity("пост не анон") is False
def test_determine_anonymity_priority_neanon_over_anon(self):
"""Тест приоритета 'неанон' над 'анон'"""
# Если есть и "анон" и "неанон", должен вернуть False
assert determine_anonymity("анон неанон") is False
assert determine_anonymity("неанон анон") is False
assert determine_anonymity("не анон анон") is False
def test_determine_anonymity_without_keywords(self):
"""Тест функции determine_anonymity без ключевых слов"""
assert determine_anonymity("Обычный текст") is False
assert determine_anonymity("") is False
assert determine_anonymity("Пост без специальных слов") is False
def test_determine_anonymity_with_none(self):
"""Тест функции determine_anonymity с None"""
assert determine_anonymity(None) is False
def test_determine_anonymity_with_empty_string(self):
"""Тест функции determine_anonymity с пустой строкой"""
assert determine_anonymity("") is False
assert determine_anonymity(" ") is False # Только пробелы
@pytest.mark.asyncio
async def test_check_username_and_full_name(self):
"""Тест функции проверки изменений username и full_name"""