Реализован функцоинал хранения сырых текстов поста в базе данных. Оформление поста происходит непосредственно перед его отправкой в канал.

- Реализованы методы `get_post_text_and_anonymity_by_message_id` и `get_post_text_and_anonymity_by_helper_id` в `PostRepository` для получения текста поста и флага анонимности.
- Обновлена модель `TelegramPost`, добавлено поле `is_anonymous`.
- Изменена схема базы данных для включения поля `is_anonymous` в таблицу `post_from_telegram_suggest`.
- Обновлены функции публикации постов в `PostPublishService` для учета анонимности.
- Добавлены тесты для проверки новых функций и корректности работы с анонимностью.
This commit is contained in:
2026-01-23 12:12:21 +03:00
parent c6ba90552d
commit 89022aedaf
12 changed files with 1064 additions and 48 deletions

View File

@@ -167,6 +167,14 @@ class AsyncBotDB:
"""Получает ID автора по helper_text_message_id."""
return await self.factory.posts.get_author_id_by_helper_message_id(helper_text_message_id)
async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по message_id."""
return await self.factory.posts.get_post_text_and_anonymity_by_message_id(message_id)
async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по helper_text_message_id."""
return await self.factory.posts.get_post_text_and_anonymity_by_helper_id(helper_message_id)
async def update_status_by_message_id(self, message_id: int, status: str) -> int:
"""Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк."""
return await self.factory.posts.update_status_by_message_id(message_id, status)

View File

@@ -46,6 +46,7 @@ class TelegramPost:
helper_text_message_id: Optional[int] = None
created_at: Optional[int] = None
status: str = "suggest"
is_anonymous: Optional[bool] = None
@dataclass

View File

