Files
rag-service/app/storage/vector_store.py

439 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
In-memory хранилище векторов на numpy.
Хранит векторные представления постов для быстрого сравнения.
Поддерживает персистентность через сохранение/загрузку с диска.
"""
import hashlib
import logging
import os
import threading
from pathlib import Path
from typing import List, Optional, Tuple
import numpy as np
from app.exceptions import InsufficientExamplesError, VectorStoreError
logger = logging.getLogger(__name__)
class VectorStore:
"""
In-memory хранилище векторов для RAG.
Хранит отдельно положительные (опубликованные) и отрицательные (отклоненные)
примеры. Использует косинусное сходство для расчета скора.
Attributes:
vector_dim: Размерность векторов (768 для ruBERT)
max_examples: Максимальное количество примеров каждого типа
"""
def __init__(
self,
vector_dim: int = 768,
max_examples: int = 10000,
storage_path: Optional[str] = None,
score_multiplier: float = 5.0,
):
"""
Инициализация хранилища.
Args:
vector_dim: Размерность векторов
max_examples: Максимальное количество примеров каждого типа
storage_path: Путь для сохранения/загрузки векторов (опционально)
score_multiplier: Множитель для усиления разницы в скорах
"""
self.vector_dim = vector_dim
self.max_examples = max_examples
self.storage_path = storage_path
self.score_multiplier = score_multiplier
# Инициализируем пустые массивы
# Используем список для динамического добавления, потом конвертируем в numpy
self._positive_vectors: list = []
self._negative_vectors: list = []
self._positive_hashes: list = [] # Хеши текстов для дедупликации
self._negative_hashes: list = []
# Lock для потокобезопасности
self._lock = threading.Lock()
# Пытаемся загрузить сохраненные векторы
if storage_path and os.path.exists(storage_path):
self._load_from_disk()
@property
def positive_count(self) -> int:
"""Количество положительных примеров."""
return len(self._positive_vectors)
@property
def negative_count(self) -> int:
"""Количество отрицательных примеров."""
return len(self._negative_vectors)
@property
def total_count(self) -> int:
"""Общее количество примеров."""
return self.positive_count + self.negative_count
@staticmethod
def compute_text_hash(text: str) -> str:
"""Вычисляет хеш текста для дедупликации."""
return hashlib.md5(text.encode('utf-8')).hexdigest()
def _normalize_vector(self, vector: np.ndarray) -> np.ndarray:
"""Нормализует вектор для косинусного сходства."""
norm = np.linalg.norm(vector)
if norm == 0:
return vector
return vector / norm
def add_positive(self, vector: np.ndarray, text_hash: Optional[str] = None) -> bool:
"""
Добавляет положительный пример (опубликованный пост).
Args:
vector: Векторное представление текста
text_hash: Хеш текста для дедупликации (опционально)
Returns:
True если добавлен, False если дубликат или превышен лимит
"""
with self._lock:
# Проверяем дубликат по хешу
if text_hash and text_hash in self._positive_hashes:
logger.debug("VectorStore: Пропуск дубликата положительного примера")
return False
# Проверяем лимит
if len(self._positive_vectors) >= self.max_examples:
# Удаляем самый старый пример (FIFO)
self._positive_vectors.pop(0)
self._positive_hashes.pop(0)
logger.debug("VectorStore: Удален старый положительный пример (лимит)")
# Нормализуем и добавляем
normalized = self._normalize_vector(vector)
self._positive_vectors.append(normalized)
if text_hash:
self._positive_hashes.append(text_hash)
logger.info(f"VectorStore: Добавлен положительный пример (всего: {self.positive_count})")
return True
def add_positive_batch(
self,
vectors: List[np.ndarray],
text_hashes: Optional[List[str]] = None
) -> int:
"""
Добавляет батч положительных примеров.
Args:
vectors: Список векторов
text_hashes: Список хешей текстов для дедупликации
Returns:
Количество добавленных примеров
"""
if text_hashes is None:
text_hashes = [None] * len(vectors)
added = 0
with self._lock:
for vector, text_hash in zip(vectors, text_hashes):
# Проверяем дубликат по хешу
if text_hash and text_hash in self._positive_hashes:
continue
# Проверяем лимит
if len(self._positive_vectors) >= self.max_examples:
self._positive_vectors.pop(0)
self._positive_hashes.pop(0)
# Нормализуем и добавляем
normalized = self._normalize_vector(vector)
self._positive_vectors.append(normalized)
if text_hash:
self._positive_hashes.append(text_hash)
added += 1
logger.info(f"VectorStore: Добавлено {added} положительных примеров батчем (всего: {self.positive_count})")
return added
def add_negative(self, vector: np.ndarray, text_hash: Optional[str] = None) -> bool:
"""
Добавляет отрицательный пример (отклоненный пост).
Args:
vector: Векторное представление текста
text_hash: Хеш текста для дедупликации (опционально)
Returns:
True если добавлен, False если дубликат или превышен лимит
"""
with self._lock:
# Проверяем дубликат по хешу
if text_hash and text_hash in self._negative_hashes:
logger.debug("VectorStore: Пропуск дубликата отрицательного примера")
return False
# Проверяем лимит
if len(self._negative_vectors) >= self.max_examples:
# Удаляем самый старый пример (FIFO)
self._negative_vectors.pop(0)
self._negative_hashes.pop(0)
logger.debug("VectorStore: Удален старый отрицательный пример (лимит)")
# Нормализуем и добавляем
normalized = self._normalize_vector(vector)
self._negative_vectors.append(normalized)
if text_hash:
self._negative_hashes.append(text_hash)
logger.info(f"VectorStore: Добавлен отрицательный пример (всего: {self.negative_count})")
return True
def add_negative_batch(
self,
vectors: List[np.ndarray],
text_hashes: Optional[List[str]] = None
) -> int:
"""
Добавляет батч отрицательных примеров.
Args:
vectors: Список векторов
text_hashes: Список хешей текстов для дедупликации
Returns:
Количество добавленных примеров
"""
if text_hashes is None:
text_hashes = [None] * len(vectors)
added = 0
with self._lock:
for vector, text_hash in zip(vectors, text_hashes):
# Проверяем дубликат по хешу
if text_hash and text_hash in self._negative_hashes:
continue
# Проверяем лимит
if len(self._negative_vectors) >= self.max_examples:
self._negative_vectors.pop(0)
self._negative_hashes.pop(0)
# Нормализуем и добавляем
normalized = self._normalize_vector(vector)
self._negative_vectors.append(normalized)
if text_hash:
self._negative_hashes.append(text_hash)
added += 1
logger.info(f"VectorStore: Добавлено {added} отрицательных примеров батчем (всего: {self.negative_count})")
return added
def calculate_similarity_score(self, vector: np.ndarray) -> Tuple[float, float, float]:
"""
Рассчитывает скор на основе сходства с примерами.
Алгоритм:
1. Вычисляем косинусное сходство со всеми примерами
2. Используем топ-k ближайших примеров для более чувствительной оценки
3. Сравниваем топ-k положительных с топ-k отрицательными
Args:
vector: Векторное представление нового поста
Returns:
Tuple (score, confidence, score_pos_only):
- score: Оценка от 0.0 до 1.0 (neg/pos формула)
- confidence: Уверенность (зависит от количества примеров)
- score_pos_only: Оценка только по положительным примерам
Raises:
InsufficientExamplesError: Если недостаточно примеров
"""
with self._lock:
if self.positive_count == 0:
raise InsufficientExamplesError(
"Нет положительных примеров для сравнения"
)
# Нормализуем входной вектор
normalized = self._normalize_vector(vector)
# Конвертируем в numpy массивы для быстрых вычислений
pos_matrix = np.array(self._positive_vectors)
# Косинусное сходство с положительными примерами
# Для нормализованных векторов это просто скалярное произведение
pos_similarities = np.dot(pos_matrix, normalized)
# Косинусное сходство с отрицательными примерами
if self.negative_count > 0:
neg_matrix = np.array(self._negative_vectors)
neg_similarities = np.dot(neg_matrix, normalized)
else:
neg_similarities = np.array([])
# Используем топ-k ближайших примеров для более чувствительной оценки
# k выбирается динамически: минимум 10, но не больше 20% от общего количества
k_pos = min(max(10, self.positive_count // 10), 50)
k_pos = min(k_pos, len(pos_similarities))
# Топ-k положительных примеров
top_k_pos_sim = float(np.mean(np.sort(pos_similarities)[-k_pos:]))
# Для отрицательных: если их меньше k, берем все, иначе топ-k
if len(neg_similarities) > 0:
k_neg = min(max(10, self.negative_count // 10), 50)
k_neg = min(k_neg, len(neg_similarities))
top_k_neg_sim = float(np.mean(np.sort(neg_similarities)[-k_neg:]))
else:
# Если нет отрицательных примеров, используем нейтральное значение
top_k_neg_sim = top_k_pos_sim # Нейтральный скор = 0.5
# === Вариант 1: neg/pos (разница между топ-k положительными и отрицательными) ===
# Используем более агрессивную нормализацию для малых различий
diff = top_k_pos_sim - top_k_neg_sim
# Адаптивный множитель: чем больше примеров, тем выше чувствительность
adaptive_multiplier = self.score_multiplier * (1.0 + min(1.0, (self.positive_count + self.negative_count) / 1000))
score_neg_pos = 0.5 + (diff * adaptive_multiplier)
score_neg_pos = max(0.0, min(1.0, score_neg_pos))
# === Вариант 2: pos only (только положительные, топ-k ближайших) ===
# Берём топ-5 ближайших положительных примеров
top_5_k = min(5, len(pos_similarities))
top_5_sim = float(np.mean(np.sort(pos_similarities)[-top_5_k:]))
# Нормализуем: 0.85 -> 0.0, 0.95 -> 1.0 (типичный диапазон для BERT)
score_pos_only = (top_5_sim - 0.85) / 0.10
score_pos_only = max(0.0, min(1.0, score_pos_only))
# Основной скор — neg/pos
score = score_neg_pos
# Confidence зависит от количества примеров (100% при 1000 примерах)
total_examples = self.positive_count + self.negative_count
confidence = min(1.0, total_examples / 1000)
# Дополнительная диагностическая информация
pos_mean = float(np.mean(pos_similarities))
pos_std = float(np.std(pos_similarities))
pos_min = float(np.min(pos_similarities))
pos_max = float(np.max(pos_similarities))
if len(neg_similarities) > 0:
neg_mean = float(np.mean(neg_similarities))
neg_std = float(np.std(neg_similarities))
neg_min = float(np.min(neg_similarities))
neg_max = float(np.max(neg_similarities))
else:
neg_mean = neg_std = neg_min = neg_max = 0.0
logger.info(
f"VectorStore: top_k_pos={k_pos}, top_k_neg={k_neg if len(neg_similarities) > 0 else 0}, "
f"top_k_pos_sim={top_k_pos_sim:.4f}, top_k_neg_sim={top_k_neg_sim:.4f}, "
f"diff={diff:.4f}, adaptive_mult={adaptive_multiplier:.2f}, "
f"score_neg_pos={score_neg_pos:.4f}, score_pos_only={score_pos_only:.4f}, "
f"pos_mean={pos_mean:.4f}±{pos_std:.4f}[{pos_min:.4f}-{pos_max:.4f}], "
f"neg_mean={neg_mean:.4f}±{neg_std:.4f}[{neg_min:.4f}-{neg_max:.4f}]"
)
return score, confidence, score_pos_only
def save_to_disk(self, path: Optional[str] = None) -> None:
"""
Сохраняет векторы на диск.
Args:
path: Путь для сохранения (если не указан, используется storage_path)
"""
save_path = path or self.storage_path
if not save_path:
raise VectorStoreError("Путь для сохранения не указан")
with self._lock:
# Создаем директорию если нужно
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
# Сохраняем в npz формате
np.savez_compressed(
save_path,
positive_vectors=np.array(self._positive_vectors) if self._positive_vectors else np.array([]),
negative_vectors=np.array(self._negative_vectors) if self._negative_vectors else np.array([]),
positive_hashes=np.array(self._positive_hashes, dtype=object),
negative_hashes=np.array(self._negative_hashes, dtype=object),
vector_dim=self.vector_dim,
max_examples=self.max_examples,
)
logger.info(
f"VectorStore: Сохранено на диск ({self.positive_count} pos, "
f"{self.negative_count} neg): {save_path}"
)
def _load_from_disk(self) -> None:
"""Загружает векторы с диска."""
if not self.storage_path or not os.path.exists(self.storage_path):
return
try:
with self._lock:
data = np.load(self.storage_path, allow_pickle=True)
# Загружаем векторы
pos_vectors = data.get('positive_vectors', np.array([]))
neg_vectors = data.get('negative_vectors', np.array([]))
if pos_vectors.size > 0:
self._positive_vectors = list(pos_vectors)
if neg_vectors.size > 0:
self._negative_vectors = list(neg_vectors)
# Загружаем хеши
pos_hashes = data.get('positive_hashes', np.array([]))
neg_hashes = data.get('negative_hashes', np.array([]))
if pos_hashes.size > 0:
self._positive_hashes = list(pos_hashes)
if neg_hashes.size > 0:
self._negative_hashes = list(neg_hashes)
logger.info(
f"VectorStore: Загружено с диска ({self.positive_count} pos, "
f"{self.negative_count} neg): {self.storage_path}"
)
except Exception as e:
logger.error(f"VectorStore: Ошибка загрузки с диска: {e}")
# Продолжаем с пустым хранилищем
def clear(self) -> None:
"""Очищает все векторы."""
with self._lock:
self._positive_vectors.clear()
self._negative_vectors.clear()
self._positive_hashes.clear()
self._negative_hashes.clear()
logger.info("VectorStore: Хранилище очищено")
def get_stats(self) -> dict:
"""Возвращает статистику хранилища."""
return {
"positive_count": self.positive_count,
"negative_count": self.negative_count,
"total_count": self.total_count,
"vector_dim": self.vector_dim,
"max_examples": self.max_examples,
"storage_path": self.storage_path,
}