from datetime import datetime from typing import Optional, List, Tuple from database.base import DatabaseConnection from database.models import TelegramPost, PostContent, MessageContentLink class PostRepository(DatabaseConnection): """Репозиторий для работы с постами из Telegram.""" async def create_tables(self): """Создание таблиц для постов.""" # Таблица постов из Telegram post_query = ''' CREATE TABLE IF NOT EXISTS post_from_telegram_suggest ( message_id INTEGER NOT NULL PRIMARY KEY, text TEXT, helper_text_message_id INTEGER, author_id INTEGER, created_at INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'suggest', is_anonymous INTEGER, FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE ) ''' await self._execute_query(post_query) # Таблица контента постов content_query = ''' CREATE TABLE IF NOT EXISTS content_post_from_telegram ( message_id INTEGER NOT NULL, content_name TEXT NOT NULL, content_type TEXT, PRIMARY KEY (message_id, content_name), FOREIGN KEY (message_id) REFERENCES post_from_telegram_suggest (message_id) ON DELETE CASCADE ) ''' await self._execute_query(content_query) # Таблица связи сообщений с контентом link_query = ''' CREATE TABLE IF NOT EXISTS message_link_to_content ( post_id INTEGER NOT NULL, message_id INTEGER NOT NULL, PRIMARY KEY (post_id, message_id), FOREIGN KEY (post_id) REFERENCES post_from_telegram_suggest (message_id) ON DELETE CASCADE ) ''' await self._execute_query(link_query) self.logger.info("Таблицы для постов созданы") async def add_post(self, post: TelegramPost) -> None: """Добавление поста.""" if not post.created_at: post.created_at = int(datetime.now().timestamp()) status = post.status if post.status else "suggest" # Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None) is_anonymous_int = None if post.is_anonymous is None else (1 if post.is_anonymous else 0) query = """ INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status, is_anonymous) VALUES (?, ?, ?, ?, ?, ?) """ params = (post.message_id, post.text, post.author_id, post.created_at, status, is_anonymous_int) await self._execute_query(query, params) self.logger.info(f"Пост добавлен: message_id={post.message_id}") async def update_helper_message(self, message_id: int, helper_message_id: int) -> None: """Обновление helper сообщения.""" query = "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?" await self._execute_query(query, (helper_message_id, message_id)) async def update_status_by_message_id(self, message_id: int, status: str) -> int: """Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк.""" conn = None try: conn = await self._get_connection() await conn.execute( "UPDATE post_from_telegram_suggest SET status = ? WHERE message_id = ?", (status, message_id), ) cur = await conn.execute("SELECT changes()") row = await cur.fetchone() n = row[0] if row else 0 await conn.commit() if n == 0: self.logger.warning( f"update_status_by_message_id: 0 строк обновлено для message_id={message_id}, status={status}" ) else: self.logger.info(f"Статус поста message_id={message_id} обновлён на {status}") return n except Exception as e: if conn: await conn.rollback() self.logger.error(f"Ошибка при обновлении статуса message_id={message_id}: {e}") raise finally: if conn: await conn.close() async def update_status_for_media_group_by_helper_id( self, helper_message_id: int, status: str ) -> int: """Обновление статуса постов медиагруппы по helper_text_message_id. Возвращает число обновлённых строк.""" conn = None try: conn = await self._get_connection() await conn.execute( """ UPDATE post_from_telegram_suggest SET status = ? WHERE message_id = ? OR helper_text_message_id = ? """, (status, helper_message_id, helper_message_id), ) cur = await conn.execute("SELECT changes()") row = await cur.fetchone() n = row[0] if row else 0 await conn.commit() if n == 0: self.logger.warning( f"update_status_for_media_group_by_helper_id: 0 строк обновлено " f"для helper_message_id={helper_message_id}, status={status}" ) else: self.logger.info( f"Статус медиагруппы helper_message_id={helper_message_id} обновлён на {status}" ) return n except Exception as e: if conn: await conn.rollback() self.logger.error( f"Ошибка при обновлении статуса медиагруппы helper_message_id={helper_message_id}: {e}" ) raise finally: if conn: await conn.close() async def add_post_content(self, post_id: int, message_id: int, content_name: str, content_type: str) -> bool: """Добавление контента поста.""" try: # Сначала добавляем связь link_query = "INSERT OR IGNORE INTO message_link_to_content (post_id, message_id) VALUES (?, ?)" await self._execute_query(link_query, (post_id, message_id)) # Затем добавляем контент content_query = """ INSERT OR IGNORE INTO content_post_from_telegram (message_id, content_name, content_type) VALUES (?, ?, ?) """ await self._execute_query(content_query, (message_id, content_name, content_type)) self.logger.info(f"Контент поста добавлен: post_id={post_id}, message_id={message_id}") return True except Exception as e: self.logger.error(f"Ошибка при добавлении контента поста: {e}") return False async def get_post_content_by_helper_id(self, helper_message_id: int) -> List[Tuple[str, str]]: """Получает контент поста по helper_text_message_id.""" query = """ SELECT cpft.content_name, cpft.content_type FROM post_from_telegram_suggest pft JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id JOIN content_post_from_telegram cpft ON cpft.message_id = mltc.message_id WHERE pft.helper_text_message_id = ? """ post_content = await self._execute_query_with_result(query, (helper_message_id,)) self.logger.info(f"Получен контент поста: {len(post_content)} элементов") return post_content async def get_post_text_by_helper_id(self, helper_message_id: int) -> Optional[str]: """Получает текст поста по helper_text_message_id.""" query = "SELECT text FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" rows = await self._execute_query_with_result(query, (helper_message_id,)) row = rows[0] if rows else None if row: self.logger.info(f"Получен текст поста для helper_message_id={helper_message_id}") return row[0] return None async def get_post_ids_by_helper_id(self, helper_message_id: int) -> List[int]: """Получает ID сообщений по helper_text_message_id.""" query = """ SELECT mltc.message_id FROM post_from_telegram_suggest pft JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id WHERE pft.helper_text_message_id = ? """ rows = await self._execute_query_with_result(query, (helper_message_id,)) post_ids = [row[0] for row in rows] self.logger.info(f"Получены ID сообщений: {len(post_ids)} элементов") return post_ids async def get_author_id_by_message_id(self, message_id: int) -> Optional[int]: """Получает ID автора по message_id.""" query = "SELECT author_id FROM post_from_telegram_suggest WHERE message_id = ?" rows = await self._execute_query_with_result(query, (message_id,)) row = rows[0] if rows else None if row: author_id = row[0] self.logger.info(f"Получен author_id: {author_id} для message_id={message_id}") return author_id return None async def get_author_id_by_helper_message_id(self, helper_message_id: int) -> Optional[int]: """Получает ID автора по helper_text_message_id.""" query = "SELECT author_id FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" rows = await self._execute_query_with_result(query, (helper_message_id,)) row = rows[0] if rows else None if row: author_id = row[0] self.logger.info(f"Получен author_id: {author_id} для helper_message_id={helper_message_id}") return author_id return None async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> Tuple[Optional[str], Optional[bool]]: """Получает текст и is_anonymous поста по message_id.""" query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?" rows = await self._execute_query_with_result(query, (message_id,)) row = rows[0] if rows else None if row: text = row[0] or "" is_anonymous_int = row[1] # Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None) is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int) self.logger.info(f"Получены текст и is_anonymous для message_id={message_id}") return text, is_anonymous return None, None async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> Tuple[Optional[str], Optional[bool]]: """Получает текст и is_anonymous поста по helper_text_message_id.""" query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" rows = await self._execute_query_with_result(query, (helper_message_id,)) row = rows[0] if rows else None if row: text = row[0] or "" is_anonymous_int = row[1] # Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None) is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int) self.logger.info(f"Получены текст и is_anonymous для helper_message_id={helper_message_id}") return text, is_anonymous return None, None