@@ -18,6 +18,7 @@ class PostRepository(DatabaseConnection):
author_id INTEGER,
created_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'suggest',
is_anonymous INTEGER,
FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE
)
'''
@@ -53,12 +54,14 @@ class PostRepository(DatabaseConnection):
if not post.created_at:
post.created_at = int(datetime.now().timestamp())
status = post.status if post.status else "suggest"
# Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None)
is_anonymous_int = None if post.is_anonymous is None else (1 if post.is_anonymous else 0)
query = """
INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status)
VALUES (?, ?, ?, ?, ?)
INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status, is_anonymous)
VALUES (?, ?, ?, ?, ?, ?)
"""
params = (post.message_id, post.text, post.author_id, post.created_at, status)
params = (post.message_id, post.text, post.author_id, post.created_at, status, is_anonymous_int)
await self._execute_query(query, params)
self.logger.info(f"Пост добавлен: message_id={post.message_id}")
@@ -219,3 +222,33 @@ class PostRepository(DatabaseConnection):
self.logger.info(f"Получен author_id: {author_id} для helper_message_id={helper_message_id}")
return author_id
return None
async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> Tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по message_id."""
query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?"
rows = await self._execute_query_with_result(query, (message_id,))
row = rows[0] if rows else None
if row:
text = row[0] or ""
is_anonymous_int = row[1]
# Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
self.logger.info(f"Получены текст и is_anonymous для message_id={message_id}")
return text, is_anonymous
return None, None
async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> Tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по helper_text_message_id."""
query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
rows = await self._execute_query_with_result(query, (helper_message_id,))
row = rows[0] if rows else None
if row:
text = row[0] or ""
is_anonymous_int = row[1]
# Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
self.logger.info(f"Получены текст и is_anonymous для helper_message_id={helper_message_id}")
return text, is_anonymous
return None, None

View File

@@ -58,6 +58,7 @@ CREATE TABLE IF NOT EXISTS post_from_telegram_suggest (
author_id INTEGER,
created_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'suggest',
is_anonymous INTEGER,
FOREIGN KEY (author_id) REFERENCES our_users(user_id) ON DELETE CASCADE
);

View File

@@ -8,7 +8,7 @@ from aiogram.types import CallbackQuery
from helper_bot.utils.helper_func import (
send_text_message, send_photo_message, send_video_message,
send_video_note_message, send_audio_message, send_voice_message,
send_media_group_to_channel, delete_user_blacklist
send_media_group_to_channel, delete_user_blacklist, get_text_message
)
from helper_bot.keyboards.keyboards import create_keyboard_for_ban_reason
from .exceptions import (
@@ -78,7 +78,6 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_text_post")
async def _publish_text_post(self, call: CallbackQuery) -> None:
"""Публикация текстового поста"""
text_post = html.escape(str(call.message.text))
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
@@ -86,7 +85,20 @@ class PostPublishService:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_text_message(self.main_public, call.message, text_post)
# Получаем сырой текст и is_anonymous из базы
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
if raw_text is None:
raw_text = ""
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
await send_text_message(self.main_public, call.message, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Текст сообщения опубликован в канале {self.main_public}.')
@@ -94,7 +106,6 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_photo_post")
async def _publish_photo_post(self, call: CallbackQuery) -> None:
"""Публикация поста с фото"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
@@ -102,7 +113,20 @@ class PostPublishService:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, text_post_with_photo)
# Получаем сырой текст и is_anonymous из базы
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
if raw_text is None:
raw_text = ""
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с фото опубликован в канале {self.main_public}.')
@@ -110,7 +134,6 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_video_post")
async def _publish_video_post(self, call: CallbackQuery) -> None:
"""Публикация поста с видео"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
@@ -118,7 +141,20 @@ class PostPublishService:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_video_message(self.main_public, call.message, call.message.video.file_id, text_post_with_photo)
# Получаем сырой текст и is_anonymous из базы
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
if raw_text is None:
raw_text = ""
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
await send_video_message(self.main_public, call.message, call.message.video.file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с видео опубликован в канале {self.main_public}.')
@@ -141,7 +177,6 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_audio_post")
async def _publish_audio_post(self, call: CallbackQuery) -> None:
"""Публикация поста с аудио"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
@@ -149,7 +184,20 @@ class PostPublishService:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_audio_message(self.main_public, call.message, call.message.audio.file_id, text_post_with_photo)
# Получаем сырой текст и is_anonymous из базы
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
if raw_text is None:
raw_text = ""
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
await send_audio_message(self.main_public, call.message, call.message.audio.file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с аудио опубликован в канале {self.main_public}.')
@@ -185,11 +233,12 @@ class PostPublishService:
logger.error(f"Контент медиагруппы не найден в базе данных для helper_message_id: {helper_message_id}")
raise PublishError("Контент медиагруппы не найден в базе данных")
# Получаем текст поста по helper_message_id
logger.debug(f"Получаю текст поста для helper_message_id: {helper_message_id}")
pre_text = await self.db.get_post_text_by_helper_id(helper_message_id)
post_text = html.escape(str(pre_text)) if pre_text else ""
logger.debug(f"Текст поста получен: {'пустой' if not post_text else f'длина: {len(post_text)} символов'}")
# Получаем сырой текст и is_anonymous по helper_message_id
logger.debug(f"Получаю текст и is_anonymous поста для helper_message_id: {helper_message_id}")
raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_helper_id(helper_message_id)
if raw_text is None:
raw_text = ""
logger.debug(f"Текст поста получен: {'пустой' if not raw_text else f'длина: {len(raw_text)} символов'}, is_anonymous={is_anonymous}")
# Получаем ID автора по helper_message_id
logger.debug(f"Получаю ID автора для helper_message_id: {helper_message_id}")
@@ -199,13 +248,22 @@ class PostPublishService:
raise PostNotFoundError(f"Автор не найден для медиагруппы {helper_message_id}")
logger.debug(f"ID автора получен: {author_id}")
# Получаем данные автора
user = await self.db.get_user_by_id(author_id)
if not user:
raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
# Формируем финальный текст с учетом is_anonymous
formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
logger.debug(f"Сформирован финальный текст: {'пустой' if not formatted_text else f'длина: {len(formatted_text)} символов'}")
# Отправляем медиагруппу в канал
logger.info(f"Отправляю медиагруппу в канал {self.main_public}")
await send_media_group_to_channel(
bot=self._get_bot(call.message),
chat_id=self.main_public,
post_content=post_content,
post_text=post_text
post_text=formatted_text
)
await self.db.update_status_for_media_group_by_helper_id(helper_message_id, "approved")

View File

@@ -13,11 +13,13 @@ from dataclasses import dataclass
from aiogram import types
from aiogram.types import FSInputFile
from database.models import TelegramPost, User
from logs.custom_logger import logger
# Local imports - utilities
from helper_bot.utils.helper_func import (
get_first_name,
get_text_message,
determine_anonymity,
send_text_message,
send_photo_message,
send_media_group_message_to_private_chat,
@@ -154,11 +156,17 @@ class PostService:
markup = get_reply_keyboard_for_post()
sent_message_id = await send_text_message(self.settings.group_for_posts, message, post_text, markup)
# Сохраняем сырой текст и определяем анонимность
raw_text = message.text or ""
is_anonymous = determine_anonymity(raw_text)
post = TelegramPost(
message_id=sent_message_id,
text=message.text,
text=raw_text,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
@@ -176,11 +184,16 @@ class PostService:
self.settings.group_for_posts, message, message.photo[-1].file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -201,11 +214,16 @@ class PostService:
self.settings.group_for_posts, message, message.video.file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -222,11 +240,16 @@ class PostService:
self.settings.group_for_posts, message, message.video_note.file_id, markup
)
# Сохраняем пустую строку, так как video_note не имеет caption
raw_caption = ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -247,11 +270,16 @@ class PostService:
self.settings.group_for_posts, message, message.audio.file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -268,11 +296,16 @@ class PostService:
self.settings.group_for_posts, message, message.voice.file_id, markup
)
# Сохраняем пустую строку, так как voice не имеет caption
raw_caption = ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=sent_message.caption or "",
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -285,17 +318,24 @@ class PostService:
@track_media_processing("media_group")
async def handle_media_group_post(self, message: types.Message, album: list, first_name: str) -> None:
"""Handle media group post submission"""
#TODO: Мне кажется тут какая-то дичь с одинаковыми переменными, в которых post_caption никуда не ведет
post_caption = " "
raw_caption = ""
if album and album[0].caption:
raw_caption = album[0].caption or ""
post_caption = get_text_message(album[0].caption.lower(), first_name, message.from_user.username)
# Определяем анонимность на основе сырого caption
is_anonymous = determine_anonymity(raw_caption)
# Создаем основной пост для медиагруппы
main_post = TelegramPost(
message_id=message.message_id, # ID основного сообщения медиагруппы
text=post_caption,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(main_post)

View File

@@ -85,14 +85,44 @@ def get_first_name(message: types.Message) -> str:
return ""
def get_text_message(post_text: str, first_name: str, username: str = None):
def determine_anonymity(post_text: str) -> bool:
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
Определяет, является ли пост анонимным на основе ключевых слов в тексте.
Args:
post_text: Текст сообщения
Returns:
bool: True, если "анон" в тексте; False, если "неанон" или "не анон" в тексте;
False по умолчанию (если нет ключевых слов)
"""
if not post_text:
return False
post_text_lower = post_text.lower()
# Сначала проверяем "неанон" или "не анон" (более специфичное условие)
if "неанон" in post_text_lower or "не анон" in post_text_lower:
return False
# Проверяем "анон"
if "анон" in post_text_lower:
return True
# По умолчанию False
return False
def get_text_message(post_text: str, first_name: str, username: str = None, is_anonymous: Optional[bool] = None):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон"
или переданного параметра is_anonymous.
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста (может быть None)
is_anonymous: Флаг анонимности (True - анонимно, False - не анонимно, None - legacy, определяется по тексту)
Returns:
str: - Сформированный текст сообщения.
@@ -109,6 +139,15 @@ def get_text_message(post_text: str, first_name: str, username: str = None):
else:
author_info = f"{first_name} (Ник не указан)"
# Если передан is_anonymous, используем его, иначе определяем по тексту (legacy)
# TODO: Уверен можно укоротить
if is_anonymous is not None:
if is_anonymous:
return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно'
else:
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
else:
# Legacy: определяем по тексту
if "неанон" in post_text or "не анон" in post_text:
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
elif "анон" in post_text:

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""
Скрипт миграции для добавления колонки is_anonymous в таблицу post_from_telegram_suggest.
Для существующих записей определяет is_anonymous на основе текста или устанавливает NULL.
"""
import argparse
import asyncio
import os
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import aiosqlite
from logs.custom_logger import logger
from helper_bot.utils.helper_func import determine_anonymity
DEFAULT_DB_PATH = "database/tg-bot-database.db"
def _column_exists(rows: list, name: str) -> bool:
"""PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk)."""
for row in rows:
if row[1] == name:
return True
return False
async def main(db_path: str) -> None:
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error("База данных не найдена: %s", db_path)
print(f"Ошибка: база данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Проверяем наличие колонки is_anonymous
cursor = await conn.execute(
"PRAGMA table_info(post_from_telegram_suggest)"
)
rows = await cursor.fetchall()
await cursor.close()
if not _column_exists(rows, "is_anonymous"):
logger.info("Добавление колонки is_anonymous в post_from_telegram_suggest")
await conn.execute(
"ALTER TABLE post_from_telegram_suggest "
"ADD COLUMN is_anonymous INTEGER"
)
await conn.commit()
print("Колонка is_anonymous добавлена.")
else:
print("Колонка is_anonymous уже существует.")
# Получаем все записи с текстом для обновления
cursor = await conn.execute(
"SELECT message_id, text FROM post_from_telegram_suggest WHERE text IS NOT NULL"
)
posts = await cursor.fetchall()
await cursor.close()
updated_count = 0
null_count = 0
# Обновляем каждую запись
for message_id, text in posts:
try:
# Определяем is_anonymous на основе текста
# Если текст пустой или None, устанавливаем NULL (legacy)
if not text or not text.strip():
is_anonymous = None
else:
is_anonymous = determine_anonymity(text)
# Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None)
is_anonymous_int = None if is_anonymous is None else (1 if is_anonymous else 0)
await conn.execute(
"UPDATE post_from_telegram_suggest SET is_anonymous = ? WHERE message_id = ?",
(is_anonymous_int, message_id)
)
if is_anonymous is not None:
updated_count += 1
else:
null_count += 1
except Exception as e:
logger.error(f"Ошибка при обработке поста message_id={message_id}: {e}")
# В случае ошибки устанавливаем NULL
await conn.execute(
"UPDATE post_from_telegram_suggest SET is_anonymous = NULL WHERE message_id = ?",
(message_id,)
)
null_count += 1
# Обновляем записи без текста (устанавливаем NULL)
cursor = await conn.execute(
"SELECT COUNT(*) FROM post_from_telegram_suggest WHERE text IS NULL"
)
row = await cursor.fetchone()
posts_without_text = row[0] if row else 0
await cursor.close()
if posts_without_text > 0:
await conn.execute(
"UPDATE post_from_telegram_suggest SET is_anonymous = NULL WHERE text IS NULL"
)
null_count += posts_without_text
await conn.commit()
total_updated = updated_count + null_count
logger.info(
f"Миграция завершена. Обновлено записей: {total_updated} "
f"(определено: {updated_count}, установлено NULL: {null_count})"
)
print(f"Миграция завершена.")
print(f"Обновлено записей: {total_updated}")
print(f" - Определено is_anonymous: {updated_count}")
print(f" - Установлено NULL: {null_count}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Добавление колонки is_anonymous в post_from_telegram_suggest"
)
parser.add_argument(
"--db",
default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
help="Путь к БД (или DB_PATH)",
)
args = parser.parse_args()
asyncio.run(main(args.db))

148
scripts/clean_post_text.py Executable file
View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
Скрипт для приведения текста постов к "сырому" виду.
Удаляет форматирование, добавленное функцией get_text_message(), оставляя только исходный текст.
"""
import argparse
import asyncio
import html
import os
import re
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import aiosqlite
from logs.custom_logger import logger
DEFAULT_DB_PATH = "database/tg-bot-database.db"
# Паттерны для определения форматированного текста
PREFIX = "Пост из ТГ:\n"
ANONYMOUS_SUFFIX = "\n\nПост опубликован анонимно"
AUTHOR_SUFFIX_PATTERN = re.compile(r"\n\nАвтор поста: .+$")
def extract_raw_text(formatted_text: str) -> str:
"""
Извлекает сырой текст из форматированного текста поста.
Args:
formatted_text: Форматированный текст поста
Returns:
str: Сырой текст или исходный текст, если форматирование не обнаружено
"""
if not formatted_text:
return ""
# Проверяем, начинается ли текст с префикса
if not formatted_text.startswith(PREFIX):
# Текст уже в сыром виде или имеет другой формат
return formatted_text
# Извлекаем текст после префикса
text_after_prefix = formatted_text[len(PREFIX):]
# Проверяем, заканчивается ли текст на "Пост опубликован анонимно"
if text_after_prefix.endswith(ANONYMOUS_SUFFIX):
raw_text = text_after_prefix[:-len(ANONYMOUS_SUFFIX)]
# Проверяем, заканчивается ли текст на "Автор поста: ..."
elif AUTHOR_SUFFIX_PATTERN.search(text_after_prefix):
raw_text = AUTHOR_SUFFIX_PATTERN.sub("", text_after_prefix)
else:
# Не удалось определить формат, возвращаем текст без префикса
raw_text = text_after_prefix
# Декодируем HTML-экранирование
raw_text = html.unescape(raw_text)
return raw_text
async def main(db_path: str, dry_run: bool = False) -> None:
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error("База данных не найдена: %s", db_path)
print(f"Ошибка: база данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Получаем все записи с текстом
cursor = await conn.execute(
"SELECT message_id, text FROM post_from_telegram_suggest WHERE text IS NOT NULL AND text != ''"
)
posts = await cursor.fetchall()
await cursor.close()
updated_count = 0
skipped_count = 0
error_count = 0
print(f"Найдено записей для обработки: {len(posts)}")
if dry_run:
print("РЕЖИМ ПРОВЕРКИ (dry-run): изменения не будут сохранены")
# Обрабатываем каждую запись
for message_id, formatted_text in posts:
try:
# Извлекаем сырой текст
raw_text = extract_raw_text(formatted_text)
# Проверяем, изменился ли текст
if raw_text == formatted_text:
skipped_count += 1
continue
if dry_run:
print(f"\n[DRY-RUN] message_id={message_id}:")
print(f" Было: {formatted_text[:100]}...")
print(f" Станет: {raw_text[:100]}...")
else:
# Обновляем запись
await conn.execute(
"UPDATE post_from_telegram_suggest SET text = ? WHERE message_id = ?",
(raw_text, message_id)
)
updated_count += 1
except Exception as e:
logger.error(f"Ошибка при обработке поста message_id={message_id}: {e}")
error_count += 1
if not dry_run:
await conn.commit()
total_processed = updated_count + skipped_count + error_count
logger.info(
f"Обработка завершена. Всего записей: {total_processed}, "
f"обновлено: {updated_count}, пропущено: {skipped_count}, ошибок: {error_count}"
)
print(f"\nОбработка завершена:")
print(f" - Всего записей: {total_processed}")
print(f" - Обновлено: {updated_count}")
print(f" - Пропущено (уже в сыром виде): {skipped_count}")
print(f" - Ошибок: {error_count}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Приведение текста постов к 'сырому' виду"
)
parser.add_argument(
"--db",
default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
help="Путь к БД (или DB_PATH)",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Режим проверки без сохранения изменений",
)
args = parser.parse_args()
asyncio.run(main(args.db, args.dry_run))

View File

@@ -77,6 +77,7 @@ class TestPostRepository:
assert "message_id INTEGER NOT NULL PRIMARY KEY" in post_table_call
assert "created_at INTEGER NOT NULL" in post_table_call
assert "status TEXT NOT NULL DEFAULT 'suggest'" in post_table_call
assert "is_anonymous INTEGER" in post_table_call
assert "FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE" in post_table_call
# Проверяем создание таблицы контента
@@ -104,14 +105,17 @@ class TestPostRepository:
assert "INSERT INTO post_from_telegram_suggest" in query
assert "status" in query
assert "VALUES (?, ?, ?, ?, ?)" in query
assert params == (
sample_post.message_id,
sample_post.text,
sample_post.author_id,
sample_post.created_at,
sample_post.status,
)
assert "is_anonymous" in query
assert "VALUES (?, ?, ?, ?, ?, ?)" in query
# Проверяем параметры: message_id, text, author_id, created_at, status, is_anonymous
assert params[0] == sample_post.message_id
assert params[1] == sample_post.text
assert params[2] == sample_post.author_id
assert params[3] == sample_post.created_at
assert params[4] == sample_post.status
# is_anonymous преобразуется в int (None -> None, True -> 1, False -> 0)
expected_is_anonymous = None if sample_post.is_anonymous is None else (1 if sample_post.is_anonymous else 0)
assert params[5] == expected_is_anonymous
@pytest.mark.asyncio
async def test_add_post_without_date(self, post_repository, sample_post_no_date):
@@ -132,6 +136,8 @@ class TestPostRepository:
assert params[3] == sample_post_no_date.created_at # created_at
assert params[4] == sample_post_no_date.status # status (default suggest)
# Проверяем is_anonymous (должен быть в параметрах)
assert len(params) == 6 # Всего 6 параметров включая is_anonymous
@pytest.mark.asyncio
async def test_add_post_logs_correctly(self, post_repository, sample_post):
@@ -476,6 +482,169 @@ class TestPostRepository:
# Проверяем, что logger.info не вызывался
post_repository.logger.info.assert_not_called()
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_message_id_found(self, post_repository):
"""Тест получения текста и is_anonymous по message_id (пост найден)."""
# Мокаем _execute_query_with_result
mock_result = [("Тестовый текст", 1)] # is_anonymous = 1 (True)
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
post_repository.logger = MagicMock()
message_id = 12345
result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
# Проверяем результат
text, is_anonymous = result
assert text == "Тестовый текст"
assert is_anonymous is True
# Проверяем вызов _execute_query_with_result
post_repository._execute_query_with_result.assert_called_once()
call_args = post_repository._execute_query_with_result.call_args
query = call_args[0][0]
params = call_args[0][1]
assert "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?" in query
assert params == (message_id,)
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_message_id_with_false(self, post_repository):
"""Тест получения текста и is_anonymous по message_id (is_anonymous = False)."""
# Мокаем _execute_query_with_result
mock_result = [("Тестовый текст", 0)] # is_anonymous = 0 (False)
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
message_id = 12345
result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
# Проверяем результат
text, is_anonymous = result
assert text == "Тестовый текст"
assert is_anonymous is False
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_message_id_with_null(self, post_repository):
"""Тест получения текста и is_anonymous по message_id (is_anonymous = NULL)."""
# Мокаем _execute_query_with_result
mock_result = [("Тестовый текст", None)] # is_anonymous = NULL
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
message_id = 12345
result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
# Проверяем результат
text, is_anonymous = result
assert text == "Тестовый текст"
assert is_anonymous is None
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_message_id_not_found(self, post_repository):
"""Тест получения текста и is_anonymous по message_id (пост не найден)."""
# Мокаем _execute_query_with_result
mock_result = []
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
message_id = 12345
result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
# Проверяем результат
text, is_anonymous = result
assert text is None
assert is_anonymous is None
@pytest.mark.asyncio
async def test_get_post_text_and_anonymity_by_helper_id_found(self, post_repository):
"""Тест получения текста и is_anonymous по helper_message_id (пост найден)."""
# Мокаем _execute_query_with_result
mock_result = [("Тестовый текст", 1)] # is_anonymous = 1 (True)
post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
post_repository.logger = MagicMock()
helper_message_id = 67890
result = await post_repository.get_post_text_and_anonymity_by_helper_id(helper_message_id)
# Проверяем результат
text, is_anonymous = result
assert text == "Тестовый текст"
assert is_anonymous is True
# Проверяем вызов _execute_query_with_result
post_repository._execute_query_with_result.assert_called_once()
call_args = post_repository._execute_query_with_result.call_args
query = call_args[0][0]
params = call_args[0][1]
assert "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" in query
assert params == (helper_message_id,)
@pytest.mark.asyncio
async def test_add_post_with_is_anonymous_true(self, post_repository):
"""Тест добавления поста с is_anonymous=True."""
post = TelegramPost(
message_id=12345,
text="Тестовый пост анон",
author_id=67890,
created_at=int(datetime.now().timestamp()),
is_anonymous=True
)
post_repository._execute_query = AsyncMock()
await post_repository.add_post(post)
call_args = post_repository._execute_query.call_args
params = call_args[0][1]
# Проверяем, что is_anonymous преобразован в 1
assert params[5] == 1
@pytest.mark.asyncio
async def test_add_post_with_is_anonymous_false(self, post_repository):
"""Тест добавления поста с is_anonymous=False."""
post = TelegramPost(
message_id=12345,
text="Тестовый пост неанон",
author_id=67890,
created_at=int(datetime.now().timestamp()),
is_anonymous=False
)
post_repository._execute_query = AsyncMock()
await post_repository.add_post(post)
call_args = post_repository._execute_query.call_args
params = call_args[0][1]
# Проверяем, что is_anonymous преобразован в 0
assert params[5] == 0
@pytest.mark.asyncio
async def test_add_post_with_is_anonymous_none(self, post_repository):
"""Тест добавления поста с is_anonymous=None."""
post = TelegramPost(
message_id=12345,
text="Тестовый пост",
author_id=67890,
created_at=int(datetime.now().timestamp()),
is_anonymous=None
)
post_repository._execute_query = AsyncMock()
await post_repository.add_post(post)
call_args = post_repository._execute_query.call_args
params = call_args[0][1]
# Проверяем, что is_anonymous остался None
assert params[5] is None
@pytest.mark.asyncio
async def test_create_tables_logs_success(self, post_repository):
"""Тест логирования успешного создания таблиц."""

287
tests/test_post_service.py Normal file
View File

@@ -0,0 +1,287 @@
"""Tests for PostService"""
import pytest
from unittest.mock import Mock, AsyncMock, MagicMock, patch
from datetime import datetime
from aiogram import types
from helper_bot.handlers.private.services import PostService, BotSettings
from database.models import TelegramPost, User
class TestPostService:
"""Test class for PostService"""
@pytest.fixture
def mock_db(self):
"""Mock database"""
db = Mock()
db.add_post = AsyncMock()
db.update_helper_message = AsyncMock()
db.get_user_by_id = AsyncMock()
return db
@pytest.fixture
def mock_settings(self):
"""Mock bot settings"""
return BotSettings(
group_for_posts="test_posts",
group_for_message="test_message",
main_public="test_public",
group_for_logs="test_logs",
important_logs="test_important",
preview_link="test_link",
logs="test_logs_setting",
test="test_test_setting"
)
@pytest.fixture
def post_service(self, mock_db, mock_settings):
"""Create PostService instance"""
return PostService(mock_db, mock_settings)
@pytest.fixture
def mock_message(self):
"""Mock Telegram message"""
message = Mock(spec=types.Message)
from_user = Mock()
from_user.id = 12345
from_user.first_name = "Test"
from_user.username = "testuser"
from_user.full_name = "Test User"
message.from_user = from_user
message.text = "Тестовый пост"
message.message_id = 100
message.bot = AsyncMock()
message.chat = Mock()
message.chat.id = 12345
return message
@pytest.mark.asyncio
async def test_handle_text_post_saves_raw_text(self, post_service, mock_message, mock_db):
"""Test that handle_text_post saves raw text to database"""
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted text"):
with patch('helper_bot.handlers.private.services.send_text_message', return_value=200):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
await post_service.handle_text_post(mock_message, "Test")
# Check that add_post was called
mock_db.add_post.assert_called_once()
call_args = mock_db.add_post.call_args[0][0]
# Check that raw text is saved
assert isinstance(call_args, TelegramPost)
assert call_args.text == "Тестовый пост" # Raw text
assert call_args.message_id == 200
assert call_args.author_id == 12345
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_text_post_determines_anonymity(self, post_service, mock_message, mock_db):
"""Test that handle_text_post determines anonymity correctly"""
mock_message.text = "Тестовый пост анон"
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted text"):
with patch('helper_bot.handlers.private.services.send_text_message', return_value=200):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
await post_service.handle_text_post(mock_message, "Test")
call_args = mock_db.add_post.call_args[0][0]
assert call_args.is_anonymous is True
@pytest.mark.asyncio
async def test_handle_photo_post_saves_raw_caption(self, post_service, mock_message, mock_db):
"""Test that handle_photo_post saves raw caption to database"""
mock_message.caption = "Тестовая подпись"
mock_message.photo = [Mock()]
mock_message.photo[-1].file_id = "photo_123"
sent_message = Mock()
sent_message.message_id = 201
sent_message.caption = "Formatted caption"
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted caption"):
with patch('helper_bot.handlers.private.services.send_photo_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_photo_post(mock_message, "Test")
mock_db.add_post.assert_called_once()
call_args = mock_db.add_post.call_args[0][0]
# Check that raw caption is saved
assert call_args.text == "Тестовая подпись" # Raw caption
assert call_args.message_id == 201
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_photo_post_without_caption(self, post_service, mock_message, mock_db):
"""Test that handle_photo_post handles missing caption"""
mock_message.caption = None
mock_message.photo = [Mock()]
mock_message.photo[-1].file_id = "photo_123"
sent_message = Mock()
sent_message.message_id = 202
with patch('helper_bot.handlers.private.services.get_text_message', return_value=""):
with patch('helper_bot.handlers.private.services.send_photo_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_photo_post(mock_message, "Test")
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "" # Empty string for missing caption
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_video_post_saves_raw_caption(self, post_service, mock_message, mock_db):
"""Test that handle_video_post saves raw caption to database"""
mock_message.caption = "Видео подпись"
mock_message.video = Mock()
mock_message.video.file_id = "video_123"
sent_message = Mock()
sent_message.message_id = 203
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
with patch('helper_bot.handlers.private.services.send_video_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_video_post(mock_message, "Test")
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "Видео подпись" # Raw caption
assert call_args.is_anonymous is True
@pytest.mark.asyncio
async def test_handle_audio_post_saves_raw_caption(self, post_service, mock_message, mock_db):
"""Test that handle_audio_post saves raw caption to database"""
mock_message.caption = "Аудио подпись"
mock_message.audio = Mock()
mock_message.audio.file_id = "audio_123"
sent_message = Mock()
sent_message.message_id = 204
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
with patch('helper_bot.handlers.private.services.send_audio_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_audio_post(mock_message, "Test")
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "Аудио подпись" # Raw caption
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_video_note_post_saves_empty_string(self, post_service, mock_message, mock_db):
"""Test that handle_video_note_post saves empty string"""
mock_message.video_note = Mock()
mock_message.video_note.file_id = "video_note_123"
sent_message = Mock()
sent_message.message_id = 205
with patch('helper_bot.handlers.private.services.send_video_note_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_video_note_post(mock_message)
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "" # Empty string
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_voice_post_saves_empty_string(self, post_service, mock_message, mock_db):
"""Test that handle_voice_post saves empty string"""
mock_message.voice = Mock()
mock_message.voice.file_id = "voice_123"
sent_message = Mock()
sent_message.message_id = 206
with patch('helper_bot.handlers.private.services.send_voice_message', return_value=sent_message):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
await post_service.handle_voice_post(mock_message)
call_args = mock_db.add_post.call_args[0][0]
assert call_args.text == "" # Empty string
assert call_args.is_anonymous is False
@pytest.mark.asyncio
async def test_handle_media_group_post_saves_raw_caption(self, post_service, mock_message, mock_db):
"""Test that handle_media_group_post saves raw caption to database"""
mock_message.message_id = 300
mock_message.media_group_id = 1
album = [Mock()]
album[0].caption = "Медиагруппа подпись"
with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
with patch('helper_bot.handlers.private.services.prepare_media_group_from_middlewares', return_value=[]):
with patch('helper_bot.handlers.private.services.send_media_group_message_to_private_chat', return_value=301):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.send_text_message', return_value=302):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
with patch('asyncio.sleep', return_value=None):
await post_service.handle_media_group_post(mock_message, album, "Test")
# Check main post
calls = mock_db.add_post.call_args_list
main_post = calls[0][0][0]
assert main_post.text == "Медиагруппа подпись" # Raw caption
assert main_post.message_id == 300
assert main_post.is_anonymous is True
@pytest.mark.asyncio
async def test_handle_media_group_post_without_caption(self, post_service, mock_message, mock_db):
"""Test that handle_media_group_post handles missing caption"""
mock_message.message_id = 301
mock_message.media_group_id = 1
album = [Mock()]
album[0].caption = None
with patch('helper_bot.handlers.private.services.get_text_message', return_value=" "):
with patch('helper_bot.handlers.private.services.prepare_media_group_from_middlewares', return_value=[]):
with patch('helper_bot.handlers.private.services.send_media_group_message_to_private_chat', return_value=302):
with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
with patch('helper_bot.handlers.private.services.send_text_message', return_value=303):
with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
with patch('asyncio.sleep', return_value=None):
await post_service.handle_media_group_post(mock_message, album, "Test")
calls = mock_db.add_post.call_args_list
main_post = calls[0][0][0]
assert main_post.text == "" # Empty string for missing caption
assert main_post.is_anonymous is False

View File

@@ -6,6 +6,7 @@ import os
from helper_bot.utils.helper_func import (
get_first_name,
get_text_message,
determine_anonymity,
check_username_and_full_name,
safe_html_escape,
download_file,
@@ -64,12 +65,13 @@ class TestHelperFunctions:
def test_get_text_message(self, mock_message):
"""Тест функции обработки текста сообщения"""
# Тест с обычным текстом
# Тест с обычным текстом (legacy - определяется по тексту)
text = "Привет, это тестовое сообщение"
result = get_text_message(text, "Test", "testuser")
assert "Test" in result
assert "testuser" in result
assert "тестовое сообщение" in result
assert "Автор поста" in result
# Тест с пустым текстом
result = get_text_message("", "Test", "testuser")
@@ -83,6 +85,98 @@ class TestHelperFunctions:
assert "testuser" in result
assert "Обычный текст без специальных слов" in result
def test_get_text_message_with_is_anonymous_true(self, mock_message):
"""Тест функции get_text_message с is_anonymous=True"""
text = "Тестовый пост"
result = get_text_message(text, "Test", "testuser", is_anonymous=True)
assert "Пост из ТГ:" in result
assert "Тестовый пост" in result
assert "Пост опубликован анонимно" in result
assert "Автор поста" not in result
def test_get_text_message_with_is_anonymous_false(self, mock_message):
"""Тест функции get_text_message с is_anonymous=False"""
text = "Тестовый пост"
result = get_text_message(text, "Test", "testuser", is_anonymous=False)
assert "Пост из ТГ:" in result
assert "Тестовый пост" in result
assert "Автор поста" in result
assert "Test" in result
assert "testuser" in result
assert "Пост опубликован анонимно" not in result
def test_get_text_message_with_is_anonymous_none_legacy(self, mock_message):
"""Тест функции get_text_message с is_anonymous=None (legacy - определяется по тексту)"""
# Тест с "анон" в тексте
text = "Тестовый пост анон"
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
assert "Пост из ТГ:" in result
assert "Тестовый пост анон" in result
assert "Пост опубликован анонимно" in result
# Тест с "неанон" в тексте
text = "Тестовый пост неанон"
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
assert "Пост из ТГ:" in result
assert "Тестовый пост неанон" in result
assert "Автор поста" in result
# Тест с "не анон" в тексте
text = "Тестовый пост не анон"
result = get_text_message(text, "Test", "testuser", is_anonymous=None)
assert "Автор поста" in result
def test_get_text_message_with_username_none(self, mock_message):
"""Тест функции get_text_message без username"""
text = "Тестовый пост"
result = get_text_message(text, "Test", None, is_anonymous=False)
assert "Test" in result
assert "(Ник не указан)" in result
assert "@" not in result
def test_determine_anonymity_with_anon(self):
"""Тест функции determine_anonymity с 'анон' в тексте"""
assert determine_anonymity("Этот пост анон") is True
assert determine_anonymity("анон") is True
assert determine_anonymity("АНОН") is True # Проверка регистра
assert determine_anonymity("пост анонимный анон") is True
def test_determine_anonymity_with_neanon(self):
"""Тест функции determine_anonymity с 'неанон' в тексте"""
assert determine_anonymity("Этот пост неанон") is False
assert determine_anonymity("неанон") is False
assert determine_anonymity("НЕАНОН") is False # Проверка регистра
assert determine_anonymity("пост неанон") is False
def test_determine_anonymity_with_ne_anon(self):
"""Тест функции determine_anonymity с 'не анон' в тексте"""
assert determine_anonymity("Этот пост не анон") is False
assert determine_anonymity("не анон") is False
assert determine_anonymity("НЕ АНОН") is False # Проверка регистра
assert determine_anonymity("пост не анон") is False
def test_determine_anonymity_priority_neanon_over_anon(self):
"""Тест приоритета 'неанон' над 'анон'"""
# Если есть и "анон" и "неанон", должен вернуть False
assert determine_anonymity("анон неанон") is False
assert determine_anonymity("неанон анон") is False
assert determine_anonymity("не анон анон") is False
def test_determine_anonymity_without_keywords(self):
"""Тест функции determine_anonymity без ключевых слов"""
assert determine_anonymity("Обычный текст") is False
assert determine_anonymity("") is False
assert determine_anonymity("Пост без специальных слов") is False
def test_determine_anonymity_with_none(self):
"""Тест функции determine_anonymity с None"""
assert determine_anonymity(None) is False
def test_determine_anonymity_with_empty_string(self):
"""Тест функции determine_anonymity с пустой строкой"""
assert determine_anonymity("") is False
assert determine_anonymity(" ") is False # Только пробелы
@pytest.mark.asyncio
async def test_check_username_and_full_name(self):
"""Тест функции проверки изменений username и full_name"""