diff --git a/database/async_db.py b/database/async_db.py index 4f67ec1..bd2e62b 100644 --- a/database/async_db.py +++ b/database/async_db.py @@ -166,6 +166,14 @@ class AsyncBotDB: async def get_author_id_by_helper_message_id(self, helper_text_message_id: int) -> Optional[int]: """Получает ID автора по helper_text_message_id.""" return await self.factory.posts.get_author_id_by_helper_message_id(helper_text_message_id) + + async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> tuple[Optional[str], Optional[bool]]: + """Получает текст и is_anonymous поста по message_id.""" + return await self.factory.posts.get_post_text_and_anonymity_by_message_id(message_id) + + async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> tuple[Optional[str], Optional[bool]]: + """Получает текст и is_anonymous поста по helper_text_message_id.""" + return await self.factory.posts.get_post_text_and_anonymity_by_helper_id(helper_message_id) async def update_status_by_message_id(self, message_id: int, status: str) -> int: """Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк.""" diff --git a/database/models.py b/database/models.py index 751e653..67b11ed 100644 --- a/database/models.py +++ b/database/models.py @@ -46,6 +46,7 @@ class TelegramPost: helper_text_message_id: Optional[int] = None created_at: Optional[int] = None status: str = "suggest" + is_anonymous: Optional[bool] = None @dataclass diff --git a/database/repositories/post_repository.py b/database/repositories/post_repository.py index 0160e69..9c03c3a 100644 --- a/database/repositories/post_repository.py +++ b/database/repositories/post_repository.py @@ -18,6 +18,7 @@ class PostRepository(DatabaseConnection): author_id INTEGER, created_at INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'suggest', + is_anonymous INTEGER, FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE ) ''' @@ -53,12 +54,14 @@ class PostRepository(DatabaseConnection): if not post.created_at: post.created_at = int(datetime.now().timestamp()) status = post.status if post.status else "suggest" + # Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None) + is_anonymous_int = None if post.is_anonymous is None else (1 if post.is_anonymous else 0) query = """ - INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status) - VALUES (?, ?, ?, ?, ?) + INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status, is_anonymous) + VALUES (?, ?, ?, ?, ?, ?) """ - params = (post.message_id, post.text, post.author_id, post.created_at, status) + params = (post.message_id, post.text, post.author_id, post.created_at, status, is_anonymous_int) await self._execute_query(query, params) self.logger.info(f"Пост добавлен: message_id={post.message_id}") @@ -219,3 +222,33 @@ class PostRepository(DatabaseConnection): self.logger.info(f"Получен author_id: {author_id} для helper_message_id={helper_message_id}") return author_id return None + + async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> Tuple[Optional[str], Optional[bool]]: + """Получает текст и is_anonymous поста по message_id.""" + query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?" + rows = await self._execute_query_with_result(query, (message_id,)) + row = rows[0] if rows else None + + if row: + text = row[0] or "" + is_anonymous_int = row[1] + # Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None) + is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int) + self.logger.info(f"Получены текст и is_anonymous для message_id={message_id}") + return text, is_anonymous + return None, None + + async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> Tuple[Optional[str], Optional[bool]]: + """Получает текст и is_anonymous поста по helper_text_message_id.""" + query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" + rows = await self._execute_query_with_result(query, (helper_message_id,)) + row = rows[0] if rows else None + + if row: + text = row[0] or "" + is_anonymous_int = row[1] + # Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None) + is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int) + self.logger.info(f"Получены текст и is_anonymous для helper_message_id={helper_message_id}") + return text, is_anonymous + return None, None diff --git a/database/schema.sql b/database/schema.sql index acb4353..5c0a132 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -58,6 +58,7 @@ CREATE TABLE IF NOT EXISTS post_from_telegram_suggest ( author_id INTEGER, created_at INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'suggest', + is_anonymous INTEGER, FOREIGN KEY (author_id) REFERENCES our_users(user_id) ON DELETE CASCADE ); diff --git a/helper_bot/handlers/callback/services.py b/helper_bot/handlers/callback/services.py index e623a91..991b9f5 100644 --- a/helper_bot/handlers/callback/services.py +++ b/helper_bot/handlers/callback/services.py @@ -8,7 +8,7 @@ from aiogram.types import CallbackQuery from helper_bot.utils.helper_func import ( send_text_message, send_photo_message, send_video_message, send_video_note_message, send_audio_message, send_voice_message, - send_media_group_to_channel, delete_user_blacklist + send_media_group_to_channel, delete_user_blacklist, get_text_message ) from helper_bot.keyboards.keyboards import create_keyboard_for_ban_reason from .exceptions import ( @@ -78,7 +78,6 @@ class PostPublishService: @track_errors("post_publish_service", "_publish_text_post") async def _publish_text_post(self, call: CallbackQuery) -> None: """Публикация текстового поста""" - text_post = html.escape(str(call.message.text)) author_id = await self._get_author_id(call.message.message_id) updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved") @@ -86,7 +85,20 @@ class PostPublishService: logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'") raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных") - await send_text_message(self.main_public, call.message, text_post) + # Получаем сырой текст и is_anonymous из базы + raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id) + if raw_text is None: + raw_text = "" + + # Получаем данные автора + user = await self.db.get_user_by_id(author_id) + if not user: + raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных") + + # Формируем финальный текст с учетом is_anonymous + formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) + + await send_text_message(self.main_public, call.message, formatted_text) await self._delete_post_and_notify_author(call, author_id) logger.info(f'Текст сообщения опубликован в канале {self.main_public}.') @@ -94,7 +106,6 @@ class PostPublishService: @track_errors("post_publish_service", "_publish_photo_post") async def _publish_photo_post(self, call: CallbackQuery) -> None: """Публикация поста с фото""" - text_post_with_photo = html.escape(str(call.message.caption)) author_id = await self._get_author_id(call.message.message_id) updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved") @@ -102,7 +113,20 @@ class PostPublishService: logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'") raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных") - await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, text_post_with_photo) + # Получаем сырой текст и is_anonymous из базы + raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id) + if raw_text is None: + raw_text = "" + + # Получаем данные автора + user = await self.db.get_user_by_id(author_id) + if not user: + raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных") + + # Формируем финальный текст с учетом is_anonymous + formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) + + await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, formatted_text) await self._delete_post_and_notify_author(call, author_id) logger.info(f'Пост с фото опубликован в канале {self.main_public}.') @@ -110,7 +134,6 @@ class PostPublishService: @track_errors("post_publish_service", "_publish_video_post") async def _publish_video_post(self, call: CallbackQuery) -> None: """Публикация поста с видео""" - text_post_with_photo = html.escape(str(call.message.caption)) author_id = await self._get_author_id(call.message.message_id) updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved") @@ -118,7 +141,20 @@ class PostPublishService: logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'") raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных") - await send_video_message(self.main_public, call.message, call.message.video.file_id, text_post_with_photo) + # Получаем сырой текст и is_anonymous из базы + raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id) + if raw_text is None: + raw_text = "" + + # Получаем данные автора + user = await self.db.get_user_by_id(author_id) + if not user: + raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных") + + # Формируем финальный текст с учетом is_anonymous + formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) + + await send_video_message(self.main_public, call.message, call.message.video.file_id, formatted_text) await self._delete_post_and_notify_author(call, author_id) logger.info(f'Пост с видео опубликован в канале {self.main_public}.') @@ -141,7 +177,6 @@ class PostPublishService: @track_errors("post_publish_service", "_publish_audio_post") async def _publish_audio_post(self, call: CallbackQuery) -> None: """Публикация поста с аудио""" - text_post_with_photo = html.escape(str(call.message.caption)) author_id = await self._get_author_id(call.message.message_id) updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved") @@ -149,7 +184,20 @@ class PostPublishService: logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'") raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных") - await send_audio_message(self.main_public, call.message, call.message.audio.file_id, text_post_with_photo) + # Получаем сырой текст и is_anonymous из базы + raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id) + if raw_text is None: + raw_text = "" + + # Получаем данные автора + user = await self.db.get_user_by_id(author_id) + if not user: + raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных") + + # Формируем финальный текст с учетом is_anonymous + formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) + + await send_audio_message(self.main_public, call.message, call.message.audio.file_id, formatted_text) await self._delete_post_and_notify_author(call, author_id) logger.info(f'Пост с аудио опубликован в канале {self.main_public}.') @@ -185,11 +233,12 @@ class PostPublishService: logger.error(f"Контент медиагруппы не найден в базе данных для helper_message_id: {helper_message_id}") raise PublishError("Контент медиагруппы не найден в базе данных") - # Получаем текст поста по helper_message_id - logger.debug(f"Получаю текст поста для helper_message_id: {helper_message_id}") - pre_text = await self.db.get_post_text_by_helper_id(helper_message_id) - post_text = html.escape(str(pre_text)) if pre_text else "" - logger.debug(f"Текст поста получен: {'пустой' if not post_text else f'длина: {len(post_text)} символов'}") + # Получаем сырой текст и is_anonymous по helper_message_id + logger.debug(f"Получаю текст и is_anonymous поста для helper_message_id: {helper_message_id}") + raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_helper_id(helper_message_id) + if raw_text is None: + raw_text = "" + logger.debug(f"Текст поста получен: {'пустой' if not raw_text else f'длина: {len(raw_text)} символов'}, is_anonymous={is_anonymous}") # Получаем ID автора по helper_message_id logger.debug(f"Получаю ID автора для helper_message_id: {helper_message_id}") @@ -199,13 +248,22 @@ class PostPublishService: raise PostNotFoundError(f"Автор не найден для медиагруппы {helper_message_id}") logger.debug(f"ID автора получен: {author_id}") + # Получаем данные автора + user = await self.db.get_user_by_id(author_id) + if not user: + raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных") + + # Формируем финальный текст с учетом is_anonymous + formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) + logger.debug(f"Сформирован финальный текст: {'пустой' if not formatted_text else f'длина: {len(formatted_text)} символов'}") + # Отправляем медиагруппу в канал logger.info(f"Отправляю медиагруппу в канал {self.main_public}") await send_media_group_to_channel( bot=self._get_bot(call.message), chat_id=self.main_public, post_content=post_content, - post_text=post_text + post_text=formatted_text ) await self.db.update_status_for_media_group_by_helper_id(helper_message_id, "approved") diff --git a/helper_bot/handlers/private/services.py b/helper_bot/handlers/private/services.py index 32955b7..27bad6b 100644 --- a/helper_bot/handlers/private/services.py +++ b/helper_bot/handlers/private/services.py @@ -13,11 +13,13 @@ from dataclasses import dataclass from aiogram import types from aiogram.types import FSInputFile from database.models import TelegramPost, User +from logs.custom_logger import logger # Local imports - utilities from helper_bot.utils.helper_func import ( get_first_name, get_text_message, + determine_anonymity, send_text_message, send_photo_message, send_media_group_message_to_private_chat, @@ -154,11 +156,17 @@ class PostService: markup = get_reply_keyboard_for_post() sent_message_id = await send_text_message(self.settings.group_for_posts, message, post_text, markup) + + # Сохраняем сырой текст и определяем анонимность + raw_text = message.text or "" + is_anonymous = determine_anonymity(raw_text) + post = TelegramPost( message_id=sent_message_id, - text=message.text, + text=raw_text, author_id=message.from_user.id, - created_at=int(datetime.now().timestamp()) + created_at=int(datetime.now().timestamp()), + is_anonymous=is_anonymous ) await self.db.add_post(post) @@ -176,11 +184,16 @@ class PostService: self.settings.group_for_posts, message, message.photo[-1].file_id, post_caption, markup ) + # Сохраняем сырой caption и определяем анонимность + raw_caption = message.caption or "" + is_anonymous = determine_anonymity(raw_caption) + post = TelegramPost( message_id=sent_message.message_id, - text=sent_message.caption or "", + text=raw_caption, author_id=message.from_user.id, - created_at=int(datetime.now().timestamp()) + created_at=int(datetime.now().timestamp()), + is_anonymous=is_anonymous ) await self.db.add_post(post) success = await add_in_db_media(sent_message, self.db) @@ -201,11 +214,16 @@ class PostService: self.settings.group_for_posts, message, message.video.file_id, post_caption, markup ) + # Сохраняем сырой caption и определяем анонимность + raw_caption = message.caption or "" + is_anonymous = determine_anonymity(raw_caption) + post = TelegramPost( message_id=sent_message.message_id, - text=sent_message.caption or "", + text=raw_caption, author_id=message.from_user.id, - created_at=int(datetime.now().timestamp()) + created_at=int(datetime.now().timestamp()), + is_anonymous=is_anonymous ) await self.db.add_post(post) success = await add_in_db_media(sent_message, self.db) @@ -222,11 +240,16 @@ class PostService: self.settings.group_for_posts, message, message.video_note.file_id, markup ) + # Сохраняем пустую строку, так как video_note не имеет caption + raw_caption = "" + is_anonymous = determine_anonymity(raw_caption) + post = TelegramPost( message_id=sent_message.message_id, - text=sent_message.caption or "", + text=raw_caption, author_id=message.from_user.id, - created_at=int(datetime.now().timestamp()) + created_at=int(datetime.now().timestamp()), + is_anonymous=is_anonymous ) await self.db.add_post(post) success = await add_in_db_media(sent_message, self.db) @@ -247,11 +270,16 @@ class PostService: self.settings.group_for_posts, message, message.audio.file_id, post_caption, markup ) + # Сохраняем сырой caption и определяем анонимность + raw_caption = message.caption or "" + is_anonymous = determine_anonymity(raw_caption) + post = TelegramPost( message_id=sent_message.message_id, - text=sent_message.caption or "", + text=raw_caption, author_id=message.from_user.id, - created_at=int(datetime.now().timestamp()) + created_at=int(datetime.now().timestamp()), + is_anonymous=is_anonymous ) await self.db.add_post(post) success = await add_in_db_media(sent_message, self.db) @@ -268,11 +296,16 @@ class PostService: self.settings.group_for_posts, message, message.voice.file_id, markup ) + # Сохраняем пустую строку, так как voice не имеет caption + raw_caption = "" + is_anonymous = determine_anonymity(raw_caption) + post = TelegramPost( message_id=sent_message.message_id, - text=sent_message.caption or "", + text=raw_caption, author_id=message.from_user.id, - created_at=int(datetime.now().timestamp()) + created_at=int(datetime.now().timestamp()), + is_anonymous=is_anonymous ) await self.db.add_post(post) success = await add_in_db_media(sent_message, self.db) @@ -285,17 +318,24 @@ class PostService: @track_media_processing("media_group") async def handle_media_group_post(self, message: types.Message, album: list, first_name: str) -> None: """Handle media group post submission""" + #TODO: Мне кажется тут какая-то дичь с одинаковыми переменными, в которых post_caption никуда не ведет post_caption = " " + raw_caption = "" if album and album[0].caption: + raw_caption = album[0].caption or "" post_caption = get_text_message(album[0].caption.lower(), first_name, message.from_user.username) + # Определяем анонимность на основе сырого caption + is_anonymous = determine_anonymity(raw_caption) + # Создаем основной пост для медиагруппы main_post = TelegramPost( message_id=message.message_id, # ID основного сообщения медиагруппы - text=post_caption, + text=raw_caption, author_id=message.from_user.id, - created_at=int(datetime.now().timestamp()) + created_at=int(datetime.now().timestamp()), + is_anonymous=is_anonymous ) await self.db.add_post(main_post) diff --git a/helper_bot/utils/helper_func.py b/helper_bot/utils/helper_func.py index 078bb1b..43e930d 100644 --- a/helper_bot/utils/helper_func.py +++ b/helper_bot/utils/helper_func.py @@ -85,14 +85,44 @@ def get_first_name(message: types.Message) -> str: return "" -def get_text_message(post_text: str, first_name: str, username: str = None): +def determine_anonymity(post_text: str) -> bool: """ - Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон". + Определяет, является ли пост анонимным на основе ключевых слов в тексте. + + Args: + post_text: Текст сообщения + + Returns: + bool: True, если "анон" в тексте; False, если "неанон" или "не анон" в тексте; + False по умолчанию (если нет ключевых слов) + """ + if not post_text: + return False + + post_text_lower = post_text.lower() + + # Сначала проверяем "неанон" или "не анон" (более специфичное условие) + if "неанон" in post_text_lower or "не анон" in post_text_lower: + return False + + # Проверяем "анон" + if "анон" in post_text_lower: + return True + + # По умолчанию False + return False + + +def get_text_message(post_text: str, first_name: str, username: str = None, is_anonymous: Optional[bool] = None): + """ + Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон" + или переданного параметра is_anonymous. Args: post_text: Текст сообщения first_name: Имя автора поста username: Юзернейм автора поста (может быть None) + is_anonymous: Флаг анонимности (True - анонимно, False - не анонимно, None - legacy, определяется по тексту) Returns: str: - Сформированный текст сообщения. @@ -109,12 +139,21 @@ def get_text_message(post_text: str, first_name: str, username: str = None): else: author_info = f"{first_name} (Ник не указан)" - if "неанон" in post_text or "не анон" in post_text: - return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}' - elif "анон" in post_text: - return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно' + # Если передан is_anonymous, используем его, иначе определяем по тексту (legacy) + # TODO: Уверен можно укоротить + if is_anonymous is not None: + if is_anonymous: + return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно' + else: + return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}' else: - return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}' + # Legacy: определяем по тексту + if "неанон" in post_text or "не анон" in post_text: + return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}' + elif "анон" in post_text: + return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно' + else: + return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}' @track_time("download_file", "helper_func") @track_errors("helper_func", "download_file") diff --git a/scripts/add_is_anonymous_column.py b/scripts/add_is_anonymous_column.py new file mode 100755 index 0000000..6e631fe --- /dev/null +++ b/scripts/add_is_anonymous_column.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +""" +Скрипт миграции для добавления колонки is_anonymous в таблицу post_from_telegram_suggest. +Для существующих записей определяет is_anonymous на основе текста или устанавливает NULL. +""" +import argparse +import asyncio +import os +import sys +from pathlib import Path + +project_root = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(project_root)) + +import aiosqlite + +from logs.custom_logger import logger +from helper_bot.utils.helper_func import determine_anonymity + +DEFAULT_DB_PATH = "database/tg-bot-database.db" + + +def _column_exists(rows: list, name: str) -> bool: + """PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk).""" + for row in rows: + if row[1] == name: + return True + return False + + +async def main(db_path: str) -> None: + db_path = os.path.abspath(db_path) + if not os.path.exists(db_path): + logger.error("База данных не найдена: %s", db_path) + print(f"Ошибка: база данных не найдена: {db_path}") + return + + async with aiosqlite.connect(db_path) as conn: + await conn.execute("PRAGMA foreign_keys = ON") + + # Проверяем наличие колонки is_anonymous + cursor = await conn.execute( + "PRAGMA table_info(post_from_telegram_suggest)" + ) + rows = await cursor.fetchall() + await cursor.close() + + if not _column_exists(rows, "is_anonymous"): + logger.info("Добавление колонки is_anonymous в post_from_telegram_suggest") + await conn.execute( + "ALTER TABLE post_from_telegram_suggest " + "ADD COLUMN is_anonymous INTEGER" + ) + await conn.commit() + print("Колонка is_anonymous добавлена.") + else: + print("Колонка is_anonymous уже существует.") + + # Получаем все записи с текстом для обновления + cursor = await conn.execute( + "SELECT message_id, text FROM post_from_telegram_suggest WHERE text IS NOT NULL" + ) + posts = await cursor.fetchall() + await cursor.close() + + updated_count = 0 + null_count = 0 + + # Обновляем каждую запись + for message_id, text in posts: + try: + # Определяем is_anonymous на основе текста + # Если текст пустой или None, устанавливаем NULL (legacy) + if not text or not text.strip(): + is_anonymous = None + else: + is_anonymous = determine_anonymity(text) + + # Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None) + is_anonymous_int = None if is_anonymous is None else (1 if is_anonymous else 0) + + await conn.execute( + "UPDATE post_from_telegram_suggest SET is_anonymous = ? WHERE message_id = ?", + (is_anonymous_int, message_id) + ) + + if is_anonymous is not None: + updated_count += 1 + else: + null_count += 1 + + except Exception as e: + logger.error(f"Ошибка при обработке поста message_id={message_id}: {e}") + # В случае ошибки устанавливаем NULL + await conn.execute( + "UPDATE post_from_telegram_suggest SET is_anonymous = NULL WHERE message_id = ?", + (message_id,) + ) + null_count += 1 + + # Обновляем записи без текста (устанавливаем NULL) + cursor = await conn.execute( + "SELECT COUNT(*) FROM post_from_telegram_suggest WHERE text IS NULL" + ) + row = await cursor.fetchone() + posts_without_text = row[0] if row else 0 + await cursor.close() + + if posts_without_text > 0: + await conn.execute( + "UPDATE post_from_telegram_suggest SET is_anonymous = NULL WHERE text IS NULL" + ) + null_count += posts_without_text + + await conn.commit() + + total_updated = updated_count + null_count + logger.info( + f"Миграция завершена. Обновлено записей: {total_updated} " + f"(определено: {updated_count}, установлено NULL: {null_count})" + ) + print(f"Миграция завершена.") + print(f"Обновлено записей: {total_updated}") + print(f" - Определено is_anonymous: {updated_count}") + print(f" - Установлено NULL: {null_count}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Добавление колонки is_anonymous в post_from_telegram_suggest" + ) + parser.add_argument( + "--db", + default=os.environ.get("DB_PATH", DEFAULT_DB_PATH), + help="Путь к БД (или DB_PATH)", + ) + args = parser.parse_args() + asyncio.run(main(args.db)) diff --git a/scripts/clean_post_text.py b/scripts/clean_post_text.py new file mode 100755 index 0000000..2e1b1a3 --- /dev/null +++ b/scripts/clean_post_text.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Скрипт для приведения текста постов к "сырому" виду. +Удаляет форматирование, добавленное функцией get_text_message(), оставляя только исходный текст. +""" +import argparse +import asyncio +import html +import os +import re +import sys +from pathlib import Path + +project_root = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(project_root)) + +import aiosqlite + +from logs.custom_logger import logger + +DEFAULT_DB_PATH = "database/tg-bot-database.db" + +# Паттерны для определения форматированного текста +PREFIX = "Пост из ТГ:\n" +ANONYMOUS_SUFFIX = "\n\nПост опубликован анонимно" +AUTHOR_SUFFIX_PATTERN = re.compile(r"\n\nАвтор поста: .+$") + + +def extract_raw_text(formatted_text: str) -> str: + """ + Извлекает сырой текст из форматированного текста поста. + + Args: + formatted_text: Форматированный текст поста + + Returns: + str: Сырой текст или исходный текст, если форматирование не обнаружено + """ + if not formatted_text: + return "" + + # Проверяем, начинается ли текст с префикса + if not formatted_text.startswith(PREFIX): + # Текст уже в сыром виде или имеет другой формат + return formatted_text + + # Извлекаем текст после префикса + text_after_prefix = formatted_text[len(PREFIX):] + + # Проверяем, заканчивается ли текст на "Пост опубликован анонимно" + if text_after_prefix.endswith(ANONYMOUS_SUFFIX): + raw_text = text_after_prefix[:-len(ANONYMOUS_SUFFIX)] + # Проверяем, заканчивается ли текст на "Автор поста: ..." + elif AUTHOR_SUFFIX_PATTERN.search(text_after_prefix): + raw_text = AUTHOR_SUFFIX_PATTERN.sub("", text_after_prefix) + else: + # Не удалось определить формат, возвращаем текст без префикса + raw_text = text_after_prefix + + # Декодируем HTML-экранирование + raw_text = html.unescape(raw_text) + + return raw_text + + +async def main(db_path: str, dry_run: bool = False) -> None: + db_path = os.path.abspath(db_path) + if not os.path.exists(db_path): + logger.error("База данных не найдена: %s", db_path) + print(f"Ошибка: база данных не найдена: {db_path}") + return + + async with aiosqlite.connect(db_path) as conn: + await conn.execute("PRAGMA foreign_keys = ON") + + # Получаем все записи с текстом + cursor = await conn.execute( + "SELECT message_id, text FROM post_from_telegram_suggest WHERE text IS NOT NULL AND text != ''" + ) + posts = await cursor.fetchall() + await cursor.close() + + updated_count = 0 + skipped_count = 0 + error_count = 0 + + print(f"Найдено записей для обработки: {len(posts)}") + if dry_run: + print("РЕЖИМ ПРОВЕРКИ (dry-run): изменения не будут сохранены") + + # Обрабатываем каждую запись + for message_id, formatted_text in posts: + try: + # Извлекаем сырой текст + raw_text = extract_raw_text(formatted_text) + + # Проверяем, изменился ли текст + if raw_text == formatted_text: + skipped_count += 1 + continue + + if dry_run: + print(f"\n[DRY-RUN] message_id={message_id}:") + print(f" Было: {formatted_text[:100]}...") + print(f" Станет: {raw_text[:100]}...") + else: + # Обновляем запись + await conn.execute( + "UPDATE post_from_telegram_suggest SET text = ? WHERE message_id = ?", + (raw_text, message_id) + ) + updated_count += 1 + + except Exception as e: + logger.error(f"Ошибка при обработке поста message_id={message_id}: {e}") + error_count += 1 + + if not dry_run: + await conn.commit() + + total_processed = updated_count + skipped_count + error_count + logger.info( + f"Обработка завершена. Всего записей: {total_processed}, " + f"обновлено: {updated_count}, пропущено: {skipped_count}, ошибок: {error_count}" + ) + print(f"\nОбработка завершена:") + print(f" - Всего записей: {total_processed}") + print(f" - Обновлено: {updated_count}") + print(f" - Пропущено (уже в сыром виде): {skipped_count}") + print(f" - Ошибок: {error_count}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Приведение текста постов к 'сырому' виду" + ) + parser.add_argument( + "--db", + default=os.environ.get("DB_PATH", DEFAULT_DB_PATH), + help="Путь к БД (или DB_PATH)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Режим проверки без сохранения изменений", + ) + args = parser.parse_args() + asyncio.run(main(args.db, args.dry_run)) diff --git a/tests/test_post_repository.py b/tests/test_post_repository.py index 0a1bbd6..5030b58 100644 --- a/tests/test_post_repository.py +++ b/tests/test_post_repository.py @@ -77,6 +77,7 @@ class TestPostRepository: assert "message_id INTEGER NOT NULL PRIMARY KEY" in post_table_call assert "created_at INTEGER NOT NULL" in post_table_call assert "status TEXT NOT NULL DEFAULT 'suggest'" in post_table_call + assert "is_anonymous INTEGER" in post_table_call assert "FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE" in post_table_call # Проверяем создание таблицы контента @@ -104,14 +105,17 @@ class TestPostRepository: assert "INSERT INTO post_from_telegram_suggest" in query assert "status" in query - assert "VALUES (?, ?, ?, ?, ?)" in query - assert params == ( - sample_post.message_id, - sample_post.text, - sample_post.author_id, - sample_post.created_at, - sample_post.status, - ) + assert "is_anonymous" in query + assert "VALUES (?, ?, ?, ?, ?, ?)" in query + # Проверяем параметры: message_id, text, author_id, created_at, status, is_anonymous + assert params[0] == sample_post.message_id + assert params[1] == sample_post.text + assert params[2] == sample_post.author_id + assert params[3] == sample_post.created_at + assert params[4] == sample_post.status + # is_anonymous преобразуется в int (None -> None, True -> 1, False -> 0) + expected_is_anonymous = None if sample_post.is_anonymous is None else (1 if sample_post.is_anonymous else 0) + assert params[5] == expected_is_anonymous @pytest.mark.asyncio async def test_add_post_without_date(self, post_repository, sample_post_no_date): @@ -132,6 +136,8 @@ class TestPostRepository: assert params[3] == sample_post_no_date.created_at # created_at assert params[4] == sample_post_no_date.status # status (default suggest) + # Проверяем is_anonymous (должен быть в параметрах) + assert len(params) == 6 # Всего 6 параметров включая is_anonymous @pytest.mark.asyncio async def test_add_post_logs_correctly(self, post_repository, sample_post): @@ -476,6 +482,169 @@ class TestPostRepository: # Проверяем, что logger.info не вызывался post_repository.logger.info.assert_not_called() + @pytest.mark.asyncio + async def test_get_post_text_and_anonymity_by_message_id_found(self, post_repository): + """Тест получения текста и is_anonymous по message_id (пост найден).""" + # Мокаем _execute_query_with_result + mock_result = [("Тестовый текст", 1)] # is_anonymous = 1 (True) + post_repository._execute_query_with_result = AsyncMock(return_value=mock_result) + post_repository.logger = MagicMock() + + message_id = 12345 + + result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id) + + # Проверяем результат + text, is_anonymous = result + assert text == "Тестовый текст" + assert is_anonymous is True + + # Проверяем вызов _execute_query_with_result + post_repository._execute_query_with_result.assert_called_once() + call_args = post_repository._execute_query_with_result.call_args + query = call_args[0][0] + params = call_args[0][1] + + assert "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?" in query + assert params == (message_id,) + + @pytest.mark.asyncio + async def test_get_post_text_and_anonymity_by_message_id_with_false(self, post_repository): + """Тест получения текста и is_anonymous по message_id (is_anonymous = False).""" + # Мокаем _execute_query_with_result + mock_result = [("Тестовый текст", 0)] # is_anonymous = 0 (False) + post_repository._execute_query_with_result = AsyncMock(return_value=mock_result) + + message_id = 12345 + + result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id) + + # Проверяем результат + text, is_anonymous = result + assert text == "Тестовый текст" + assert is_anonymous is False + + @pytest.mark.asyncio + async def test_get_post_text_and_anonymity_by_message_id_with_null(self, post_repository): + """Тест получения текста и is_anonymous по message_id (is_anonymous = NULL).""" + # Мокаем _execute_query_with_result + mock_result = [("Тестовый текст", None)] # is_anonymous = NULL + post_repository._execute_query_with_result = AsyncMock(return_value=mock_result) + + message_id = 12345 + + result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id) + + # Проверяем результат + text, is_anonymous = result + assert text == "Тестовый текст" + assert is_anonymous is None + + @pytest.mark.asyncio + async def test_get_post_text_and_anonymity_by_message_id_not_found(self, post_repository): + """Тест получения текста и is_anonymous по message_id (пост не найден).""" + # Мокаем _execute_query_with_result + mock_result = [] + post_repository._execute_query_with_result = AsyncMock(return_value=mock_result) + + message_id = 12345 + + result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id) + + # Проверяем результат + text, is_anonymous = result + assert text is None + assert is_anonymous is None + + @pytest.mark.asyncio + async def test_get_post_text_and_anonymity_by_helper_id_found(self, post_repository): + """Тест получения текста и is_anonymous по helper_message_id (пост найден).""" + # Мокаем _execute_query_with_result + mock_result = [("Тестовый текст", 1)] # is_anonymous = 1 (True) + post_repository._execute_query_with_result = AsyncMock(return_value=mock_result) + post_repository.logger = MagicMock() + + helper_message_id = 67890 + + result = await post_repository.get_post_text_and_anonymity_by_helper_id(helper_message_id) + + # Проверяем результат + text, is_anonymous = result + assert text == "Тестовый текст" + assert is_anonymous is True + + # Проверяем вызов _execute_query_with_result + post_repository._execute_query_with_result.assert_called_once() + call_args = post_repository._execute_query_with_result.call_args + query = call_args[0][0] + params = call_args[0][1] + + assert "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" in query + assert params == (helper_message_id,) + + @pytest.mark.asyncio + async def test_add_post_with_is_anonymous_true(self, post_repository): + """Тест добавления поста с is_anonymous=True.""" + post = TelegramPost( + message_id=12345, + text="Тестовый пост анон", + author_id=67890, + created_at=int(datetime.now().timestamp()), + is_anonymous=True + ) + + post_repository._execute_query = AsyncMock() + + await post_repository.add_post(post) + + call_args = post_repository._execute_query.call_args + params = call_args[0][1] + + # Проверяем, что is_anonymous преобразован в 1 + assert params[5] == 1 + + @pytest.mark.asyncio + async def test_add_post_with_is_anonymous_false(self, post_repository): + """Тест добавления поста с is_anonymous=False.""" + post = TelegramPost( + message_id=12345, + text="Тестовый пост неанон", + author_id=67890, + created_at=int(datetime.now().timestamp()), + is_anonymous=False + ) + + post_repository._execute_query = AsyncMock() + + await post_repository.add_post(post) + + call_args = post_repository._execute_query.call_args + params = call_args[0][1] + + # Проверяем, что is_anonymous преобразован в 0 + assert params[5] == 0 + + @pytest.mark.asyncio + async def test_add_post_with_is_anonymous_none(self, post_repository): + """Тест добавления поста с is_anonymous=None.""" + post = TelegramPost( + message_id=12345, + text="Тестовый пост", + author_id=67890, + created_at=int(datetime.now().timestamp()), + is_anonymous=None + ) + + post_repository._execute_query = AsyncMock() + + await post_repository.add_post(post) + + call_args = post_repository._execute_query.call_args + params = call_args[0][1] + + # Проверяем, что is_anonymous остался None + assert params[5] is None + @pytest.mark.asyncio async def test_create_tables_logs_success(self, post_repository): """Тест логирования успешного создания таблиц.""" diff --git a/tests/test_post_service.py b/tests/test_post_service.py new file mode 100644 index 0000000..10db950 --- /dev/null +++ b/tests/test_post_service.py @@ -0,0 +1,287 @@ +"""Tests for PostService""" + +import pytest +from unittest.mock import Mock, AsyncMock, MagicMock, patch +from datetime import datetime +from aiogram import types + +from helper_bot.handlers.private.services import PostService, BotSettings +from database.models import TelegramPost, User + + +class TestPostService: + """Test class for PostService""" + + @pytest.fixture + def mock_db(self): + """Mock database""" + db = Mock() + db.add_post = AsyncMock() + db.update_helper_message = AsyncMock() + db.get_user_by_id = AsyncMock() + return db + + @pytest.fixture + def mock_settings(self): + """Mock bot settings""" + return BotSettings( + group_for_posts="test_posts", + group_for_message="test_message", + main_public="test_public", + group_for_logs="test_logs", + important_logs="test_important", + preview_link="test_link", + logs="test_logs_setting", + test="test_test_setting" + ) + + @pytest.fixture + def post_service(self, mock_db, mock_settings): + """Create PostService instance""" + return PostService(mock_db, mock_settings) + + @pytest.fixture + def mock_message(self): + """Mock Telegram message""" + message = Mock(spec=types.Message) + from_user = Mock() + from_user.id = 12345 + from_user.first_name = "Test" + from_user.username = "testuser" + from_user.full_name = "Test User" + message.from_user = from_user + message.text = "Тестовый пост" + message.message_id = 100 + message.bot = AsyncMock() + message.chat = Mock() + message.chat.id = 12345 + return message + + @pytest.mark.asyncio + async def test_handle_text_post_saves_raw_text(self, post_service, mock_message, mock_db): + """Test that handle_text_post saves raw text to database""" + with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted text"): + with patch('helper_bot.handlers.private.services.send_text_message', return_value=200): + with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False): + + await post_service.handle_text_post(mock_message, "Test") + + # Check that add_post was called + mock_db.add_post.assert_called_once() + call_args = mock_db.add_post.call_args[0][0] + + # Check that raw text is saved + assert isinstance(call_args, TelegramPost) + assert call_args.text == "Тестовый пост" # Raw text + assert call_args.message_id == 200 + assert call_args.author_id == 12345 + assert call_args.is_anonymous is False + + @pytest.mark.asyncio + async def test_handle_text_post_determines_anonymity(self, post_service, mock_message, mock_db): + """Test that handle_text_post determines anonymity correctly""" + mock_message.text = "Тестовый пост анон" + + with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted text"): + with patch('helper_bot.handlers.private.services.send_text_message', return_value=200): + with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True): + + await post_service.handle_text_post(mock_message, "Test") + + call_args = mock_db.add_post.call_args[0][0] + assert call_args.is_anonymous is True + + @pytest.mark.asyncio + async def test_handle_photo_post_saves_raw_caption(self, post_service, mock_message, mock_db): + """Test that handle_photo_post saves raw caption to database""" + mock_message.caption = "Тестовая подпись" + mock_message.photo = [Mock()] + mock_message.photo[-1].file_id = "photo_123" + + sent_message = Mock() + sent_message.message_id = 201 + sent_message.caption = "Formatted caption" + + with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted caption"): + with patch('helper_bot.handlers.private.services.send_photo_message', return_value=sent_message): + with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False): + with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True): + + await post_service.handle_photo_post(mock_message, "Test") + + mock_db.add_post.assert_called_once() + call_args = mock_db.add_post.call_args[0][0] + + # Check that raw caption is saved + assert call_args.text == "Тестовая подпись" # Raw caption + assert call_args.message_id == 201 + assert call_args.is_anonymous is False + + @pytest.mark.asyncio + async def test_handle_photo_post_without_caption(self, post_service, mock_message, mock_db): + """Test that handle_photo_post handles missing caption""" + mock_message.caption = None + mock_message.photo = [Mock()] + mock_message.photo[-1].file_id = "photo_123" + + sent_message = Mock() + sent_message.message_id = 202 + + with patch('helper_bot.handlers.private.services.get_text_message', return_value=""): + with patch('helper_bot.handlers.private.services.send_photo_message', return_value=sent_message): + with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False): + with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True): + + await post_service.handle_photo_post(mock_message, "Test") + + call_args = mock_db.add_post.call_args[0][0] + assert call_args.text == "" # Empty string for missing caption + assert call_args.is_anonymous is False + + @pytest.mark.asyncio + async def test_handle_video_post_saves_raw_caption(self, post_service, mock_message, mock_db): + """Test that handle_video_post saves raw caption to database""" + mock_message.caption = "Видео подпись" + mock_message.video = Mock() + mock_message.video.file_id = "video_123" + + sent_message = Mock() + sent_message.message_id = 203 + + with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"): + with patch('helper_bot.handlers.private.services.send_video_message', return_value=sent_message): + with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True): + with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True): + + await post_service.handle_video_post(mock_message, "Test") + + call_args = mock_db.add_post.call_args[0][0] + assert call_args.text == "Видео подпись" # Raw caption + assert call_args.is_anonymous is True + + @pytest.mark.asyncio + async def test_handle_audio_post_saves_raw_caption(self, post_service, mock_message, mock_db): + """Test that handle_audio_post saves raw caption to database""" + mock_message.caption = "Аудио подпись" + mock_message.audio = Mock() + mock_message.audio.file_id = "audio_123" + + sent_message = Mock() + sent_message.message_id = 204 + + with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"): + with patch('helper_bot.handlers.private.services.send_audio_message', return_value=sent_message): + with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False): + with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True): + + await post_service.handle_audio_post(mock_message, "Test") + + call_args = mock_db.add_post.call_args[0][0] + assert call_args.text == "Аудио подпись" # Raw caption + assert call_args.is_anonymous is False + + @pytest.mark.asyncio + async def test_handle_video_note_post_saves_empty_string(self, post_service, mock_message, mock_db): + """Test that handle_video_note_post saves empty string""" + mock_message.video_note = Mock() + mock_message.video_note.file_id = "video_note_123" + + sent_message = Mock() + sent_message.message_id = 205 + + with patch('helper_bot.handlers.private.services.send_video_note_message', return_value=sent_message): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False): + with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True): + + await post_service.handle_video_note_post(mock_message) + + call_args = mock_db.add_post.call_args[0][0] + assert call_args.text == "" # Empty string + assert call_args.is_anonymous is False + + @pytest.mark.asyncio + async def test_handle_voice_post_saves_empty_string(self, post_service, mock_message, mock_db): + """Test that handle_voice_post saves empty string""" + mock_message.voice = Mock() + mock_message.voice.file_id = "voice_123" + + sent_message = Mock() + sent_message.message_id = 206 + + with patch('helper_bot.handlers.private.services.send_voice_message', return_value=sent_message): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False): + with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True): + + await post_service.handle_voice_post(mock_message) + + call_args = mock_db.add_post.call_args[0][0] + assert call_args.text == "" # Empty string + assert call_args.is_anonymous is False + + @pytest.mark.asyncio + async def test_handle_media_group_post_saves_raw_caption(self, post_service, mock_message, mock_db): + """Test that handle_media_group_post saves raw caption to database""" + mock_message.message_id = 300 + mock_message.media_group_id = 1 + + album = [Mock()] + album[0].caption = "Медиагруппа подпись" + + with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"): + with patch('helper_bot.handlers.private.services.prepare_media_group_from_middlewares', return_value=[]): + with patch('helper_bot.handlers.private.services.send_media_group_message_to_private_chat', return_value=301): + with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.send_text_message', return_value=302): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True): + with patch('asyncio.sleep', return_value=None): + + await post_service.handle_media_group_post(mock_message, album, "Test") + + # Check main post + calls = mock_db.add_post.call_args_list + main_post = calls[0][0][0] + + assert main_post.text == "Медиагруппа подпись" # Raw caption + assert main_post.message_id == 300 + assert main_post.is_anonymous is True + + @pytest.mark.asyncio + async def test_handle_media_group_post_without_caption(self, post_service, mock_message, mock_db): + """Test that handle_media_group_post handles missing caption""" + mock_message.message_id = 301 + mock_message.media_group_id = 1 + + album = [Mock()] + album[0].caption = None + + with patch('helper_bot.handlers.private.services.get_text_message', return_value=" "): + with patch('helper_bot.handlers.private.services.prepare_media_group_from_middlewares', return_value=[]): + with patch('helper_bot.handlers.private.services.send_media_group_message_to_private_chat', return_value=302): + with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"): + with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None): + with patch('helper_bot.handlers.private.services.send_text_message', return_value=303): + with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False): + with patch('asyncio.sleep', return_value=None): + + await post_service.handle_media_group_post(mock_message, album, "Test") + + calls = mock_db.add_post.call_args_list + main_post = calls[0][0][0] + + assert main_post.text == "" # Empty string for missing caption + assert main_post.is_anonymous is False diff --git a/tests/test_utils.py b/tests/test_utils.py index 9befce0..c53812c 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,6 +6,7 @@ import os from helper_bot.utils.helper_func import ( get_first_name, get_text_message, + determine_anonymity, check_username_and_full_name, safe_html_escape, download_file, @@ -64,12 +65,13 @@ class TestHelperFunctions: def test_get_text_message(self, mock_message): """Тест функции обработки текста сообщения""" - # Тест с обычным текстом + # Тест с обычным текстом (legacy - определяется по тексту) text = "Привет, это тестовое сообщение" result = get_text_message(text, "Test", "testuser") assert "Test" in result assert "testuser" in result assert "тестовое сообщение" in result + assert "Автор поста" in result # Тест с пустым текстом result = get_text_message("", "Test", "testuser") @@ -83,6 +85,98 @@ class TestHelperFunctions: assert "testuser" in result assert "Обычный текст без специальных слов" in result + def test_get_text_message_with_is_anonymous_true(self, mock_message): + """Тест функции get_text_message с is_anonymous=True""" + text = "Тестовый пост" + result = get_text_message(text, "Test", "testuser", is_anonymous=True) + assert "Пост из ТГ:" in result + assert "Тестовый пост" in result + assert "Пост опубликован анонимно" in result + assert "Автор поста" not in result + + def test_get_text_message_with_is_anonymous_false(self, mock_message): + """Тест функции get_text_message с is_anonymous=False""" + text = "Тестовый пост" + result = get_text_message(text, "Test", "testuser", is_anonymous=False) + assert "Пост из ТГ:" in result + assert "Тестовый пост" in result + assert "Автор поста" in result + assert "Test" in result + assert "testuser" in result + assert "Пост опубликован анонимно" not in result + + def test_get_text_message_with_is_anonymous_none_legacy(self, mock_message): + """Тест функции get_text_message с is_anonymous=None (legacy - определяется по тексту)""" + # Тест с "анон" в тексте + text = "Тестовый пост анон" + result = get_text_message(text, "Test", "testuser", is_anonymous=None) + assert "Пост из ТГ:" in result + assert "Тестовый пост анон" in result + assert "Пост опубликован анонимно" in result + + # Тест с "неанон" в тексте + text = "Тестовый пост неанон" + result = get_text_message(text, "Test", "testuser", is_anonymous=None) + assert "Пост из ТГ:" in result + assert "Тестовый пост неанон" in result + assert "Автор поста" in result + + # Тест с "не анон" в тексте + text = "Тестовый пост не анон" + result = get_text_message(text, "Test", "testuser", is_anonymous=None) + assert "Автор поста" in result + + def test_get_text_message_with_username_none(self, mock_message): + """Тест функции get_text_message без username""" + text = "Тестовый пост" + result = get_text_message(text, "Test", None, is_anonymous=False) + assert "Test" in result + assert "(Ник не указан)" in result + assert "@" not in result + + def test_determine_anonymity_with_anon(self): + """Тест функции determine_anonymity с 'анон' в тексте""" + assert determine_anonymity("Этот пост анон") is True + assert determine_anonymity("анон") is True + assert determine_anonymity("АНОН") is True # Проверка регистра + assert determine_anonymity("пост анонимный анон") is True + + def test_determine_anonymity_with_neanon(self): + """Тест функции determine_anonymity с 'неанон' в тексте""" + assert determine_anonymity("Этот пост неанон") is False + assert determine_anonymity("неанон") is False + assert determine_anonymity("НЕАНОН") is False # Проверка регистра + assert determine_anonymity("пост неанон") is False + + def test_determine_anonymity_with_ne_anon(self): + """Тест функции determine_anonymity с 'не анон' в тексте""" + assert determine_anonymity("Этот пост не анон") is False + assert determine_anonymity("не анон") is False + assert determine_anonymity("НЕ АНОН") is False # Проверка регистра + assert determine_anonymity("пост не анон") is False + + def test_determine_anonymity_priority_neanon_over_anon(self): + """Тест приоритета 'неанон' над 'анон'""" + # Если есть и "анон" и "неанон", должен вернуть False + assert determine_anonymity("анон неанон") is False + assert determine_anonymity("неанон анон") is False + assert determine_anonymity("не анон анон") is False + + def test_determine_anonymity_without_keywords(self): + """Тест функции determine_anonymity без ключевых слов""" + assert determine_anonymity("Обычный текст") is False + assert determine_anonymity("") is False + assert determine_anonymity("Пост без специальных слов") is False + + def test_determine_anonymity_with_none(self): + """Тест функции determine_anonymity с None""" + assert determine_anonymity(None) is False + + def test_determine_anonymity_with_empty_string(self): + """Тест функции determine_anonymity с пустой строкой""" + assert determine_anonymity("") is False + assert determine_anonymity(" ") is False # Только пробелы + @pytest.mark.asyncio async def test_check_username_and_full_name(self): """Тест функции проверки изменений username и full_name"""