- Обновлен Dockerfile для использования Alpine вместо Slim, улучшая размер образа. - Удален устаревший RAGService и добавлен RagApiClient для работы с внешним RAG API. - Обновлены переменные окружения в env.example для настройки нового RAG API. - Обновлен ScoringManager для интеграции с RagApiClient. - Упрощена структура проекта, удалены ненужные файлы и зависимости, связанные с векторным хранилищем. - Обновлены обработчики и функции для работы с новым API, включая получение статистики и обработку ошибок.
465 lines
23 KiB
Python
465 lines
23 KiB
Python
from datetime import datetime
|
||
from typing import List, Optional, Tuple
|
||
|
||
from database.base import DatabaseConnection
|
||
from database.models import MessageContentLink, PostContent, TelegramPost
|
||
|
||
|
||
class PostRepository(DatabaseConnection):
|
||
"""Репозиторий для работы с постами из Telegram."""
|
||
|
||
async def create_tables(self):
|
||
"""Создание таблиц для постов."""
|
||
# Таблица постов из Telegram
|
||
post_query = '''
|
||
CREATE TABLE IF NOT EXISTS post_from_telegram_suggest (
|
||
message_id INTEGER NOT NULL PRIMARY KEY,
|
||
text TEXT,
|
||
helper_text_message_id INTEGER,
|
||
author_id INTEGER,
|
||
created_at INTEGER NOT NULL,
|
||
status TEXT NOT NULL DEFAULT 'suggest',
|
||
is_anonymous INTEGER,
|
||
published_message_id INTEGER,
|
||
FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE
|
||
)
|
||
'''
|
||
await self._execute_query(post_query)
|
||
|
||
# Добавляем поле published_message_id если его нет (для существующих БД)
|
||
try:
|
||
check_column_query = """
|
||
SELECT name FROM pragma_table_info('post_from_telegram_suggest')
|
||
WHERE name = 'published_message_id'
|
||
"""
|
||
existing_columns = await self._execute_query_with_result(check_column_query)
|
||
if not existing_columns:
|
||
await self._execute_query('ALTER TABLE post_from_telegram_suggest ADD COLUMN published_message_id INTEGER')
|
||
self.logger.info("Столбец published_message_id добавлен в post_from_telegram_suggest")
|
||
except Exception as e:
|
||
# Если проверка не удалась, пытаемся добавить столбец (может быть уже существует)
|
||
try:
|
||
await self._execute_query('ALTER TABLE post_from_telegram_suggest ADD COLUMN published_message_id INTEGER')
|
||
self.logger.info("Столбец published_message_id добавлен в post_from_telegram_suggest (fallback)")
|
||
except Exception:
|
||
# Столбец уже существует, игнорируем ошибку
|
||
pass
|
||
|
||
# Таблица контента постов
|
||
content_query = '''
|
||
CREATE TABLE IF NOT EXISTS content_post_from_telegram (
|
||
message_id INTEGER NOT NULL,
|
||
content_name TEXT NOT NULL,
|
||
content_type TEXT,
|
||
PRIMARY KEY (message_id, content_name),
|
||
FOREIGN KEY (message_id) REFERENCES post_from_telegram_suggest (message_id) ON DELETE CASCADE
|
||
)
|
||
'''
|
||
await self._execute_query(content_query)
|
||
|
||
# Таблица связи сообщений с контентом
|
||
link_query = '''
|
||
CREATE TABLE IF NOT EXISTS message_link_to_content (
|
||
post_id INTEGER NOT NULL,
|
||
message_id INTEGER NOT NULL,
|
||
PRIMARY KEY (post_id, message_id),
|
||
FOREIGN KEY (post_id) REFERENCES post_from_telegram_suggest (message_id) ON DELETE CASCADE
|
||
)
|
||
'''
|
||
await self._execute_query(link_query)
|
||
|
||
# Таблица контента опубликованных постов
|
||
published_content_query = '''
|
||
CREATE TABLE IF NOT EXISTS published_post_content (
|
||
published_message_id INTEGER NOT NULL,
|
||
content_name TEXT NOT NULL,
|
||
content_type TEXT,
|
||
published_at INTEGER NOT NULL,
|
||
PRIMARY KEY (published_message_id, content_name)
|
||
)
|
||
'''
|
||
await self._execute_query(published_content_query)
|
||
|
||
# Создаем индексы
|
||
try:
|
||
await self._execute_query('CREATE INDEX IF NOT EXISTS idx_published_post_content_message_id ON published_post_content(published_message_id)')
|
||
await self._execute_query('CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_published ON post_from_telegram_suggest(published_message_id)')
|
||
except Exception:
|
||
# Индексы уже существуют, игнорируем ошибку
|
||
pass
|
||
|
||
self.logger.info("Таблицы для постов созданы")
|
||
|
||
async def add_post(self, post: TelegramPost) -> None:
|
||
"""Добавление поста."""
|
||
if not post.created_at:
|
||
post.created_at = int(datetime.now().timestamp())
|
||
status = post.status if post.status else "suggest"
|
||
# Преобразуем bool в int для SQLite (True -> 1, False -> 0, None -> None)
|
||
is_anonymous_int = None if post.is_anonymous is None else (1 if post.is_anonymous else 0)
|
||
|
||
# Используем INSERT OR IGNORE чтобы избежать ошибок при повторном создании
|
||
query = """
|
||
INSERT OR IGNORE INTO post_from_telegram_suggest (message_id, text, author_id, created_at, status, is_anonymous)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
"""
|
||
params = (post.message_id, post.text, post.author_id, post.created_at, status, is_anonymous_int)
|
||
|
||
await self._execute_query(query, params)
|
||
self.logger.info(f"Пост добавлен (или уже существует): message_id={post.message_id}, text длина={len(post.text) if post.text else 0}, is_anonymous={is_anonymous_int}")
|
||
|
||
async def update_helper_message(self, message_id: int, helper_message_id: int) -> None:
|
||
"""Обновление helper сообщения."""
|
||
query = "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?"
|
||
await self._execute_query(query, (helper_message_id, message_id))
|
||
|
||
async def update_status_by_message_id(self, message_id: int, status: str) -> int:
|
||
"""Обновление статуса поста по message_id (одиночные посты). Возвращает число обновлённых строк."""
|
||
conn = None
|
||
try:
|
||
conn = await self._get_connection()
|
||
await conn.execute(
|
||
"UPDATE post_from_telegram_suggest SET status = ? WHERE message_id = ?",
|
||
(status, message_id),
|
||
)
|
||
cur = await conn.execute("SELECT changes()")
|
||
row = await cur.fetchone()
|
||
n = row[0] if row else 0
|
||
await conn.commit()
|
||
if n == 0:
|
||
self.logger.warning(
|
||
f"update_status_by_message_id: 0 строк обновлено для message_id={message_id}, status={status}"
|
||
)
|
||
else:
|
||
self.logger.info(f"Статус поста message_id={message_id} обновлён на {status}")
|
||
return n
|
||
except Exception as e:
|
||
if conn:
|
||
await conn.rollback()
|
||
self.logger.error(f"Ошибка при обновлении статуса message_id={message_id}: {e}")
|
||
raise
|
||
finally:
|
||
if conn:
|
||
await conn.close()
|
||
|
||
async def update_status_for_media_group_by_helper_id(
|
||
self, helper_message_id: int, status: str
|
||
) -> int:
|
||
"""Обновление статуса постов медиагруппы по helper_text_message_id. Возвращает число обновлённых строк."""
|
||
conn = None
|
||
try:
|
||
conn = await self._get_connection()
|
||
await conn.execute(
|
||
"""
|
||
UPDATE post_from_telegram_suggest
|
||
SET status = ?
|
||
WHERE message_id = ? OR helper_text_message_id = ?
|
||
""",
|
||
(status, helper_message_id, helper_message_id),
|
||
)
|
||
cur = await conn.execute("SELECT changes()")
|
||
row = await cur.fetchone()
|
||
n = row[0] if row else 0
|
||
await conn.commit()
|
||
if n == 0:
|
||
self.logger.warning(
|
||
f"update_status_for_media_group_by_helper_id: 0 строк обновлено "
|
||
f"для helper_message_id={helper_message_id}, status={status}"
|
||
)
|
||
else:
|
||
self.logger.info(
|
||
f"Статус медиагруппы helper_message_id={helper_message_id} обновлён на {status}"
|
||
)
|
||
return n
|
||
except Exception as e:
|
||
if conn:
|
||
await conn.rollback()
|
||
self.logger.error(
|
||
f"Ошибка при обновлении статуса медиагруппы helper_message_id={helper_message_id}: {e}"
|
||
)
|
||
raise
|
||
finally:
|
||
if conn:
|
||
await conn.close()
|
||
|
||
async def add_post_content(self, post_id: int, message_id: int, content_name: str, content_type: str) -> bool:
|
||
"""Добавление контента поста."""
|
||
try:
|
||
# Сначала добавляем связь
|
||
link_query = "INSERT OR IGNORE INTO message_link_to_content (post_id, message_id) VALUES (?, ?)"
|
||
await self._execute_query(link_query, (post_id, message_id))
|
||
|
||
# Затем добавляем контент
|
||
content_query = """
|
||
INSERT OR IGNORE INTO content_post_from_telegram (message_id, content_name, content_type)
|
||
VALUES (?, ?, ?)
|
||
"""
|
||
await self._execute_query(content_query, (message_id, content_name, content_type))
|
||
|
||
self.logger.info(f"Контент поста добавлен: post_id={post_id}, message_id={message_id}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при добавлении контента поста: {e}")
|
||
return False
|
||
|
||
async def add_message_link(self, post_id: int, message_id: int) -> bool:
|
||
"""Добавляет связь между post_id и message_id в таблицу message_link_to_content."""
|
||
try:
|
||
self.logger.info(f"Добавление связи: post_id={post_id}, message_id={message_id}")
|
||
link_query = "INSERT OR IGNORE INTO message_link_to_content (post_id, message_id) VALUES (?, ?)"
|
||
await self._execute_query(link_query, (post_id, message_id))
|
||
self.logger.info(f"Связь успешно добавлена: post_id={post_id}, message_id={message_id}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при добавлении связи post_id={post_id}, message_id={message_id}: {e}")
|
||
return False
|
||
|
||
async def get_post_content_by_helper_id(self, helper_message_id: int) -> List[Tuple[str, str]]:
|
||
"""Получает контент поста по helper_text_message_id."""
|
||
query = """
|
||
SELECT cpft.content_name, cpft.content_type
|
||
FROM post_from_telegram_suggest pft
|
||
JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id
|
||
JOIN content_post_from_telegram cpft ON cpft.message_id = mltc.message_id
|
||
WHERE pft.helper_text_message_id = ?
|
||
"""
|
||
post_content = await self._execute_query_with_result(query, (helper_message_id,))
|
||
|
||
self.logger.info(f"Получен контент поста: {len(post_content)} элементов")
|
||
return post_content
|
||
|
||
async def get_post_content_by_message_id(self, message_id: int) -> List[Tuple[str, str]]:
|
||
"""Получает контент одиночного поста по message_id."""
|
||
query = """
|
||
SELECT cpft.content_name, cpft.content_type
|
||
FROM post_from_telegram_suggest pft
|
||
JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id
|
||
JOIN content_post_from_telegram cpft ON cpft.message_id = mltc.message_id
|
||
WHERE pft.message_id = ? AND pft.helper_text_message_id IS NULL
|
||
"""
|
||
post_content = await self._execute_query_with_result(query, (message_id,))
|
||
|
||
self.logger.info(f"Получен контент одиночного поста: {len(post_content)} элементов для message_id={message_id}")
|
||
return post_content
|
||
|
||
async def get_post_text_by_helper_id(self, helper_message_id: int) -> Optional[str]:
|
||
"""Получает текст поста по helper_text_message_id."""
|
||
query = "SELECT text FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
|
||
rows = await self._execute_query_with_result(query, (helper_message_id,))
|
||
row = rows[0] if rows else None
|
||
|
||
if row:
|
||
self.logger.info(f"Получен текст поста для helper_message_id={helper_message_id}")
|
||
return row[0]
|
||
return None
|
||
|
||
async def get_post_ids_by_helper_id(self, helper_message_id: int) -> List[int]:
|
||
"""Получает ID сообщений по helper_text_message_id."""
|
||
query = """
|
||
SELECT mltc.message_id
|
||
FROM post_from_telegram_suggest pft
|
||
JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id
|
||
WHERE pft.helper_text_message_id = ?
|
||
"""
|
||
rows = await self._execute_query_with_result(query, (helper_message_id,))
|
||
|
||
post_ids = [row[0] for row in rows]
|
||
self.logger.info(f"Получены ID сообщений: {len(post_ids)} элементов")
|
||
return post_ids
|
||
|
||
async def get_author_id_by_message_id(self, message_id: int) -> Optional[int]:
|
||
"""Получает ID автора по message_id."""
|
||
query = "SELECT author_id FROM post_from_telegram_suggest WHERE message_id = ?"
|
||
rows = await self._execute_query_with_result(query, (message_id,))
|
||
row = rows[0] if rows else None
|
||
|
||
if row:
|
||
author_id = row[0]
|
||
self.logger.info(f"Получен author_id: {author_id} для message_id={message_id}")
|
||
return author_id
|
||
return None
|
||
|
||
async def get_author_id_by_helper_message_id(self, helper_message_id: int) -> Optional[int]:
|
||
"""Получает ID автора по helper_text_message_id."""
|
||
query = "SELECT author_id FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
|
||
rows = await self._execute_query_with_result(query, (helper_message_id,))
|
||
row = rows[0] if rows else None
|
||
|
||
if row:
|
||
author_id = row[0]
|
||
self.logger.info(f"Получен author_id: {author_id} для helper_message_id={helper_message_id}")
|
||
return author_id
|
||
return None
|
||
|
||
async def get_post_text_and_anonymity_by_message_id(self, message_id: int) -> Tuple[Optional[str], Optional[bool]]:
|
||
"""Получает текст и is_anonymous поста по message_id."""
|
||
query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE message_id = ?"
|
||
rows = await self._execute_query_with_result(query, (message_id,))
|
||
row = rows[0] if rows else None
|
||
|
||
if row:
|
||
text = row[0] or ""
|
||
is_anonymous_int = row[1]
|
||
# Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
|
||
is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
|
||
self.logger.info(f"Получены текст и is_anonymous для message_id={message_id}")
|
||
return text, is_anonymous
|
||
return None, None
|
||
|
||
async def get_post_text_and_anonymity_by_helper_id(self, helper_message_id: int) -> Tuple[Optional[str], Optional[bool]]:
|
||
"""Получает текст и is_anonymous поста по helper_text_message_id."""
|
||
query = "SELECT text, is_anonymous FROM post_from_telegram_suggest WHERE helper_text_message_id = ?"
|
||
rows = await self._execute_query_with_result(query, (helper_message_id,))
|
||
row = rows[0] if rows else None
|
||
|
||
if row:
|
||
text = row[0] or ""
|
||
is_anonymous_int = row[1]
|
||
# Преобразуем int в bool (1 -> True, 0 -> False, NULL -> None)
|
||
is_anonymous = None if is_anonymous_int is None else bool(is_anonymous_int)
|
||
self.logger.info(f"Получены текст и is_anonymous для helper_message_id={helper_message_id}")
|
||
return text, is_anonymous
|
||
return None, None
|
||
|
||
async def update_published_message_id(self, original_message_id: int, published_message_id: int) -> None:
|
||
"""Обновляет published_message_id для опубликованного поста."""
|
||
query = "UPDATE post_from_telegram_suggest SET published_message_id = ? WHERE message_id = ?"
|
||
await self._execute_query(query, (published_message_id, original_message_id))
|
||
self.logger.info(f"Обновлен published_message_id: {original_message_id} -> {published_message_id}")
|
||
|
||
async def add_published_post_content(
|
||
self, published_message_id: int, content_path: str, content_type: str
|
||
) -> bool:
|
||
"""Добавляет контент опубликованного поста."""
|
||
try:
|
||
from datetime import datetime
|
||
published_at = int(datetime.now().timestamp())
|
||
|
||
query = """
|
||
INSERT OR IGNORE INTO published_post_content
|
||
(published_message_id, content_name, content_type, published_at)
|
||
VALUES (?, ?, ?, ?)
|
||
"""
|
||
await self._execute_query(query, (published_message_id, content_path, content_type, published_at))
|
||
self.logger.info(f"Добавлен контент опубликованного поста: published_message_id={published_message_id}, type={content_type}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при добавлении контента опубликованного поста: {e}")
|
||
return False
|
||
|
||
async def get_published_post_content(self, published_message_id: int) -> List[Tuple[str, str]]:
|
||
"""Получает контент опубликованного поста."""
|
||
query = """
|
||
SELECT content_name, content_type
|
||
FROM published_post_content
|
||
WHERE published_message_id = ?
|
||
"""
|
||
post_content = await self._execute_query_with_result(query, (published_message_id,))
|
||
self.logger.info(f"Получен контент опубликованного поста: {len(post_content)} элементов для published_message_id={published_message_id}")
|
||
return post_content
|
||
|
||
# ============================================
|
||
# Методы для работы с ML-скорингом
|
||
# ============================================
|
||
|
||
async def update_ml_scores(self, message_id: int, ml_scores_json: str) -> bool:
|
||
"""
|
||
Обновляет ML-скоры для поста.
|
||
|
||
Args:
|
||
message_id: ID сообщения в группе модерации
|
||
ml_scores_json: JSON строка со скорами
|
||
|
||
Returns:
|
||
True если обновлено успешно
|
||
"""
|
||
try:
|
||
query = "UPDATE post_from_telegram_suggest SET ml_scores = ? WHERE message_id = ?"
|
||
await self._execute_query(query, (ml_scores_json, message_id))
|
||
self.logger.info(f"ML-скоры обновлены для message_id={message_id}")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка обновления ML-скоров для message_id={message_id}: {e}")
|
||
return False
|
||
|
||
async def get_ml_scores_by_message_id(self, message_id: int) -> Optional[str]:
|
||
"""
|
||
Получает ML-скоры для поста.
|
||
|
||
Args:
|
||
message_id: ID сообщения
|
||
|
||
Returns:
|
||
JSON строка со скорами или None
|
||
"""
|
||
query = "SELECT ml_scores FROM post_from_telegram_suggest WHERE message_id = ?"
|
||
rows = await self._execute_query_with_result(query, (message_id,))
|
||
if rows and rows[0][0]:
|
||
return rows[0][0]
|
||
return None
|
||
|
||
async def get_post_text_by_message_id(self, message_id: int) -> Optional[str]:
|
||
"""
|
||
Получает текст поста по message_id.
|
||
|
||
Args:
|
||
message_id: ID сообщения
|
||
|
||
Returns:
|
||
Текст поста или None
|
||
"""
|
||
query = "SELECT text FROM post_from_telegram_suggest WHERE message_id = ?"
|
||
rows = await self._execute_query_with_result(query, (message_id,))
|
||
if rows and rows[0][0]:
|
||
return rows[0][0]
|
||
return None
|
||
|
||
async def get_approved_posts_texts(self, limit: int = 1000) -> List[str]:
|
||
"""
|
||
Получает тексты опубликованных постов для обучения RAG.
|
||
|
||
Args:
|
||
limit: Максимальное количество постов
|
||
|
||
Returns:
|
||
Список текстов
|
||
"""
|
||
query = """
|
||
SELECT text FROM post_from_telegram_suggest
|
||
WHERE status = 'approved'
|
||
AND text IS NOT NULL
|
||
AND text != ''
|
||
AND text != '^'
|
||
ORDER BY created_at DESC
|
||
LIMIT ?
|
||
"""
|
||
rows = await self._execute_query_with_result(query, (limit,))
|
||
texts = [row[0] for row in rows if row[0]]
|
||
self.logger.info(f"Получено {len(texts)} опубликованных постов для обучения")
|
||
return texts
|
||
|
||
async def get_declined_posts_texts(self, limit: int = 1000) -> List[str]:
|
||
"""
|
||
Получает тексты отклоненных постов для обучения RAG.
|
||
|
||
Args:
|
||
limit: Максимальное количество постов
|
||
|
||
Returns:
|
||
Список текстов
|
||
"""
|
||
query = """
|
||
SELECT text FROM post_from_telegram_suggest
|
||
WHERE status = 'declined'
|
||
AND text IS NOT NULL
|
||
AND text != ''
|
||
AND text != '^'
|
||
ORDER BY created_at DESC
|
||
LIMIT ?
|
||
"""
|
||
rows = await self._execute_query_with_result(query, (limit,))
|
||
texts = [row[0] for row in rows if row[0]]
|
||
self.logger.info(f"Получено {len(texts)} отклоненных постов для обучения")
|
||
return texts
|
||
|