""" In-memory хранилище векторов на numpy. Хранит векторные представления постов для быстрого сравнения. Поддерживает персистентность через сохранение/загрузку с диска. """ import hashlib import logging import os import threading import time from pathlib import Path from typing import Any import numpy as np from app.exceptions import InsufficientExamplesError, VectorStoreError logger = logging.getLogger(__name__) class VectorStore: """ In-memory хранилище векторов для RAG. Хранит отдельно положительные (опубликованные) и отрицательные (отклоненные) примеры. Использует косинусное сходство для расчета скора. Attributes: vector_dim: Размерность векторов (384 для all-MiniLM-L12-v2) max_examples: Максимальное количество примеров каждого типа """ def __init__( self, vector_dim: int = 384, max_examples: int = 10000, max_submitted: int = 5000, storage_path: str | None = None, submitted_path: str | None = None, score_multiplier: float = 5.0, k: int = 3, ): """ Инициализация хранилища. Args: vector_dim: Размерность векторов max_examples: Максимальное количество примеров каждого типа max_submitted: Максимальное количество submitted-постов storage_path: Путь для сохранения/загрузки векторов (опционально) submitted_path: Путь для сохранения/загрузки submitted-постов (опционально) score_multiplier: Множитель для масштабирования разницы в скорах k: Количество ближайших примеров для расчета среднего сходства """ self.vector_dim = vector_dim self.max_examples = max_examples self.max_submitted = max_submitted self.storage_path = storage_path self.submitted_path = submitted_path self.score_multiplier = score_multiplier self.k = k # Инициализируем пустые массивы # Используем список для динамического добавления, потом конвертируем в numpy self._positive_vectors: list = [] self._negative_vectors: list = [] self._positive_hashes: list = [] # Хеши текстов для дедупликации self._negative_hashes: list = [] # Submitted-посты (третья коллекция) self._submitted_vectors: list = [] self._submitted_hashes: list = [] self._submitted_created_at: list = [] # Unix timestamps self._submitted_post_ids: list = [] self._submitted_texts: list = [] self._submitted_rag_scores: list = [] # Lock для потокобезопасности self._lock = threading.Lock() # Пытаемся загрузить сохраненные векторы if storage_path: self._load_from_disk() if submitted_path: self._load_submitted_from_disk() @property def positive_count(self) -> int: """Количество положительных примеров.""" return len(self._positive_vectors) @property def negative_count(self) -> int: """Количество отрицательных примеров.""" return len(self._negative_vectors) @property def total_count(self) -> int: """Общее количество примеров.""" return self.positive_count + self.negative_count @property def submitted_count(self) -> int: """Количество submitted-постов.""" return len(self._submitted_vectors) @staticmethod def compute_text_hash(text: str) -> str: """Вычисляет хеш текста для дедупликации.""" return hashlib.md5(text.encode("utf-8")).hexdigest() def _normalize_vector(self, vector: np.ndarray) -> np.ndarray: """Нормализует вектор для косинусного сходства.""" norm = np.linalg.norm(vector) if norm == 0: return vector return vector / norm def add_positive(self, vector: np.ndarray, text_hash: str | None = None) -> bool: """ Добавляет положительный пример (опубликованный пост). Args: vector: Векторное представление текста text_hash: Хеш текста для дедупликации (опционально) Returns: True если добавлен, False если дубликат или превышен лимит """ with self._lock: # Проверяем дубликат по хешу if text_hash and text_hash in self._positive_hashes: logger.debug("VectorStore: Пропуск дубликата положительного примера") return False # Проверяем лимит if len(self._positive_vectors) >= self.max_examples: # Удаляем самый старый пример (FIFO) self._positive_vectors.pop(0) self._positive_hashes.pop(0) logger.debug("VectorStore: Удален старый положительный пример (лимит)") # Нормализуем и добавляем normalized = self._normalize_vector(vector) self._positive_vectors.append(normalized) if text_hash: self._positive_hashes.append(text_hash) logger.info( f"VectorStore: Добавлен положительный пример (всего: {self.positive_count})" ) return True def add_positive_batch( self, vectors: list[np.ndarray], text_hashes: list[str] | None = None ) -> int: """ Добавляет батч положительных примеров. Args: vectors: Список векторов text_hashes: Список хешей текстов для дедупликации Returns: Количество добавленных примеров """ if text_hashes is None: text_hashes = [None] * len(vectors) added = 0 with self._lock: for vector, text_hash in zip(vectors, text_hashes): # Проверяем дубликат по хешу if text_hash and text_hash in self._positive_hashes: continue # Проверяем лимит if len(self._positive_vectors) >= self.max_examples: self._positive_vectors.pop(0) self._positive_hashes.pop(0) # Нормализуем и добавляем normalized = self._normalize_vector(vector) self._positive_vectors.append(normalized) if text_hash: self._positive_hashes.append(text_hash) added += 1 logger.info( f"VectorStore: Добавлено {added} положительных примеров батчем (всего: {self.positive_count})" ) return added def add_negative(self, vector: np.ndarray, text_hash: str | None = None) -> bool: """ Добавляет отрицательный пример (отклоненный пост). Args: vector: Векторное представление текста text_hash: Хеш текста для дедупликации (опционально) Returns: True если добавлен, False если дубликат или превышен лимит """ with self._lock: # Проверяем дубликат по хешу if text_hash and text_hash in self._negative_hashes: logger.debug("VectorStore: Пропуск дубликата отрицательного примера") return False # Проверяем лимит if len(self._negative_vectors) >= self.max_examples: # Удаляем самый старый пример (FIFO) self._negative_vectors.pop(0) self._negative_hashes.pop(0) logger.debug("VectorStore: Удален старый отрицательный пример (лимит)") # Нормализуем и добавляем normalized = self._normalize_vector(vector) self._negative_vectors.append(normalized) if text_hash: self._negative_hashes.append(text_hash) logger.info( f"VectorStore: Добавлен отрицательный пример (всего: {self.negative_count})" ) return True def add_negative_batch( self, vectors: list[np.ndarray], text_hashes: list[str] | None = None ) -> int: """ Добавляет батч отрицательных примеров. Args: vectors: Список векторов text_hashes: Список хешей текстов для дедупликации Returns: Количество добавленных примеров """ if text_hashes is None: text_hashes = [None] * len(vectors) added = 0 with self._lock: for vector, text_hash in zip(vectors, text_hashes): # Проверяем дубликат по хешу if text_hash and text_hash in self._negative_hashes: continue # Проверяем лимит if len(self._negative_vectors) >= self.max_examples: self._negative_vectors.pop(0) self._negative_hashes.pop(0) # Нормализуем и добавляем normalized = self._normalize_vector(vector) self._negative_vectors.append(normalized) if text_hash: self._negative_hashes.append(text_hash) added += 1 logger.info( f"VectorStore: Добавлено {added} отрицательных примеров батчем (всего: {self.negative_count})" ) return added def add_submitted( self, vector: np.ndarray, text_hash: str, created_at: int, post_id: int | None = None, text: str = "", rag_score: float | None = None, ) -> bool: """ Добавляет submitted-пост в коллекцию. Args: vector: Векторное представление текста text_hash: Хеш текста для дедупликации created_at: Unix timestamp создания post_id: ID поста (опционально) text: Текст поста rag_score: RAG скор поста (опционально) Returns: True если добавлен, False если дубликат """ with self._lock: if text_hash in self._submitted_hashes: logger.debug("VectorStore: Пропуск дубликата submitted-поста") return False if len(self._submitted_vectors) >= self.max_submitted: self._submitted_vectors.pop(0) self._submitted_hashes.pop(0) self._submitted_created_at.pop(0) self._submitted_post_ids.pop(0) self._submitted_texts.pop(0) self._submitted_rag_scores.pop(0) logger.debug("VectorStore: Удален старый submitted-пост (лимит)") normalized = self._normalize_vector(vector) self._submitted_vectors.append(normalized) self._submitted_hashes.append(text_hash) self._submitted_created_at.append(created_at) self._submitted_post_ids.append(post_id) self._submitted_texts.append(text) self._submitted_rag_scores.append(rag_score) logger.info(f"VectorStore: Добавлен submitted-пост (всего: {self.submitted_count})") return True def find_similar_submitted( self, vector: np.ndarray, threshold: float, hours: int, ) -> list[dict[str, Any]]: """ Ищет похожие submitted-посты за последние N часов. Args: vector: Векторное представление запроса threshold: Минимальный порог similarity (0.0 - 1.0) hours: Количество часов для фильтрации (created_at >= now - hours*3600) Returns: Список dict с полями: similarity, created_at, post_id, text, rag_score """ with self._lock: if self.submitted_count == 0: return [] now = int(time.time()) cutoff = now - hours * 3600 normalized = self._normalize_vector(vector) submitted_matrix = np.array(self._submitted_vectors) similarities = np.dot(submitted_matrix, normalized) results: list[dict[str, Any]] = [] for i, sim in enumerate(similarities): if float(sim) < threshold: continue created_at = self._submitted_created_at[i] if created_at < cutoff: continue results.append( { "similarity": float(sim), "created_at": created_at, "post_id": self._submitted_post_ids[i], "text": self._submitted_texts[i], "rag_score": self._submitted_rag_scores[i], } ) return sorted(results, key=lambda x: x["similarity"], reverse=True) def calculate_similarity_score(self, vector: np.ndarray) -> tuple[float, float, float]: """ Рассчитывает скор на основе сходства с примерами. Алгоритм: 1. Вычисляем косинусное сходство со всеми примерами 2. Используем топ-k ближайших примеров для более чувствительной оценки 3. Сравниваем топ-k положительных с топ-k отрицательными Args: vector: Векторное представление нового поста Returns: Tuple (score, confidence, score_pos_only): - score: Оценка от 0.0 до 1.0 (neg/pos формула) - confidence: Уверенность (зависит от количества примеров) - score_pos_only: Оценка только по положительным примерам Raises: InsufficientExamplesError: Если недостаточно примеров """ with self._lock: if self.positive_count == 0: raise InsufficientExamplesError("Нет положительных примеров для сравнения") # Нормализуем входной вектор normalized = self._normalize_vector(np.asarray(vector).flatten()) # Конвертируем в numpy массивы для быстрых вычислений # Используем vstack для гарантии одинаковой формы (совместимость со старым npz) pos_matrix = np.vstack([np.asarray(v).flatten() for v in self._positive_vectors]) # Косинусное сходство с положительными примерами # Для нормализованных векторов это просто скалярное произведение pos_similarities = np.dot(pos_matrix, normalized) # Косинусное сходство с отрицательными примерами if self.negative_count > 0: neg_matrix = np.vstack([np.asarray(v).flatten() for v in self._negative_vectors]) neg_similarities = np.dot(neg_matrix, normalized) else: neg_similarities = np.array([]) # Используем топ-k ближайших примеров для расчета среднего сходства k_pos = min(self.k, len(pos_similarities)) top_k_pos = np.sort(pos_similarities)[-k_pos:] avg_pos = float(np.mean(top_k_pos)) # Для отрицательных: если их меньше k, берем все, иначе топ-k if len(neg_similarities) > 0: k_neg = min(self.k, len(neg_similarities)) top_k_neg = np.sort(neg_similarities)[-k_neg:] avg_neg = float(np.mean(top_k_neg)) else: # Если нет отрицательных примеров, используем нейтральное значение avg_neg = avg_pos # Нейтральный скор = 0.5 # Формула расчета score: (diff * scale + 1) / 2, переводим из [-1, 1] в [0, 1] diff = avg_pos - avg_neg score_neg_pos = np.clip((diff * self.score_multiplier + 1) / 2, 0.0, 1.0) # === Вариант 2: pos only (только положительные, топ-k ближайших) === # Берём топ-5 ближайших положительных примеров top_5_k = min(5, len(pos_similarities)) top_5_sim = float(np.mean(np.sort(pos_similarities)[-top_5_k:])) # Нормализуем: 0.85 -> 0.0, 0.95 -> 1.0 (типичный диапазон для BERT) score_pos_only = (top_5_sim - 0.85) / 0.10 score_pos_only = max(0.0, min(1.0, score_pos_only)) # Основной скор — neg/pos score = score_neg_pos # Confidence зависит от количества примеров (100% при 1000 примерах) total_examples = self.positive_count + self.negative_count confidence = min(1.0, total_examples / 1000) # Дополнительная диагностическая информация pos_mean = float(np.mean(pos_similarities)) pos_std = float(np.std(pos_similarities)) pos_min = float(np.min(pos_similarities)) pos_max = float(np.max(pos_similarities)) if len(neg_similarities) > 0: neg_mean = float(np.mean(neg_similarities)) neg_std = float(np.std(neg_similarities)) neg_min = float(np.min(neg_similarities)) neg_max = float(np.max(neg_similarities)) else: neg_mean = neg_std = neg_min = neg_max = 0.0 logger.info( f"VectorStore: k={self.k}, k_pos={k_pos}, k_neg={k_neg if len(neg_similarities) > 0 else 0}, " f"avg_pos={avg_pos:.4f}, avg_neg={avg_neg:.4f}, " f"diff={diff:.4f}, score_multiplier={self.score_multiplier}, " f"score_neg_pos={score_neg_pos:.4f}, score_pos_only={score_pos_only:.4f}, " f"pos_mean={pos_mean:.4f}±{pos_std:.4f}[{pos_min:.4f}-{pos_max:.4f}], " f"neg_mean={neg_mean:.4f}±{neg_std:.4f}[{neg_min:.4f}-{neg_max:.4f}]" ) return score, confidence, score_pos_only def save_to_disk(self, path: str | None = None) -> None: """ Сохраняет векторы на диск. Args: path: Путь для сохранения (если не указан, используется storage_path) """ save_path = path or self.storage_path if not save_path: raise VectorStoreError("Путь для сохранения не указан") with self._lock: # Создаем директорию если нужно Path(save_path).parent.mkdir(parents=True, exist_ok=True) # Сохраняем в npz формате np.savez_compressed( save_path, positive_vectors=np.array(self._positive_vectors) if self._positive_vectors else np.array([]), negative_vectors=np.array(self._negative_vectors) if self._negative_vectors else np.array([]), positive_hashes=np.array(self._positive_hashes, dtype=object), negative_hashes=np.array(self._negative_hashes, dtype=object), vector_dim=self.vector_dim, max_examples=self.max_examples, ) logger.info( f"VectorStore: Сохранено на диск ({self.positive_count} pos, " f"{self.negative_count} neg): {save_path}" ) def save_submitted_to_disk(self, path: str | None = None) -> None: """ Сохраняет submitted-коллекцию на диск. Args: path: Путь для сохранения (если не указан, используется submitted_path) """ save_path = path or self.submitted_path if not save_path: raise VectorStoreError("Путь для сохранения submitted не указан") with self._lock: Path(save_path).parent.mkdir(parents=True, exist_ok=True) np.savez_compressed( save_path, vectors=np.array(self._submitted_vectors) if self._submitted_vectors else np.array([]), hashes=np.array(self._submitted_hashes, dtype=object), created_at=np.array(self._submitted_created_at) if self._submitted_created_at else np.array([]), post_ids=np.array(self._submitted_post_ids, dtype=object), texts=np.array(self._submitted_texts, dtype=object), rag_scores=np.array(self._submitted_rag_scores, dtype=object), ) logger.info(f"VectorStore: Сохранено submitted ({self.submitted_count}): {save_path}") def _load_submitted_from_disk(self) -> None: """Загружает submitted-коллекцию с диска.""" if not self.submitted_path or not os.path.exists(self.submitted_path): return try: with self._lock: data = np.load(self.submitted_path, allow_pickle=True) vectors = data.get("vectors", np.array([])) if vectors.size > 0: if len(vectors.shape) == 2: self._submitted_vectors = [ self._normalize_vector(np.array(v)) for v in vectors ] elif len(vectors.shape) == 1: self._submitted_vectors = [self._normalize_vector(np.array(vectors))] else: self._submitted_vectors = [] else: self._submitted_vectors = [] hashes = data.get("hashes", np.array([])) self._submitted_hashes = list(hashes) if hashes.size > 0 else [] created_at = data.get("created_at", np.array([])) self._submitted_created_at = list(created_at) if created_at.size > 0 else [] post_ids = data.get("post_ids", np.array([])) self._submitted_post_ids = list(post_ids) if post_ids.size > 0 else [] texts = data.get("texts", np.array([])) self._submitted_texts = list(texts) if texts.size > 0 else [] rag_scores = data.get("rag_scores", np.array([])) self._submitted_rag_scores = list(rag_scores) if rag_scores.size > 0 else [] # Выравниваем длины (на случай поврежденных данных) n = len(self._submitted_vectors) self._submitted_hashes = self._submitted_hashes[:n] self._submitted_created_at = self._submitted_created_at[:n] self._submitted_post_ids = self._submitted_post_ids[:n] self._submitted_texts = self._submitted_texts[:n] self._submitted_rag_scores = self._submitted_rag_scores[:n] logger.info( f"VectorStore: Загружено submitted ({self.submitted_count}): {self.submitted_path}" ) except Exception as e: logger.error(f"VectorStore: Ошибка загрузки submitted с диска: {e}") def _load_from_disk(self) -> None: """Загружает векторы с диска.""" if not self.storage_path: return try: with self._lock: storage_dir = Path(self.storage_path).parent positive_npy = storage_dir / "positive_embeddings.npy" negative_npy = storage_dir / "negative_embeddings.npy" # Отладочное логирование logger.info( f"VectorStore: Проверка путей - storage_dir={storage_dir}, positive_npy={positive_npy}, exists={positive_npy.exists()}, negative_npy={negative_npy}, exists={negative_npy.exists()}" ) # Проверяем наличие отдельных .npy файлов if positive_npy.exists() or negative_npy.exists(): logger.info("VectorStore: Обнаружены отдельные .npy файлы, загружаем их...") # Загружаем положительные векторы if positive_npy.exists(): pos_vectors = np.load(positive_npy, allow_pickle=False) if pos_vectors.size > 0: # Проверяем размерность if len(pos_vectors.shape) == 2: # Массив векторов [N, dim] self._positive_vectors = [vec for vec in pos_vectors] elif len(pos_vectors.shape) == 1: # Один вектор [dim] self._positive_vectors = [pos_vectors] else: logger.warning( f"VectorStore: Неожиданная размерность positive_embeddings.npy: {pos_vectors.shape}" ) self._positive_vectors = [] logger.info( f"VectorStore: Загружено {len(self._positive_vectors)} положительных векторов из {positive_npy}" ) # Загружаем отрицательные векторы if negative_npy.exists(): neg_vectors = np.load(negative_npy, allow_pickle=False) if neg_vectors.size > 0: # Проверяем размерность if len(neg_vectors.shape) == 2: # Массив векторов [N, dim] self._negative_vectors = [vec for vec in neg_vectors] elif len(neg_vectors.shape) == 1: # Один вектор [dim] self._negative_vectors = [neg_vectors] else: logger.warning( f"VectorStore: Неожиданная размерность negative_embeddings.npy: {neg_vectors.shape}" ) self._negative_vectors = [] logger.info( f"VectorStore: Загружено {len(self._negative_vectors)} отрицательных векторов из {negative_npy}" ) # Нормализуем загруженные векторы self._positive_vectors = [ self._normalize_vector(np.array(v)) for v in self._positive_vectors ] self._negative_vectors = [ self._normalize_vector(np.array(v)) for v in self._negative_vectors ] logger.info( f"VectorStore: Загружено с диска из .npy файлов ({self.positive_count} pos, " f"{self.negative_count} neg)" ) return # Если отдельных .npy файлов нет, пытаемся загрузить из старого формата .npz if os.path.exists(self.storage_path): logger.info( f"VectorStore: Загружаем из старого формата .npz: {self.storage_path}" ) data = np.load(self.storage_path, allow_pickle=True) # Загружаем векторы pos_vectors = data.get("positive_vectors", np.array([])) neg_vectors = data.get("negative_vectors", np.array([])) if pos_vectors.size > 0: self._positive_vectors = list(pos_vectors) if neg_vectors.size > 0: self._negative_vectors = list(neg_vectors) # Загружаем хеши pos_hashes = data.get("positive_hashes", np.array([])) neg_hashes = data.get("negative_hashes", np.array([])) if pos_hashes.size > 0: self._positive_hashes = list(pos_hashes) if neg_hashes.size > 0: self._negative_hashes = list(neg_hashes) logger.info( f"VectorStore: Загружено с диска ({self.positive_count} pos, " f"{self.negative_count} neg): {self.storage_path}" ) except Exception as e: logger.error(f"VectorStore: Ошибка загрузки с диска: {e}") # Продолжаем с пустым хранилищем def clear(self) -> None: """Очищает все векторы.""" with self._lock: self._positive_vectors.clear() self._negative_vectors.clear() self._positive_hashes.clear() self._negative_hashes.clear() self._submitted_vectors.clear() self._submitted_hashes.clear() self._submitted_created_at.clear() self._submitted_post_ids.clear() self._submitted_texts.clear() self._submitted_rag_scores.clear() logger.info("VectorStore: Хранилище очищено") def get_stats(self) -> dict: """Возвращает статистику хранилища.""" return { "positive_count": self.positive_count, "negative_count": self.negative_count, "total_count": self.total_count, "submitted_count": self.submitted_count, "vector_dim": self.vector_dim, "max_examples": self.max_examples, "max_submitted": self.max_submitted, } def get_scoring_params(self) -> dict: """Возвращает текущие параметры формулы расчета score.""" return { "score_multiplier": self.score_multiplier, "k": self.k, } def update_scoring_params( self, score_multiplier: float | None = None, k: int | None = None, ) -> dict: """ Обновляет параметры формулы расчета score. Args: score_multiplier: Множитель для масштабирования разницы (должен быть > 0) k: Количество ближайших примеров для расчета среднего (должно быть >= 1) Returns: dict: Обновленные параметры Raises: ValueError: При невалидных значениях """ with self._lock: if score_multiplier is not None: if score_multiplier <= 0: raise ValueError("score_multiplier должен быть > 0") self.score_multiplier = score_multiplier if k is not None: if k < 1: raise ValueError("k должен быть >= 1") self.k = k logger.info( f"VectorStore: Параметры формулы обновлены: " f"score_multiplier={self.score_multiplier}, k={self.k}" ) return self.get_scoring_params()