""" RAG сервис для скоринга постов с использованием ruBERT. Использует модель DeepPavlov/rubert-base-cased для создания эмбеддингов и сравнивает их с эталонными примерами через VectorStore. """ import asyncio from typing import Optional, List import numpy as np from logs.custom_logger import logger from helper_bot.utils.metrics import track_time, track_errors from .base import ScoringResult from .vector_store import VectorStore from .exceptions import ( ModelNotLoadedError, ScoringError, InsufficientExamplesError, TextTooShortError, ) class RAGService: """ RAG сервис для оценки постов на основе векторного сходства. Использует ruBERT для создания эмбеддингов текста и сравнивает их с эталонными примерами (опубликованные vs отклоненные посты). Attributes: model_name: Название модели HuggingFace vector_store: Хранилище векторов min_text_length: Минимальная длина текста для обработки """ # Название модели по умолчанию DEFAULT_MODEL = "DeepPavlov/rubert-base-cased" def __init__( self, model_name: Optional[str] = None, vector_store: Optional[VectorStore] = None, cache_dir: Optional[str] = None, enabled: bool = True, min_text_length: int = 3, ): """ Инициализация RAG сервиса. Args: model_name: Название модели HuggingFace (по умолчанию ruBERT) vector_store: Хранилище векторов (создается автоматически если не передано) cache_dir: Директория для кеширования модели enabled: Включен ли сервис min_text_length: Минимальная длина текста для обработки """ self.model_name = model_name or self.DEFAULT_MODEL self.cache_dir = cache_dir self._enabled = enabled self.min_text_length = min_text_length # Модель и токенизатор загружаются лениво self._model = None self._tokenizer = None self._model_loaded = False # Хранилище векторов self.vector_store = vector_store or VectorStore() logger.info(f"RAGService инициализирован (model={self.model_name}, enabled={enabled})") @property def source_name(self) -> str: """Имя источника для результатов.""" return "rag" @property def is_enabled(self) -> bool: """Проверяет, включен ли сервис.""" return self._enabled @property def is_model_loaded(self) -> bool: """Проверяет, загружена ли модель.""" return self._model_loaded async def load_model(self) -> None: """ Загружает модель и токенизатор. Выполняется асинхронно в отдельном потоке чтобы не блокировать event loop. """ if self._model_loaded: return if not self._enabled: logger.warning("RAGService: Сервис отключен, модель не загружается") return logger.info(f"RAGService: Загрузка модели {self.model_name}...") try: # Загрузка в отдельном потоке loop = asyncio.get_event_loop() await loop.run_in_executor(None, self._load_model_sync) self._model_loaded = True logger.info(f"RAGService: Модель {self.model_name} успешно загружена") except Exception as e: logger.error(f"RAGService: Ошибка загрузки модели: {e}") raise ModelNotLoadedError(f"Не удалось загрузить модель {self.model_name}: {e}") def _load_model_sync(self) -> None: """Синхронная загрузка модели (вызывается в executor).""" logger.info("RAGService: Начало _load_model_sync, импорт transformers...") from transformers import AutoTokenizer, AutoModel import torch # Определяем устройство self._device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"RAGService: Устройство определено: {self._device}") # Загружаем токенизатор logger.info(f"RAGService: Загрузка токенизатора из {self.model_name}...") self._tokenizer = AutoTokenizer.from_pretrained( self.model_name, cache_dir=self.cache_dir, ) logger.info("RAGService: Токенизатор загружен") # Загружаем модель logger.info(f"RAGService: Загрузка модели из {self.model_name} (это может занять несколько минут)...") self._model = AutoModel.from_pretrained( self.model_name, cache_dir=self.cache_dir, ) logger.info("RAGService: Модель загружена, перенос на устройство...") self._model.to(self._device) self._model.eval() # Режим инференса logger.info(f"RAGService: Модель готова на устройстве: {self._device}") def _get_embedding_sync(self, text: str) -> np.ndarray: """ Получает эмбеддинг текста (синхронно). Использует [CLS] токен как представление всего текста. Args: text: Текст для векторизации Returns: Numpy массив с эмбеддингом (768 измерений для ruBERT) """ import torch # Токенизация с ограничением длины inputs = self._tokenizer( text, return_tensors="pt", truncation=True, max_length=512, padding=True, ) inputs = {k: v.to(self._device) for k, v in inputs.items()} # Получаем эмбеддинг with torch.no_grad(): outputs = self._model(**inputs) # Используем [CLS] токен (первый токен) embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy() return embedding.flatten() def _get_embeddings_batch_sync(self, texts: List[str], batch_size: int = 16) -> List[np.ndarray]: """ Получает эмбеддинги для батча текстов (синхронно). Обрабатывает тексты пачками для эффективного использования GPU/CPU. Args: texts: Список текстов для векторизации batch_size: Размер батча (по умолчанию 16) Returns: Список numpy массивов с эмбеддингами """ import torch all_embeddings = [] for i in range(0, len(texts), batch_size): batch_texts = texts[i:i + batch_size] # Токенизация батча inputs = self._tokenizer( batch_texts, return_tensors="pt", truncation=True, max_length=512, padding=True, ) inputs = {k: v.to(self._device) for k, v in inputs.items()} # Получаем эмбеддинги with torch.no_grad(): outputs = self._model(**inputs) # [CLS] токен для каждого текста в батче batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy() # Разбиваем на отдельные эмбеддинги for j in range(len(batch_texts)): all_embeddings.append(batch_embeddings[j]) if i > 0 and i % (batch_size * 10) == 0: logger.info(f"RAGService: Обработано {i}/{len(texts)} текстов") return all_embeddings async def get_embeddings_batch(self, texts: List[str], batch_size: int = 16) -> List[np.ndarray]: """ Получает эмбеддинги для батча текстов (асинхронно). Args: texts: Список текстов для векторизации batch_size: Размер батча Returns: Список numpy массивов с эмбеддингами """ if not self._model_loaded: await self.load_model() if not self._model_loaded: raise ModelNotLoadedError("Модель не загружена") # Очищаем тексты clean_texts = [self._clean_text(text) for text in texts] # Выполняем батч-обработку в thread pool loop = asyncio.get_event_loop() embeddings = await loop.run_in_executor( None, self._get_embeddings_batch_sync, clean_texts, batch_size, ) return embeddings async def get_embedding(self, text: str) -> np.ndarray: """ Получает эмбеддинг текста (асинхронно). Args: text: Текст для векторизации Returns: Numpy массив с эмбеддингом Raises: ModelNotLoadedError: Если модель не загружена TextTooShortError: Если текст слишком короткий """ if not self._model_loaded: await self.load_model() if not self._model_loaded: raise ModelNotLoadedError("Модель не загружена") # Очищаем текст clean_text = self._clean_text(text) if len(clean_text) < self.min_text_length: raise TextTooShortError( f"Текст слишком короткий (минимум {self.min_text_length} символов)" ) # Выполняем в отдельном потоке loop = asyncio.get_event_loop() embedding = await loop.run_in_executor( None, self._get_embedding_sync, clean_text ) return embedding def _clean_text(self, text: str) -> str: """Очищает текст от лишних символов.""" if not text: return "" # Удаляем лишние пробелы и переносы строк clean = " ".join(text.split()) # Удаляем служебные символы (например "^" для helper сообщений) if clean == "^": return "" return clean.strip() @track_time("calculate_score", "rag_service") @track_errors("rag_service", "calculate_score") async def calculate_score(self, text: str) -> ScoringResult: """ Рассчитывает скор для текста поста. Args: text: Текст поста для оценки Returns: ScoringResult с оценкой Raises: ScoringError: При ошибке расчета """ if not self._enabled: raise ScoringError("RAG сервис отключен") try: # Получаем эмбеддинг текста embedding = await self.get_embedding(text) # Логируем первые элементы вектора для отладки logger.info( f"RAGService: embedding[:3]={embedding[:3].tolist()}, " f"text_preview='{text[:30]}'" ) # Рассчитываем скор через VectorStore score, confidence, score_pos_only = self.vector_store.calculate_similarity_score(embedding) return ScoringResult( score=score, source=self.source_name, model=self.model_name, confidence=confidence, metadata={ "positive_examples": self.vector_store.positive_count, "negative_examples": self.vector_store.negative_count, "score_pos_only": score_pos_only, # Для сравнения }, ) except InsufficientExamplesError: # Не достаточно примеров - возвращаем нейтральный скор logger.warning("RAGService: Недостаточно примеров для расчета скора") raise except TextTooShortError: logger.warning(f"RAGService: Текст слишком короткий для оценки") raise except Exception as e: logger.error(f"RAGService: Ошибка расчета скора: {e}") raise ScoringError(f"Ошибка расчета скора: {e}") @track_time("add_positive_example", "rag_service") async def add_positive_example(self, text: str) -> None: """ Добавляет текст как положительный пример (опубликованный пост). Args: text: Текст опубликованного поста """ if not self._enabled: return try: clean_text = self._clean_text(text) if len(clean_text) < self.min_text_length: logger.debug("RAGService: Текст слишком короткий для примера, пропускаем") return # Получаем эмбеддинг embedding = await self.get_embedding(clean_text) # Вычисляем хеш для дедупликации text_hash = VectorStore.compute_text_hash(clean_text) # Добавляем в хранилище added = self.vector_store.add_positive(embedding, text_hash) if added: logger.info(f"RAGService: Добавлен положительный пример") except Exception as e: logger.error(f"RAGService: Ошибка добавления положительного примера: {e}") @track_time("add_negative_example", "rag_service") async def add_negative_example(self, text: str) -> None: """ Добавляет текст как отрицательный пример (отклоненный пост). Args: text: Текст отклоненного поста """ if not self._enabled: return try: clean_text = self._clean_text(text) if len(clean_text) < self.min_text_length: logger.debug("RAGService: Текст слишком короткий для примера, пропускаем") return # Получаем эмбеддинг embedding = await self.get_embedding(clean_text) # Вычисляем хеш для дедупликации text_hash = VectorStore.compute_text_hash(clean_text) # Добавляем в хранилище added = self.vector_store.add_negative(embedding, text_hash) if added: logger.info(f"RAGService: Добавлен отрицательный пример") except Exception as e: logger.error(f"RAGService: Ошибка добавления отрицательного примера: {e}") async def load_examples_from_db( self, positive_texts: list[str], negative_texts: list[str], batch_size: int = 16, ) -> None: """ Загружает примеры из базы данных с батч-обработкой. Используется при запуске бота для восстановления VectorStore. Батч-обработка ускоряет загрузку в 10-20 раз. Args: positive_texts: Список текстов опубликованных постов negative_texts: Список текстов отклоненных постов batch_size: Размер батча для обработки (по умолчанию 16) """ if not self._enabled: return logger.info( f"RAGService: Загрузка примеров из БД с батч-обработкой " f"(positive: {len(positive_texts)}, negative: {len(negative_texts)}, batch_size: {batch_size})" ) # Убеждаемся что модель загружена await self.load_model() import time start_time = time.time() # Фильтруем и очищаем положительные тексты if positive_texts: clean_positive = [] positive_hashes = [] for text in positive_texts: clean_text = self._clean_text(text) if len(clean_text) >= self.min_text_length: clean_positive.append(clean_text) positive_hashes.append(VectorStore.compute_text_hash(clean_text)) if clean_positive: logger.info(f"RAGService: Обработка {len(clean_positive)} положительных примеров батчами...") positive_embeddings = await self.get_embeddings_batch(clean_positive, batch_size) self.vector_store.add_positive_batch(positive_embeddings, positive_hashes) # Фильтруем и очищаем отрицательные тексты if negative_texts: clean_negative = [] negative_hashes = [] for text in negative_texts: clean_text = self._clean_text(text) if len(clean_text) >= self.min_text_length: clean_negative.append(clean_text) negative_hashes.append(VectorStore.compute_text_hash(clean_text)) if clean_negative: logger.info(f"RAGService: Обработка {len(clean_negative)} отрицательных примеров батчами...") negative_embeddings = await self.get_embeddings_batch(clean_negative, batch_size) self.vector_store.add_negative_batch(negative_embeddings, negative_hashes) elapsed = time.time() - start_time logger.info( f"RAGService: Загрузка завершена за {elapsed:.1f} сек " f"(positive: {self.vector_store.positive_count}, " f"negative: {self.vector_store.negative_count})" ) def save_vectors(self) -> None: """Сохраняет векторы на диск.""" if self.vector_store.storage_path: self.vector_store.save_to_disk() def get_stats(self) -> dict: """Возвращает статистику сервиса.""" return { "enabled": self._enabled, "model_name": self.model_name, "model_loaded": self._model_loaded, "vector_store": self.vector_store.get_stats(), }