from datetime import datetime from typing import List, Optional, Tuple from database.base import DatabaseConnection from database.models import MessageContentLink, PostContent, TelegramPost class PostRepository(DatabaseConnection): """Репозиторий для работы с постами из Telegram.""" async def create_tables(self): """Создание таблиц для постов.""" # Таблица постов из Telegram post_query = """ CREATE TABLE IF NOT EXISTS post_from_telegram_suggest ( message_id INTEGER NOT NULL PRIMARY KEY, text TEXT, helper_text_message_id INTEGER, author_id INTEGER, created_at INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'suggest', is_anonymous INTEGER, published_message_id INTEGER, FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE ) """ await self._execute_query(post_query) # Добавляем поле published_message_id если его нет (для существующих БД) try: check_column_query = """ SELECT name FROM pragma_table_info('post_from_telegram_suggest') WHERE name = 'published_message_id' """ existing_columns = await self._execute_query_with_result(check_column_query) if not existing_columns: await self._execute_query( "ALTER TABLE post_from_telegram_suggest ADD COLUMN published_message_id INTEGER" ) self.logger.info( "Столбец published_message_id добавлен в post_from_telegram_suggest" ) except Exception as e: # Если проверка не удалась, пытаемся добавить столбец (может быть уже существует) try: await self._execute_query( "ALTER TABLE post_from_telegram_suggest ADD COLUMN published_message_id INTEGER" ) self.logger.info( "Столбец published_message_id добавлен в post_from_telegram_suggest (fallback)" ) except Exception: # Столбец уже существует, игнорируем ошибку pass # Таблица контента постов content_query = """ CREATE TABLE IF NOT EXISTS content_post_from_telegram ( message_id INTEGER NOT NULL, content_name TEXT NOT NULL, content_type TEXT, PRIMARY KEY (message_id, content_name), FOREIGN KEY (message_id) REFERENCES post_from_telegram_suggest (message_id) ON DELETE CASCADE ) """ await self._execute_query(content_query) # Таблица связи сообщений с контентом link_query = """ CREATE TABLE IF NOT EXISTS message_link_to_content ( post_id INTEGER NOT NULL, message_id INTEGER NOT NULL, PRIMARY KEY (post_id, message_id), FOREIGN KEY (post_id) REFERENCES post_from_telegram_suggest (message_id) ON DELETE CASCADE ) """ await self._execute_query(link_query) # Таблица контента опубликованных постов published_content_query = """ CREATE TABLE IF NOT EXISTS published_post_content ( published_message_id INTEGER NOT NULL, content_name TEXT NOT NULL, content_type TEXT, published_at INTEGER NOT NULL, PRIMARY KEY (published_message_id, content_name) ) """ await self._execute_query(published_content_query) # Создаем индексы try: await self._execute_query( "CREATE INDEX IF NOT EXISTS idx_published_post_content_message_id ON published_post_content(published_message_id)" ) await self._execute_query( "CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_published ON post_from_telegram_suggest(published_message_id)" ) except Exception: # Индексы уже существуют, игнорируем ошибку pass self.logger.info("Таблицы для постов созданы") async def add_post(self, post: TelegramPost) -> None: """Добавление поста.""" if not post.created_at: post.created_at = int(datetime.now().timestamp()) status = post.status if post.status else "suggest" # Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None) is_anonymous_int = ( None if post.is_anonymous is None else (1 if post.is_anonymous else 0) ) # Используем INSERT OR IGNORE чтобы избежать ошибок при повторном создании query = """ INSERT OR IGNORE INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status, is_anonymous) VALUES (?, ?, ?, ?, ?, ?) """ params = ( post.message_id, post.text, post.author_id, post.created_at, status, is_anonymous_int, ) await self._execute_query(query, params) self.logger.info( f"Пост добавлен (или уже существует): message_id={post.message_id}, text длина={len(post.text) if post.text else 0}, is_anonymous={is_anonymous_int}" ) async def update_helper_message( self, message_id: int, helper_message_id: int ) -> None: """Обновление helper сообщения.""" query = "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?" await self._execute_query(query, (helper_message_id, message_id)) async def update_status_by_message_id(self, message_id: int, status: str) -> int: """Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк.""" conn = None try: conn = await self._get_connection() await conn.execute( "UPDATE post_from_telegram_suggest SET status = ? WHERE message_id = ?", (status, message_id), ) cur = await conn.execute("SELECT changes()") row = await cur.fetchone() n = row[0] if row else 0 await conn.commit() if n == 0: self.logger.warning( f"update_status_by_message_id: 0 строк обновлено для message_id={message_id}, status={status}" ) else: self.logger.info( f"Статус поста message_id={message_id} обновлён на {status}" ) return n except Exception as e: if conn: await conn.rollback() self.logger.error( f"Ошибка при обновлении статуса message_id={message_id}: {e}" ) raise finally: if conn: await conn.close() async def update_status_for_media_group_by_helper_id( self, helper_message_id: int, status: str ) -> int: """Обновление статуса постов медиагруппы по helper_text_message_id. Возвращает число обновлённых строк.""" conn = None try: conn = await self._get_connection() await conn.execute( """ UPDATE post_from_telegram_suggest SET status = ? WHERE message_id = ? OR helper_text_message_id = ? """, (status, helper_message_id, helper_message_id), ) cur = await conn.execute("SELECT changes()") row = await cur.fetchone() n = row[0] if row else 0 await conn.commit() if n == 0: self.logger.warning( f"update_status_for_media_group_by_helper_id: 0 строк обновлено " f"для helper_message_id={helper_message_id}, status={status}" ) else: self.logger.info( f"Статус медиагруппы helper_message_id={helper_message_id} обновлён на {status}" ) return n except Exception as e: if conn: await conn.rollback() self.logger.error( f"Ошибка при обновлении статуса медиагруппы helper_message_id={helper_message_id}: {e}" ) raise finally: if conn: await conn.close() async def add_post_content( self, post_id: int, message_id: int, content_name: str, content_type: str ) -> bool: """Добавление контента поста.""" try: # Сначала добавляем связь link_query = "INSERT OR IGNORE INTO message_link_to_content (post_id, message_id) VALUES (?, ?)" await self._execute_query(link_query, (post_id, message_id)) # Затем добавляем контент content_query = """ INSERT OR IGNORE INTO content_post_from_telegram (message_id, content_name, content_type) VALUES (?, ?, ?) """ await self._execute_query( content_query, (message_id, content_name, content_type) ) self.logger.info( f"Контент поста добавлен: post_id={post_id}, message_id={message_id}" ) return True except Exception as e: self.logger.error(f"Ошибка при добавлении контента поста: {e}") return False async def add_message_link(self, post_id: int, message_id: int) -> bool: """Добавляет связь между post_id и message_id в таблицу message_link_to_content.""" try: self.logger.info( f"Добавление связи: post_id={post_id}, message_id={message_id}" ) link_query = "INSERT OR IGNORE INTO message_link_to_content (post_id, message_id) VALUES (?, ?)" await self._execute_query(link_query, (post_id, message_id)) self.logger.info( f"Связь успешно добавлена: post_id={post_id}, message_id={message_id}" ) return True except Exception as e: self.logger.error( f"Ошибка при добавлении связи post_id={post_id}, message_id={message_id}: {e}" ) return False async def get_post_content_by_helper_id( self, helper_message_id: int ) -> List[Tuple[str, str]]: """Получает контент поста по helper_text_message_id.""" query = """ SELECT cpft.content_name, cpft.content_type FROM post_from_telegram_suggest pft JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id JOIN content_post_from_telegram cpft ON cpft.message_id = mltc.message_id WHERE pft.helper_text_message_id = ? """ post_content = await self._execute_query_with_result( query, (helper_message_id,) ) self.logger.info(f"Получен контент поста: {len(post_content)} элементов") return post_content async def get_post_content_by_message_id( self, message_id: int ) -> List[Tuple[str, str]]: """Получает контент одиночного поста по message_id.""" query = """ SELECT cpft.content_name, cpft.content_type FROM post_from_telegram_suggest pft JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id JOIN content_post_from_telegram cpft ON cpft.message_id = mltc.message_id WHERE pft.message_id = ? AND pft.helper_text_message_id IS NULL """ post_content = await self._execute_query_with_result(query, (message_id,)) self.logger.info( f"Получен контент одиночного поста: {len(post_content)} элементов для message_id={message_id}" ) return post_content async def get_post_text_by_helper_id(self, helper_message_id: int) -> Optional[str]: """Получает текст поста по helper_text_message_id.""" query = "SELECT text FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" rows = await self._execute_query_with_result(query, (helper_message_id,)) row = rows[0] if rows else None if row: self.logger.info( f"Получен текст поста для helper_message_id={helper_message_id}" ) return row[0] return None async def get_post_ids_by_helper_id(self, helper_message_id: int) -> List[int]: """Получает ID сообщений по helper_text_message_id.""" query = """ SELECT mltc.message_id FROM post_from_telegram_suggest pft JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id WHERE pft.helper_text_message_id = ? """ rows = await self._execute_query_with_result(query, (helper_message_id,)) post_ids = [row[0] for row in rows] self.logger.info(f"Получены ID сообщений: {len(post_ids)} элементов") return post_ids async def get_author_id_by_message_id(self, message_id: int) -> Optional[int]: """Получает ID автора по message_id.""" query = "SELECT author_id FROM post_from_telegram_suggest WHERE message_id = ?" rows = await self._execute_query_with_result(query, (message_id,)) row = rows[0] if rows else None if row: author_id = row[0] self.logger.info( f"Получен author_id: {author_id} для message_id={message_id}" ) return author_id return None async def get_author_id_by_helper_message_id( self, helper_message_id: int ) -> Optional[int]: """Получает ID автора по helper_text_message_id.""" query = "SELECT author_id FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" rows = await self._execute_query_with_result(query, (helper_message_id,)) row = rows[0] if rows else None if row: author_id = row[0] self.logger.info( f"Получен author_id: {author_id} для helper_message_id={helper_message_id}" ) return author_id return None async def get_post_text_and_anonymity_by_message_id( self, message_id: int ) -> Tuple[Optional[str], Optional[bool]]: """Получает текст и is_anonymous поста по message_id.""" query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?" rows = await self._execute_query_with_result(query, (message_id,)) row = rows[0] if rows else None if row: text = row[0] or "" is_anonymous_int = row[1] # Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None) is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int) self.logger.info( f"Получены текст и is_anonymous для message_id={message_id}" ) return text, is_anonymous return None, None async def get_post_text_and_anonymity_by_helper_id( self, helper_message_id: int ) -> Tuple[Optional[str], Optional[bool]]: """Получает текст и is_anonymous поста по helper_text_message_id.""" query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" rows = await self._execute_query_with_result(query, (helper_message_id,)) row = rows[0] if rows else None if row: text = row[0] or "" is_anonymous_int = row[1] # Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None) is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int) self.logger.info( f"Получены текст и is_anonymous для helper_message_id={helper_message_id}" ) return text, is_anonymous return None, None async def update_published_message_id( self, original_message_id: int, published_message_id: int ) -> None: """Обновляет published_message_id для опубликованного поста.""" query = "UPDATE post_from_telegram_suggest SET published_message_id = ? WHERE message_id = ?" await self._execute_query(query, (published_message_id, original_message_id)) self.logger.info( f"Обновлен published_message_id: {original_message_id} -> {published_message_id}" ) async def add_published_post_content( self, published_message_id: int, content_path: str, content_type: str ) -> bool: """Добавляет контент опубликованного поста.""" try: from datetime import datetime published_at = int(datetime.now().timestamp()) query = """ INSERT OR IGNORE INTO published_post_content (published_message_id, content_name, content_type, published_at) VALUES (?, ?, ?, ?) """ await self._execute_query( query, (published_message_id, content_path, content_type, published_at) ) self.logger.info( f"Добавлен контент опубликованного поста: published_message_id={published_message_id}, type={content_type}" ) return True except Exception as e: self.logger.error( f"Ошибка при добавлении контента опубликованного поста: {e}" ) return False async def get_published_post_content( self, published_message_id: int ) -> List[Tuple[str, str]]: """Получает контент опубликованного поста.""" query = """ SELECT content_name, content_type FROM published_post_content WHERE published_message_id = ? """ post_content = await self._execute_query_with_result( query, (published_message_id,) ) self.logger.info( f"Получен контент опубликованного поста: {len(post_content)} элементов для published_message_id={published_message_id}" ) return post_content # ============================================ # Методы для работы с ML-скорингом # ============================================ async def update_ml_scores(self, message_id: int, ml_scores_json: str) -> bool: """ Обновляет ML-скоры для поста. Args: message_id: ID сообщения в группе модерации ml_scores_json: JSON строка со скорами Returns: True если обновлено успешно """ try: query = "UPDATE post_from_telegram_suggest SET ml_scores = ? WHERE message_id = ?" await self._execute_query(query, (ml_scores_json, message_id)) self.logger.info(f"ML-скоры обновлены для message_id={message_id}") return True except Exception as e: self.logger.error( f"Ошибка обновления ML-скоров для message_id={message_id}: {e}" ) return False async def get_ml_scores_by_message_id(self, message_id: int) -> Optional[str]: """ Получает ML-скоры для поста. Args: message_id: ID сообщения Returns: JSON строка со скорами или None """ query = "SELECT ml_scores FROM post_from_telegram_suggest WHERE message_id = ?" rows = await self._execute_query_with_result(query, (message_id,)) if rows and rows[0][0]: return rows[0][0] return None async def get_post_text_by_message_id(self, message_id: int) -> Optional[str]: """ Получает текст поста по message_id. Args: message_id: ID сообщения Returns: Текст поста или None """ query = "SELECT text FROM post_from_telegram_suggest WHERE message_id = ?" rows = await self._execute_query_with_result(query, (message_id,)) if rows and rows[0][0]: return rows[0][0] return None async def get_approved_posts_texts(self, limit: int = 1000) -> List[str]: """ Получает тексты опубликованных постов для обучения RAG. Args: limit: Максимальное количество постов Returns: Список текстов """ query = """ SELECT text FROM post_from_telegram_suggest WHERE status = 'approved' AND text IS NOT NULL AND text != '' AND text != '^' ORDER BY created_at DESC LIMIT ? """ rows = await self._execute_query_with_result(query, (limit,)) texts = [row[0] for row in rows if row[0]] self.logger.info(f"Получено {len(texts)} опубликованных постов для обучения") return texts async def get_declined_posts_texts(self, limit: int = 1000) -> List[str]: """ Получает тексты отклоненных постов для обучения RAG. Args: limit: Максимальное количество постов Returns: Список текстов """ query = """ SELECT text FROM post_from_telegram_suggest WHERE status = 'declined' AND text IS NOT NULL AND text != '' AND text != '^' ORDER BY created_at DESC LIMIT ? """ rows = await self._execute_query_with_result(query, (limit,)) texts = [row[0] for row in rows if row[0]] self.logger.info(f"Получено {len(texts)} отклоненных постов для обучения") return texts async def get_user_posts_stats(self, user_id: int) -> Tuple[int, int, int]: """ Получает статистику постов пользователя. Args: user_id: ID пользователя Returns: Tuple (approved_count, declined_count, suggest_count) """ query = """ SELECT SUM(CASE WHEN status = 'approved' THEN 1 ELSE 0 END) as approved, SUM(CASE WHEN status = 'declined' THEN 1 ELSE 0 END) as declined, SUM(CASE WHEN status = 'suggest' THEN 1 ELSE 0 END) as suggest FROM post_from_telegram_suggest WHERE author_id = ? AND text != '^' """ rows = await self._execute_query_with_result(query, (user_id,)) row = rows[0] if rows else None if row: approved = row[0] or 0 declined = row[1] or 0 suggest = row[2] or 0 self.logger.info( f"Статистика постов для user_id={user_id}: " f"approved={approved}, declined={declined}, suggest={suggest}" ) return (approved, declined, suggest) return (0, 0, 0) async def get_last_post_by_author(self, user_id: int) -> Optional[str]: """ Получает текст последнего поста пользователя. Args: user_id: ID пользователя Returns: Текст последнего поста или None, если постов нет """ query = """ SELECT text FROM post_from_telegram_suggest WHERE author_id = ? AND text IS NOT NULL AND text != '' AND text != '^' ORDER BY created_at DESC LIMIT 1 """ rows = await self._execute_query_with_result(query, (user_id,)) row = rows[0] if rows else None if row: text = row[0] self.logger.info( f"Последний пост для user_id={user_id}: '{text[:50]}...'" if len(text) > 50 else f"Последний пост для user_id={user_id}: '{text}'" ) return text self.logger.info(f"Постов для user_id={user_id} не найдено") return None