Files
telegram-helper-bot/database/repositories/post_repository.py
Andrey 89022aedaf Реализован функцоинал хранения сырых текстов поста в базе данных. Оформление поста происходит непосредственно перед его отправкой в канал.
- Реализованы методы `get_post_text_and_anonymity_by_message_id` и `get_post_text_and_anonymity_by_helper_id` в `PostRepository` для получения текста поста и флага анонимности.
- Обновлена модель `TelegramPost`, добавлено поле `is_anonymous`.
- Изменена схема базы данных для включения поля `is_anonymous` в таблицу `post_from_telegram_suggest`.
- Обновлены функции публикации постов в `PostPublishService` для учета анонимности.
- Добавлены тесты для проверки новых функций и корректности работы с анонимностью.
2026-01-23 12:12:21 +03:00

255 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime
from typing import Optional, List, Tuple
from database.base import DatabaseConnection
from database.models import TelegramPost, PostContent, MessageContentLink
class PostRepository(DatabaseConnection):
"""Репозиторий для работы с постами из Telegram."""
async def create_tables(self):
"""Создание таблиц для постов."""
# Таблица постов из Telegram
post_query = '''
CREATE TABLE IF NOT EXISTS post_from_telegram_suggest (
message_id INTEGER NOT NULL PRIMARY KEY,
text TEXT,
helper_text_message_id INTEGER,
author_id INTEGER,
created_at INTEGER NOT NULL,
status TEXT NOT NULL DEFAULT 'suggest',
is_anonymous INTEGER,
FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE
)
'''
await self._execute_query(post_query)
# Таблица контента постов
content_query = '''
CREATE TABLE IF NOT EXISTS content_post_from_telegram (
message_id INTEGER NOT NULL,
content_name TEXT NOT NULL,
content_type TEXT,
PRIMARY KEY (message_id, content_name),
FOREIGN KEY (message_id) REFERENCES post_from_telegram_suggest (message_id) ON DELETE CASCADE
)
'''
await self._execute_query(content_query)
# Таблица связи сообщений с контентом
link_query = '''
CREATE TABLE IF NOT EXISTS message_link_to_content (
post_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
PRIMARY KEY (post_id, message_id),
FOREIGN KEY (post_id) REFERENCES post_from_telegram_suggest (message_id) ON DELETE CASCADE
)
'''
await self._execute_query(link_query)
self.logger.info("Таблицы для постов созданы")
async def add_post(self, post: TelegramPost) -> None:
"""Добавление поста."""
if not post.created_at:
post.created_at = int(datetime.now().timestamp())
status = post.status if post.status else "suggest"
# Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None)
is_anonymous_int = None if post.is_anonymous is None else (1 if post.is_anonymous else 0)
query = """
INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status, is_anonymous)
VALUES (?, ?, ?, ?, ?, ?)
"""
params = (post.message_id, post.text, post.author_id, post.created_at, status, is_anonymous_int)
await self._execute_query(query, params)
self.logger.info(f"Пост добавлен: message_id={post.message_id}")
async def update_helper_message(self, message_id: int, helper_message_id: int) -> None:
"""Обновление helper сообщения."""
query = "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?"
await self._execute_query(query, (helper_message_id, message_id))
async def update_status_by_message_id(self, message_id: int, status: str) -> int:
"""Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"UPDATE post_from_telegram_suggest SET status = ? WHERE message_id = ?",
(status, message_id),
)
cur = await conn.execute("SELECT changes()")
row = await cur.fetchone()
n = row[0] if row else 0
await conn.commit()
if n == 0:
self.logger.warning(
f"update_status_by_message_id: 0 строк обновлено для message_id={message_id}, status={status}"
)
else:
self.logger.info(f"Статус поста message_id={message_id} обновлён на {status}")
return n
except Exception as e:
if conn:
await conn.rollback()
self.logger.error(f"Ошибка при обновлении статуса message_id={message_id}: {e}")
raise
finally:
if conn:
await conn.close()
async def update_status_for_media_group_by_helper_id(
self, helper_message_id: int, status: str
) -> int:
"""Обновление статуса постов медиагруппы по helper_text_message_id. Возвращает число обновлённых строк."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"""
UPDATE post_from_telegram_suggest
SET status = ?
WHERE message_id = ? OR helper_text_message_id = ?
""",
(status, helper_message_id, helper_message_id),
)
cur = await conn.execute("SELECT changes()")
row = await cur.fetchone()
n = row[0] if row else 0
await conn.commit()
if n == 0:
self.logger.warning(
f"update_status_for_media_group_by_helper_id: 0 строк обновлено "
f"для helper_message_id={helper_message_id}, status={status}"
)
else:
self.logger.info(
f"Статус медиагруппы helper_message_id={helper_message_id} обновлён на {status}"
)
return n
except Exception as e:
if conn:
await conn.rollback()
self.logger.error(
f"Ошибка при обновлении статуса медиагруппы helper_message_id={helper_message_id}: {e}"
)
raise
finally:
if conn:
await conn.close()
async def add_post_content(self, post_id: int, message_id: int, content_name: str, content_type: str) -> bool:
"""Добавление контента поста."""
try:
# Сначала добавляем связь
link_query = "INSERT OR IGNORE INTO message_link_to_content (post_id, message_id) VALUES (?, ?)"
await self._execute_query(link_query, (post_id, message_id))
# Затем добавляем контент
content_query = """
INSERT OR IGNORE INTO content_post_from_telegram (message_id, content_name, content_type)
VALUES (?, ?, ?)
"""
await self._execute_query(content_query, (message_id, content_name, content_type))
self.logger.info(f"Контент поста добавлен: post_id={post_id}, message_id={message_id}")
return True
except Exception as e:
self.logger.error(f"Ошибка при добавлении контента поста: {e}")
return False
async def get_post_content_by_helper_id(self, helper_message_id: int) -> List[Tuple[str, str]]:
"""Получает контент поста по helper_text_message_id."""
query = """
SELECT cpft.content_name, cpft.content_type
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id
JOIN content_post_from_telegram cpft ON cpft.message_id = mltc.message_id
WHERE pft.helper_text_message_id = ?
"""
post_content = await self._execute_query_with_result(query, (helper_message_id,))
self.logger.info(f"Получен контент поста: {len(post_content)} элементов")
return post_content
async def get_post_text_by_helper_id(self, helper_message_id: int) -> Optional[str]:
"""Получает текст поста по helper_text_message_id."""
query = "SELECT text FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
rows = await self._execute_query_with_result(query, (helper_message_id,))
row = rows[0] if rows else None
if row:
self.logger.info(f"Получен текст поста для helper_message_id={helper_message_id}")
return row[0]
return None
async def get_post_ids_by_helper_id(self, helper_message_id: int) -> List[int]:
"""Получает ID сообщений по helper_text_message_id."""
query = """
SELECT mltc.message_id
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id
WHERE pft.helper_text_message_id = ?
"""
rows = await self._execute_query_with_result(query, (helper_message_id,))
post_ids = [row[0] for row in rows]
self.logger.info(f"Получены ID сообщений: {len(post_ids)} элементов")
return post_ids
async def get_author_id_by_message_id(self, message_id: int) -> Optional[int]:
"""Получает ID автора по message_id."""
query = "SELECT author_id FROM post_from_telegram_suggest WHERE message_id = ?"
rows = await self._execute_query_with_result(query, (message_id,))
row = rows[0] if rows else None
if row:
author_id = row[0]
self.logger.info(f"Получен author_id: {author_id} для message_id={message_id}")
return author_id
return None
async def get_author_id_by_helper_message_id(self, helper_message_id: int) -> Optional[int]:
"""Получает ID автора по helper_text_message_id."""
query = "SELECT author_id FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
rows = await self._execute_query_with_result(query, (helper_message_id,))
row = rows[0] if rows else None
if row:
author_id = row[0]
self.logger.info(f"Получен author_id: {author_id} для helper_message_id={helper_message_id}")
return author_id
return None
async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> Tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по message_id."""
query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?"
rows = await self._execute_query_with_result(query, (message_id,))
row = rows[0] if rows else None
if row:
text = row[0] or ""
is_anonymous_int = row[1]
# Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
self.logger.info(f"Получены текст и is_anonymous для message_id={message_id}")
return text, is_anonymous
return None, None
async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> Tuple[Optional[str], Optional[bool]]:
"""Получает текст и is_anonymous поста по helper_text_message_id."""
query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
rows = await self._execute_query_with_result(query, (helper_message_id,))
row = rows[0] if rows else None
if row:
text = row[0] or ""
is_anonymous_int = row[1]
# Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
self.logger.info(f"Получены текст и is_anonymous для helper_message_id={helper_message_id}")
return text, is_anonymous
return None, None