Files
rag-service/app/storage/vector_store.py

760 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
In-memory хранилище векторов на numpy.
Хранит векторные представления постов для быстрого сравнения.
Поддерживает персистентность через сохранение/загрузку с диска.
"""
import hashlib
import logging
import os
import threading
import time
from pathlib import Path
from typing import Any
import numpy as np
from app.exceptions import InsufficientExamplesError, VectorStoreError
logger = logging.getLogger(__name__)
class VectorStore:
"""
In-memory хранилище векторов для RAG.
Хранит отдельно положительные (опубликованные) и отрицательные (отклоненные)
примеры. Использует косинусное сходство для расчета скора.
Attributes:
vector_dim: Размерность векторов (384 для all-MiniLM-L12-v2)
max_examples: Максимальное количество примеров каждого типа
"""
def __init__(
self,
vector_dim: int = 384,
max_examples: int = 10000,
max_submitted: int = 5000,
storage_path: str | None = None,
submitted_path: str | None = None,
score_multiplier: float = 5.0,
k: int = 3,
):
"""
Инициализация хранилища.
Args:
vector_dim: Размерность векторов
max_examples: Максимальное количество примеров каждого типа
max_submitted: Максимальное количество submitted-постов
storage_path: Путь для сохранения/загрузки векторов (опционально)
submitted_path: Путь для сохранения/загрузки submitted-постов (опционально)
score_multiplier: Множитель для масштабирования разницы в скорах
k: Количество ближайших примеров для расчета среднего сходства
"""
self.vector_dim = vector_dim
self.max_examples = max_examples
self.max_submitted = max_submitted
self.storage_path = storage_path
self.submitted_path = submitted_path
self.score_multiplier = score_multiplier
self.k = k
# Инициализируем пустые массивы
# Используем список для динамического добавления, потом конвертируем в numpy
self._positive_vectors: list = []
self._negative_vectors: list = []
self._positive_hashes: list = [] # Хеши текстов для дедупликации
self._negative_hashes: list = []
# Submitted-посты (третья коллекция)
self._submitted_vectors: list = []
self._submitted_hashes: list = []
self._submitted_created_at: list = [] # Unix timestamps
self._submitted_post_ids: list = []
self._submitted_texts: list = []
self._submitted_rag_scores: list = []
# Lock для потокобезопасности
self._lock = threading.Lock()
# Пытаемся загрузить сохраненные векторы
if storage_path:
self._load_from_disk()
if submitted_path:
self._load_submitted_from_disk()
@property
def positive_count(self) -> int:
"""Количество положительных примеров."""
return len(self._positive_vectors)
@property
def negative_count(self) -> int:
"""Количество отрицательных примеров."""
return len(self._negative_vectors)
@property
def total_count(self) -> int:
"""Общее количество примеров."""
return self.positive_count + self.negative_count
@property
def submitted_count(self) -> int:
"""Количество submitted-постов."""
return len(self._submitted_vectors)
@staticmethod
def compute_text_hash(text: str) -> str:
"""Вычисляет хеш текста для дедупликации."""
return hashlib.md5(text.encode("utf-8")).hexdigest()
def _normalize_vector(self, vector: np.ndarray) -> np.ndarray:
"""Нормализует вектор для косинусного сходства."""
norm = np.linalg.norm(vector)
if norm == 0:
return vector
return vector / norm
def add_positive(self, vector: np.ndarray, text_hash: str | None = None) -> bool:
"""
Добавляет положительный пример (опубликованный пост).
Args:
vector: Векторное представление текста
text_hash: Хеш текста для дедупликации (опционально)
Returns:
True если добавлен, False если дубликат или превышен лимит
"""
with self._lock:
# Проверяем дубликат по хешу
if text_hash and text_hash in self._positive_hashes:
logger.debug("VectorStore: Пропуск дубликата положительного примера")
return False
# Проверяем лимит
if len(self._positive_vectors) >= self.max_examples:
# Удаляем самый старый пример (FIFO)
self._positive_vectors.pop(0)
self._positive_hashes.pop(0)
logger.debug("VectorStore: Удален старый положительный пример (лимит)")
# Нормализуем и добавляем
normalized = self._normalize_vector(vector)
self._positive_vectors.append(normalized)
if text_hash:
self._positive_hashes.append(text_hash)
logger.info(
f"VectorStore: Добавлен положительный пример (всего: {self.positive_count})"
)
return True
def add_positive_batch(
self, vectors: list[np.ndarray], text_hashes: list[str] | None = None
) -> int:
"""
Добавляет батч положительных примеров.
Args:
vectors: Список векторов
text_hashes: Список хешей текстов для дедупликации
Returns:
Количество добавленных примеров
"""
if text_hashes is None:
text_hashes = [None] * len(vectors)
added = 0
with self._lock:
for vector, text_hash in zip(vectors, text_hashes):
# Проверяем дубликат по хешу
if text_hash and text_hash in self._positive_hashes:
continue
# Проверяем лимит
if len(self._positive_vectors) >= self.max_examples:
self._positive_vectors.pop(0)
self._positive_hashes.pop(0)
# Нормализуем и добавляем
normalized = self._normalize_vector(vector)
self._positive_vectors.append(normalized)
if text_hash:
self._positive_hashes.append(text_hash)
added += 1
logger.info(
f"VectorStore: Добавлено {added} положительных примеров батчем (всего: {self.positive_count})"
)
return added
def add_negative(self, vector: np.ndarray, text_hash: str | None = None) -> bool:
"""
Добавляет отрицательный пример (отклоненный пост).
Args:
vector: Векторное представление текста
text_hash: Хеш текста для дедупликации (опционально)
Returns:
True если добавлен, False если дубликат или превышен лимит
"""
with self._lock:
# Проверяем дубликат по хешу
if text_hash and text_hash in self._negative_hashes:
logger.debug("VectorStore: Пропуск дубликата отрицательного примера")
return False
# Проверяем лимит
if len(self._negative_vectors) >= self.max_examples:
# Удаляем самый старый пример (FIFO)
self._negative_vectors.pop(0)
self._negative_hashes.pop(0)
logger.debug("VectorStore: Удален старый отрицательный пример (лимит)")
# Нормализуем и добавляем
normalized = self._normalize_vector(vector)
self._negative_vectors.append(normalized)
if text_hash:
self._negative_hashes.append(text_hash)
logger.info(
f"VectorStore: Добавлен отрицательный пример (всего: {self.negative_count})"
)
return True
def add_negative_batch(
self, vectors: list[np.ndarray], text_hashes: list[str] | None = None
) -> int:
"""
Добавляет батч отрицательных примеров.
Args:
vectors: Список векторов
text_hashes: Список хешей текстов для дедупликации
Returns:
Количество добавленных примеров
"""
if text_hashes is None:
text_hashes = [None] * len(vectors)
added = 0
with self._lock:
for vector, text_hash in zip(vectors, text_hashes):
# Проверяем дубликат по хешу
if text_hash and text_hash in self._negative_hashes:
continue
# Проверяем лимит
if len(self._negative_vectors) >= self.max_examples:
self._negative_vectors.pop(0)
self._negative_hashes.pop(0)
# Нормализуем и добавляем
normalized = self._normalize_vector(vector)
self._negative_vectors.append(normalized)
if text_hash:
self._negative_hashes.append(text_hash)
added += 1
logger.info(
f"VectorStore: Добавлено {added} отрицательных примеров батчем (всего: {self.negative_count})"
)
return added
def add_submitted(
self,
vector: np.ndarray,
text_hash: str,
created_at: int,
post_id: int | None = None,
text: str = "",
rag_score: float | None = None,
) -> bool:
"""
Добавляет submitted-пост в коллекцию.
Args:
vector: Векторное представление текста
text_hash: Хеш текста для дедупликации
created_at: Unix timestamp создания
post_id: ID поста (опционально)
text: Текст поста
rag_score: RAG скор поста (опционально)
Returns:
True если добавлен, False если дубликат
"""
with self._lock:
if text_hash in self._submitted_hashes:
logger.debug("VectorStore: Пропуск дубликата submitted-поста")
return False
if len(self._submitted_vectors) >= self.max_submitted:
self._submitted_vectors.pop(0)
self._submitted_hashes.pop(0)
self._submitted_created_at.pop(0)
self._submitted_post_ids.pop(0)
self._submitted_texts.pop(0)
self._submitted_rag_scores.pop(0)
logger.debug("VectorStore: Удален старый submitted-пост (лимит)")
normalized = self._normalize_vector(vector)
self._submitted_vectors.append(normalized)
self._submitted_hashes.append(text_hash)
self._submitted_created_at.append(created_at)
self._submitted_post_ids.append(post_id)
self._submitted_texts.append(text)
self._submitted_rag_scores.append(rag_score)
logger.info(f"VectorStore: Добавлен submitted-пост (всего: {self.submitted_count})")
return True
def find_similar_submitted(
self,
vector: np.ndarray,
threshold: float,
hours: int,
) -> list[dict[str, Any]]:
"""
Ищет похожие submitted-посты за последние N часов.
Args:
vector: Векторное представление запроса
threshold: Минимальный порог similarity (0.0 - 1.0)
hours: Количество часов для фильтрации (created_at >= now - hours*3600)
Returns:
Список dict с полями: similarity, created_at, post_id, text, rag_score
"""
with self._lock:
if self.submitted_count == 0:
return []
now = int(time.time())
cutoff = now - hours * 3600
normalized = self._normalize_vector(vector)
submitted_matrix = np.array(self._submitted_vectors)
similarities = np.dot(submitted_matrix, normalized)
results: list[dict[str, Any]] = []
for i, sim in enumerate(similarities):
if float(sim) < threshold:
continue
created_at = self._submitted_created_at[i]
if created_at < cutoff:
continue
results.append(
{
"similarity": float(sim),
"created_at": created_at,
"post_id": self._submitted_post_ids[i],
"text": self._submitted_texts[i],
"rag_score": self._submitted_rag_scores[i],
}
)
return sorted(results, key=lambda x: x["similarity"], reverse=True)
def calculate_similarity_score(self, vector: np.ndarray) -> tuple[float, float, float]:
"""
Рассчитывает скор на основе сходства с примерами.
Алгоритм:
1. Вычисляем косинусное сходство со всеми примерами
2. Используем топ-k ближайших примеров для более чувствительной оценки
3. Сравниваем топ-k положительных с топ-k отрицательными
Args:
vector: Векторное представление нового поста
Returns:
Tuple (score, confidence, score_pos_only):
- score: Оценка от 0.0 до 1.0 (neg/pos формула)
- confidence: Уверенность (зависит от количества примеров)
- score_pos_only: Оценка только по положительным примерам
Raises:
InsufficientExamplesError: Если недостаточно примеров
"""
with self._lock:
if self.positive_count == 0:
raise InsufficientExamplesError("Нет положительных примеров для сравнения")
# Нормализуем входной вектор
normalized = self._normalize_vector(np.asarray(vector).flatten())
# Конвертируем в numpy массивы для быстрых вычислений
# Используем vstack для гарантии одинаковой формы (совместимость со старым npz)
pos_matrix = np.vstack([np.asarray(v).flatten() for v in self._positive_vectors])
# Косинусное сходство с положительными примерами
# Для нормализованных векторов это просто скалярное произведение
pos_similarities = np.dot(pos_matrix, normalized)
# Косинусное сходство с отрицательными примерами
if self.negative_count > 0:
neg_matrix = np.vstack([np.asarray(v).flatten() for v in self._negative_vectors])
neg_similarities = np.dot(neg_matrix, normalized)
else:
neg_similarities = np.array([])
# Используем топ-k ближайших примеров для расчета среднего сходства
k_pos = min(self.k, len(pos_similarities))
top_k_pos = np.sort(pos_similarities)[-k_pos:]
avg_pos = float(np.mean(top_k_pos))
# Для отрицательных: если их меньше k, берем все, иначе топ-k
if len(neg_similarities) > 0:
k_neg = min(self.k, len(neg_similarities))
top_k_neg = np.sort(neg_similarities)[-k_neg:]
avg_neg = float(np.mean(top_k_neg))
else:
# Если нет отрицательных примеров, используем нейтральное значение
avg_neg = avg_pos # Нейтральный скор = 0.5
# Формула расчета score: (diff * scale + 1) / 2, переводим из [-1, 1] в [0, 1]
diff = avg_pos - avg_neg
score_neg_pos = np.clip((diff * self.score_multiplier + 1) / 2, 0.0, 1.0)
# === Вариант 2: pos only (только положительные, топ-k ближайших) ===
# Берём топ-5 ближайших положительных примеров
top_5_k = min(5, len(pos_similarities))
top_5_sim = float(np.mean(np.sort(pos_similarities)[-top_5_k:]))
# Нормализуем: 0.85 -> 0.0, 0.95 -> 1.0 (типичный диапазон для BERT)
score_pos_only = (top_5_sim - 0.85) / 0.10
score_pos_only = max(0.0, min(1.0, score_pos_only))
# Основной скор — neg/pos
score = score_neg_pos
# Confidence зависит от количества примеров (100% при 1000 примерах)
total_examples = self.positive_count + self.negative_count
confidence = min(1.0, total_examples / 1000)
# Дополнительная диагностическая информация
pos_mean = float(np.mean(pos_similarities))
pos_std = float(np.std(pos_similarities))
pos_min = float(np.min(pos_similarities))
pos_max = float(np.max(pos_similarities))
if len(neg_similarities) > 0:
neg_mean = float(np.mean(neg_similarities))
neg_std = float(np.std(neg_similarities))
neg_min = float(np.min(neg_similarities))
neg_max = float(np.max(neg_similarities))
else:
neg_mean = neg_std = neg_min = neg_max = 0.0
logger.info(
f"VectorStore: k={self.k}, k_pos={k_pos}, k_neg={k_neg if len(neg_similarities) > 0 else 0}, "
f"avg_pos={avg_pos:.4f}, avg_neg={avg_neg:.4f}, "
f"diff={diff:.4f}, score_multiplier={self.score_multiplier}, "
f"score_neg_pos={score_neg_pos:.4f}, score_pos_only={score_pos_only:.4f}, "
f"pos_mean={pos_mean:.4f}±{pos_std:.4f}[{pos_min:.4f}-{pos_max:.4f}], "
f"neg_mean={neg_mean:.4f}±{neg_std:.4f}[{neg_min:.4f}-{neg_max:.4f}]"
)
return score, confidence, score_pos_only
def save_to_disk(self, path: str | None = None) -> None:
"""
Сохраняет векторы на диск.
Args:
path: Путь для сохранения (если не указан, используется storage_path)
"""
save_path = path or self.storage_path
if not save_path:
raise VectorStoreError("Путь для сохранения не указан")
with self._lock:
# Создаем директорию если нужно
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
# Сохраняем в npz формате
np.savez_compressed(
save_path,
positive_vectors=np.array(self._positive_vectors)
if self._positive_vectors
else np.array([]),
negative_vectors=np.array(self._negative_vectors)
if self._negative_vectors
else np.array([]),
positive_hashes=np.array(self._positive_hashes, dtype=object),
negative_hashes=np.array(self._negative_hashes, dtype=object),
vector_dim=self.vector_dim,
max_examples=self.max_examples,
)
logger.info(
f"VectorStore: Сохранено на диск ({self.positive_count} pos, "
f"{self.negative_count} neg): {save_path}"
)
def save_submitted_to_disk(self, path: str | None = None) -> None:
"""
Сохраняет submitted-коллекцию на диск.
Args:
path: Путь для сохранения (если не указан, используется submitted_path)
"""
save_path = path or self.submitted_path
if not save_path:
raise VectorStoreError("Путь для сохранения submitted не указан")
with self._lock:
Path(save_path).parent.mkdir(parents=True, exist_ok=True)
np.savez_compressed(
save_path,
vectors=np.array(self._submitted_vectors)
if self._submitted_vectors
else np.array([]),
hashes=np.array(self._submitted_hashes, dtype=object),
created_at=np.array(self._submitted_created_at)
if self._submitted_created_at
else np.array([]),
post_ids=np.array(self._submitted_post_ids, dtype=object),
texts=np.array(self._submitted_texts, dtype=object),
rag_scores=np.array(self._submitted_rag_scores, dtype=object),
)
logger.info(f"VectorStore: Сохранено submitted ({self.submitted_count}): {save_path}")
def _load_submitted_from_disk(self) -> None:
"""Загружает submitted-коллекцию с диска."""
if not self.submitted_path or not os.path.exists(self.submitted_path):
return
try:
with self._lock:
data = np.load(self.submitted_path, allow_pickle=True)
vectors = data.get("vectors", np.array([]))
if vectors.size > 0:
if len(vectors.shape) == 2:
self._submitted_vectors = [
self._normalize_vector(np.array(v)) for v in vectors
]
elif len(vectors.shape) == 1:
self._submitted_vectors = [self._normalize_vector(np.array(vectors))]
else:
self._submitted_vectors = []
else:
self._submitted_vectors = []
hashes = data.get("hashes", np.array([]))
self._submitted_hashes = list(hashes) if hashes.size > 0 else []
created_at = data.get("created_at", np.array([]))
self._submitted_created_at = list(created_at) if created_at.size > 0 else []
post_ids = data.get("post_ids", np.array([]))
self._submitted_post_ids = list(post_ids) if post_ids.size > 0 else []
texts = data.get("texts", np.array([]))
self._submitted_texts = list(texts) if texts.size > 0 else []
rag_scores = data.get("rag_scores", np.array([]))
self._submitted_rag_scores = list(rag_scores) if rag_scores.size > 0 else []
# Выравниваем длины (на случай поврежденных данных)
n = len(self._submitted_vectors)
self._submitted_hashes = self._submitted_hashes[:n]
self._submitted_created_at = self._submitted_created_at[:n]
self._submitted_post_ids = self._submitted_post_ids[:n]
self._submitted_texts = self._submitted_texts[:n]
self._submitted_rag_scores = self._submitted_rag_scores[:n]
logger.info(
f"VectorStore: Загружено submitted ({self.submitted_count}): {self.submitted_path}"
)
except Exception as e:
logger.error(f"VectorStore: Ошибка загрузки submitted с диска: {e}")
def _load_from_disk(self) -> None:
"""Загружает векторы с диска."""
if not self.storage_path:
return
try:
with self._lock:
storage_dir = Path(self.storage_path).parent
positive_npy = storage_dir / "positive_embeddings.npy"
negative_npy = storage_dir / "negative_embeddings.npy"
# Отладочное логирование
logger.info(
f"VectorStore: Проверка путей - storage_dir={storage_dir}, positive_npy={positive_npy}, exists={positive_npy.exists()}, negative_npy={negative_npy}, exists={negative_npy.exists()}"
)
# Проверяем наличие отдельных .npy файлов
if positive_npy.exists() or negative_npy.exists():
logger.info("VectorStore: Обнаружены отдельные .npy файлы, загружаем их...")
# Загружаем положительные векторы
if positive_npy.exists():
pos_vectors = np.load(positive_npy, allow_pickle=False)
if pos_vectors.size > 0:
# Проверяем размерность
if len(pos_vectors.shape) == 2:
# Массив векторов [N, dim]
self._positive_vectors = [vec for vec in pos_vectors]
elif len(pos_vectors.shape) == 1:
# Один вектор [dim]
self._positive_vectors = [pos_vectors]
else:
logger.warning(
f"VectorStore: Неожиданная размерность positive_embeddings.npy: {pos_vectors.shape}"
)
self._positive_vectors = []
logger.info(
f"VectorStore: Загружено {len(self._positive_vectors)} положительных векторов из {positive_npy}"
)
# Загружаем отрицательные векторы
if negative_npy.exists():
neg_vectors = np.load(negative_npy, allow_pickle=False)
if neg_vectors.size > 0:
# Проверяем размерность
if len(neg_vectors.shape) == 2:
# Массив векторов [N, dim]
self._negative_vectors = [vec for vec in neg_vectors]
elif len(neg_vectors.shape) == 1:
# Один вектор [dim]
self._negative_vectors = [neg_vectors]
else:
logger.warning(
f"VectorStore: Неожиданная размерность negative_embeddings.npy: {neg_vectors.shape}"
)
self._negative_vectors = []
logger.info(
f"VectorStore: Загружено {len(self._negative_vectors)} отрицательных векторов из {negative_npy}"
)
# Нормализуем загруженные векторы
self._positive_vectors = [
self._normalize_vector(np.array(v)) for v in self._positive_vectors
]
self._negative_vectors = [
self._normalize_vector(np.array(v)) for v in self._negative_vectors
]
logger.info(
f"VectorStore: Загружено с диска из .npy файлов ({self.positive_count} pos, "
f"{self.negative_count} neg)"
)
return
# Если отдельных .npy файлов нет, пытаемся загрузить из старого формата .npz
if os.path.exists(self.storage_path):
logger.info(
f"VectorStore: Загружаем из старого формата .npz: {self.storage_path}"
)
data = np.load(self.storage_path, allow_pickle=True)
# Загружаем векторы
pos_vectors = data.get("positive_vectors", np.array([]))
neg_vectors = data.get("negative_vectors", np.array([]))
if pos_vectors.size > 0:
self._positive_vectors = list(pos_vectors)
if neg_vectors.size > 0:
self._negative_vectors = list(neg_vectors)
# Загружаем хеши
pos_hashes = data.get("positive_hashes", np.array([]))
neg_hashes = data.get("negative_hashes", np.array([]))
if pos_hashes.size > 0:
self._positive_hashes = list(pos_hashes)
if neg_hashes.size > 0:
self._negative_hashes = list(neg_hashes)
logger.info(
f"VectorStore: Загружено с диска ({self.positive_count} pos, "
f"{self.negative_count} neg): {self.storage_path}"
)
except Exception as e:
logger.error(f"VectorStore: Ошибка загрузки с диска: {e}")
# Продолжаем с пустым хранилищем
def clear(self) -> None:
"""Очищает все векторы."""
with self._lock:
self._positive_vectors.clear()
self._negative_vectors.clear()
self._positive_hashes.clear()
self._negative_hashes.clear()
self._submitted_vectors.clear()
self._submitted_hashes.clear()
self._submitted_created_at.clear()
self._submitted_post_ids.clear()
self._submitted_texts.clear()
self._submitted_rag_scores.clear()
logger.info("VectorStore: Хранилище очищено")
def get_stats(self) -> dict:
"""Возвращает статистику хранилища."""
return {
"positive_count": self.positive_count,
"negative_count": self.negative_count,
"total_count": self.total_count,
"submitted_count": self.submitted_count,
"vector_dim": self.vector_dim,
"max_examples": self.max_examples,
"max_submitted": self.max_submitted,
}
def get_scoring_params(self) -> dict:
"""Возвращает текущие параметры формулы расчета score."""
return {
"score_multiplier": self.score_multiplier,
"k": self.k,
}
def update_scoring_params(
self,
score_multiplier: float | None = None,
k: int | None = None,
) -> dict:
"""
Обновляет параметры формулы расчета score.
Args:
score_multiplier: Множитель для масштабирования разницы (должен быть > 0)
k: Количество ближайших примеров для расчета среднего (должно быть >= 1)
Returns:
dict: Обновленные параметры
Raises:
ValueError: При невалидных значениях
"""
with self._lock:
if score_multiplier is not None:
if score_multiplier <= 0:
raise ValueError("score_multiplier должен быть > 0")
self.score_multiplier = score_multiplier
if k is not None:
if k < 1:
raise ValueError("k должен быть >= 1")
self.k = k
logger.info(
f"VectorStore: Параметры формулы обновлены: "
f"score_multiplier={self.score_multiplier}, k={self.k}"
)
return self.get_scoring_params()