diff --git a/database/async_db.py b/database/async_db.py
index 6174e78..07973eb 100644
--- a/database/async_db.py
+++ b/database/async_db.py
@@ -3,7 +3,7 @@ from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from database.repository_factory import RepositoryFactory
from database.models import (
- User, BlacklistUser, UserMessage, TelegramPost, PostContent,
+ User, BlacklistUser, BlacklistHistoryRecord, UserMessage, TelegramPost, PostContent,
Admin, AudioMessage
)
@@ -142,10 +142,18 @@ class AsyncBotDB:
async def get_post_content_from_telegram_by_last_id(self, last_post_id: int) -> List[Tuple[str, str]]:
"""Получает контент поста по helper_text_message_id."""
return await self.factory.posts.get_post_content_by_helper_id(last_post_id)
+
+ async def get_post_content_by_helper_id(self, helper_message_id: int) -> List[Tuple[str, str]]:
+ """Алиас для get_post_content_from_telegram_by_last_id (используется callback-сервисом)."""
+ return await self.get_post_content_from_telegram_by_last_id(helper_message_id)
async def get_post_text_from_telegram_by_last_id(self, last_post_id: int) -> Optional[str]:
"""Получает текст поста по helper_text_message_id."""
return await self.factory.posts.get_post_text_by_helper_id(last_post_id)
+
+ async def get_post_text_by_helper_id(self, helper_message_id: int) -> Optional[str]:
+ """Алиас для get_post_text_from_telegram_by_last_id (используется callback-сервисом)."""
+ return await self.get_post_text_from_telegram_by_last_id(helper_message_id)
async def get_post_ids_from_telegram_by_last_id(self, last_post_id: int) -> List[int]:
"""Получает ID сообщений по helper_text_message_id."""
@@ -159,19 +167,80 @@ class AsyncBotDB:
"""Получает ID автора по helper_text_message_id."""
return await self.factory.posts.get_author_id_by_helper_message_id(helper_text_message_id)
+ async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> tuple[Optional[str], Optional[bool]]:
+ """Получает текст и is_anonymous поста по message_id."""
+ return await self.factory.posts.get_post_text_and_anonymity_by_message_id(message_id)
+
+ async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> tuple[Optional[str], Optional[bool]]:
+ """Получает текст и is_anonymous поста по helper_text_message_id."""
+ return await self.factory.posts.get_post_text_and_anonymity_by_helper_id(helper_message_id)
+
+ async def update_status_by_message_id(self, message_id: int, status: str) -> int:
+ """Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк."""
+ return await self.factory.posts.update_status_by_message_id(message_id, status)
+
+ async def update_status_for_media_group_by_helper_id(
+ self, helper_message_id: int, status: str
+ ) -> int:
+ """Обновление статуса постов медиагруппы по helper_message_id. Возвращает число обновлённых строк."""
+ return await self.factory.posts.update_status_for_media_group_by_helper_id(
+ helper_message_id, status
+ )
+
# Методы для работы с черным списком
- async def set_user_blacklist(self, user_id: int, user_name: str = None,
- message_for_user: str = None, date_to_unban: int = None):
- """Добавляет пользователя в черный список."""
+ async def set_user_blacklist(
+ self,
+ user_id: int,
+ user_name: str = None,
+ message_for_user: str = None,
+ date_to_unban: int = None,
+ ban_author: Optional[int] = None,
+ ):
+ """
+ Добавляет пользователя в черный список.
+ Также создает запись в истории банов для отслеживания.
+ """
blacklist_user = BlacklistUser(
user_id=user_id,
message_for_user=message_for_user,
- date_to_unban=date_to_unban
+ date_to_unban=date_to_unban,
+ ban_author=ban_author,
)
await self.factory.blacklist.add_user(blacklist_user)
+
+ # Логируем в историю банов
+ try:
+ date_ban = int(datetime.now().timestamp())
+ history_record = BlacklistHistoryRecord(
+ user_id=user_id,
+ message_for_user=message_for_user,
+ date_ban=date_ban,
+ date_unban=None, # Будет установлено при разбане
+ ban_author=ban_author,
+ )
+ await self.factory.blacklist_history.add_record_on_ban(history_record)
+ except Exception as e:
+ # Ошибка записи в историю не должна ломать процесс бана
+ self.logger.error(
+ f"Ошибка записи в историю банов для user_id={user_id}: {e}"
+ )
async def delete_user_blacklist(self, user_id: int) -> bool:
- """Удаляет пользователя из черного списка."""
+ """
+ Удаляет пользователя из черного списка.
+ Также обновляет запись в истории банов, устанавливая date_unban.
+ """
+ # Сначала обновляем историю (если есть открытая запись)
+ try:
+ date_unban = int(datetime.now().timestamp())
+ await self.factory.blacklist_history.set_unban_date(user_id, date_unban)
+ except Exception as e:
+ # Ошибка записи в историю не должна ломать критический путь разбана
+ self.logger.error(
+ f"Ошибка обновления истории при разбане для user_id={user_id}: {e}"
+ )
+
+ # Удаляем из черного списка (критический путь)
return await self.factory.blacklist.remove_user(user_id)
async def check_user_in_blacklist(self, user_id: int) -> bool:
diff --git a/database/models.py b/database/models.py
index a2b5b90..4f719d9 100644
--- a/database/models.py
+++ b/database/models.py
@@ -26,6 +26,20 @@ class BlacklistUser:
message_for_user: Optional[str] = None
date_to_unban: Optional[int] = None
created_at: Optional[int] = None
+ ban_author: Optional[int] = None
+
+
+@dataclass
+class BlacklistHistoryRecord:
+ """Модель записи истории банов/разбанов."""
+ user_id: int
+ message_for_user: Optional[str] = None
+ date_ban: int = 0
+ date_unban: Optional[int] = None
+ ban_author: Optional[int] = None
+ id: Optional[int] = None
+ created_at: Optional[int] = None
+ updated_at: Optional[int] = None
@dataclass
@@ -45,6 +59,8 @@ class TelegramPost:
author_id: int
helper_text_message_id: Optional[int] = None
created_at: Optional[int] = None
+ status: str = "suggest"
+ is_anonymous: Optional[bool] = None
@dataclass
diff --git a/database/repositories/__init__.py b/database/repositories/__init__.py
index 4f8f70b..867b4bf 100644
--- a/database/repositories/__init__.py
+++ b/database/repositories/__init__.py
@@ -4,6 +4,7 @@
Содержит репозитории для разных сущностей:
- user_repository: работа с пользователями
- blacklist_repository: работа с черным списком
+- blacklist_history_repository: работа с историей банов/разбанов
- message_repository: работа с сообщениями
- post_repository: работа с постами
- admin_repository: работа с администраторами
@@ -12,12 +13,13 @@
from .user_repository import UserRepository
from .blacklist_repository import BlacklistRepository
+from .blacklist_history_repository import BlacklistHistoryRepository
from .message_repository import MessageRepository
from .post_repository import PostRepository
from .admin_repository import AdminRepository
from .audio_repository import AudioRepository
__all__ = [
- 'UserRepository', 'BlacklistRepository', 'MessageRepository', 'PostRepository',
- 'AdminRepository', 'AudioRepository'
+ 'UserRepository', 'BlacklistRepository', 'BlacklistHistoryRepository',
+ 'MessageRepository', 'PostRepository', 'AdminRepository', 'AudioRepository'
]
diff --git a/database/repositories/blacklist_history_repository.py b/database/repositories/blacklist_history_repository.py
new file mode 100644
index 0000000..e0914a0
--- /dev/null
+++ b/database/repositories/blacklist_history_repository.py
@@ -0,0 +1,119 @@
+from typing import Optional
+from database.base import DatabaseConnection
+from database.models import BlacklistHistoryRecord
+
+
+class BlacklistHistoryRepository(DatabaseConnection):
+ """Репозиторий для работы с историей банов/разбанов."""
+
+ async def create_tables(self):
+ """Создание таблицы истории банов/разбанов."""
+ query = '''
+ CREATE TABLE IF NOT EXISTS blacklist_history (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ message_for_user TEXT,
+ date_ban INTEGER NOT NULL,
+ date_unban INTEGER,
+ ban_author INTEGER,
+ created_at INTEGER DEFAULT (strftime('%s', 'now')),
+ updated_at INTEGER DEFAULT (strftime('%s', 'now')),
+ FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
+ FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
+ )
+ '''
+ await self._execute_query(query)
+
+ # Создаем индексы
+ await self._execute_query(
+ "CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id)"
+ )
+ await self._execute_query(
+ "CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban)"
+ )
+ await self._execute_query(
+ "CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban)"
+ )
+
+ self.logger.info("Таблица истории банов/разбанов создана")
+
+ async def add_record_on_ban(self, record: BlacklistHistoryRecord) -> None:
+ """Добавляет запись о бане в историю."""
+ query = """
+ INSERT INTO blacklist_history (
+ user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at
+ )
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ """
+ # Используем текущее время, если не указано
+ from datetime import datetime
+ current_timestamp = int(datetime.now().timestamp())
+
+ params = (
+ record.user_id,
+ record.message_for_user,
+ record.date_ban,
+ record.date_unban,
+ record.ban_author,
+ record.created_at if record.created_at is not None else current_timestamp,
+ record.updated_at if record.updated_at is not None else current_timestamp,
+ )
+
+ await self._execute_query(query, params)
+ self.logger.info(
+ f"Запись о бане добавлена в историю: user_id={record.user_id}, "
+ f"date_ban={record.date_ban}"
+ )
+
+ async def set_unban_date(self, user_id: int, date_unban: int) -> bool:
+ """
+ Обновляет date_unban и updated_at в последней записи (date_unban IS NULL) для пользователя.
+
+ Args:
+ user_id: ID пользователя
+ date_unban: Timestamp даты разбана
+
+ Returns:
+ True если запись обновлена, False если не найдена открытая запись
+ """
+ try:
+ from datetime import datetime
+ current_timestamp = int(datetime.now().timestamp())
+
+ # SQLite не поддерживает ORDER BY в UPDATE, поэтому используем подзапрос
+ # Сначала проверяем, есть ли открытая запись
+ check_query = """
+ SELECT id FROM blacklist_history
+ WHERE user_id = ? AND date_unban IS NULL
+ ORDER BY id DESC
+ LIMIT 1
+ """
+ rows = await self._execute_query_with_result(check_query, (user_id,))
+
+ if not rows:
+ self.logger.warning(
+ f"Не найдена открытая запись в истории для обновления: user_id={user_id}"
+ )
+ return False
+
+ # Обновляем найденную запись
+ update_query = """
+ UPDATE blacklist_history
+ SET date_unban = ?,
+ updated_at = ?
+ WHERE id = ?
+ """
+
+ record_id = rows[0][0]
+ params = (date_unban, current_timestamp, record_id)
+ await self._execute_query(update_query, params)
+
+ self.logger.info(
+ f"Дата разбана обновлена в истории: user_id={user_id}, date_unban={date_unban}"
+ )
+ return True
+ except Exception as e:
+ self.logger.error(
+ f"Ошибка обновления даты разбана в истории для user_id={user_id}: {str(e)}"
+ )
+ return False
diff --git a/database/repositories/blacklist_repository.py b/database/repositories/blacklist_repository.py
index d02d6af..d66d514 100644
--- a/database/repositories/blacklist_repository.py
+++ b/database/repositories/blacklist_repository.py
@@ -14,7 +14,9 @@ class BlacklistRepository(DatabaseConnection):
message_for_user TEXT,
date_to_unban INTEGER,
created_at INTEGER DEFAULT (strftime('%s', 'now')),
- FOREIGN KEY (user_id) REFERENCES our_users (user_id) ON DELETE CASCADE
+ ban_author INTEGER,
+ FOREIGN KEY (user_id) REFERENCES our_users (user_id) ON DELETE CASCADE,
+ FOREIGN KEY (ban_author) REFERENCES our_users (user_id) ON DELETE SET NULL
)
'''
await self._execute_query(query)
@@ -23,10 +25,15 @@ class BlacklistRepository(DatabaseConnection):
async def add_user(self, blacklist_user: BlacklistUser) -> None:
"""Добавляет пользователя в черный список."""
query = """
- INSERT INTO blacklist (user_id, message_for_user, date_to_unban)
- VALUES (?, ?, ?)
+ INSERT INTO blacklist (user_id, message_for_user, date_to_unban, ban_author)
+ VALUES (?, ?, ?, ?)
"""
- params = (blacklist_user.user_id, blacklist_user.message_for_user, blacklist_user.date_to_unban)
+ params = (
+ blacklist_user.user_id,
+ blacklist_user.message_for_user,
+ blacklist_user.date_to_unban,
+ blacklist_user.ban_author,
+ )
await self._execute_query(query, params)
self.logger.info(f"Пользователь добавлен в черный список: user_id={blacklist_user.user_id}")
@@ -52,7 +59,11 @@ class BlacklistRepository(DatabaseConnection):
async def get_user(self, user_id: int) -> Optional[BlacklistUser]:
"""Возвращает информацию о пользователе в черном списке по user_id."""
- query = "SELECT user_id, message_for_user, date_to_unban, created_at FROM blacklist WHERE user_id = ?"
+ query = """
+ SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
+ FROM blacklist
+ WHERE user_id = ?
+ """
rows = await self._execute_query_with_result(query, (user_id,))
row = rows[0] if rows else None
@@ -61,40 +72,54 @@ class BlacklistRepository(DatabaseConnection):
user_id=row[0],
message_for_user=row[1],
date_to_unban=row[2],
- created_at=row[3]
+ created_at=row[3],
+ ban_author=row[4] if len(row) > 4 else None,
)
return None
async def get_all_users(self, offset: int = 0, limit: int = 10) -> List[BlacklistUser]:
"""Возвращает список пользователей в черном списке."""
- query = "SELECT user_id, message_for_user, date_to_unban, created_at FROM blacklist LIMIT ?, ?"
+ query = """
+ SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
+ FROM blacklist
+ LIMIT ?, ?
+ """
rows = await self._execute_query_with_result(query, (offset, limit))
users = []
for row in rows:
- users.append(BlacklistUser(
- user_id=row[0],
- message_for_user=row[1],
- date_to_unban=row[2],
- created_at=row[3]
- ))
+ users.append(
+ BlacklistUser(
+ user_id=row[0],
+ message_for_user=row[1],
+ date_to_unban=row[2],
+ created_at=row[3],
+ ban_author=row[4] if len(row) > 4 else None,
+ )
+ )
self.logger.info(f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {len(users)}")
return users
async def get_all_users_no_limit(self) -> List[BlacklistUser]:
"""Возвращает список всех пользователей в черном списке без лимитов."""
- query = "SELECT user_id, message_for_user, date_to_unban, created_at FROM blacklist"
+ query = """
+ SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
+ FROM blacklist
+ """
rows = await self._execute_query_with_result(query)
users = []
for row in rows:
- users.append(BlacklistUser(
- user_id=row[0],
- message_for_user=row[1],
- date_to_unban=row[2],
- created_at=row[3]
- ))
+ users.append(
+ BlacklistUser(
+ user_id=row[0],
+ message_for_user=row[1],
+ date_to_unban=row[2],
+ created_at=row[3],
+ ban_author=row[4] if len(row) > 4 else None,
+ )
+ )
self.logger.info(f"Получен список всех пользователей в черном списке: {len(users)}")
return users
diff --git a/database/repositories/post_repository.py b/database/repositories/post_repository.py
index ca90077..9c03c3a 100644
--- a/database/repositories/post_repository.py
+++ b/database/repositories/post_repository.py
@@ -17,6 +17,8 @@ class PostRepository(DatabaseConnection):
helper_text_message_id INTEGER,
author_id INTEGER,
created_at INTEGER NOT NULL,
+ status TEXT NOT NULL DEFAULT 'suggest',
+ is_anonymous INTEGER,
FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE
)
'''
@@ -51,13 +53,16 @@ class PostRepository(DatabaseConnection):
"""Добавление поста."""
if not post.created_at:
post.created_at = int(datetime.now().timestamp())
-
+ status = post.status if post.status else "suggest"
+ # Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None)
+ is_anonymous_int = None if post.is_anonymous is None else (1 if post.is_anonymous else 0)
+
query = """
- INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at)
- VALUES (?, ?, ?, ?)
+ INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status, is_anonymous)
+ VALUES (?, ?, ?, ?, ?, ?)
"""
- params = (post.message_id, post.text, post.author_id, post.created_at)
-
+ params = (post.message_id, post.text, post.author_id, post.created_at, status, is_anonymous_int)
+
await self._execute_query(query, params)
self.logger.info(f"Пост добавлен: message_id={post.message_id}")
@@ -65,7 +70,76 @@ class PostRepository(DatabaseConnection):
"""Обновление helper сообщения."""
query = "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?"
await self._execute_query(query, (helper_message_id, message_id))
-
+
+ async def update_status_by_message_id(self, message_id: int, status: str) -> int:
+ """Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк."""
+ conn = None
+ try:
+ conn = await self._get_connection()
+ await conn.execute(
+ "UPDATE post_from_telegram_suggest SET status = ? WHERE message_id = ?",
+ (status, message_id),
+ )
+ cur = await conn.execute("SELECT changes()")
+ row = await cur.fetchone()
+ n = row[0] if row else 0
+ await conn.commit()
+ if n == 0:
+ self.logger.warning(
+ f"update_status_by_message_id: 0 строк обновлено для message_id={message_id}, status={status}"
+ )
+ else:
+ self.logger.info(f"Статус поста message_id={message_id} обновлён на {status}")
+ return n
+ except Exception as e:
+ if conn:
+ await conn.rollback()
+ self.logger.error(f"Ошибка при обновлении статуса message_id={message_id}: {e}")
+ raise
+ finally:
+ if conn:
+ await conn.close()
+
+ async def update_status_for_media_group_by_helper_id(
+ self, helper_message_id: int, status: str
+ ) -> int:
+ """Обновление статуса постов медиагруппы по helper_text_message_id. Возвращает число обновлённых строк."""
+ conn = None
+ try:
+ conn = await self._get_connection()
+ await conn.execute(
+ """
+ UPDATE post_from_telegram_suggest
+ SET status = ?
+ WHERE message_id = ? OR helper_text_message_id = ?
+ """,
+ (status, helper_message_id, helper_message_id),
+ )
+ cur = await conn.execute("SELECT changes()")
+ row = await cur.fetchone()
+ n = row[0] if row else 0
+ await conn.commit()
+ if n == 0:
+ self.logger.warning(
+ f"update_status_for_media_group_by_helper_id: 0 строк обновлено "
+ f"для helper_message_id={helper_message_id}, status={status}"
+ )
+ else:
+ self.logger.info(
+ f"Статус медиагруппы helper_message_id={helper_message_id} обновлён на {status}"
+ )
+ return n
+ except Exception as e:
+ if conn:
+ await conn.rollback()
+ self.logger.error(
+ f"Ошибка при обновлении статуса медиагруппы helper_message_id={helper_message_id}: {e}"
+ )
+ raise
+ finally:
+ if conn:
+ await conn.close()
+
async def add_post_content(self, post_id: int, message_id: int, content_name: str, content_type: str) -> bool:
"""Добавление контента поста."""
try:
@@ -148,3 +222,33 @@ class PostRepository(DatabaseConnection):
self.logger.info(f"Получен author_id: {author_id} для helper_message_id={helper_message_id}")
return author_id
return None
+
+ async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> Tuple[Optional[str], Optional[bool]]:
+ """Получает текст и is_anonymous поста по message_id."""
+ query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?"
+ rows = await self._execute_query_with_result(query, (message_id,))
+ row = rows[0] if rows else None
+
+ if row:
+ text = row[0] or ""
+ is_anonymous_int = row[1]
+ # Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
+ is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
+ self.logger.info(f"Получены текст и is_anonymous для message_id={message_id}")
+ return text, is_anonymous
+ return None, None
+
+ async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> Tuple[Optional[str], Optional[bool]]:
+ """Получает текст и is_anonymous поста по helper_text_message_id."""
+ query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
+ rows = await self._execute_query_with_result(query, (helper_message_id,))
+ row = rows[0] if rows else None
+
+ if row:
+ text = row[0] or ""
+ is_anonymous_int = row[1]
+ # Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
+ is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
+ self.logger.info(f"Получены текст и is_anonymous для helper_message_id={helper_message_id}")
+ return text, is_anonymous
+ return None, None
diff --git a/database/repository_factory.py b/database/repository_factory.py
index d5e5d34..154e611 100644
--- a/database/repository_factory.py
+++ b/database/repository_factory.py
@@ -1,6 +1,7 @@
from typing import Optional
from database.repositories.user_repository import UserRepository
from database.repositories.blacklist_repository import BlacklistRepository
+from database.repositories.blacklist_history_repository import BlacklistHistoryRepository
from database.repositories.message_repository import MessageRepository
from database.repositories.post_repository import PostRepository
from database.repositories.admin_repository import AdminRepository
@@ -14,6 +15,7 @@ class RepositoryFactory:
self.db_path = db_path
self._user_repo: Optional[UserRepository] = None
self._blacklist_repo: Optional[BlacklistRepository] = None
+ self._blacklist_history_repo: Optional[BlacklistHistoryRepository] = None
self._message_repo: Optional[MessageRepository] = None
self._post_repo: Optional[PostRepository] = None
self._admin_repo: Optional[AdminRepository] = None
@@ -33,6 +35,13 @@ class RepositoryFactory:
self._blacklist_repo = BlacklistRepository(self.db_path)
return self._blacklist_repo
+ @property
+ def blacklist_history(self) -> BlacklistHistoryRepository:
+ """Возвращает репозиторий истории банов/разбанов."""
+ if self._blacklist_history_repo is None:
+ self._blacklist_history_repo = BlacklistHistoryRepository(self.db_path)
+ return self._blacklist_history_repo
+
@property
def messages(self) -> MessageRepository:
"""Возвращает репозиторий сообщений."""
@@ -65,6 +74,7 @@ class RepositoryFactory:
"""Создает все таблицы в базе данных."""
await self.users.create_tables()
await self.blacklist.create_tables()
+ await self.blacklist_history.create_tables()
await self.messages.create_tables()
await self.posts.create_tables()
await self.admins.create_tables()
diff --git a/database/schema.sql b/database/schema.sql
index 7cb4afd..da9e9aa 100644
--- a/database/schema.sql
+++ b/database/schema.sql
@@ -40,6 +40,20 @@ CREATE TABLE IF NOT EXISTS blacklist (
FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE
);
+-- Blacklist history for tracking all ban/unban events
+CREATE TABLE IF NOT EXISTS blacklist_history (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ message_for_user TEXT,
+ date_ban INTEGER NOT NULL,
+ date_unban INTEGER,
+ ban_author INTEGER,
+ created_at INTEGER DEFAULT (strftime('%s', 'now')),
+ updated_at INTEGER DEFAULT (strftime('%s', 'now')),
+ FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
+ FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
+);
+
-- User message history
CREATE TABLE IF NOT EXISTS user_messages (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
@@ -57,6 +71,8 @@ CREATE TABLE IF NOT EXISTS post_from_telegram_suggest (
helper_text_message_id INTEGER,
author_id INTEGER,
created_at INTEGER NOT NULL,
+ status TEXT NOT NULL DEFAULT 'suggest',
+ is_anonymous INTEGER,
FOREIGN KEY (author_id) REFERENCES our_users(user_id) ON DELETE CASCADE
);
@@ -107,6 +123,9 @@ CREATE INDEX IF NOT EXISTS idx_audio_message_reference_author_id ON audio_messag
CREATE INDEX IF NOT EXISTS idx_user_messages_user_id ON user_messages(user_id);
CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_author_id ON post_from_telegram_suggest(author_id);
CREATE INDEX IF NOT EXISTS idx_blacklist_date_to_unban ON blacklist(date_to_unban);
+CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id);
+CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban);
+CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban);
CREATE INDEX IF NOT EXISTS idx_user_messages_date ON user_messages(date);
CREATE INDEX IF NOT EXISTS idx_audio_message_reference_date ON audio_message_reference(date_added);
CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_date ON post_from_telegram_suggest(created_at);
diff --git a/helper_bot/handlers/admin/admin_handlers.py b/helper_bot/handlers/admin/admin_handlers.py
index b9b229a..9d8d8f4 100644
--- a/helper_bot/handlers/admin/admin_handlers.py
+++ b/helper_bot/handlers/admin/admin_handlers.py
@@ -359,7 +359,8 @@ async def confirm_ban(
user_id=user_data['target_user_id'],
username=user_data['target_username'],
reason=user_data['ban_reason'],
- ban_days=user_data['ban_days']
+ ban_days=user_data['ban_days'],
+ ban_author_id=message.from_user.id,
)
safe_username = escape_html(user_data['target_username'])
diff --git a/helper_bot/handlers/admin/services.py b/helper_bot/handlers/admin/services.py
index 55fe57b..df3f3eb 100644
--- a/helper_bot/handlers/admin/services.py
+++ b/helper_bot/handlers/admin/services.py
@@ -117,7 +117,7 @@ class AdminService:
@track_time("ban_user", "admin_service")
@track_errors("admin_service", "ban_user")
- async def ban_user(self, user_id: int, username: str, reason: str, ban_days: Optional[int]) -> None:
+ async def ban_user(self, user_id: int, username: str, reason: str, ban_days: Optional[int], ban_author_id: int) -> None:
"""Заблокировать пользователя"""
try:
# Проверяем, не заблокирован ли уже пользователь
@@ -130,7 +130,7 @@ class AdminService:
date_to_unban = add_days_to_date(ban_days)
# Сохраняем в БД (username больше не передается, так как не используется в новой схеме)
- await self.bot_db.set_user_blacklist(user_id, None, reason, date_to_unban)
+ await self.bot_db.set_user_blacklist(user_id, None, reason, date_to_unban, ban_author=ban_author_id)
logger.info(f"Пользователь {user_id} ({username}) заблокирован. Причина: {reason}, срок: {ban_days} дней")
diff --git a/helper_bot/handlers/callback/callback_handlers.py b/helper_bot/handlers/callback/callback_handlers.py
index f9c073e..198a9b2 100644
--- a/helper_bot/handlers/callback/callback_handlers.py
+++ b/helper_bot/handlers/callback/callback_handlers.py
@@ -13,6 +13,7 @@ from helper_bot.handlers.voice.services import AudioFileService
from helper_bot.keyboards.keyboards import create_keyboard_with_pagination, get_reply_keyboard_admin, \
create_keyboard_for_ban_reason
from helper_bot.utils.helper_func import get_banned_users_list, get_banned_users_buttons
+from helper_bot.handlers.admin.utils import format_user_info
from helper_bot.utils.base_dependency_factory import get_global_instance
from .dependency_factory import get_post_publish_service, get_ban_service
from .exceptions import UserBlockedBotError, PostNotFoundError, UserNotFoundError, PublishError, BanError
@@ -125,7 +126,7 @@ async def ban_user_from_post(call: CallbackQuery, **kwargs):
@callback_router.callback_query(F.data.contains(CALLBACK_BAN))
@track_time("process_ban_user", "callback_handlers")
@track_errors("callback_handlers", "process_ban_user")
-async def process_ban_user(call: CallbackQuery, state: FSMContext, **kwargs):
+async def process_ban_user(call: CallbackQuery, state: FSMContext, bot_db: MagicData("bot_db"), **kwargs):
ban_service = get_ban_service()
# TODO: переделать на MagicData
user_id = call.data[4:]
@@ -140,17 +141,33 @@ async def process_ban_user(call: CallbackQuery, state: FSMContext, **kwargs):
return
try:
- user_name = await ban_service.ban_user(str(user_id_int), "")
- await state.update_data(user_id=user_id_int, user_name=user_name, message_for_user=None, date_to_unban=None)
+ # Получаем username пользователя
+ username = await ban_service.ban_user(str(user_id_int), "")
+ if not username:
+ raise UserNotFoundError(f"Пользователь с ID {user_id_int} не найден в базе")
+
+ # Получаем full_name пользователя из базы данных
+ full_name = await bot_db.get_full_name_by_id(user_id_int)
+ if not full_name:
+ full_name = 'Неизвестно'
+
+ # Сохраняем данные в формате, совместимом с admin_handlers
+ await state.update_data(
+ target_user_id=user_id_int,
+ target_username=username,
+ target_full_name=full_name
+ )
+
+ # Используем единый формат отображения информации о пользователе
+ user_info = format_user_info(user_id_int, username, full_name)
markup = create_keyboard_for_ban_reason()
- user_name_escaped = html.escape(str(user_name))
- full_name_escaped = html.escape(str(call.message.from_user.full_name))
await call.message.answer(
- text=f"Выбран пользователь:\nid: {user_id_int}\nusername: {user_name_escaped}\nИмя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
+ text=f"{user_info}\n\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup
)
- await state.set_state('BAN_2')
+ await state.set_state('AWAIT_BAN_DETAILS')
+ logger.info(f"process_ban_user: Состояние изменено на AWAIT_BAN_DETAILS для пользователя {user_id_int}")
except UserNotFoundError:
markup = get_reply_keyboard_admin()
await call.message.answer(text='Пользователь с таким ID не найден в базе', reply_markup=markup)
diff --git a/helper_bot/handlers/callback/services.py b/helper_bot/handlers/callback/services.py
index 229eea3..c8ce08f 100644
--- a/helper_bot/handlers/callback/services.py
+++ b/helper_bot/handlers/callback/services.py
@@ -8,7 +8,7 @@ from aiogram.types import CallbackQuery
from helper_bot.utils.helper_func import (
send_text_message, send_photo_message, send_video_message,
send_video_note_message, send_audio_message, send_voice_message,
- send_media_group_to_channel, delete_user_blacklist
+ send_media_group_to_channel, delete_user_blacklist, get_text_message
)
from helper_bot.keyboards.keyboards import create_keyboard_for_ban_reason
from .exceptions import (
@@ -78,10 +78,27 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_text_post")
async def _publish_text_post(self, call: CallbackQuery) -> None:
"""Публикация текстового поста"""
- text_post = html.escape(str(call.message.text))
author_id = await self._get_author_id(call.message.message_id)
+
+ updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
+ if updated_rows == 0:
+ logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
+ raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
- await send_text_message(self.main_public, call.message, text_post)
+ # Получаем сырой текст и is_anonymous из базы
+ raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
+ if raw_text is None:
+ raw_text = ""
+
+ # Получаем данные автора
+ user = await self.db.get_user_by_id(author_id)
+ if not user:
+ raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
+
+ # Формируем финальный текст с учетом is_anonymous
+ formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
+
+ await send_text_message(self.main_public, call.message, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Текст сообщения опубликован в канале {self.main_public}.')
@@ -89,10 +106,27 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_photo_post")
async def _publish_photo_post(self, call: CallbackQuery) -> None:
"""Публикация поста с фото"""
- text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
+
+ updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
+ if updated_rows == 0:
+ logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
+ raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
- await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, text_post_with_photo)
+ # Получаем сырой текст и is_anonymous из базы
+ raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
+ if raw_text is None:
+ raw_text = ""
+
+ # Получаем данные автора
+ user = await self.db.get_user_by_id(author_id)
+ if not user:
+ raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
+
+ # Формируем финальный текст с учетом is_anonymous
+ formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
+
+ await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с фото опубликован в канале {self.main_public}.')
@@ -100,10 +134,27 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_video_post")
async def _publish_video_post(self, call: CallbackQuery) -> None:
"""Публикация поста с видео"""
- text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
+
+ updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
+ if updated_rows == 0:
+ logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
+ raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
- await send_video_message(self.main_public, call.message, call.message.video.file_id, text_post_with_photo)
+ # Получаем сырой текст и is_anonymous из базы
+ raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
+ if raw_text is None:
+ raw_text = ""
+
+ # Получаем данные автора
+ user = await self.db.get_user_by_id(author_id)
+ if not user:
+ raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
+
+ # Формируем финальный текст с учетом is_anonymous
+ formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
+
+ await send_video_message(self.main_public, call.message, call.message.video.file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с видео опубликован в канале {self.main_public}.')
@@ -112,6 +163,11 @@ class PostPublishService:
async def _publish_video_note_post(self, call: CallbackQuery) -> None:
"""Публикация поста с кружком"""
author_id = await self._get_author_id(call.message.message_id)
+
+ updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
+ if updated_rows == 0:
+ logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
+ raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_video_note_message(self.main_public, call.message, call.message.video_note.file_id)
await self._delete_post_and_notify_author(call, author_id)
@@ -121,10 +177,27 @@ class PostPublishService:
@track_errors("post_publish_service", "_publish_audio_post")
async def _publish_audio_post(self, call: CallbackQuery) -> None:
"""Публикация поста с аудио"""
- text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
+
+ updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
+ if updated_rows == 0:
+ logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
+ raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
- await send_audio_message(self.main_public, call.message, call.message.audio.file_id, text_post_with_photo)
+ # Получаем сырой текст и is_anonymous из базы
+ raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_message_id(call.message.message_id)
+ if raw_text is None:
+ raw_text = ""
+
+ # Получаем данные автора
+ user = await self.db.get_user_by_id(author_id)
+ if not user:
+ raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
+
+ # Формируем финальный текст с учетом is_anonymous
+ formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
+
+ await send_audio_message(self.main_public, call.message, call.message.audio.file_id, formatted_text)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с аудио опубликован в канале {self.main_public}.')
@@ -133,6 +206,11 @@ class PostPublishService:
async def _publish_voice_post(self, call: CallbackQuery) -> None:
"""Публикация поста с войсом"""
author_id = await self._get_author_id(call.message.message_id)
+
+ updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
+ if updated_rows == 0:
+ logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
+ raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_voice_message(self.main_public, call.message, call.message.voice.file_id)
await self._delete_post_and_notify_author(call, author_id)
@@ -155,11 +233,12 @@ class PostPublishService:
logger.error(f"Контент медиагруппы не найден в базе данных для helper_message_id: {helper_message_id}")
raise PublishError("Контент медиагруппы не найден в базе данных")
- # Получаем текст поста по helper_message_id
- logger.debug(f"Получаю текст поста для helper_message_id: {helper_message_id}")
- pre_text = await self.db.get_post_text_by_helper_id(helper_message_id)
- post_text = html.escape(str(pre_text)) if pre_text else ""
- logger.debug(f"Текст поста получен: {'пустой' if not post_text else f'длина: {len(post_text)} символов'}")
+ # Получаем сырой текст и is_anonymous по helper_message_id
+ logger.debug(f"Получаю текст и is_anonymous поста для helper_message_id: {helper_message_id}")
+ raw_text, is_anonymous = await self.db.get_post_text_and_anonymity_by_helper_id(helper_message_id)
+ if raw_text is None:
+ raw_text = ""
+ logger.debug(f"Текст поста получен: {'пустой' if not raw_text else f'длина: {len(raw_text)} символов'}, is_anonymous={is_anonymous}")
# Получаем ID автора по helper_message_id
logger.debug(f"Получаю ID автора для helper_message_id: {helper_message_id}")
@@ -169,15 +248,25 @@ class PostPublishService:
raise PostNotFoundError(f"Автор не найден для медиагруппы {helper_message_id}")
logger.debug(f"ID автора получен: {author_id}")
+ # Получаем данные автора
+ user = await self.db.get_user_by_id(author_id)
+ if not user:
+ raise PostNotFoundError(f"Пользователь {author_id} не найден в базе данных")
+
+ # Формируем финальный текст с учетом is_anonymous
+ formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous)
+ logger.debug(f"Сформирован финальный текст: {'пустой' if not formatted_text else f'длина: {len(formatted_text)} символов'}")
+
# Отправляем медиагруппу в канал
logger.info(f"Отправляю медиагруппу в канал {self.main_public}")
await send_media_group_to_channel(
bot=self._get_bot(call.message),
chat_id=self.main_public,
post_content=post_content,
- post_text=post_text
+ post_text=formatted_text
)
-
+
+ await self.db.update_status_for_media_group_by_helper_id(helper_message_id, "approved")
logger.debug(f"Удаляю медиагруппу и уведомляю автора {author_id}")
await self._delete_media_group_and_notify_author(call, author_id)
logger.info(f'Медиагруппа опубликована в канале {self.main_public}.')
@@ -190,10 +279,8 @@ class PostPublishService:
@track_errors("post_publish_service", "decline_post")
async def decline_post(self, call: CallbackQuery) -> None:
"""Отклонение поста"""
- logger.info(f"Начинаю отклонение поста. Message ID: {call.message.message_id}, Content type: {call.message.content_type}")
# Проверяем, является ли сообщение частью медиагруппы (осознанный костыль, т.к. сообщение к которому прикреплен коллбек без медиагруппы)
if call.message.text == CONTENT_TYPE_MEDIA_GROUP:
- logger.debug("Сообщение является частью медиагруппы, вызываю _decline_media_group")
await self._decline_media_group(call)
return
@@ -201,7 +288,6 @@ class PostPublishService:
if content_type in [CONTENT_TYPE_TEXT, CONTENT_TYPE_PHOTO, CONTENT_TYPE_AUDIO,
CONTENT_TYPE_VOICE, CONTENT_TYPE_VIDEO, CONTENT_TYPE_VIDEO_NOTE]:
- logger.debug(f"Отклоняю одиночный пост типа: {content_type}")
await self._decline_single_post(call)
else:
logger.error(f"Неподдерживаемый тип контента для отклонения: {content_type}")
@@ -211,15 +297,16 @@ class PostPublishService:
@track_errors("post_publish_service", "_decline_single_post")
async def _decline_single_post(self, call: CallbackQuery) -> None:
"""Отклонение одиночного поста"""
- logger.debug(f"Отклоняю одиночный пост. Message ID: {call.message.message_id}")
author_id = await self._get_author_id(call.message.message_id)
- logger.debug(f"ID автора получен: {author_id}")
+
+ updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "declined")
+ if updated_rows == 0:
+ logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'declined'")
+ raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
- logger.debug(f"Удаляю сообщение из группы {self.group_for_posts}")
await self._get_bot(call.message).delete_message(chat_id=self.group_for_posts, message_id=call.message.message_id)
try:
- logger.debug(f"Отправляю уведомление об отклонении автору {author_id}")
await send_text_message(author_id, call.message, MESSAGE_POST_DECLINED)
except Exception as e:
if str(e) == ERROR_BOT_BLOCKED:
@@ -234,16 +321,16 @@ class PostPublishService:
@track_media_processing("media_group")
async def _decline_media_group(self, call: CallbackQuery) -> None:
"""Отклонение медиагруппы"""
- logger.debug(f"Отклоняю медиагруппу. Helper message ID: {call.message.message_id}")
-
+ await self.db.update_status_for_media_group_by_helper_id(call.message.message_id, "declined")
+
post_ids = await self.db.get_post_ids_from_telegram_by_last_id(call.message.message_id)
message_ids = post_ids.copy()
message_ids.append(call.message.message_id)
logger.debug(f"Получены ID сообщений для удаления: {message_ids}")
-
+
author_id = await self._get_author_id_for_media_group(call.message.message_id)
logger.debug(f"ID автора медиагруппы получен: {author_id}")
-
+
logger.debug(f"Удаляю {len(message_ids)} сообщений из группы {self.group_for_posts}")
await self._get_bot(call.message).delete_messages(chat_id=self.group_for_posts, message_ids=message_ids)
@@ -352,11 +439,14 @@ class BanService:
current_date = datetime.now()
date_to_unban = int((current_date + timedelta(days=7)).timestamp())
+ ban_author_id = call.from_user.id
+
await self.db.set_user_blacklist(
user_id=author_id,
user_name=None,
message_for_user="Спам",
- date_to_unban=date_to_unban
+ date_to_unban=date_to_unban,
+ ban_author=ban_author_id,
)
await self._get_bot(call.message).delete_message(chat_id=self.group_for_posts, message_id=call.message.message_id)
diff --git a/helper_bot/handlers/private/services.py b/helper_bot/handlers/private/services.py
index 32955b7..27bad6b 100644
--- a/helper_bot/handlers/private/services.py
+++ b/helper_bot/handlers/private/services.py
@@ -13,11 +13,13 @@ from dataclasses import dataclass
from aiogram import types
from aiogram.types import FSInputFile
from database.models import TelegramPost, User
+from logs.custom_logger import logger
# Local imports - utilities
from helper_bot.utils.helper_func import (
get_first_name,
get_text_message,
+ determine_anonymity,
send_text_message,
send_photo_message,
send_media_group_message_to_private_chat,
@@ -154,11 +156,17 @@ class PostService:
markup = get_reply_keyboard_for_post()
sent_message_id = await send_text_message(self.settings.group_for_posts, message, post_text, markup)
+
+ # Сохраняем сырой текст и определяем анонимность
+ raw_text = message.text or ""
+ is_anonymous = determine_anonymity(raw_text)
+
post = TelegramPost(
message_id=sent_message_id,
- text=message.text,
+ text=raw_text,
author_id=message.from_user.id,
- created_at=int(datetime.now().timestamp())
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=is_anonymous
)
await self.db.add_post(post)
@@ -176,11 +184,16 @@ class PostService:
self.settings.group_for_posts, message, message.photo[-1].file_id, post_caption, markup
)
+ # Сохраняем сырой caption и определяем анонимность
+ raw_caption = message.caption or ""
+ is_anonymous = determine_anonymity(raw_caption)
+
post = TelegramPost(
message_id=sent_message.message_id,
- text=sent_message.caption or "",
+ text=raw_caption,
author_id=message.from_user.id,
- created_at=int(datetime.now().timestamp())
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -201,11 +214,16 @@ class PostService:
self.settings.group_for_posts, message, message.video.file_id, post_caption, markup
)
+ # Сохраняем сырой caption и определяем анонимность
+ raw_caption = message.caption or ""
+ is_anonymous = determine_anonymity(raw_caption)
+
post = TelegramPost(
message_id=sent_message.message_id,
- text=sent_message.caption or "",
+ text=raw_caption,
author_id=message.from_user.id,
- created_at=int(datetime.now().timestamp())
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -222,11 +240,16 @@ class PostService:
self.settings.group_for_posts, message, message.video_note.file_id, markup
)
+ # Сохраняем пустую строку, так как video_note не имеет caption
+ raw_caption = ""
+ is_anonymous = determine_anonymity(raw_caption)
+
post = TelegramPost(
message_id=sent_message.message_id,
- text=sent_message.caption or "",
+ text=raw_caption,
author_id=message.from_user.id,
- created_at=int(datetime.now().timestamp())
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -247,11 +270,16 @@ class PostService:
self.settings.group_for_posts, message, message.audio.file_id, post_caption, markup
)
+ # Сохраняем сырой caption и определяем анонимность
+ raw_caption = message.caption or ""
+ is_anonymous = determine_anonymity(raw_caption)
+
post = TelegramPost(
message_id=sent_message.message_id,
- text=sent_message.caption or "",
+ text=raw_caption,
author_id=message.from_user.id,
- created_at=int(datetime.now().timestamp())
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -268,11 +296,16 @@ class PostService:
self.settings.group_for_posts, message, message.voice.file_id, markup
)
+ # Сохраняем пустую строку, так как voice не имеет caption
+ raw_caption = ""
+ is_anonymous = determine_anonymity(raw_caption)
+
post = TelegramPost(
message_id=sent_message.message_id,
- text=sent_message.caption or "",
+ text=raw_caption,
author_id=message.from_user.id,
- created_at=int(datetime.now().timestamp())
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=is_anonymous
)
await self.db.add_post(post)
success = await add_in_db_media(sent_message, self.db)
@@ -285,17 +318,24 @@ class PostService:
@track_media_processing("media_group")
async def handle_media_group_post(self, message: types.Message, album: list, first_name: str) -> None:
"""Handle media group post submission"""
+ #TODO: Мне кажется тут какая-то дичь с одинаковыми переменными, в которых post_caption никуда не ведет
post_caption = " "
+ raw_caption = ""
if album and album[0].caption:
+ raw_caption = album[0].caption or ""
post_caption = get_text_message(album[0].caption.lower(), first_name, message.from_user.username)
+ # Определяем анонимность на основе сырого caption
+ is_anonymous = determine_anonymity(raw_caption)
+
# Создаем основной пост для медиагруппы
main_post = TelegramPost(
message_id=message.message_id, # ID основного сообщения медиагруппы
- text=post_caption,
+ text=raw_caption,
author_id=message.from_user.id,
- created_at=int(datetime.now().timestamp())
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=is_anonymous
)
await self.db.add_post(main_post)
diff --git a/helper_bot/utils/helper_func.py b/helper_bot/utils/helper_func.py
index 078bb1b..8e3dad7 100644
--- a/helper_bot/utils/helper_func.py
+++ b/helper_bot/utils/helper_func.py
@@ -85,14 +85,44 @@ def get_first_name(message: types.Message) -> str:
return ""
-def get_text_message(post_text: str, first_name: str, username: str = None):
+def determine_anonymity(post_text: str) -> bool:
"""
- Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
+ Определяет, является ли пост анонимным на основе ключевых слов в тексте.
+
+ Args:
+ post_text: Текст сообщения
+
+ Returns:
+ bool: True, если "анон" в тексте; False, если "неанон" или "не анон" в тексте;
+ False по умолчанию (если нет ключевых слов)
+ """
+ if not post_text:
+ return False
+
+ post_text_lower = post_text.lower()
+
+ # Сначала проверяем "неанон" или "не анон" (более специфичное условие)
+ if "неанон" in post_text_lower or "не анон" in post_text_lower:
+ return False
+
+ # Проверяем "анон"
+ if "анон" in post_text_lower:
+ return True
+
+ # По умолчанию False
+ return False
+
+
+def get_text_message(post_text: str, first_name: str, username: str = None, is_anonymous: Optional[bool] = None):
+ """
+ Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон"
+ или переданного параметра is_anonymous.
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста (может быть None)
+ is_anonymous: Флаг анонимности (True - анонимно, False - не анонимно, None - legacy, определяется по тексту)
Returns:
str: - Сформированный текст сообщения.
@@ -109,12 +139,21 @@ def get_text_message(post_text: str, first_name: str, username: str = None):
else:
author_info = f"{first_name} (Ник не указан)"
- if "неанон" in post_text or "не анон" in post_text:
- return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
- elif "анон" in post_text:
- return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно'
+ # Если передан is_anonymous, используем его, иначе определяем по тексту (legacy)
+ # TODO: Уверен можно укоротить
+ if is_anonymous is not None:
+ if is_anonymous:
+ return f'{safe_post_text}\n\nПост опубликован анонимно'
+ else:
+ return f'{safe_post_text}\n\nАвтор поста: {author_info}'
else:
- return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
+ # Legacy: определяем по тексту
+ if "неанон" in post_text or "не анон" in post_text:
+ return f'{safe_post_text}\n\nАвтор поста: {author_info}'
+ elif "анон" in post_text:
+ return f'{safe_post_text}\n\nПост опубликован анонимно'
+ else:
+ return f'{safe_post_text}\n\nАвтор поста: {author_info}'
@track_time("download_file", "helper_func")
@track_errors("helper_func", "download_file")
diff --git a/requirements.txt b/requirements.txt
index b4721a1..505c65c 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -21,7 +21,7 @@ aiohttp==3.9.1
# Network stability improvements
aiohttp[speedups]>=3.9.1
aiodns>=3.0.0
-cchardet>=2.1.7
+charset-normalizer>=3.0.0
# Development tools
pluggy==1.5.0
diff --git a/scripts/add_ban_author_column_to_blacklist.py b/scripts/add_ban_author_column_to_blacklist.py
new file mode 100644
index 0000000..3513a77
--- /dev/null
+++ b/scripts/add_ban_author_column_to_blacklist.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python3
+"""
+Скрипт миграции для добавления колонки ban_author в таблицу blacklist.
+Колонка хранит user_id администратора, инициировавшего бан.
+"""
+import argparse
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+import aiosqlite
+
+project_root = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(project_root))
+
+from logs.custom_logger import logger # noqa: E402
+
+DEFAULT_DB_PATH = "database/tg-bot-database.db"
+
+
+def _column_exists(rows: list, name: str) -> bool:
+ """PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk)."""
+ for row in rows:
+ if row[1] == name:
+ return True
+ return False
+
+
+async def main(db_path: str) -> None:
+ db_path = os.path.abspath(db_path)
+ if not os.path.exists(db_path):
+ logger.error("База данных не найдена: %s", db_path)
+ print(f"Ошибка: база данных не найдена: {db_path}")
+ return
+
+ async with aiosqlite.connect(db_path) as conn:
+ await conn.execute("PRAGMA foreign_keys = ON")
+
+ # Проверяем наличие колонки ban_author
+ cursor = await conn.execute("PRAGMA table_info(blacklist)")
+ rows = await cursor.fetchall()
+ await cursor.close()
+
+ if not _column_exists(rows, "ban_author"):
+ logger.info("Добавление колонки ban_author в blacklist")
+ await conn.execute(
+ "ALTER TABLE blacklist "
+ "ADD COLUMN ban_author INTEGER REFERENCES our_users (user_id) ON DELETE SET NULL"
+ )
+ await conn.commit()
+ print("Колонка ban_author добавлена в таблицу blacklist.")
+ else:
+ print("Колонка ban_author уже существует в таблице blacklist.")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Добавление колонки ban_author в blacklist"
+ )
+ parser.add_argument(
+ "--db",
+ default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
+ help="Путь к БД (или DB_PATH)",
+ )
+ args = parser.parse_args()
+ asyncio.run(main(args.db))
+
diff --git a/scripts/add_is_anonymous_column.py b/scripts/add_is_anonymous_column.py
new file mode 100755
index 0000000..6e631fe
--- /dev/null
+++ b/scripts/add_is_anonymous_column.py
@@ -0,0 +1,138 @@
+#!/usr/bin/env python3
+"""
+Скрипт миграции для добавления колонки is_anonymous в таблицу post_from_telegram_suggest.
+Для существующих записей определяет is_anonymous на основе текста или устанавливает NULL.
+"""
+import argparse
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+project_root = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(project_root))
+
+import aiosqlite
+
+from logs.custom_logger import logger
+from helper_bot.utils.helper_func import determine_anonymity
+
+DEFAULT_DB_PATH = "database/tg-bot-database.db"
+
+
+def _column_exists(rows: list, name: str) -> bool:
+ """PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk)."""
+ for row in rows:
+ if row[1] == name:
+ return True
+ return False
+
+
+async def main(db_path: str) -> None:
+ db_path = os.path.abspath(db_path)
+ if not os.path.exists(db_path):
+ logger.error("База данных не найдена: %s", db_path)
+ print(f"Ошибка: база данных не найдена: {db_path}")
+ return
+
+ async with aiosqlite.connect(db_path) as conn:
+ await conn.execute("PRAGMA foreign_keys = ON")
+
+ # Проверяем наличие колонки is_anonymous
+ cursor = await conn.execute(
+ "PRAGMA table_info(post_from_telegram_suggest)"
+ )
+ rows = await cursor.fetchall()
+ await cursor.close()
+
+ if not _column_exists(rows, "is_anonymous"):
+ logger.info("Добавление колонки is_anonymous в post_from_telegram_suggest")
+ await conn.execute(
+ "ALTER TABLE post_from_telegram_suggest "
+ "ADD COLUMN is_anonymous INTEGER"
+ )
+ await conn.commit()
+ print("Колонка is_anonymous добавлена.")
+ else:
+ print("Колонка is_anonymous уже существует.")
+
+ # Получаем все записи с текстом для обновления
+ cursor = await conn.execute(
+ "SELECT message_id, text FROM post_from_telegram_suggest WHERE text IS NOT NULL"
+ )
+ posts = await cursor.fetchall()
+ await cursor.close()
+
+ updated_count = 0
+ null_count = 0
+
+ # Обновляем каждую запись
+ for message_id, text in posts:
+ try:
+ # Определяем is_anonymous на основе текста
+ # Если текст пустой или None, устанавливаем NULL (legacy)
+ if not text or not text.strip():
+ is_anonymous = None
+ else:
+ is_anonymous = determine_anonymity(text)
+
+ # Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None)
+ is_anonymous_int = None if is_anonymous is None else (1 if is_anonymous else 0)
+
+ await conn.execute(
+ "UPDATE post_from_telegram_suggest SET is_anonymous = ? WHERE message_id = ?",
+ (is_anonymous_int, message_id)
+ )
+
+ if is_anonymous is not None:
+ updated_count += 1
+ else:
+ null_count += 1
+
+ except Exception as e:
+ logger.error(f"Ошибка при обработке поста message_id={message_id}: {e}")
+ # В случае ошибки устанавливаем NULL
+ await conn.execute(
+ "UPDATE post_from_telegram_suggest SET is_anonymous = NULL WHERE message_id = ?",
+ (message_id,)
+ )
+ null_count += 1
+
+ # Обновляем записи без текста (устанавливаем NULL)
+ cursor = await conn.execute(
+ "SELECT COUNT(*) FROM post_from_telegram_suggest WHERE text IS NULL"
+ )
+ row = await cursor.fetchone()
+ posts_without_text = row[0] if row else 0
+ await cursor.close()
+
+ if posts_without_text > 0:
+ await conn.execute(
+ "UPDATE post_from_telegram_suggest SET is_anonymous = NULL WHERE text IS NULL"
+ )
+ null_count += posts_without_text
+
+ await conn.commit()
+
+ total_updated = updated_count + null_count
+ logger.info(
+ f"Миграция завершена. Обновлено записей: {total_updated} "
+ f"(определено: {updated_count}, установлено NULL: {null_count})"
+ )
+ print(f"Миграция завершена.")
+ print(f"Обновлено записей: {total_updated}")
+ print(f" - Определено is_anonymous: {updated_count}")
+ print(f" - Установлено NULL: {null_count}")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Добавление колонки is_anonymous в post_from_telegram_suggest"
+ )
+ parser.add_argument(
+ "--db",
+ default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
+ help="Путь к БД (или DB_PATH)",
+ )
+ args = parser.parse_args()
+ asyncio.run(main(args.db))
diff --git a/scripts/backfill_post_status_legacy.py b/scripts/backfill_post_status_legacy.py
new file mode 100644
index 0000000..1b7580a
--- /dev/null
+++ b/scripts/backfill_post_status_legacy.py
@@ -0,0 +1,82 @@
+#!/usr/bin/env python3
+"""
+Скрипт для проставления status='legacy' всем существующим записям в post_from_telegram_suggest.
+Добавляет колонку status, если её нет, затем обновляет все строки.
+"""
+import argparse
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+project_root = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(project_root))
+
+import aiosqlite
+
+from logs.custom_logger import logger
+
+DEFAULT_DB_PATH = "database/tg-bot-database.db"
+
+
+def _column_exists(rows: list, name: str) -> bool:
+ """PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk)."""
+ for row in rows:
+ if row[1] == name:
+ return True
+ return False
+
+
+async def main(db_path: str) -> None:
+ db_path = os.path.abspath(db_path)
+ if not os.path.exists(db_path):
+ logger.error("База данных не найдена: %s", db_path)
+ print(f"Ошибка: база данных не найдена: {db_path}")
+ return
+
+ async with aiosqlite.connect(db_path) as conn:
+ await conn.execute("PRAGMA foreign_keys = ON")
+
+ # Проверяем наличие колонки status
+ cursor = await conn.execute(
+ "PRAGMA table_info(post_from_telegram_suggest)"
+ )
+ rows = await cursor.fetchall()
+ await cursor.close()
+
+ if not _column_exists(rows, "status"):
+ logger.info("Добавление колонки status в post_from_telegram_suggest")
+ await conn.execute(
+ "ALTER TABLE post_from_telegram_suggest "
+ "ADD COLUMN status TEXT NOT NULL DEFAULT 'suggest'"
+ )
+ await conn.commit()
+ print("Колонка status добавлена.")
+ else:
+ print("Колонка status уже существует.")
+
+ # Обновляем все существующие записи на legacy
+ await conn.execute(
+ "UPDATE post_from_telegram_suggest SET status = 'legacy'"
+ )
+ await conn.commit()
+ cursor = await conn.execute("SELECT changes()")
+ row = await cursor.fetchone()
+ updated = row[0] if row else 0
+ await cursor.close()
+
+ logger.info("Обновлено записей в post_from_telegram_suggest: %d", updated)
+ print(f"Обновлено записей: {updated}")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Backfill status='legacy' для post_from_telegram_suggest"
+ )
+ parser.add_argument(
+ "--db",
+ default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
+ help="Путь к БД (или DB_PATH)",
+ )
+ args = parser.parse_args()
+ asyncio.run(main(args.db))
diff --git a/scripts/clean_post_text.py b/scripts/clean_post_text.py
new file mode 100755
index 0000000..2e1b1a3
--- /dev/null
+++ b/scripts/clean_post_text.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python3
+"""
+Скрипт для приведения текста постов к "сырому" виду.
+Удаляет форматирование, добавленное функцией get_text_message(), оставляя только исходный текст.
+"""
+import argparse
+import asyncio
+import html
+import os
+import re
+import sys
+from pathlib import Path
+
+project_root = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(project_root))
+
+import aiosqlite
+
+from logs.custom_logger import logger
+
+DEFAULT_DB_PATH = "database/tg-bot-database.db"
+
+# Паттерны для определения форматированного текста
+PREFIX = "Пост из ТГ:\n"
+ANONYMOUS_SUFFIX = "\n\nПост опубликован анонимно"
+AUTHOR_SUFFIX_PATTERN = re.compile(r"\n\nАвтор поста: .+$")
+
+
+def extract_raw_text(formatted_text: str) -> str:
+ """
+ Извлекает сырой текст из форматированного текста поста.
+
+ Args:
+ formatted_text: Форматированный текст поста
+
+ Returns:
+ str: Сырой текст или исходный текст, если форматирование не обнаружено
+ """
+ if not formatted_text:
+ return ""
+
+ # Проверяем, начинается ли текст с префикса
+ if not formatted_text.startswith(PREFIX):
+ # Текст уже в сыром виде или имеет другой формат
+ return formatted_text
+
+ # Извлекаем текст после префикса
+ text_after_prefix = formatted_text[len(PREFIX):]
+
+ # Проверяем, заканчивается ли текст на "Пост опубликован анонимно"
+ if text_after_prefix.endswith(ANONYMOUS_SUFFIX):
+ raw_text = text_after_prefix[:-len(ANONYMOUS_SUFFIX)]
+ # Проверяем, заканчивается ли текст на "Автор поста: ..."
+ elif AUTHOR_SUFFIX_PATTERN.search(text_after_prefix):
+ raw_text = AUTHOR_SUFFIX_PATTERN.sub("", text_after_prefix)
+ else:
+ # Не удалось определить формат, возвращаем текст без префикса
+ raw_text = text_after_prefix
+
+ # Декодируем HTML-экранирование
+ raw_text = html.unescape(raw_text)
+
+ return raw_text
+
+
+async def main(db_path: str, dry_run: bool = False) -> None:
+ db_path = os.path.abspath(db_path)
+ if not os.path.exists(db_path):
+ logger.error("База данных не найдена: %s", db_path)
+ print(f"Ошибка: база данных не найдена: {db_path}")
+ return
+
+ async with aiosqlite.connect(db_path) as conn:
+ await conn.execute("PRAGMA foreign_keys = ON")
+
+ # Получаем все записи с текстом
+ cursor = await conn.execute(
+ "SELECT message_id, text FROM post_from_telegram_suggest WHERE text IS NOT NULL AND text != ''"
+ )
+ posts = await cursor.fetchall()
+ await cursor.close()
+
+ updated_count = 0
+ skipped_count = 0
+ error_count = 0
+
+ print(f"Найдено записей для обработки: {len(posts)}")
+ if dry_run:
+ print("РЕЖИМ ПРОВЕРКИ (dry-run): изменения не будут сохранены")
+
+ # Обрабатываем каждую запись
+ for message_id, formatted_text in posts:
+ try:
+ # Извлекаем сырой текст
+ raw_text = extract_raw_text(formatted_text)
+
+ # Проверяем, изменился ли текст
+ if raw_text == formatted_text:
+ skipped_count += 1
+ continue
+
+ if dry_run:
+ print(f"\n[DRY-RUN] message_id={message_id}:")
+ print(f" Было: {formatted_text[:100]}...")
+ print(f" Станет: {raw_text[:100]}...")
+ else:
+ # Обновляем запись
+ await conn.execute(
+ "UPDATE post_from_telegram_suggest SET text = ? WHERE message_id = ?",
+ (raw_text, message_id)
+ )
+ updated_count += 1
+
+ except Exception as e:
+ logger.error(f"Ошибка при обработке поста message_id={message_id}: {e}")
+ error_count += 1
+
+ if not dry_run:
+ await conn.commit()
+
+ total_processed = updated_count + skipped_count + error_count
+ logger.info(
+ f"Обработка завершена. Всего записей: {total_processed}, "
+ f"обновлено: {updated_count}, пропущено: {skipped_count}, ошибок: {error_count}"
+ )
+ print(f"\nОбработка завершена:")
+ print(f" - Всего записей: {total_processed}")
+ print(f" - Обновлено: {updated_count}")
+ print(f" - Пропущено (уже в сыром виде): {skipped_count}")
+ print(f" - Ошибок: {error_count}")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Приведение текста постов к 'сырому' виду"
+ )
+ parser.add_argument(
+ "--db",
+ default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
+ help="Путь к БД (или DB_PATH)",
+ )
+ parser.add_argument(
+ "--dry-run",
+ action="store_true",
+ help="Режим проверки без сохранения изменений",
+ )
+ args = parser.parse_args()
+ asyncio.run(main(args.db, args.dry_run))
diff --git a/scripts/create_blacklist_history_table.py b/scripts/create_blacklist_history_table.py
new file mode 100644
index 0000000..5c1c517
--- /dev/null
+++ b/scripts/create_blacklist_history_table.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+"""
+Скрипт миграции для создания таблицы blacklist_history.
+Таблица хранит историю всех операций бана/разбана пользователей.
+"""
+import argparse
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+project_root = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(project_root))
+
+import aiosqlite
+
+from logs.custom_logger import logger
+
+DEFAULT_DB_PATH = "database/tg-bot-database.db"
+
+
+def _table_exists(rows: list, table_name: str) -> bool:
+ """Проверяет существование таблицы по результатам PRAGMA table_list."""
+ for row in rows:
+ if row[1] == table_name: # name column
+ return True
+ return False
+
+
+async def main(db_path: str) -> None:
+ db_path = os.path.abspath(db_path)
+ if not os.path.exists(db_path):
+ logger.error("База данных не найдена: %s", db_path)
+ print(f"Ошибка: база данных не найдена: {db_path}")
+ return
+
+ async with aiosqlite.connect(db_path) as conn:
+ await conn.execute("PRAGMA foreign_keys = ON")
+
+ # Проверяем наличие таблицы blacklist_history
+ cursor = await conn.execute(
+ "SELECT name FROM sqlite_master WHERE type='table' AND name='blacklist_history'"
+ )
+ rows = await cursor.fetchall()
+ await cursor.close()
+
+ if not rows:
+ logger.info("Создание таблицы blacklist_history")
+
+ # Создаем таблицу
+ await conn.execute("""
+ CREATE TABLE IF NOT EXISTS blacklist_history (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ message_for_user TEXT,
+ date_ban INTEGER NOT NULL,
+ date_unban INTEGER,
+ ban_author INTEGER,
+ created_at INTEGER DEFAULT (strftime('%s', 'now')),
+ updated_at INTEGER DEFAULT (strftime('%s', 'now')),
+ FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
+ FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
+ )
+ """)
+
+ # Создаем индексы
+ await conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id)"
+ )
+ await conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban)"
+ )
+ await conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban)"
+ )
+
+ await conn.commit()
+ logger.info("Таблица blacklist_history и индексы успешно созданы")
+ print("Таблица blacklist_history и индексы успешно созданы.")
+ else:
+ print("Таблица blacklist_history уже существует.")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Создание таблицы blacklist_history для истории банов/разбанов"
+ )
+ parser.add_argument(
+ "--db",
+ default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
+ help="Путь к БД (или DB_PATH)",
+ )
+ args = parser.parse_args()
+ asyncio.run(main(args.db))
diff --git a/scripts/migrate_blacklist_to_history.py b/scripts/migrate_blacklist_to_history.py
new file mode 100644
index 0000000..cc302ba
--- /dev/null
+++ b/scripts/migrate_blacklist_to_history.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python3
+"""
+Скрипт миграции для переноса записей из blacklist в blacklist_history.
+Переносит все существующие записи из таблицы blacklist в таблицу blacklist_history.
+"""
+import argparse
+import asyncio
+import os
+import sys
+from pathlib import Path
+from datetime import datetime
+
+project_root = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(project_root))
+
+import aiosqlite
+
+from logs.custom_logger import logger
+
+DEFAULT_DB_PATH = "database/tg-bot-database.db"
+
+
+async def main(db_path: str) -> None:
+ db_path = os.path.abspath(db_path)
+ if not os.path.exists(db_path):
+ logger.error("База данных не найдена: %s", db_path)
+ print(f"Ошибка: база данных не найдена: {db_path}")
+ return
+
+ async with aiosqlite.connect(db_path) as conn:
+ await conn.execute("PRAGMA foreign_keys = ON")
+
+ # Проверяем наличие таблицы blacklist_history
+ cursor = await conn.execute(
+ "SELECT name FROM sqlite_master WHERE type='table' AND name='blacklist_history'"
+ )
+ rows = await cursor.fetchall()
+ await cursor.close()
+
+ if not rows:
+ logger.error("Таблица blacklist_history не найдена. Сначала запустите create_blacklist_history_table.py")
+ print("Ошибка: таблица blacklist_history не найдена. Сначала запустите create_blacklist_history_table.py")
+ return
+
+ # Получаем все записи из blacklist
+ cursor = await conn.execute(
+ "SELECT user_id, message_for_user, date_to_unban, created_at, ban_author FROM blacklist"
+ )
+ blacklist_records = await cursor.fetchall()
+ await cursor.close()
+
+ if not blacklist_records:
+ print("В таблице blacklist нет записей для переноса.")
+ logger.info("В таблице blacklist нет записей для переноса")
+ return
+
+ logger.info("Найдено записей в blacklist для переноса: %d", len(blacklist_records))
+ print(f"Найдено записей в blacklist для переноса: {len(blacklist_records)}")
+
+ # Получаем текущее время в Unix timestamp
+ current_time = int(datetime.now().timestamp())
+
+ # Переносим записи в blacklist_history
+ migrated_count = 0
+ skipped_count = 0
+
+ for record in blacklist_records:
+ user_id, message_for_user, date_to_unban, created_at, ban_author = record
+
+ # Проверяем, нет ли уже записи для этого user_id с таким же date_ban
+ # (чтобы избежать дубликатов при повторном запуске)
+ date_ban = created_at if created_at is not None else current_time
+
+ check_cursor = await conn.execute(
+ "SELECT id FROM blacklist_history WHERE user_id = ? AND date_ban = ?",
+ (user_id, date_ban)
+ )
+ existing = await check_cursor.fetchone()
+ await check_cursor.close()
+
+ if existing:
+ logger.debug("Запись для user_id=%d с date_ban=%d уже существует, пропускаем", user_id, date_ban)
+ skipped_count += 1
+ continue
+
+ # Вставляем запись в blacklist_history
+ await conn.execute(
+ """
+ INSERT INTO blacklist_history
+ (user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ user_id,
+ message_for_user,
+ date_ban,
+ date_to_unban,
+ ban_author,
+ created_at if created_at is not None else current_time,
+ current_time
+ )
+ )
+ migrated_count += 1
+
+ await conn.commit()
+
+ logger.info(
+ "Миграция завершена. Перенесено записей: %d, пропущено (дубликаты): %d",
+ migrated_count,
+ skipped_count
+ )
+ print(f"Миграция завершена. Перенесено записей: {migrated_count}, пропущено (дубликаты): {skipped_count}")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Перенос записей из blacklist в blacklist_history"
+ )
+ parser.add_argument(
+ "--db",
+ default=os.environ.get("DB_PATH", DEFAULT_DB_PATH),
+ help="Путь к БД (или DB_PATH)",
+ )
+ args = parser.parse_args()
+ asyncio.run(main(args.db))
diff --git a/tests/conftest_post_repository.py b/tests/conftest_post_repository.py
index e8d6ad0..3fcfec0 100644
--- a/tests/conftest_post_repository.py
+++ b/tests/conftest_post_repository.py
@@ -22,6 +22,8 @@ def mock_post_repository():
mock_repo = Mock(spec=PostRepository)
mock_repo._execute_query = AsyncMock()
mock_repo._execute_query_with_result = AsyncMock()
+ mock_repo.update_status_by_message_id = AsyncMock()
+ mock_repo.update_status_for_media_group_by_helper_id = AsyncMock()
mock_repo.logger = Mock()
return mock_repo
@@ -198,7 +200,9 @@ def mock_sql_queries():
"CREATE TABLE IF NOT EXISTS message_link_to_content"
],
'add_post': "INSERT INTO post_from_telegram_suggest",
+ 'add_post_status': "status",
'update_helper': "UPDATE post_from_telegram_suggest SET helper_text_message_id",
+ 'update_status': "UPDATE post_from_telegram_suggest SET status = ?",
'add_content': "INSERT OR IGNORE INTO content_post_from_telegram",
'add_link': "INSERT OR IGNORE INTO message_link_to_content",
'get_content': "SELECT cpft.content_name, cpft.content_type",
diff --git a/tests/test_async_db.py b/tests/test_async_db.py
index d87b686..81b3f8a 100644
--- a/tests/test_async_db.py
+++ b/tests/test_async_db.py
@@ -14,6 +14,13 @@ class TestAsyncBotDB:
mock_factory.audio.delete_audio_moderate_record = AsyncMock()
mock_factory.users = Mock()
mock_factory.users.logger = Mock()
+ # Моки для blacklist и blacklist_history
+ mock_factory.blacklist = Mock()
+ mock_factory.blacklist.add_user = AsyncMock()
+ mock_factory.blacklist.remove_user = AsyncMock(return_value=True)
+ mock_factory.blacklist_history = Mock()
+ mock_factory.blacklist_history.add_record_on_ban = AsyncMock()
+ mock_factory.blacklist_history.set_unban_date = AsyncMock(return_value=True)
return mock_factory
@pytest.fixture
@@ -102,3 +109,107 @@ class TestAsyncBotDB:
await async_bot_db.delete_audio_moderate_record(message_id)
mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id)
+
+ @pytest.mark.asyncio
+ async def test_set_user_blacklist_calls_history(self, async_bot_db, mock_factory):
+ """Тест что set_user_blacklist вызывает добавление в историю"""
+ user_id = 12345
+ message_for_user = "Нарушение правил"
+ date_to_unban = 1234567890
+ ban_author = 999
+
+ await async_bot_db.set_user_blacklist(
+ user_id=user_id,
+ user_name=None,
+ message_for_user=message_for_user,
+ date_to_unban=date_to_unban,
+ ban_author=ban_author
+ )
+
+ # Проверяем, что сначала добавлен в blacklist
+ mock_factory.blacklist.add_user.assert_called_once()
+
+ # Проверяем, что затем добавлена запись в историю
+ mock_factory.blacklist_history.add_record_on_ban.assert_called_once()
+
+ # Проверяем параметры записи в историю
+ history_call = mock_factory.blacklist_history.add_record_on_ban.call_args[0][0]
+ assert history_call.user_id == user_id
+ assert history_call.message_for_user == message_for_user
+ assert history_call.date_ban is not None
+ assert history_call.date_unban is None
+ assert history_call.ban_author == ban_author
+
+ @pytest.mark.asyncio
+ async def test_set_user_blacklist_history_error_does_not_fail(self, async_bot_db, mock_factory):
+ """Тест что ошибка записи в историю не ломает процесс бана"""
+ user_id = 12345
+ mock_factory.blacklist_history.add_record_on_ban.side_effect = Exception("History error")
+
+ # Бан должен пройти успешно, несмотря на ошибку в истории
+ await async_bot_db.set_user_blacklist(
+ user_id=user_id,
+ message_for_user="Тест",
+ date_to_unban=None,
+ ban_author=None
+ )
+
+ # Проверяем, что пользователь все равно добавлен в blacklist
+ mock_factory.blacklist.add_user.assert_called_once()
+
+ # Проверяем, что попытка записи в историю была
+ mock_factory.blacklist_history.add_record_on_ban.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_delete_user_blacklist_calls_history(self, async_bot_db, mock_factory):
+ """Тест что delete_user_blacklist вызывает обновление истории"""
+ user_id = 12345
+
+ result = await async_bot_db.delete_user_blacklist(user_id)
+
+ # Проверяем, что сначала обновлена история
+ mock_factory.blacklist_history.set_unban_date.assert_called_once()
+ history_call = mock_factory.blacklist_history.set_unban_date.call_args
+ assert history_call[0][0] == user_id
+ assert history_call[0][1] is not None # date_unban timestamp
+
+ # Проверяем, что затем удален из blacklist
+ mock_factory.blacklist.remove_user.assert_called_once_with(user_id)
+
+ # Проверяем результат
+ assert result is True
+
+ @pytest.mark.asyncio
+ async def test_delete_user_blacklist_history_error_does_not_fail(self, async_bot_db, mock_factory):
+ """Тест что ошибка обновления истории не ломает процесс разбана"""
+ user_id = 12345
+ mock_factory.blacklist_history.set_unban_date.side_effect = Exception("History error")
+
+ # Разбан должен пройти успешно, несмотря на ошибку в истории
+ result = await async_bot_db.delete_user_blacklist(user_id)
+
+ # Проверяем, что попытка обновления истории была
+ mock_factory.blacklist_history.set_unban_date.assert_called_once()
+
+ # Проверяем, что пользователь все равно удален из blacklist
+ mock_factory.blacklist.remove_user.assert_called_once_with(user_id)
+
+ # Проверяем результат
+ assert result is True
+
+ @pytest.mark.asyncio
+ async def test_delete_user_blacklist_returns_false_on_blacklist_error(self, async_bot_db, mock_factory):
+ """Тест что delete_user_blacklist возвращает False при ошибке удаления из blacklist"""
+ user_id = 12345
+ mock_factory.blacklist.remove_user.return_value = False
+
+ result = await async_bot_db.delete_user_blacklist(user_id)
+
+ # Проверяем, что история обновлена
+ mock_factory.blacklist_history.set_unban_date.assert_called_once()
+
+ # Проверяем, что удаление из blacklist было попытка
+ mock_factory.blacklist.remove_user.assert_called_once_with(user_id)
+
+ # Проверяем результат
+ assert result is False
diff --git a/tests/test_auto_unban_integration.py b/tests/test_auto_unban_integration.py
index 4ddc64a..5113d92 100644
--- a/tests/test_auto_unban_integration.py
+++ b/tests/test_auto_unban_integration.py
@@ -17,7 +17,7 @@ class TestAutoUnbanIntegration:
@pytest.fixture
def setup_test_db(self, test_db_path):
- """Создает тестовую базу данных с таблицей blacklist"""
+ """Создает тестовую базу данных с таблицами blacklist, our_users и blacklist_history"""
# Удаляем старую тестовую базу если она существует
if os.path.exists(test_db_path):
os.remove(test_db_path)
@@ -26,30 +26,112 @@ class TestAutoUnbanIntegration:
conn = sqlite3.connect(test_db_path)
cursor = conn.cursor()
- # Создаем таблицу blacklist
+ # Включаем поддержку внешних ключей
+ cursor.execute("PRAGMA foreign_keys = ON")
+
+ # Создаем таблицу our_users (нужна для внешних ключей)
cursor.execute('''
- CREATE TABLE IF NOT EXISTS blacklist (
- user_id INTEGER PRIMARY KEY,
- user_name TEXT,
- message_for_user TEXT,
- date_to_unban INTEGER
+ CREATE TABLE IF NOT EXISTS our_users (
+ user_id INTEGER NOT NULL PRIMARY KEY,
+ first_name TEXT,
+ full_name TEXT,
+ username TEXT,
+ is_bot BOOLEAN DEFAULT 0,
+ language_code TEXT,
+ has_stickers BOOLEAN DEFAULT 0 NOT NULL,
+ emoji TEXT,
+ date_added INTEGER NOT NULL,
+ date_changed INTEGER NOT NULL,
+ voice_bot_welcome_received BOOLEAN DEFAULT 0
)
''')
- # Добавляем тестовые данные
+ # Создаем таблицу blacklist
+ cursor.execute('''
+ CREATE TABLE IF NOT EXISTS blacklist (
+ user_id INTEGER NOT NULL PRIMARY KEY,
+ message_for_user TEXT,
+ date_to_unban INTEGER,
+ created_at INTEGER DEFAULT (strftime('%s', 'now')),
+ ban_author INTEGER,
+ FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE
+ )
+ ''')
+
+ # Создаем таблицу blacklist_history
+ cursor.execute('''
+ CREATE TABLE IF NOT EXISTS blacklist_history (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ message_for_user TEXT,
+ date_ban INTEGER NOT NULL,
+ date_unban INTEGER,
+ ban_author INTEGER,
+ created_at INTEGER DEFAULT (strftime('%s', 'now')),
+ updated_at INTEGER DEFAULT (strftime('%s', 'now')),
+ FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE,
+ FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL
+ )
+ ''')
+
+ # Создаем индексы для blacklist_history
+ cursor.execute('''
+ CREATE INDEX IF NOT EXISTS idx_blacklist_history_user_id ON blacklist_history(user_id)
+ ''')
+ cursor.execute('''
+ CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_ban ON blacklist_history(date_ban)
+ ''')
+ cursor.execute('''
+ CREATE INDEX IF NOT EXISTS idx_blacklist_history_date_unban ON blacklist_history(date_unban)
+ ''')
+
+ # Добавляем тестовых пользователей в our_users
+ current_time = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
+ users_data = [
+ (123, "Test", "Test User 1", "test_user1", 0, "ru", 0, "😊", current_time, current_time, 0),
+ (456, "Test", "Test User 2", "test_user2", 0, "ru", 0, "😊", current_time, current_time, 0),
+ (789, "Test", "Test User 3", "test_user3", 0, "ru", 0, "😊", current_time, current_time, 0),
+ (999, "Test", "Test User 4", "test_user4", 0, "ru", 0, "😊", current_time, current_time, 0),
+ ]
+ cursor.executemany(
+ """INSERT INTO our_users (user_id, first_name, full_name, username, is_bot,
+ language_code, has_stickers, emoji, date_added, date_changed, voice_bot_welcome_received)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
+ users_data
+ )
+
+ # Добавляем тестовые данные в blacklist
today_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
tomorrow_timestamp = int((datetime.now(timezone(timedelta(hours=3))) + timedelta(days=1)).timestamp())
- test_data = [
- (123, "test_user1", "Test ban 1", today_timestamp), # Разблокируется сегодня
- (456, "test_user2", "Test ban 2", today_timestamp), # Разблокируется сегодня
- (789, "test_user3", "Test ban 3", tomorrow_timestamp), # Разблокируется завтра
- (999, "test_user4", "Test ban 4", None), # Навсегда заблокирован
+ blacklist_data = [
+ (123, "Test ban 1", today_timestamp, current_time, None), # Разблокируется сегодня
+ (456, "Test ban 2", today_timestamp, current_time, None), # Разблокируется сегодня
+ (789, "Test ban 3", tomorrow_timestamp, current_time, None), # Разблокируется завтра
+ (999, "Test ban 4", None, current_time, None), # Навсегда заблокирован
]
cursor.executemany(
- "INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)",
- test_data
+ "INSERT INTO blacklist (user_id, message_for_user, date_to_unban, created_at, ban_author) VALUES (?, ?, ?, ?, ?)",
+ blacklist_data
+ )
+
+ # Добавляем тестовые данные в blacklist_history
+ # Для пользователей 123 и 456 (которые будут разблокированы) создаем записи с date_unban = NULL
+ yesterday_timestamp = int((datetime.now(timezone(timedelta(hours=3))) - timedelta(days=1)).timestamp())
+
+ history_data = [
+ (123, "Test ban 1", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Будет разблокирован
+ (456, "Test ban 2", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Будет разблокирован
+ (789, "Test ban 3", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Не будет разблокирован сегодня
+ (999, "Test ban 4", yesterday_timestamp, None, None, yesterday_timestamp, yesterday_timestamp), # Навсегда заблокирован
+ ]
+
+ cursor.executemany(
+ """INSERT INTO blacklist_history
+ (user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?)""",
+ history_data
)
conn.commit()
@@ -105,9 +187,20 @@ class TestAutoUnbanIntegration:
initial_count = cursor.fetchone()[0]
assert initial_count == 4
+ # Проверяем начальное состояние истории: должно быть 2 записи с date_unban IS NULL для user_id 123 и 456
+ cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (123, 456) AND date_unban IS NULL")
+ initial_open_history = cursor.fetchone()[0]
+ assert initial_open_history == 2
+
+ # Запоминаем время до разбана для проверки updated_at
+ before_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
+
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
+ # Запоминаем время после разбана для проверки updated_at
+ after_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
+
# Проверяем, что пользователи с сегодняшней датой разблокированы
current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
cursor.execute("SELECT COUNT(*) FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?",
@@ -131,6 +224,32 @@ class TestAutoUnbanIntegration:
final_count = cursor.fetchone()[0]
assert final_count == 2 # Остались только завтрашние и навсегда заблокированные
+ # Проверяем историю банов: для user_id 123 и 456 должны быть установлены date_unban
+ cursor.execute("SELECT user_id, date_unban, updated_at FROM blacklist_history WHERE user_id IN (123, 456) ORDER BY user_id")
+ history_records = cursor.fetchall()
+
+ assert len(history_records) == 2
+
+ for user_id, date_unban, updated_at in history_records:
+ # Проверяем, что date_unban установлен (не NULL)
+ assert date_unban is not None, f"date_unban должен быть установлен для user_id={user_id}"
+ assert isinstance(date_unban, int), f"date_unban должен быть integer для user_id={user_id}"
+
+ # Проверяем, что date_unban находится в разумных пределах (между before и after)
+ assert before_unban_timestamp <= date_unban <= after_unban_timestamp, \
+ f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {date_unban}"
+
+ # Проверяем, что updated_at обновлен
+ assert updated_at is not None, f"updated_at должен быть установлен для user_id={user_id}"
+ assert isinstance(updated_at, int), f"updated_at должен быть integer для user_id={user_id}"
+ assert before_unban_timestamp <= updated_at <= after_unban_timestamp, \
+ f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}, получен {updated_at}"
+
+ # Проверяем, что для user_id 789 и 999 записи в истории остались без изменений (date_unban все еще NULL)
+ cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE user_id IN (789, 999) AND date_unban IS NULL")
+ unchanged_history = cursor.fetchone()[0]
+ assert unchanged_history == 2, "Записи для user_id 789 и 999 должны остаться с date_unban = NULL"
+
conn.close()
# Проверяем, что отчет был отправлен
@@ -148,6 +267,12 @@ class TestAutoUnbanIntegration:
cursor = conn.cursor()
current_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
cursor.execute("DELETE FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?", (current_timestamp,))
+
+ # Проверяем начальное состояние истории: все записи должны иметь date_unban = NULL
+ cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL")
+ initial_open_history = cursor.fetchone()[0]
+ assert initial_open_history == 4 # Все 4 записи должны быть открытыми
+
conn.commit()
conn.close()
@@ -159,6 +284,14 @@ class TestAutoUnbanIntegration:
# Выполняем автоматический разбан
await scheduler.auto_unban_users()
+ # Проверяем, что история не изменилась (все записи все еще с date_unban = NULL)
+ conn = sqlite3.connect(setup_test_db)
+ cursor = conn.cursor()
+ cursor.execute("SELECT COUNT(*) FROM blacklist_history WHERE date_unban IS NULL")
+ final_open_history = cursor.fetchone()[0]
+ assert final_open_history == 4, "История не должна изменяться, если нет пользователей для разблокировки"
+ conn.close()
+
# Проверяем, что отчет не был отправлен (нет пользователей для разблокировки)
mock_bot.send_message.assert_not_called()
@@ -190,6 +323,100 @@ class TestAutoUnbanIntegration:
assert call_args[1]['chat_id'] == '-1001234567891' # important_logs
assert "Ошибка автоматического разбана" in call_args[1]['text']
+ @pytest.mark.asyncio
+ @patch('helper_bot.utils.auto_unban_scheduler.get_global_instance')
+ async def test_auto_unban_updates_history(self, mock_get_instance, setup_test_db, mock_bdf, mock_bot):
+ """Тест что автоматический разбан обновляет историю банов"""
+ # Настройка моков
+ mock_get_instance.return_value = mock_bdf
+
+ # Создаем планировщик
+ scheduler = AutoUnbanScheduler()
+ scheduler.bot_db = mock_bdf.database
+ scheduler.set_bot(mock_bot)
+
+ conn = sqlite3.connect(setup_test_db)
+ cursor = conn.cursor()
+
+ # Проверяем начальное состояние: для user_id 123 и 456 должны быть записи с date_unban = NULL
+ cursor.execute("""
+ SELECT id, user_id, date_ban, date_unban, updated_at
+ FROM blacklist_history
+ WHERE user_id IN (123, 456) AND date_unban IS NULL
+ ORDER BY user_id
+ """)
+ initial_records = cursor.fetchall()
+ assert len(initial_records) == 2, "Должно быть 2 открытые записи для user_id 123 и 456"
+
+ # Запоминаем ID записей и их начальные значения updated_at
+ record_ids = {row[0]: (row[1], row[4]) for row in initial_records}
+
+ # Запоминаем время до разбана
+ before_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
+
+ conn.close()
+
+ # Выполняем автоматический разбан
+ await scheduler.auto_unban_users()
+
+ # Запоминаем время после разбана
+ after_unban_timestamp = int(datetime.now(timezone(timedelta(hours=3))).timestamp())
+
+ # Проверяем, что записи обновлены
+ conn = sqlite3.connect(setup_test_db)
+ cursor = conn.cursor()
+
+ cursor.execute("""
+ SELECT id, user_id, date_ban, date_unban, updated_at
+ FROM blacklist_history
+ WHERE user_id IN (123, 456)
+ ORDER BY user_id
+ """)
+ updated_records = cursor.fetchall()
+
+ assert len(updated_records) == 2, "Должно быть 2 записи для user_id 123 и 456"
+
+ for record_id, user_id, date_ban, date_unban, updated_at in updated_records:
+ # Проверяем, что это одна из наших записей
+ assert record_id in record_ids, f"Запись с id={record_id} должна быть в исходных записях"
+
+ # Проверяем, что date_unban установлен
+ assert date_unban is not None, f"date_unban должен быть установлен для user_id={user_id}"
+ assert isinstance(date_unban, int), f"date_unban должен быть integer для user_id={user_id}"
+
+ # Проверяем, что date_unban находится в разумных пределах
+ assert before_unban_timestamp <= date_unban <= after_unban_timestamp, \
+ f"date_unban для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}"
+
+ # Проверяем, что updated_at обновлен (должен быть больше начального значения)
+ assert updated_at is not None, f"updated_at должен быть установлен для user_id={user_id}"
+ assert isinstance(updated_at, int), f"updated_at должен быть integer для user_id={user_id}"
+ assert before_unban_timestamp <= updated_at <= after_unban_timestamp, \
+ f"updated_at для user_id={user_id} должен быть между {before_unban_timestamp} и {after_unban_timestamp}"
+
+ # Проверяем, что updated_at действительно обновлен (больше начального значения)
+ initial_updated_at = record_ids[record_id][1]
+ assert updated_at >= initial_updated_at, \
+ f"updated_at для user_id={user_id} должен быть больше или равен начальному значению"
+
+ # Проверяем, что обновлена только последняя запись для каждого пользователя
+ # (если бы было несколько записей, обновилась бы только последняя)
+ cursor.execute("""
+ SELECT COUNT(*) FROM blacklist_history
+ WHERE user_id IN (123, 456) AND date_unban IS NOT NULL
+ """)
+ closed_records = cursor.fetchone()[0]
+ assert closed_records == 2, "Должно быть закрыто 2 записи (по одной для каждого пользователя)"
+
+ cursor.execute("""
+ SELECT COUNT(*) FROM blacklist_history
+ WHERE user_id IN (123, 456) AND date_unban IS NULL
+ """)
+ open_records = cursor.fetchone()[0]
+ assert open_records == 0, "Не должно быть открытых записей для user_id 123 и 456"
+
+ conn.close()
+
def test_date_format_consistency(self, setup_test_db, mock_bdf):
"""Тест консистентности формата дат"""
scheduler = AutoUnbanScheduler()
diff --git a/tests/test_blacklist_history_repository.py b/tests/test_blacklist_history_repository.py
new file mode 100644
index 0000000..b0ceca6
--- /dev/null
+++ b/tests/test_blacklist_history_repository.py
@@ -0,0 +1,257 @@
+import pytest
+from unittest.mock import Mock, AsyncMock, patch
+from datetime import datetime
+import time
+
+from database.repositories.blacklist_history_repository import BlacklistHistoryRepository
+from database.models import BlacklistHistoryRecord
+
+
+class TestBlacklistHistoryRepository:
+ """Тесты для BlacklistHistoryRepository"""
+
+ @pytest.fixture
+ def mock_db_connection(self):
+ """Мок для DatabaseConnection"""
+ mock_connection = Mock()
+ mock_connection._execute_query = AsyncMock()
+ mock_connection._execute_query_with_result = AsyncMock()
+ mock_connection.logger = Mock()
+ return mock_connection
+
+ @pytest.fixture
+ def blacklist_history_repository(self, mock_db_connection):
+ """Экземпляр BlacklistHistoryRepository для тестов"""
+ # Патчим наследование от DatabaseConnection
+ with patch.object(BlacklistHistoryRepository, '__init__', return_value=None):
+ repo = BlacklistHistoryRepository()
+ repo._execute_query = mock_db_connection._execute_query
+ repo._execute_query_with_result = mock_db_connection._execute_query_with_result
+ repo.logger = mock_db_connection.logger
+ return repo
+
+ @pytest.fixture
+ def sample_history_record(self):
+ """Тестовая запись истории бана"""
+ current_time = int(time.time())
+ return BlacklistHistoryRecord(
+ user_id=12345,
+ message_for_user="Нарушение правил",
+ date_ban=current_time,
+ date_unban=None,
+ ban_author=999,
+ created_at=current_time,
+ updated_at=current_time,
+ )
+
+ @pytest.fixture
+ def sample_history_record_with_unban(self):
+ """Тестовая запись истории бана с датой разбана"""
+ current_time = int(time.time())
+ return BlacklistHistoryRecord(
+ user_id=12345,
+ message_for_user="Нарушение правил",
+ date_ban=current_time - 86400, # Бан был вчера
+ date_unban=current_time, # Разбан сегодня
+ ban_author=999,
+ created_at=current_time - 86400,
+ updated_at=current_time,
+ )
+
+ @pytest.mark.asyncio
+ async def test_create_tables(self, blacklist_history_repository):
+ """Тест создания таблицы истории банов/разбанов"""
+ await blacklist_history_repository.create_tables()
+
+ # Проверяем, что метод вызван (4 раза: таблица + 3 индекса)
+ assert blacklist_history_repository._execute_query.call_count == 4
+ calls = blacklist_history_repository._execute_query.call_args_list
+
+ # Проверяем, что создается таблица с правильной структурой
+ create_table_call = calls[0]
+ assert "CREATE TABLE IF NOT EXISTS blacklist_history" in create_table_call[0][0]
+ assert "id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT" in create_table_call[0][0]
+ assert "user_id INTEGER NOT NULL" in create_table_call[0][0]
+ assert "message_for_user TEXT" in create_table_call[0][0]
+ assert "date_ban INTEGER NOT NULL" in create_table_call[0][0]
+ assert "date_unban INTEGER" in create_table_call[0][0]
+ assert "ban_author INTEGER" in create_table_call[0][0]
+ assert "created_at INTEGER DEFAULT (strftime('%s', 'now'))" in create_table_call[0][0]
+ assert "updated_at INTEGER DEFAULT (strftime('%s', 'now'))" in create_table_call[0][0]
+ assert "FOREIGN KEY (user_id) REFERENCES our_users(user_id) ON DELETE CASCADE" in create_table_call[0][0]
+ assert "FOREIGN KEY (ban_author) REFERENCES our_users(user_id) ON DELETE SET NULL" in create_table_call[0][0]
+
+ # Проверяем создание индексов
+ index_calls = calls[1:4]
+ index_names = [call[0][0] for call in index_calls]
+ assert any("idx_blacklist_history_user_id" in idx for idx in index_names)
+ assert any("idx_blacklist_history_date_ban" in idx for idx in index_names)
+ assert any("idx_blacklist_history_date_unban" in idx for idx in index_names)
+
+ # Проверяем логирование
+ blacklist_history_repository.logger.info.assert_called_once_with(
+ "Таблица истории банов/разбанов создана"
+ )
+
+ @pytest.mark.asyncio
+ async def test_add_record_on_ban(self, blacklist_history_repository, sample_history_record):
+ """Тест добавления записи о бане в историю"""
+ await blacklist_history_repository.add_record_on_ban(sample_history_record)
+
+ # Проверяем, что метод вызван с правильными параметрами
+ blacklist_history_repository._execute_query.assert_called_once()
+ call_args = blacklist_history_repository._execute_query.call_args
+
+ # Проверяем SQL запрос
+ sql_query = call_args[0][0].replace('\n', ' ').replace(' ', ' ').strip()
+ assert "INSERT INTO blacklist_history" in sql_query
+ assert "user_id, message_for_user, date_ban, date_unban, ban_author, created_at, updated_at" in sql_query
+
+ # Проверяем параметры
+ params = call_args[0][1]
+ assert params[0] == 12345 # user_id
+ assert params[1] == "Нарушение правил" # message_for_user
+ assert params[2] == sample_history_record.date_ban # date_ban
+ assert params[3] is None # date_unban
+ assert params[4] == 999 # ban_author
+ assert params[5] == sample_history_record.created_at # created_at
+ assert params[6] == sample_history_record.updated_at # updated_at
+
+ # Проверяем логирование
+ blacklist_history_repository.logger.info.assert_called_once()
+ log_call = blacklist_history_repository.logger.info.call_args[0][0]
+ assert "Запись о бане добавлена в историю" in log_call
+ assert "user_id=12345" in log_call
+
+ @pytest.mark.asyncio
+ async def test_add_record_on_ban_with_defaults(self, blacklist_history_repository):
+ """Тест добавления записи о бане с дефолтными значениями created_at и updated_at"""
+ record = BlacklistHistoryRecord(
+ user_id=12345,
+ message_for_user="Тест",
+ date_ban=int(time.time()),
+ date_unban=None,
+ ban_author=None,
+ created_at=None, # Будет установлено автоматически
+ updated_at=None, # Будет установлено автоматически
+ )
+
+ await blacklist_history_repository.add_record_on_ban(record)
+
+ # Проверяем, что метод вызван
+ blacklist_history_repository._execute_query.assert_called_once()
+ call_args = blacklist_history_repository._execute_query.call_args
+
+ # Проверяем, что created_at и updated_at установлены (не None)
+ params = call_args[0][1]
+ assert params[5] is not None # created_at
+ assert params[6] is not None # updated_at
+ assert isinstance(params[5], int)
+ assert isinstance(params[6], int)
+
+ @pytest.mark.asyncio
+ async def test_set_unban_date_success(self, blacklist_history_repository):
+ """Тест успешного обновления даты разбана"""
+ user_id = 12345
+ date_unban = int(time.time())
+
+ # Мокируем результат проверки - находим открытую запись
+ blacklist_history_repository._execute_query_with_result.return_value = [(100,)] # id записи
+
+ result = await blacklist_history_repository.set_unban_date(user_id, date_unban)
+
+ # Проверяем, что сначала проверяется наличие записи
+ assert blacklist_history_repository._execute_query_with_result.call_count == 1
+ check_call = blacklist_history_repository._execute_query_with_result.call_args
+ assert "SELECT id FROM blacklist_history" in check_call[0][0]
+ assert check_call[0][1] == (user_id,)
+
+ # Проверяем, что затем обновляется запись
+ assert blacklist_history_repository._execute_query.call_count == 1
+ update_call = blacklist_history_repository._execute_query.call_args
+ update_query = update_call[0][0].replace('\n', ' ').replace(' ', ' ').strip()
+ assert "UPDATE blacklist_history" in update_query
+ assert "SET date_unban = ?" in update_query
+ assert "updated_at = ?" in update_query
+
+ # Проверяем параметры обновления
+ update_params = update_call[0][1]
+ assert update_params[0] == date_unban
+ assert update_params[1] is not None # updated_at (текущее время)
+ assert isinstance(update_params[1], int)
+ assert update_params[2] == 100 # id записи
+
+ # Проверяем результат
+ assert result is True
+
+ # Проверяем логирование
+ blacklist_history_repository.logger.info.assert_called_once()
+ log_call = blacklist_history_repository.logger.info.call_args[0][0]
+ assert "Дата разбана обновлена в истории" in log_call
+ assert f"user_id={user_id}" in log_call
+
+ @pytest.mark.asyncio
+ async def test_set_unban_date_no_open_record(self, blacklist_history_repository):
+ """Тест обновления даты разбана когда нет открытой записи"""
+ user_id = 12345
+ date_unban = int(time.time())
+
+ # Мокируем результат проверки - нет открытых записей
+ blacklist_history_repository._execute_query_with_result.return_value = []
+
+ result = await blacklist_history_repository.set_unban_date(user_id, date_unban)
+
+ # Проверяем, что проверка была выполнена
+ assert blacklist_history_repository._execute_query_with_result.call_count == 1
+
+ # Проверяем, что UPDATE не был вызван (нет записей для обновления)
+ blacklist_history_repository._execute_query.assert_not_called()
+
+ # Проверяем результат
+ assert result is False
+
+ # Проверяем логирование предупреждения
+ blacklist_history_repository.logger.warning.assert_called_once()
+ log_call = blacklist_history_repository.logger.warning.call_args[0][0]
+ assert "Не найдена открытая запись в истории для обновления" in log_call
+ assert f"user_id={user_id}" in log_call
+
+ @pytest.mark.asyncio
+ async def test_set_unban_date_exception(self, blacklist_history_repository):
+ """Тест обработки исключения при обновлении даты разбана"""
+ user_id = 12345
+ date_unban = int(time.time())
+
+ # Мокируем исключение при проверке
+ blacklist_history_repository._execute_query_with_result.side_effect = Exception("Database error")
+
+ result = await blacklist_history_repository.set_unban_date(user_id, date_unban)
+
+ # Проверяем, что метод вернул False при ошибке
+ assert result is False
+
+ # Проверяем логирование ошибки
+ blacklist_history_repository.logger.error.assert_called_once()
+ log_call = blacklist_history_repository.logger.error.call_args[0][0]
+ assert "Ошибка обновления даты разбана в истории" in log_call
+ assert f"user_id={user_id}" in log_call
+
+ @pytest.mark.asyncio
+ async def test_set_unban_date_update_exception(self, blacklist_history_repository):
+ """Тест обработки исключения при обновлении записи"""
+ user_id = 12345
+ date_unban = int(time.time())
+
+ # Мокируем успешную проверку, но ошибку при обновлении
+ blacklist_history_repository._execute_query_with_result.return_value = [(100,)]
+ blacklist_history_repository._execute_query.side_effect = Exception("Update error")
+
+ result = await blacklist_history_repository.set_unban_date(user_id, date_unban)
+
+ # Проверяем, что метод вернул False при ошибке
+ assert result is False
+
+ # Проверяем логирование ошибки
+ blacklist_history_repository.logger.error.assert_called_once()
+ log_call = blacklist_history_repository.logger.error.call_args[0][0]
+ assert "Ошибка обновления даты разбана в истории" in log_call
diff --git a/tests/test_blacklist_repository.py b/tests/test_blacklist_repository.py
index 1d9c7b9..f4de1b7 100644
--- a/tests/test_blacklist_repository.py
+++ b/tests/test_blacklist_repository.py
@@ -37,7 +37,8 @@ class TestBlacklistRepository:
user_id=12345,
message_for_user="Нарушение правил",
date_to_unban=int(time.time()) + 86400, # +1 день
- created_at=int(time.time())
+ created_at=int(time.time()),
+ ban_author=999,
)
@pytest.fixture
@@ -47,7 +48,8 @@ class TestBlacklistRepository:
user_id=67890,
message_for_user="Постоянный бан",
date_to_unban=None,
- created_at=int(time.time())
+ created_at=int(time.time()),
+ ban_author=None,
)
@pytest.mark.asyncio
@@ -82,11 +84,11 @@ class TestBlacklistRepository:
# Проверяем SQL запрос (учитываем форматирование)
sql_query = call_args[0][0].replace('\n', ' ').replace(' ', ' ').replace(' ', ' ').strip()
- expected_sql = "INSERT INTO blacklist (user_id, message_for_user, date_to_unban) VALUES (?, ?, ?)"
+ expected_sql = "INSERT INTO blacklist (user_id, message_for_user, date_to_unban, ban_author) VALUES (?, ?, ?, ?)"
assert sql_query == expected_sql
# Проверяем параметры
- assert call_args[0][1] == (12345, "Нарушение правил", sample_blacklist_user.date_to_unban)
+ assert call_args[0][1] == (12345, "Нарушение правил", sample_blacklist_user.date_to_unban, 999)
# Проверяем логирование
blacklist_repository.logger.info.assert_called_once_with(
@@ -99,7 +101,7 @@ class TestBlacklistRepository:
await blacklist_repository.add_user(sample_blacklist_user_permanent)
call_args = blacklist_repository._execute_query.call_args
- assert call_args[0][1] == (67890, "Постоянный бан", None)
+ assert call_args[0][1] == (67890, "Постоянный бан", None, None)
blacklist_repository.logger.info.assert_called_once_with(
"Пользователь добавлен в черный список: user_id=67890"
@@ -182,7 +184,7 @@ class TestBlacklistRepository:
async def test_get_user_success(self, blacklist_repository):
"""Тест успешного получения пользователя по ID"""
# Симулируем результат запроса
- mock_row = (12345, "Нарушение правил", int(time.time()) + 86400, int(time.time()))
+ mock_row = (12345, "Нарушение правил", int(time.time()) + 86400, int(time.time()), 111)
blacklist_repository._execute_query_with_result.return_value = [mock_row]
result = await blacklist_repository.get_user(12345)
@@ -193,12 +195,13 @@ class TestBlacklistRepository:
assert result.message_for_user == "Нарушение правил"
assert result.date_to_unban == mock_row[2]
assert result.created_at == mock_row[3]
+ assert result.ban_author == mock_row[4]
# Проверяем, что метод вызван с правильными параметрами
blacklist_repository._execute_query_with_result.assert_called_once()
call_args = blacklist_repository._execute_query_with_result.call_args
- assert call_args[0][0] == "SELECT user_id, message_for_user, date_to_unban, created_at FROM blacklist WHERE user_id = ?"
+ assert "SELECT user_id, message_for_user, date_to_unban, created_at, ban_author" in call_args[0][0]
assert call_args[0][1] == (12345,)
@pytest.mark.asyncio
diff --git a/tests/test_post_repository.py b/tests/test_post_repository.py
index 43136f6..5030b58 100644
--- a/tests/test_post_repository.py
+++ b/tests/test_post_repository.py
@@ -38,7 +38,8 @@ class TestPostRepository:
text="Тестовый пост без даты",
author_id=67890,
helper_text_message_id=None,
- created_at=None
+ created_at=None,
+ status="suggest",
)
@pytest.fixture
@@ -75,6 +76,8 @@ class TestPostRepository:
assert "CREATE TABLE IF NOT EXISTS post_from_telegram_suggest" in post_table_call
assert "message_id INTEGER NOT NULL PRIMARY KEY" in post_table_call
assert "created_at INTEGER NOT NULL" in post_table_call
+ assert "status TEXT NOT NULL DEFAULT 'suggest'" in post_table_call
+ assert "is_anonymous INTEGER" in post_table_call
assert "FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE" in post_table_call
# Проверяем создание таблицы контента
@@ -101,13 +104,18 @@ class TestPostRepository:
params = call_args[0][1]
assert "INSERT INTO post_from_telegram_suggest" in query
- assert "VALUES (?, ?, ?, ?)" in query
- assert params == (
- sample_post.message_id,
- sample_post.text,
- sample_post.author_id,
- sample_post.created_at
- )
+ assert "status" in query
+ assert "is_anonymous" in query
+ assert "VALUES (?, ?, ?, ?, ?, ?)" in query
+ # Проверяем параметры: message_id, text, author_id, created_at, status, is_anonymous
+ assert params[0] == sample_post.message_id
+ assert params[1] == sample_post.text
+ assert params[2] == sample_post.author_id
+ assert params[3] == sample_post.created_at
+ assert params[4] == sample_post.status
+ # is_anonymous преобразуется в int (None -> None, True -> 1, False -> 0)
+ expected_is_anonymous = None if sample_post.is_anonymous is None else (1 if sample_post.is_anonymous else 0)
+ assert params[5] == expected_is_anonymous
@pytest.mark.asyncio
async def test_add_post_without_date(self, post_repository, sample_post_no_date):
@@ -126,7 +134,10 @@ class TestPostRepository:
call_args = post_repository._execute_query.call_args
params = call_args[0][1]
- assert params[3] == sample_post_no_date.created_at # created_at field
+ assert params[3] == sample_post_no_date.created_at # created_at
+ assert params[4] == sample_post_no_date.status # status (default suggest)
+ # Проверяем is_anonymous (должен быть в параметрах)
+ assert len(params) == 6 # Всего 6 параметров включая is_anonymous
@pytest.mark.asyncio
async def test_add_post_logs_correctly(self, post_repository, sample_post):
@@ -159,7 +170,52 @@ class TestPostRepository:
assert "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?" in query
assert params == (helper_message_id, message_id)
-
+
+ @pytest.mark.asyncio
+ async def test_update_status_by_message_id(self, post_repository):
+ """Тест обновления статуса поста по message_id."""
+ post_repository._execute_query = AsyncMock()
+ post_repository.logger = MagicMock()
+
+ message_id = 12345
+ status = "approved"
+
+ await post_repository.update_status_by_message_id(message_id, status)
+
+ post_repository._execute_query.assert_called_once()
+ call_args = post_repository._execute_query.call_args
+ query = call_args[0][0]
+ params = call_args[0][1]
+
+ assert "UPDATE post_from_telegram_suggest" in query
+ assert "SET status = ? WHERE message_id = ?" in query
+ assert params == (status, message_id)
+ post_repository.logger.info.assert_called_once()
+
+ @pytest.mark.asyncio
+ async def test_update_status_for_media_group_by_helper_id(self, post_repository):
+ """Тест обновления статуса медиагруппы по helper_message_id."""
+ post_repository._execute_query = AsyncMock()
+ post_repository.logger = MagicMock()
+
+ helper_message_id = 99999
+ status = "declined"
+
+ await post_repository.update_status_for_media_group_by_helper_id(
+ helper_message_id, status
+ )
+
+ post_repository._execute_query.assert_called_once()
+ call_args = post_repository._execute_query.call_args
+ query = call_args[0][0]
+ params = call_args[0][1]
+
+ assert "UPDATE post_from_telegram_suggest" in query
+ assert "SET status = ?" in query
+ assert "message_id = ? OR helper_text_message_id = ?" in query
+ assert params == (status, helper_message_id, helper_message_id)
+ post_repository.logger.info.assert_called_once()
+
@pytest.mark.asyncio
async def test_add_post_content_success(self, post_repository):
"""Тест успешного добавления контента поста."""
@@ -426,6 +482,169 @@ class TestPostRepository:
# Проверяем, что logger.info не вызывался
post_repository.logger.info.assert_not_called()
+ @pytest.mark.asyncio
+ async def test_get_post_text_and_anonymity_by_message_id_found(self, post_repository):
+ """Тест получения текста и is_anonymous по message_id (пост найден)."""
+ # Мокаем _execute_query_with_result
+ mock_result = [("Тестовый текст", 1)] # is_anonymous = 1 (True)
+ post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
+ post_repository.logger = MagicMock()
+
+ message_id = 12345
+
+ result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
+
+ # Проверяем результат
+ text, is_anonymous = result
+ assert text == "Тестовый текст"
+ assert is_anonymous is True
+
+ # Проверяем вызов _execute_query_with_result
+ post_repository._execute_query_with_result.assert_called_once()
+ call_args = post_repository._execute_query_with_result.call_args
+ query = call_args[0][0]
+ params = call_args[0][1]
+
+ assert "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?" in query
+ assert params == (message_id,)
+
+ @pytest.mark.asyncio
+ async def test_get_post_text_and_anonymity_by_message_id_with_false(self, post_repository):
+ """Тест получения текста и is_anonymous по message_id (is_anonymous = False)."""
+ # Мокаем _execute_query_with_result
+ mock_result = [("Тестовый текст", 0)] # is_anonymous = 0 (False)
+ post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
+
+ message_id = 12345
+
+ result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
+
+ # Проверяем результат
+ text, is_anonymous = result
+ assert text == "Тестовый текст"
+ assert is_anonymous is False
+
+ @pytest.mark.asyncio
+ async def test_get_post_text_and_anonymity_by_message_id_with_null(self, post_repository):
+ """Тест получения текста и is_anonymous по message_id (is_anonymous = NULL)."""
+ # Мокаем _execute_query_with_result
+ mock_result = [("Тестовый текст", None)] # is_anonymous = NULL
+ post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
+
+ message_id = 12345
+
+ result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
+
+ # Проверяем результат
+ text, is_anonymous = result
+ assert text == "Тестовый текст"
+ assert is_anonymous is None
+
+ @pytest.mark.asyncio
+ async def test_get_post_text_and_anonymity_by_message_id_not_found(self, post_repository):
+ """Тест получения текста и is_anonymous по message_id (пост не найден)."""
+ # Мокаем _execute_query_with_result
+ mock_result = []
+ post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
+
+ message_id = 12345
+
+ result = await post_repository.get_post_text_and_anonymity_by_message_id(message_id)
+
+ # Проверяем результат
+ text, is_anonymous = result
+ assert text is None
+ assert is_anonymous is None
+
+ @pytest.mark.asyncio
+ async def test_get_post_text_and_anonymity_by_helper_id_found(self, post_repository):
+ """Тест получения текста и is_anonymous по helper_message_id (пост найден)."""
+ # Мокаем _execute_query_with_result
+ mock_result = [("Тестовый текст", 1)] # is_anonymous = 1 (True)
+ post_repository._execute_query_with_result = AsyncMock(return_value=mock_result)
+ post_repository.logger = MagicMock()
+
+ helper_message_id = 67890
+
+ result = await post_repository.get_post_text_and_anonymity_by_helper_id(helper_message_id)
+
+ # Проверяем результат
+ text, is_anonymous = result
+ assert text == "Тестовый текст"
+ assert is_anonymous is True
+
+ # Проверяем вызов _execute_query_with_result
+ post_repository._execute_query_with_result.assert_called_once()
+ call_args = post_repository._execute_query_with_result.call_args
+ query = call_args[0][0]
+ params = call_args[0][1]
+
+ assert "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" in query
+ assert params == (helper_message_id,)
+
+ @pytest.mark.asyncio
+ async def test_add_post_with_is_anonymous_true(self, post_repository):
+ """Тест добавления поста с is_anonymous=True."""
+ post = TelegramPost(
+ message_id=12345,
+ text="Тестовый пост анон",
+ author_id=67890,
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=True
+ )
+
+ post_repository._execute_query = AsyncMock()
+
+ await post_repository.add_post(post)
+
+ call_args = post_repository._execute_query.call_args
+ params = call_args[0][1]
+
+ # Проверяем, что is_anonymous преобразован в 1
+ assert params[5] == 1
+
+ @pytest.mark.asyncio
+ async def test_add_post_with_is_anonymous_false(self, post_repository):
+ """Тест добавления поста с is_anonymous=False."""
+ post = TelegramPost(
+ message_id=12345,
+ text="Тестовый пост неанон",
+ author_id=67890,
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=False
+ )
+
+ post_repository._execute_query = AsyncMock()
+
+ await post_repository.add_post(post)
+
+ call_args = post_repository._execute_query.call_args
+ params = call_args[0][1]
+
+ # Проверяем, что is_anonymous преобразован в 0
+ assert params[5] == 0
+
+ @pytest.mark.asyncio
+ async def test_add_post_with_is_anonymous_none(self, post_repository):
+ """Тест добавления поста с is_anonymous=None."""
+ post = TelegramPost(
+ message_id=12345,
+ text="Тестовый пост",
+ author_id=67890,
+ created_at=int(datetime.now().timestamp()),
+ is_anonymous=None
+ )
+
+ post_repository._execute_query = AsyncMock()
+
+ await post_repository.add_post(post)
+
+ call_args = post_repository._execute_query.call_args
+ params = call_args[0][1]
+
+ # Проверяем, что is_anonymous остался None
+ assert params[5] is None
+
@pytest.mark.asyncio
async def test_create_tables_logs_success(self, post_repository):
"""Тест логирования успешного создания таблиц."""
diff --git a/tests/test_post_repository_integration.py b/tests/test_post_repository_integration.py
index 9601aea..5b6139b 100644
--- a/tests/test_post_repository_integration.py
+++ b/tests/test_post_repository_integration.py
@@ -495,3 +495,52 @@ class TestPostRepositoryIntegration:
expected_message_ids = [11111, 22222, 33333, 44444]
for expected_id in expected_message_ids:
assert expected_id in post_ids
+
+ @pytest.mark.asyncio
+ async def test_update_status_by_message_id_integration(self, post_repository, sample_post):
+ """Интеграционный тест обновления статуса одиночного поста."""
+ await self._setup_test_database(post_repository)
+ await post_repository.add_post(sample_post)
+
+ await post_repository.update_status_by_message_id(sample_post.message_id, "approved")
+
+ rows = await post_repository._execute_query_with_result(
+ "SELECT status FROM post_from_telegram_suggest WHERE message_id = ?",
+ (sample_post.message_id,),
+ )
+ assert len(rows) == 1
+ assert rows[0][0] == "approved"
+
+ @pytest.mark.asyncio
+ async def test_update_status_for_media_group_by_helper_id_integration(
+ self, post_repository, sample_post_with_helper
+ ):
+ """Интеграционный тест обновления статуса медиагруппы по helper_message_id."""
+ await self._setup_test_database(post_repository)
+ await post_repository.add_post(sample_post_with_helper)
+ helper_message_id = 99999
+ helper_post = TelegramPost(
+ message_id=helper_message_id,
+ text="^",
+ author_id=67890,
+ helper_text_message_id=sample_post_with_helper.message_id,
+ created_at=int(datetime.now().timestamp()),
+ status="suggest",
+ )
+ await post_repository.add_post(helper_post)
+ await post_repository.update_helper_message(
+ sample_post_with_helper.message_id, helper_message_id
+ )
+
+ await post_repository.update_status_for_media_group_by_helper_id(
+ helper_message_id, "declined"
+ )
+
+ rows = await post_repository._execute_query_with_result(
+ "SELECT status FROM post_from_telegram_suggest "
+ "WHERE message_id = ? OR helper_text_message_id = ?",
+ (helper_message_id, helper_message_id),
+ )
+ assert len(rows) == 2
+ for row in rows:
+ assert row[0] == "declined"
diff --git a/tests/test_post_service.py b/tests/test_post_service.py
new file mode 100644
index 0000000..10db950
--- /dev/null
+++ b/tests/test_post_service.py
@@ -0,0 +1,287 @@
+"""Tests for PostService"""
+
+import pytest
+from unittest.mock import Mock, AsyncMock, MagicMock, patch
+from datetime import datetime
+from aiogram import types
+
+from helper_bot.handlers.private.services import PostService, BotSettings
+from database.models import TelegramPost, User
+
+
+class TestPostService:
+ """Test class for PostService"""
+
+ @pytest.fixture
+ def mock_db(self):
+ """Mock database"""
+ db = Mock()
+ db.add_post = AsyncMock()
+ db.update_helper_message = AsyncMock()
+ db.get_user_by_id = AsyncMock()
+ return db
+
+ @pytest.fixture
+ def mock_settings(self):
+ """Mock bot settings"""
+ return BotSettings(
+ group_for_posts="test_posts",
+ group_for_message="test_message",
+ main_public="test_public",
+ group_for_logs="test_logs",
+ important_logs="test_important",
+ preview_link="test_link",
+ logs="test_logs_setting",
+ test="test_test_setting"
+ )
+
+ @pytest.fixture
+ def post_service(self, mock_db, mock_settings):
+ """Create PostService instance"""
+ return PostService(mock_db, mock_settings)
+
+ @pytest.fixture
+ def mock_message(self):
+ """Mock Telegram message"""
+ message = Mock(spec=types.Message)
+ from_user = Mock()
+ from_user.id = 12345
+ from_user.first_name = "Test"
+ from_user.username = "testuser"
+ from_user.full_name = "Test User"
+ message.from_user = from_user
+ message.text = "Тестовый пост"
+ message.message_id = 100
+ message.bot = AsyncMock()
+ message.chat = Mock()
+ message.chat.id = 12345
+ return message
+
+ @pytest.mark.asyncio
+ async def test_handle_text_post_saves_raw_text(self, post_service, mock_message, mock_db):
+ """Test that handle_text_post saves raw text to database"""
+ with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted text"):
+ with patch('helper_bot.handlers.private.services.send_text_message', return_value=200):
+ with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
+
+ await post_service.handle_text_post(mock_message, "Test")
+
+ # Check that add_post was called
+ mock_db.add_post.assert_called_once()
+ call_args = mock_db.add_post.call_args[0][0]
+
+ # Check that raw text is saved
+ assert isinstance(call_args, TelegramPost)
+ assert call_args.text == "Тестовый пост" # Raw text
+ assert call_args.message_id == 200
+ assert call_args.author_id == 12345
+ assert call_args.is_anonymous is False
+
+ @pytest.mark.asyncio
+ async def test_handle_text_post_determines_anonymity(self, post_service, mock_message, mock_db):
+ """Test that handle_text_post determines anonymity correctly"""
+ mock_message.text = "Тестовый пост анон"
+
+ with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted text"):
+ with patch('helper_bot.handlers.private.services.send_text_message', return_value=200):
+ with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
+
+ await post_service.handle_text_post(mock_message, "Test")
+
+ call_args = mock_db.add_post.call_args[0][0]
+ assert call_args.is_anonymous is True
+
+ @pytest.mark.asyncio
+ async def test_handle_photo_post_saves_raw_caption(self, post_service, mock_message, mock_db):
+ """Test that handle_photo_post saves raw caption to database"""
+ mock_message.caption = "Тестовая подпись"
+ mock_message.photo = [Mock()]
+ mock_message.photo[-1].file_id = "photo_123"
+
+ sent_message = Mock()
+ sent_message.message_id = 201
+ sent_message.caption = "Formatted caption"
+
+ with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted caption"):
+ with patch('helper_bot.handlers.private.services.send_photo_message', return_value=sent_message):
+ with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
+ with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
+
+ await post_service.handle_photo_post(mock_message, "Test")
+
+ mock_db.add_post.assert_called_once()
+ call_args = mock_db.add_post.call_args[0][0]
+
+ # Check that raw caption is saved
+ assert call_args.text == "Тестовая подпись" # Raw caption
+ assert call_args.message_id == 201
+ assert call_args.is_anonymous is False
+
+ @pytest.mark.asyncio
+ async def test_handle_photo_post_without_caption(self, post_service, mock_message, mock_db):
+ """Test that handle_photo_post handles missing caption"""
+ mock_message.caption = None
+ mock_message.photo = [Mock()]
+ mock_message.photo[-1].file_id = "photo_123"
+
+ sent_message = Mock()
+ sent_message.message_id = 202
+
+ with patch('helper_bot.handlers.private.services.get_text_message', return_value=""):
+ with patch('helper_bot.handlers.private.services.send_photo_message', return_value=sent_message):
+ with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
+ with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
+
+ await post_service.handle_photo_post(mock_message, "Test")
+
+ call_args = mock_db.add_post.call_args[0][0]
+ assert call_args.text == "" # Empty string for missing caption
+ assert call_args.is_anonymous is False
+
+ @pytest.mark.asyncio
+ async def test_handle_video_post_saves_raw_caption(self, post_service, mock_message, mock_db):
+ """Test that handle_video_post saves raw caption to database"""
+ mock_message.caption = "Видео подпись"
+ mock_message.video = Mock()
+ mock_message.video.file_id = "video_123"
+
+ sent_message = Mock()
+ sent_message.message_id = 203
+
+ with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
+ with patch('helper_bot.handlers.private.services.send_video_message', return_value=sent_message):
+ with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
+ with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
+
+ await post_service.handle_video_post(mock_message, "Test")
+
+ call_args = mock_db.add_post.call_args[0][0]
+ assert call_args.text == "Видео подпись" # Raw caption
+ assert call_args.is_anonymous is True
+
+ @pytest.mark.asyncio
+ async def test_handle_audio_post_saves_raw_caption(self, post_service, mock_message, mock_db):
+ """Test that handle_audio_post saves raw caption to database"""
+ mock_message.caption = "Аудио подпись"
+ mock_message.audio = Mock()
+ mock_message.audio.file_id = "audio_123"
+
+ sent_message = Mock()
+ sent_message.message_id = 204
+
+ with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
+ with patch('helper_bot.handlers.private.services.send_audio_message', return_value=sent_message):
+ with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
+ with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
+
+ await post_service.handle_audio_post(mock_message, "Test")
+
+ call_args = mock_db.add_post.call_args[0][0]
+ assert call_args.text == "Аудио подпись" # Raw caption
+ assert call_args.is_anonymous is False
+
+ @pytest.mark.asyncio
+ async def test_handle_video_note_post_saves_empty_string(self, post_service, mock_message, mock_db):
+ """Test that handle_video_note_post saves empty string"""
+ mock_message.video_note = Mock()
+ mock_message.video_note.file_id = "video_note_123"
+
+ sent_message = Mock()
+ sent_message.message_id = 205
+
+ with patch('helper_bot.handlers.private.services.send_video_note_message', return_value=sent_message):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
+ with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
+
+ await post_service.handle_video_note_post(mock_message)
+
+ call_args = mock_db.add_post.call_args[0][0]
+ assert call_args.text == "" # Empty string
+ assert call_args.is_anonymous is False
+
+ @pytest.mark.asyncio
+ async def test_handle_voice_post_saves_empty_string(self, post_service, mock_message, mock_db):
+ """Test that handle_voice_post saves empty string"""
+ mock_message.voice = Mock()
+ mock_message.voice.file_id = "voice_123"
+
+ sent_message = Mock()
+ sent_message.message_id = 206
+
+ with patch('helper_bot.handlers.private.services.send_voice_message', return_value=sent_message):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
+ with patch('helper_bot.handlers.private.services.add_in_db_media', return_value=True):
+
+ await post_service.handle_voice_post(mock_message)
+
+ call_args = mock_db.add_post.call_args[0][0]
+ assert call_args.text == "" # Empty string
+ assert call_args.is_anonymous is False
+
+ @pytest.mark.asyncio
+ async def test_handle_media_group_post_saves_raw_caption(self, post_service, mock_message, mock_db):
+ """Test that handle_media_group_post saves raw caption to database"""
+ mock_message.message_id = 300
+ mock_message.media_group_id = 1
+
+ album = [Mock()]
+ album[0].caption = "Медиагруппа подпись"
+
+ with patch('helper_bot.handlers.private.services.get_text_message', return_value="Formatted"):
+ with patch('helper_bot.handlers.private.services.prepare_media_group_from_middlewares', return_value=[]):
+ with patch('helper_bot.handlers.private.services.send_media_group_message_to_private_chat', return_value=301):
+ with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.send_text_message', return_value=302):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=True):
+ with patch('asyncio.sleep', return_value=None):
+
+ await post_service.handle_media_group_post(mock_message, album, "Test")
+
+ # Check main post
+ calls = mock_db.add_post.call_args_list
+ main_post = calls[0][0][0]
+
+ assert main_post.text == "Медиагруппа подпись" # Raw caption
+ assert main_post.message_id == 300
+ assert main_post.is_anonymous is True
+
+ @pytest.mark.asyncio
+ async def test_handle_media_group_post_without_caption(self, post_service, mock_message, mock_db):
+ """Test that handle_media_group_post handles missing caption"""
+ mock_message.message_id = 301
+ mock_message.media_group_id = 1
+
+ album = [Mock()]
+ album[0].caption = None
+
+ with patch('helper_bot.handlers.private.services.get_text_message', return_value=" "):
+ with patch('helper_bot.handlers.private.services.prepare_media_group_from_middlewares', return_value=[]):
+ with patch('helper_bot.handlers.private.services.send_media_group_message_to_private_chat', return_value=302):
+ with patch('helper_bot.handlers.private.services.get_first_name', return_value="Test"):
+ with patch('helper_bot.handlers.private.services.get_reply_keyboard_for_post', return_value=None):
+ with patch('helper_bot.handlers.private.services.send_text_message', return_value=303):
+ with patch('helper_bot.handlers.private.services.determine_anonymity', return_value=False):
+ with patch('asyncio.sleep', return_value=None):
+
+ await post_service.handle_media_group_post(mock_message, album, "Test")
+
+ calls = mock_db.add_post.call_args_list
+ main_post = calls[0][0][0]
+
+ assert main_post.text == "" # Empty string for missing caption
+ assert main_post.is_anonymous is False
diff --git a/tests/test_refactored_admin_handlers.py b/tests/test_refactored_admin_handlers.py
index 8319e3e..10d0333 100644
--- a/tests/test_refactored_admin_handlers.py
+++ b/tests/test_refactored_admin_handlers.py
@@ -153,7 +153,7 @@ class TestAdminService:
self.mock_db.set_user_blacklist = AsyncMock(return_value=None)
# Act
- await self.admin_service.ban_user(user_id, username, reason, ban_days)
+ await self.admin_service.ban_user(user_id, username, reason, ban_days, ban_author_id=999)
# Assert
self.mock_db.check_user_in_blacklist.assert_called_once_with(user_id)
@@ -187,10 +187,10 @@ class TestAdminService:
self.mock_db.set_user_blacklist = AsyncMock(return_value=None)
# Act
- await self.admin_service.ban_user(user_id, username, reason, ban_days)
+ await self.admin_service.ban_user(user_id, username, reason, ban_days, ban_author_id=999)
# Assert
- self.mock_db.set_user_blacklist.assert_called_once_with(user_id, None, reason, None)
+ self.mock_db.set_user_blacklist.assert_called_once_with(user_id, None, reason, None, ban_author=999)
@pytest.mark.asyncio
async def test_unban_user_success(self):
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 9befce0..c53812c 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -6,6 +6,7 @@ import os
from helper_bot.utils.helper_func import (
get_first_name,
get_text_message,
+ determine_anonymity,
check_username_and_full_name,
safe_html_escape,
download_file,
@@ -64,12 +65,13 @@ class TestHelperFunctions:
def test_get_text_message(self, mock_message):
"""Тест функции обработки текста сообщения"""
- # Тест с обычным текстом
+ # Тест с обычным текстом (legacy - определяется по тексту)
text = "Привет, это тестовое сообщение"
result = get_text_message(text, "Test", "testuser")
assert "Test" in result
assert "testuser" in result
assert "тестовое сообщение" in result
+ assert "Автор поста" in result
# Тест с пустым текстом
result = get_text_message("", "Test", "testuser")
@@ -83,6 +85,98 @@ class TestHelperFunctions:
assert "testuser" in result
assert "Обычный текст без специальных слов" in result
+ def test_get_text_message_with_is_anonymous_true(self, mock_message):
+ """Тест функции get_text_message с is_anonymous=True"""
+ text = "Тестовый пост"
+ result = get_text_message(text, "Test", "testuser", is_anonymous=True)
+ assert "Пост из ТГ:" in result
+ assert "Тестовый пост" in result
+ assert "Пост опубликован анонимно" in result
+ assert "Автор поста" not in result
+
+ def test_get_text_message_with_is_anonymous_false(self, mock_message):
+ """Тест функции get_text_message с is_anonymous=False"""
+ text = "Тестовый пост"
+ result = get_text_message(text, "Test", "testuser", is_anonymous=False)
+ assert "Пост из ТГ:" in result
+ assert "Тестовый пост" in result
+ assert "Автор поста" in result
+ assert "Test" in result
+ assert "testuser" in result
+ assert "Пост опубликован анонимно" not in result
+
+ def test_get_text_message_with_is_anonymous_none_legacy(self, mock_message):
+ """Тест функции get_text_message с is_anonymous=None (legacy - определяется по тексту)"""
+ # Тест с "анон" в тексте
+ text = "Тестовый пост анон"
+ result = get_text_message(text, "Test", "testuser", is_anonymous=None)
+ assert "Пост из ТГ:" in result
+ assert "Тестовый пост анон" in result
+ assert "Пост опубликован анонимно" in result
+
+ # Тест с "неанон" в тексте
+ text = "Тестовый пост неанон"
+ result = get_text_message(text, "Test", "testuser", is_anonymous=None)
+ assert "Пост из ТГ:" in result
+ assert "Тестовый пост неанон" in result
+ assert "Автор поста" in result
+
+ # Тест с "не анон" в тексте
+ text = "Тестовый пост не анон"
+ result = get_text_message(text, "Test", "testuser", is_anonymous=None)
+ assert "Автор поста" in result
+
+ def test_get_text_message_with_username_none(self, mock_message):
+ """Тест функции get_text_message без username"""
+ text = "Тестовый пост"
+ result = get_text_message(text, "Test", None, is_anonymous=False)
+ assert "Test" in result
+ assert "(Ник не указан)" in result
+ assert "@" not in result
+
+ def test_determine_anonymity_with_anon(self):
+ """Тест функции determine_anonymity с 'анон' в тексте"""
+ assert determine_anonymity("Этот пост анон") is True
+ assert determine_anonymity("анон") is True
+ assert determine_anonymity("АНОН") is True # Проверка регистра
+ assert determine_anonymity("пост анонимный анон") is True
+
+ def test_determine_anonymity_with_neanon(self):
+ """Тест функции determine_anonymity с 'неанон' в тексте"""
+ assert determine_anonymity("Этот пост неанон") is False
+ assert determine_anonymity("неанон") is False
+ assert determine_anonymity("НЕАНОН") is False # Проверка регистра
+ assert determine_anonymity("пост неанон") is False
+
+ def test_determine_anonymity_with_ne_anon(self):
+ """Тест функции determine_anonymity с 'не анон' в тексте"""
+ assert determine_anonymity("Этот пост не анон") is False
+ assert determine_anonymity("не анон") is False
+ assert determine_anonymity("НЕ АНОН") is False # Проверка регистра
+ assert determine_anonymity("пост не анон") is False
+
+ def test_determine_anonymity_priority_neanon_over_anon(self):
+ """Тест приоритета 'неанон' над 'анон'"""
+ # Если есть и "анон" и "неанон", должен вернуть False
+ assert determine_anonymity("анон неанон") is False
+ assert determine_anonymity("неанон анон") is False
+ assert determine_anonymity("не анон анон") is False
+
+ def test_determine_anonymity_without_keywords(self):
+ """Тест функции determine_anonymity без ключевых слов"""
+ assert determine_anonymity("Обычный текст") is False
+ assert determine_anonymity("") is False
+ assert determine_anonymity("Пост без специальных слов") is False
+
+ def test_determine_anonymity_with_none(self):
+ """Тест функции determine_anonymity с None"""
+ assert determine_anonymity(None) is False
+
+ def test_determine_anonymity_with_empty_string(self):
+ """Тест функции determine_anonymity с пустой строкой"""
+ assert determine_anonymity("") is False
+ assert determine_anonymity(" ") is False # Только пробелы
+
@pytest.mark.asyncio
async def test_check_username_and_full_name(self):
"""Тест функции проверки изменений username и full_name"""