diff --git a/.cursor/rules/error-handling.md b/.cursor/rules/error-handling.md
index 8ca866c..f995d49 100644
--- a/.cursor/rules/error-handling.md
+++ b/.cursor/rules/error-handling.md
@@ -110,12 +110,63 @@ logger.error(f"Критическая ошибка: {e}", exc_info=True)
### Уровни логирования
-- `logger.debug()` - отладочная информация
-- `logger.info()` - информационные сообщения о работе
-- `logger.warning()` - предупреждения о потенциальных проблемах
-- `logger.error()` - ошибки, требующие внимания
+- `logger.debug()` - отладочная информация (детали выполнения, промежуточные значения, HTTP запросы(не используется в проекте))
+- `logger.info()` - информационные сообщения о работе (успешные операции, важные события)
+- `logger.warning()` - предупреждения о потенциальных проблемах (некритичные ошибки, таймауты)
+- `logger.error()` - ошибки, требующие внимания (исключения, сбои)
- `logger.critical()` - критические ошибки
+### Паттерн логирования в сервисах
+
+При работе с внешними API и сервисами используйте следующий паттерн:
+
+```python
+from logs.custom_logger import logger
+
+class ApiClient:
+ async def calculate_score(self, text: str) -> Score:
+ # Логируем начало операции (debug)
+ logger.debug(f"ApiClient: Отправка запроса на расчет скора (text_preview='{text[:50]}')")
+
+ try:
+ response = await self._client.post(url, json=data)
+
+ # Логируем статус ответа (debug)
+ logger.debug(f"ApiClient: Получен ответ (status={response.status_code})")
+
+ # Обрабатываем ответ
+ if response.status_code == 200:
+ result = response.json()
+ # Логируем успешный результат (info)
+ logger.info(f"ApiClient: Скор успешно получен (score={result['score']:.4f})")
+ return result
+ else:
+ # Логируем ошибку (error)
+ logger.error(f"ApiClient: Ошибка API (status={response.status_code})")
+ raise ApiError(f"Ошибка API: {response.status_code}")
+
+ except httpx.TimeoutException:
+ # Логируем таймаут (error)
+ logger.error(f"ApiClient: Таймаут запроса (>{timeout}с)")
+ raise
+ except httpx.RequestError as e:
+ # Логируем ошибку подключения (error)
+ logger.error(f"ApiClient: Ошибка подключения: {e}")
+ raise
+ except Exception as e:
+ # Логируем неожиданные ошибки (error)
+ logger.error(f"ApiClient: Неожиданная ошибка: {e}", exc_info=True)
+ raise
+```
+
+**Принципы:**
+- `logger.debug()` - для деталей выполнения (URL, параметры запроса, статус ответа)
+- `logger.info()` - для успешных операций с важными результатами
+- `logger.warning()` - для некритичных проблем (валидация, таймауты в неважных операциях)
+- `logger.error()` - для всех ошибок перед пробросом исключения
+- Всегда логируйте ошибки перед `raise`
+- Используйте `exc_info=True` для критических ошибок
+
## Метрики ошибок
Декоратор `@track_errors` автоматически отслеживает ошибки:
diff --git a/.gitignore b/.gitignore
index 690367e..6cc702c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -92,4 +92,7 @@ venv.bak/
# Other files
voice_users/
-files/
\ No newline at end of file
+files/
+
+# ML models and vectors cache
+data/
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
index c41ab93..c120d14 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,14 +1,14 @@
###########################################
# Этап 1: Сборщик (Builder)
###########################################
-FROM python:3.11.9-slim as builder
+FROM python:3.11.9-alpine as builder
-# Устанавливаем инструменты для компиляции
-RUN apt-get update && apt-get install --no-install-recommends -y \
+# Устанавливаем инструменты для компиляции (если нужны для некоторых пакетов)
+RUN apk add --no-cache \
gcc \
- g++ \
- python3-dev \
- && rm -rf /var/lib/apt/lists/*
+ musl-dev \
+ libffi-dev \
+ && rm -rf /var/cache/apk/*
WORKDIR /app
COPY requirements.txt .
@@ -20,30 +20,20 @@ RUN pip install --no-cache-dir --target /install -r requirements.txt
###########################################
# Этап 2: Финальный образ (Runtime)
###########################################
-FROM python:3.11.9-slim as runtime
-
-# Минимальные рантайм-зависимости
-RUN apt-get update && apt-get install --no-install-recommends -y \
- libgomp1 \
- && rm -rf /var/lib/apt/lists/*
+FROM python:3.11.9-alpine as runtime
# Создаем пользователя
-RUN groupadd -g 1001 deploy && useradd -r -u 1001 -g deploy deploy
+RUN addgroup -g 1001 deploy && adduser -D -u 1001 -G deploy deploy
WORKDIR /app
# Копируем зависимости
COPY --from=builder --chown=deploy:deploy /install /usr/local/lib/python3.11/site-packages
-# Создаем структуру папок (включая директории для ML моделей)
-RUN mkdir -p database logs voice_users data/models && \
+# Создаем структуру папок
+RUN mkdir -p database logs voice_users && \
chown -R deploy:deploy /app
-# Устанавливаем переменные для HuggingFace (кеш моделей внутри /app)
-ENV HF_HOME=/app/data/models
-ENV TRANSFORMERS_CACHE=/app/data/models
-ENV HF_HUB_CACHE=/app/data/models
-
# Копируем исходный код
COPY --chown=deploy:deploy . .
diff --git a/database/repositories/post_repository.py b/database/repositories/post_repository.py
index cae5ede..e819cb6 100644
--- a/database/repositories/post_repository.py
+++ b/database/repositories/post_repository.py
@@ -462,21 +462,3 @@ class PostRepository(DatabaseConnection):
self.logger.info(f"Получено {len(texts)} отклоненных постов для обучения")
return texts
- async def update_vector_hash(self, message_id: int, vector_hash: str) -> bool:
- """
- Обновляет хеш вектора для поста (для кеширования).
-
- Args:
- message_id: ID сообщения
- vector_hash: Хеш вектора
-
- Returns:
- True если обновлено успешно
- """
- try:
- query = "UPDATE post_from_telegram_suggest SET vector_hash = ? WHERE message_id = ?"
- await self._execute_query(query, (vector_hash, message_id))
- return True
- except Exception as e:
- self.logger.error(f"Ошибка обновления vector_hash для message_id={message_id}: {e}")
- return False
diff --git a/env.example b/env.example
index ea06d24..9350527 100644
--- a/env.example
+++ b/env.example
@@ -36,14 +36,13 @@ METRICS_PORT=8080
LOG_LEVEL=INFO
LOG_RETENTION_DAYS=30
-# ML Scoring - RAG (ruBERT)
-# Включает локальное векторное сравнение с использованием ruBERT
+# ML Scoring - RAG API
+# Включает оценку постов через внешний RAG API сервис
RAG_ENABLED=false
-RAG_MODEL=DeepPavlov/rubert-base-cased
-RAG_CACHE_DIR=data/models
-RAG_VECTORS_PATH=data/vectors.npz
-RAG_MAX_EXAMPLES=10000
-RAG_SCORE_MULTIPLIER=5
+RAG_API_URL=http://xx.xxx.xx.xx/api/v1
+RAG_API_KEY=your_rag_api_key_here
+RAG_API_TIMEOUT=30
+RAG_TEST_MODE=false
# ML Scoring - DeepSeek API
# Включает оценку постов через DeepSeek API
diff --git a/helper_bot/handlers/admin/admin_handlers.py b/helper_bot/handlers/admin/admin_handlers.py
index 31ed534..89159a5 100644
--- a/helper_bot/handlers/admin/admin_handlers.py
+++ b/helper_bot/handlers/admin/admin_handlers.py
@@ -161,7 +161,7 @@ async def get_ml_stats(
await message.answer("📊 ML Scoring отключен\n\nДля включения установите RAG_ENABLED=true или DEEPSEEK_ENABLED=true в .env")
return
- stats = scoring_manager.get_stats()
+ stats = await scoring_manager.get_stats()
# Формируем текст статистики
lines = ["📊 ML Scoring Статистика\n"]
@@ -169,16 +169,22 @@ async def get_ml_stats(
# RAG статистика
if "rag" in stats:
rag = stats["rag"]
- lines.append("🤖 RAG (ruBERT):")
+ lines.append("🤖 RAG API:")
lines.append(f" • Статус: {'✅ Включен' if rag.get('enabled') else '❌ Отключен'}")
- lines.append(f" • Модель: {rag.get('model_name', 'N/A')}")
- lines.append(f" • Модель загружена: {'✅' if rag.get('model_loaded') else '❌'}")
+ lines.append(f" • API URL: {rag.get('api_url', 'N/A')}")
+
+ # Статистика из API (если доступна)
+ if "positive_examples" in rag or "negative_examples" in rag:
+ lines.append(f" • Положительных примеров: {rag.get('positive_examples', 0)}")
+ lines.append(f" • Отрицательных примеров: {rag.get('negative_examples', 0)}")
+ lines.append(f" • Всего примеров: {rag.get('total_examples', rag.get('positive_examples', 0) + rag.get('negative_examples', 0))}")
+
+ # Модель из API (если доступна)
+ if "model_loaded" in rag:
+ lines.append(f" • Модель загружена: {'✅' if rag.get('model_loaded') else '❌'}")
+ if "model_name" in rag:
+ lines.append(f" • Модель: {rag.get('model_name', 'N/A')}")
- vs = rag.get("vector_store", {})
- lines.append(f" • Положительных примеров: {vs.get('positive_count', 0)}")
- lines.append(f" • Отрицательных примеров: {vs.get('negative_count', 0)}")
- lines.append(f" • Всего примеров: {vs.get('total_count', 0)}")
- lines.append(f" • Макс. примеров: {vs.get('max_examples', 'N/A')}")
lines.append("")
# DeepSeek статистика
diff --git a/helper_bot/handlers/private/services.py b/helper_bot/handlers/private/services.py
index 904dd60..3fc4582 100644
--- a/helper_bot/handlers/private/services.py
+++ b/helper_bot/handlers/private/services.py
@@ -162,7 +162,7 @@ class PostService:
# Получаем данные от RAG
rag_confidence = scores.rag.confidence if scores.rag else None
- rag_score_pos_only = scores.rag.metadata.get("score_pos_only") if scores.rag else None
+ rag_score_pos_only = scores.rag.metadata.get("rag_score_pos_only") if scores.rag else None
return scores.deepseek_score, scores.rag_score, rag_confidence, rag_score_pos_only, ml_scores_json
except Exception as e:
diff --git a/helper_bot/main.py b/helper_bot/main.py
index b710d47..a85ee0a 100644
--- a/helper_bot/main.py
+++ b/helper_bot/main.py
@@ -66,11 +66,22 @@ async def start_bot(bdf):
# Middleware уже добавлены на уровне dispatcher
dp.include_routers(admin_router, private_router, callback_router, group_router, voice_router)
+ # Получаем scoring_manager для использования в shutdown
+ scoring_manager = bdf.get_scoring_manager()
+
# Добавляем обработчик завершения для корректного закрытия
@dp.shutdown()
async def on_shutdown():
logging.info("Bot shutdown initiated, cleaning up resources...")
try:
+ # Закрываем ресурсы ScoringManager
+ if scoring_manager:
+ try:
+ await scoring_manager.close()
+ logging.info("ScoringManager закрыт")
+ except Exception as e:
+ logging.error(f"Ошибка закрытия ScoringManager: {e}")
+
await bot.session.close()
logging.info("Bot session closed successfully")
except Exception as e:
@@ -78,22 +89,6 @@ async def start_bot(bdf):
await bot.delete_webhook(drop_pending_updates=True)
- # Загружаем примеры для RAG из базы данных
- scoring_manager = bdf.get_scoring_manager()
- if scoring_manager and scoring_manager.rag_service and scoring_manager.rag_service.is_enabled:
- try:
- db = bdf.get_db()
- positive_texts = await db.get_approved_posts_texts(limit=5000)
- negative_texts = await db.get_declined_posts_texts(limit=5000)
-
- if positive_texts or negative_texts:
- await scoring_manager.load_examples_from_db(positive_texts, negative_texts)
- logging.info(f"RAG: Загружено {len(positive_texts)} положительных и {len(negative_texts)} отрицательных примеров")
- else:
- logging.warning("RAG: Нет примеров в базе данных для загрузки")
- except Exception as e:
- logging.error(f"Ошибка загрузки примеров для RAG: {e}")
-
# Запускаем HTTP сервер для метрик параллельно с ботом
metrics_host = bdf.settings.get('Metrics', {}).get('host', '0.0.0.0')
metrics_port = bdf.settings.get('Metrics', {}).get('port', 8080)
@@ -113,6 +108,14 @@ async def start_bot(bdf):
logging.error(f"❌ Ошибка запуска бота: {e}")
raise
finally:
+ # Закрываем ресурсы ScoringManager перед завершением (на случай если shutdown не сработал)
+ if scoring_manager:
+ try:
+ await scoring_manager.close()
+ logging.info("ScoringManager закрыт в finally")
+ except Exception as e:
+ logging.error(f"Ошибка закрытия ScoringManager в finally: {e}")
+
# Останавливаем метрики сервер при завершении
try:
await stop_metrics_server()
diff --git a/helper_bot/services/scoring/__init__.py b/helper_bot/services/scoring/__init__.py
index a56b7fe..b0eb339 100644
--- a/helper_bot/services/scoring/__init__.py
+++ b/helper_bot/services/scoring/__init__.py
@@ -2,10 +2,9 @@
Сервисы для ML-скоринга постов.
Включает:
-- RAGService - локальное векторное сравнение с ruBERT
+- RagApiClient - HTTP клиент для внешнего RAG API сервиса
- DeepSeekService - интеграция с DeepSeek API
- ScoringManager - объединение всех сервисов скоринга
-- VectorStore - in-memory хранилище векторов
"""
from .base import ScoringResult, ScoringServiceProtocol, CombinedScore
@@ -17,8 +16,7 @@ from .exceptions import (
InsufficientExamplesError,
TextTooShortError,
)
-from .vector_store import VectorStore
-from .rag_service import RAGService
+from .rag_client import RagApiClient
from .deepseek_service import DeepSeekService
from .scoring_manager import ScoringManager
@@ -35,8 +33,7 @@ __all__ = [
"InsufficientExamplesError",
"TextTooShortError",
# Сервисы
- "VectorStore",
- "RAGService",
+ "RagApiClient",
"DeepSeekService",
"ScoringManager",
]
diff --git a/helper_bot/services/scoring/rag_client.py b/helper_bot/services/scoring/rag_client.py
new file mode 100644
index 0000000..fc35a14
--- /dev/null
+++ b/helper_bot/services/scoring/rag_client.py
@@ -0,0 +1,311 @@
+"""
+HTTP клиент для взаимодействия с внешним RAG сервисом.
+
+Использует REST API для получения скоров и отправки примеров.
+"""
+
+from typing import Optional, Dict, Any
+import httpx
+from logs.custom_logger import logger
+from helper_bot.utils.metrics import track_time, track_errors
+
+from .base import ScoringResult
+from .exceptions import ScoringError, InsufficientExamplesError, TextTooShortError
+
+
+class RagApiClient:
+ """
+ HTTP клиент для взаимодействия с внешним RAG сервисом.
+
+ Использует REST API для:
+ - Получения скоров постов (POST /api/v1/score)
+ - Отправки положительных примеров (POST /api/v1/examples/positive)
+ - Отправки отрицательных примеров (POST /api/v1/examples/negative)
+ - Получения статистики (GET /api/v1/stats)
+
+ Attributes:
+ api_url: Базовый URL API сервиса
+ api_key: API ключ для аутентификации
+ timeout: Таймаут запросов в секундах
+ test_mode: Включен ли тестовый режим (добавляет заголовок X-Test-Mode: true)
+ enabled: Включен ли клиент
+ """
+
+ def __init__(
+ self,
+ api_url: str,
+ api_key: str,
+ timeout: int = 30,
+ test_mode: bool = False,
+ enabled: bool = True,
+ ):
+ """
+ Инициализация клиента.
+
+ Args:
+ api_url: Базовый URL API (например, http://хх.ххх.ххх.хх/api/v1)
+ api_key: API ключ для аутентификации
+ timeout: Таймаут запросов в секундах
+ test_mode: Включен ли тестовый режим (добавляет заголовок X-Test-Mode: true к запросам examples)
+ enabled: Включен ли клиент
+ """
+ # Убираем trailing slash если есть
+ self.api_url = api_url.rstrip('/')
+ self.api_key = api_key
+ self.timeout = timeout
+ self.test_mode = test_mode
+ self._enabled = enabled
+
+ # Создаем HTTP клиент
+ self._client = httpx.AsyncClient(
+ timeout=httpx.Timeout(timeout),
+ headers={
+ "X-API-Key": api_key,
+ "Content-Type": "application/json",
+ }
+ )
+
+ logger.info(f"RagApiClient инициализирован (url={self.api_url}, enabled={enabled})")
+
+ @property
+ def source_name(self) -> str:
+ """Имя источника для результатов."""
+ return "rag"
+
+ @property
+ def is_enabled(self) -> bool:
+ """Проверяет, включен ли клиент."""
+ return self._enabled
+
+ async def close(self) -> None:
+ """Закрывает HTTP клиент."""
+ await self._client.aclose()
+
+ @track_time("calculate_score", "rag_client")
+ @track_errors("rag_client", "calculate_score")
+ async def calculate_score(self, text: str) -> ScoringResult:
+ """
+ Рассчитывает скор для текста поста через API.
+
+ Args:
+ text: Текст поста для оценки
+
+ Returns:
+ ScoringResult с оценкой
+
+ Raises:
+ ScoringError: При ошибке расчета
+ InsufficientExamplesError: Если недостаточно примеров
+ TextTooShortError: Если текст слишком короткий
+ """
+ if not self._enabled:
+ raise ScoringError("RAG API клиент отключен")
+
+ if not text or not text.strip():
+ raise TextTooShortError("Текст пустой")
+
+ try:
+ response = await self._client.post(
+ f"{self.api_url}/score",
+ json={"text": text.strip()}
+ )
+
+ # Обрабатываем различные статусы
+ if response.status_code == 400:
+ try:
+ error_data = response.json()
+ error_msg = error_data.get("detail", "Неизвестная ошибка")
+ except Exception:
+ error_msg = response.text or "Неизвестная ошибка"
+
+ logger.warning(f"RagApiClient: Ошибка валидации запроса: {error_msg}")
+
+ if "недостаточно" in error_msg.lower() or "insufficient" in error_msg.lower():
+ raise InsufficientExamplesError(error_msg)
+ if "коротк" in error_msg.lower() or "short" in error_msg.lower():
+ raise TextTooShortError(error_msg)
+ raise ScoringError(f"Ошибка валидации: {error_msg}")
+
+ if response.status_code == 401:
+ logger.error("RagApiClient: Ошибка аутентификации: неверный API ключ")
+ raise ScoringError("Ошибка аутентификации: неверный API ключ")
+
+ if response.status_code == 404:
+ logger.error("RagApiClient: RAG API endpoint не найден")
+ raise ScoringError("RAG API endpoint не найден")
+
+ if response.status_code >= 500:
+ logger.error(f"RagApiClient: Ошибка сервера RAG API: {response.status_code}")
+ raise ScoringError(f"Ошибка сервера RAG API: {response.status_code}")
+
+ # Проверяем успешный статус
+ if response.status_code != 200:
+ response.raise_for_status()
+
+ data = response.json()
+
+ # Парсим ответ
+ score = float(data.get("rag_score", 0.0))
+ confidence = float(data.get("rag_confidence", 0.0)) if data.get("rag_confidence") is not None else None
+
+ # Форматируем confidence для логирования
+ confidence_str = f"{confidence:.4f}" if confidence is not None else "None"
+
+ logger.info(
+ f"RagApiClient: Скор успешно получен "
+ f"(score={score:.4f}, confidence={confidence_str})"
+ )
+
+ return ScoringResult(
+ score=score,
+ source=self.source_name,
+ model=data.get("meta", {}).get("model", "rag-service"),
+ confidence=confidence,
+ metadata={
+ "rag_score_pos_only": float(data.get("rag_score_pos_only", 0.0)) if data.get("rag_score_pos_only") is not None else None,
+ "positive_examples": data.get("meta", {}).get("positive_examples"),
+ "negative_examples": data.get("meta", {}).get("negative_examples"),
+ }
+ )
+
+ except httpx.TimeoutException:
+ logger.error(f"RagApiClient: Таймаут запроса к RAG API (>{self.timeout}с)")
+ raise ScoringError(f"Таймаут запроса к RAG API (>{self.timeout}с)")
+ except httpx.RequestError as e:
+ logger.error(f"RagApiClient: Ошибка подключения к RAG API: {e}")
+ raise ScoringError(f"Ошибка подключения к RAG API: {e}")
+ except (KeyError, ValueError, TypeError) as e:
+ logger.error(f"RagApiClient: Ошибка парсинга ответа: {e}, response: {response.text if 'response' in locals() else 'N/A'}")
+ raise ScoringError(f"Ошибка парсинга ответа от RAG API: {e}")
+ except InsufficientExamplesError:
+ raise
+ except TextTooShortError:
+ raise
+ except ScoringError:
+ # Уже залогированные ошибки (401, 404, 500, таймауты и т.д.) - просто пробрасываем
+ raise
+ except Exception as e:
+ # Только действительно неожиданные ошибки логируем здесь
+ logger.error(f"RagApiClient: Неожиданная ошибка при расчете скора: {e}", exc_info=True)
+ raise ScoringError(f"Неожиданная ошибка: {e}")
+
+ @track_time("add_positive_example", "rag_client")
+ async def add_positive_example(self, text: str) -> None:
+ """
+ Добавляет текст как положительный пример (опубликованный пост).
+
+ Args:
+ text: Текст опубликованного поста
+ """
+ if not self._enabled:
+ return
+
+ if not text or not text.strip():
+ return
+
+ try:
+ # Формируем заголовки (добавляем X-Test-Mode если включен тестовый режим)
+ headers = {}
+ if self.test_mode:
+ headers["X-Test-Mode"] = "true"
+
+ response = await self._client.post(
+ f"{self.api_url}/examples/positive",
+ json={"text": text.strip()},
+ headers=headers
+ )
+
+ if response.status_code == 200 or response.status_code == 201:
+ logger.info("RagApiClient: Положительный пример успешно добавлен")
+ elif response.status_code == 400:
+ logger.warning(f"RagApiClient: Ошибка валидации при добавлении положительного примера: {response.text}")
+ else:
+ logger.warning(f"RagApiClient: Неожиданный статус при добавлении положительного примера: {response.status_code}")
+
+ except httpx.TimeoutException:
+ logger.warning(f"RagApiClient: Таймаут при добавлении положительного примера")
+ except httpx.RequestError as e:
+ logger.warning(f"RagApiClient: Ошибка подключения при добавлении положительного примера: {e}")
+ except Exception as e:
+ logger.error(f"RagApiClient: Ошибка добавления положительного примера: {e}")
+
+ @track_time("add_negative_example", "rag_client")
+ async def add_negative_example(self, text: str) -> None:
+ """
+ Добавляет текст как отрицательный пример (отклоненный пост).
+
+ Args:
+ text: Текст отклоненного поста
+ """
+ if not self._enabled:
+ return
+
+ if not text or not text.strip():
+ return
+
+ try:
+ # Формируем заголовки (добавляем X-Test-Mode если включен тестовый режим)
+ headers = {}
+ if self.test_mode:
+ headers["X-Test-Mode"] = "true"
+
+ response = await self._client.post(
+ f"{self.api_url}/examples/negative",
+ json={"text": text.strip()},
+ headers=headers
+ )
+
+ if response.status_code == 200 or response.status_code == 201:
+ logger.info("RagApiClient: Отрицательный пример успешно добавлен")
+ elif response.status_code == 400:
+ logger.warning(f"RagApiClient: Ошибка валидации при добавлении отрицательного примера: {response.text}")
+ else:
+ logger.warning(f"RagApiClient: Неожиданный статус при добавлении отрицательного примера: {response.status_code}")
+
+ except httpx.TimeoutException:
+ logger.warning(f"RagApiClient: Таймаут при добавлении отрицательного примера")
+ except httpx.RequestError as e:
+ logger.warning(f"RagApiClient: Ошибка подключения при добавлении отрицательного примера: {e}")
+ except Exception as e:
+ logger.error(f"RagApiClient: Ошибка добавления отрицательного примера: {e}")
+
+ async def get_stats(self) -> Dict[str, Any]:
+ """
+ Получает статистику от RAG API.
+
+ Returns:
+ Словарь со статистикой или пустой словарь при ошибке
+ """
+ if not self._enabled:
+ return {}
+
+ try:
+ response = await self._client.get(f"{self.api_url}/stats")
+
+ if response.status_code == 200:
+ return response.json()
+ else:
+ logger.warning(f"RagApiClient: Неожиданный статус при получении статистики: {response.status_code}")
+ return {}
+
+ except httpx.TimeoutException:
+ logger.warning(f"RagApiClient: Таймаут при получении статистики")
+ return {}
+ except httpx.RequestError as e:
+ logger.warning(f"RagApiClient: Ошибка подключения при получении статистики: {e}")
+ return {}
+ except Exception as e:
+ logger.error(f"RagApiClient: Ошибка получения статистики: {e}")
+ return {}
+
+ def get_stats_sync(self) -> Dict[str, Any]:
+ """
+ Синхронная версия get_stats для использования в get_stats() ScoringManager.
+
+ Внимание: Это заглушка, реальная статистика будет получена асинхронно.
+ """
+ return {
+ "enabled": self._enabled,
+ "api_url": self.api_url,
+ "timeout": self.timeout,
+ }
diff --git a/helper_bot/services/scoring/rag_service.py b/helper_bot/services/scoring/rag_service.py
deleted file mode 100644
index 0c02272..0000000
--- a/helper_bot/services/scoring/rag_service.py
+++ /dev/null
@@ -1,507 +0,0 @@
-"""
-RAG сервис для скоринга постов с использованием ruBERT.
-
-Использует модель DeepPavlov/rubert-base-cased для создания эмбеддингов
-и сравнивает их с эталонными примерами через VectorStore.
-"""
-
-import asyncio
-from typing import Optional, List
-
-import numpy as np
-
-from logs.custom_logger import logger
-from helper_bot.utils.metrics import track_time, track_errors
-
-from .base import ScoringResult
-from .vector_store import VectorStore
-from .exceptions import (
- ModelNotLoadedError,
- ScoringError,
- InsufficientExamplesError,
- TextTooShortError,
-)
-
-
-class RAGService:
- """
- RAG сервис для оценки постов на основе векторного сходства.
-
- Использует ruBERT для создания эмбеддингов текста и сравнивает
- их с эталонными примерами (опубликованные vs отклоненные посты).
-
- Attributes:
- model_name: Название модели HuggingFace
- vector_store: Хранилище векторов
- min_text_length: Минимальная длина текста для обработки
- """
-
- # Название модели по умолчанию
- DEFAULT_MODEL = "DeepPavlov/rubert-base-cased"
-
- def __init__(
- self,
- model_name: Optional[str] = None,
- vector_store: Optional[VectorStore] = None,
- cache_dir: Optional[str] = None,
- enabled: bool = True,
- min_text_length: int = 3,
- ):
- """
- Инициализация RAG сервиса.
-
- Args:
- model_name: Название модели HuggingFace (по умолчанию ruBERT)
- vector_store: Хранилище векторов (создается автоматически если не передано)
- cache_dir: Директория для кеширования модели
- enabled: Включен ли сервис
- min_text_length: Минимальная длина текста для обработки
- """
- self.model_name = model_name or self.DEFAULT_MODEL
- self.cache_dir = cache_dir
- self._enabled = enabled
- self.min_text_length = min_text_length
-
- # Модель и токенизатор загружаются лениво
- self._model = None
- self._tokenizer = None
- self._model_loaded = False
-
- # Хранилище векторов
- self.vector_store = vector_store or VectorStore()
-
- logger.info(f"RAGService инициализирован (model={self.model_name}, enabled={enabled})")
-
- @property
- def source_name(self) -> str:
- """Имя источника для результатов."""
- return "rag"
-
- @property
- def is_enabled(self) -> bool:
- """Проверяет, включен ли сервис."""
- return self._enabled
-
- @property
- def is_model_loaded(self) -> bool:
- """Проверяет, загружена ли модель."""
- return self._model_loaded
-
- async def load_model(self) -> None:
- """
- Загружает модель и токенизатор.
-
- Выполняется асинхронно в отдельном потоке чтобы не блокировать event loop.
- """
- if self._model_loaded:
- return
-
- if not self._enabled:
- logger.warning("RAGService: Сервис отключен, модель не загружается")
- return
-
- logger.info(f"RAGService: Загрузка модели {self.model_name}...")
-
- try:
- # Загрузка в отдельном потоке
- loop = asyncio.get_event_loop()
- await loop.run_in_executor(None, self._load_model_sync)
-
- self._model_loaded = True
- logger.info(f"RAGService: Модель {self.model_name} успешно загружена")
-
- except Exception as e:
- logger.error(f"RAGService: Ошибка загрузки модели: {e}")
- raise ModelNotLoadedError(f"Не удалось загрузить модель {self.model_name}: {e}")
-
- def _load_model_sync(self) -> None:
- """Синхронная загрузка модели (вызывается в executor)."""
- logger.info("RAGService: Начало _load_model_sync, импорт transformers...")
- from transformers import AutoTokenizer, AutoModel
- import torch
-
- # Определяем устройство
- self._device = "cuda" if torch.cuda.is_available() else "cpu"
- logger.info(f"RAGService: Устройство определено: {self._device}")
-
- # Загружаем токенизатор
- logger.info(f"RAGService: Загрузка токенизатора из {self.model_name}...")
- self._tokenizer = AutoTokenizer.from_pretrained(
- self.model_name,
- cache_dir=self.cache_dir,
- )
- logger.info("RAGService: Токенизатор загружен")
-
- # Загружаем модель
- logger.info(f"RAGService: Загрузка модели из {self.model_name} (это может занять несколько минут)...")
- self._model = AutoModel.from_pretrained(
- self.model_name,
- cache_dir=self.cache_dir,
- )
- logger.info("RAGService: Модель загружена, перенос на устройство...")
- self._model.to(self._device)
- self._model.eval() # Режим инференса
-
- logger.info(f"RAGService: Модель готова на устройстве: {self._device}")
-
- def _get_embedding_sync(self, text: str) -> np.ndarray:
- """
- Получает эмбеддинг текста (синхронно).
-
- Использует [CLS] токен как представление всего текста.
-
- Args:
- text: Текст для векторизации
-
- Returns:
- Numpy массив с эмбеддингом (768 измерений для ruBERT)
- """
- import torch
-
- # Токенизация с ограничением длины
- inputs = self._tokenizer(
- text,
- return_tensors="pt",
- truncation=True,
- max_length=512,
- padding=True,
- )
- inputs = {k: v.to(self._device) for k, v in inputs.items()}
-
- # Получаем эмбеддинг
- with torch.no_grad():
- outputs = self._model(**inputs)
- # Используем [CLS] токен (первый токен)
- embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy()
-
- return embedding.flatten()
-
- def _get_embeddings_batch_sync(self, texts: List[str], batch_size: int = 16) -> List[np.ndarray]:
- """
- Получает эмбеддинги для батча текстов (синхронно).
-
- Обрабатывает тексты пачками для эффективного использования GPU/CPU.
-
- Args:
- texts: Список текстов для векторизации
- batch_size: Размер батча (по умолчанию 16)
-
- Returns:
- Список numpy массивов с эмбеддингами
- """
- import torch
-
- all_embeddings = []
-
- for i in range(0, len(texts), batch_size):
- batch_texts = texts[i:i + batch_size]
-
- # Токенизация батча
- inputs = self._tokenizer(
- batch_texts,
- return_tensors="pt",
- truncation=True,
- max_length=512,
- padding=True,
- )
- inputs = {k: v.to(self._device) for k, v in inputs.items()}
-
- # Получаем эмбеддинги
- with torch.no_grad():
- outputs = self._model(**inputs)
- # [CLS] токен для каждого текста в батче
- batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
-
- # Разбиваем на отдельные эмбеддинги
- for j in range(len(batch_texts)):
- all_embeddings.append(batch_embeddings[j])
-
- if i > 0 and i % (batch_size * 10) == 0:
- logger.info(f"RAGService: Обработано {i}/{len(texts)} текстов")
-
- return all_embeddings
-
- async def get_embeddings_batch(self, texts: List[str], batch_size: int = 16) -> List[np.ndarray]:
- """
- Получает эмбеддинги для батча текстов (асинхронно).
-
- Args:
- texts: Список текстов для векторизации
- batch_size: Размер батча
-
- Returns:
- Список numpy массивов с эмбеддингами
- """
- if not self._model_loaded:
- await self.load_model()
-
- if not self._model_loaded:
- raise ModelNotLoadedError("Модель не загружена")
-
- # Очищаем тексты
- clean_texts = [self._clean_text(text) for text in texts]
-
- # Выполняем батч-обработку в thread pool
- loop = asyncio.get_event_loop()
- embeddings = await loop.run_in_executor(
- None,
- self._get_embeddings_batch_sync,
- clean_texts,
- batch_size,
- )
-
- return embeddings
-
- async def get_embedding(self, text: str) -> np.ndarray:
- """
- Получает эмбеддинг текста (асинхронно).
-
- Args:
- text: Текст для векторизации
-
- Returns:
- Numpy массив с эмбеддингом
-
- Raises:
- ModelNotLoadedError: Если модель не загружена
- TextTooShortError: Если текст слишком короткий
- """
- if not self._model_loaded:
- await self.load_model()
-
- if not self._model_loaded:
- raise ModelNotLoadedError("Модель не загружена")
-
- # Очищаем текст
- clean_text = self._clean_text(text)
-
- if len(clean_text) < self.min_text_length:
- raise TextTooShortError(
- f"Текст слишком короткий (минимум {self.min_text_length} символов)"
- )
-
- # Выполняем в отдельном потоке
- loop = asyncio.get_event_loop()
- embedding = await loop.run_in_executor(
- None,
- self._get_embedding_sync,
- clean_text
- )
-
- return embedding
-
- def _clean_text(self, text: str) -> str:
- """Очищает текст от лишних символов."""
- if not text:
- return ""
-
- # Удаляем лишние пробелы и переносы строк
- clean = " ".join(text.split())
-
- # Удаляем служебные символы (например "^" для helper сообщений)
- if clean == "^":
- return ""
-
- return clean.strip()
-
- @track_time("calculate_score", "rag_service")
- @track_errors("rag_service", "calculate_score")
- async def calculate_score(self, text: str) -> ScoringResult:
- """
- Рассчитывает скор для текста поста.
-
- Args:
- text: Текст поста для оценки
-
- Returns:
- ScoringResult с оценкой
-
- Raises:
- ScoringError: При ошибке расчета
- """
- if not self._enabled:
- raise ScoringError("RAG сервис отключен")
-
- try:
- # Получаем эмбеддинг текста
- embedding = await self.get_embedding(text)
-
- # Логируем первые элементы вектора для отладки
- logger.info(
- f"RAGService: embedding[:3]={embedding[:3].tolist()}, "
- f"text_preview='{text[:30]}'"
- )
-
- # Рассчитываем скор через VectorStore
- score, confidence, score_pos_only = self.vector_store.calculate_similarity_score(embedding)
-
- return ScoringResult(
- score=score,
- source=self.source_name,
- model=self.model_name,
- confidence=confidence,
- metadata={
- "positive_examples": self.vector_store.positive_count,
- "negative_examples": self.vector_store.negative_count,
- "score_pos_only": score_pos_only, # Для сравнения
- },
- )
-
- except InsufficientExamplesError:
- # Не достаточно примеров - возвращаем нейтральный скор
- logger.warning("RAGService: Недостаточно примеров для расчета скора")
- raise
-
- except TextTooShortError:
- logger.warning(f"RAGService: Текст слишком короткий для оценки")
- raise
-
- except Exception as e:
- logger.error(f"RAGService: Ошибка расчета скора: {e}")
- raise ScoringError(f"Ошибка расчета скора: {e}")
-
- @track_time("add_positive_example", "rag_service")
- async def add_positive_example(self, text: str) -> None:
- """
- Добавляет текст как положительный пример (опубликованный пост).
-
- Args:
- text: Текст опубликованного поста
- """
- if not self._enabled:
- return
-
- try:
- clean_text = self._clean_text(text)
- if len(clean_text) < self.min_text_length:
- logger.debug("RAGService: Текст слишком короткий для примера, пропускаем")
- return
-
- # Получаем эмбеддинг
- embedding = await self.get_embedding(clean_text)
-
- # Вычисляем хеш для дедупликации
- text_hash = VectorStore.compute_text_hash(clean_text)
-
- # Добавляем в хранилище
- added = self.vector_store.add_positive(embedding, text_hash)
-
- if added:
- logger.info(f"RAGService: Добавлен положительный пример")
-
- except Exception as e:
- logger.error(f"RAGService: Ошибка добавления положительного примера: {e}")
-
- @track_time("add_negative_example", "rag_service")
- async def add_negative_example(self, text: str) -> None:
- """
- Добавляет текст как отрицательный пример (отклоненный пост).
-
- Args:
- text: Текст отклоненного поста
- """
- if not self._enabled:
- return
-
- try:
- clean_text = self._clean_text(text)
- if len(clean_text) < self.min_text_length:
- logger.debug("RAGService: Текст слишком короткий для примера, пропускаем")
- return
-
- # Получаем эмбеддинг
- embedding = await self.get_embedding(clean_text)
-
- # Вычисляем хеш для дедупликации
- text_hash = VectorStore.compute_text_hash(clean_text)
-
- # Добавляем в хранилище
- added = self.vector_store.add_negative(embedding, text_hash)
-
- if added:
- logger.info(f"RAGService: Добавлен отрицательный пример")
-
- except Exception as e:
- logger.error(f"RAGService: Ошибка добавления отрицательного примера: {e}")
-
- async def load_examples_from_db(
- self,
- positive_texts: list[str],
- negative_texts: list[str],
- batch_size: int = 16,
- ) -> None:
- """
- Загружает примеры из базы данных с батч-обработкой.
-
- Используется при запуске бота для восстановления VectorStore.
- Батч-обработка ускоряет загрузку в 10-20 раз.
-
- Args:
- positive_texts: Список текстов опубликованных постов
- negative_texts: Список текстов отклоненных постов
- batch_size: Размер батча для обработки (по умолчанию 16)
- """
- if not self._enabled:
- return
-
- logger.info(
- f"RAGService: Загрузка примеров из БД с батч-обработкой "
- f"(positive: {len(positive_texts)}, negative: {len(negative_texts)}, batch_size: {batch_size})"
- )
-
- # Убеждаемся что модель загружена
- await self.load_model()
-
- import time
- start_time = time.time()
-
- # Фильтруем и очищаем положительные тексты
- if positive_texts:
- clean_positive = []
- positive_hashes = []
- for text in positive_texts:
- clean_text = self._clean_text(text)
- if len(clean_text) >= self.min_text_length:
- clean_positive.append(clean_text)
- positive_hashes.append(VectorStore.compute_text_hash(clean_text))
-
- if clean_positive:
- logger.info(f"RAGService: Обработка {len(clean_positive)} положительных примеров батчами...")
- positive_embeddings = await self.get_embeddings_batch(clean_positive, batch_size)
- self.vector_store.add_positive_batch(positive_embeddings, positive_hashes)
-
- # Фильтруем и очищаем отрицательные тексты
- if negative_texts:
- clean_negative = []
- negative_hashes = []
- for text in negative_texts:
- clean_text = self._clean_text(text)
- if len(clean_text) >= self.min_text_length:
- clean_negative.append(clean_text)
- negative_hashes.append(VectorStore.compute_text_hash(clean_text))
-
- if clean_negative:
- logger.info(f"RAGService: Обработка {len(clean_negative)} отрицательных примеров батчами...")
- negative_embeddings = await self.get_embeddings_batch(clean_negative, batch_size)
- self.vector_store.add_negative_batch(negative_embeddings, negative_hashes)
-
- elapsed = time.time() - start_time
- logger.info(
- f"RAGService: Загрузка завершена за {elapsed:.1f} сек "
- f"(positive: {self.vector_store.positive_count}, "
- f"negative: {self.vector_store.negative_count})"
- )
-
- def save_vectors(self) -> None:
- """Сохраняет векторы на диск."""
- if self.vector_store.storage_path:
- self.vector_store.save_to_disk()
-
- def get_stats(self) -> dict:
- """Возвращает статистику сервиса."""
- return {
- "enabled": self._enabled,
- "model_name": self.model_name,
- "model_loaded": self._model_loaded,
- "vector_store": self.vector_store.get_stats(),
- }
diff --git a/helper_bot/services/scoring/scoring_manager.py b/helper_bot/services/scoring/scoring_manager.py
index 1a9b7b3..fa23dcb 100644
--- a/helper_bot/services/scoring/scoring_manager.py
+++ b/helper_bot/services/scoring/scoring_manager.py
@@ -1,20 +1,19 @@
"""
Менеджер для объединения всех сервисов скоринга.
-Координирует работу RAGService и DeepSeekService,
+Координирует работу RagApiClient и DeepSeekService,
выполняет параллельные запросы и агрегирует результаты.
"""
import asyncio
-from typing import Optional, List
+from typing import Optional
from logs.custom_logger import logger
from helper_bot.utils.metrics import track_time, track_errors
from .base import CombinedScore, ScoringResult
-from .rag_service import RAGService
+from .rag_client import RagApiClient
from .deepseek_service import DeepSeekService
-from .vector_store import VectorStore
from .exceptions import ScoringError, InsufficientExamplesError, TextTooShortError
@@ -22,39 +21,39 @@ class ScoringManager:
"""
Менеджер для управления всеми сервисами скоринга.
- Объединяет RAGService и DeepSeekService, выполняет параллельные
+ Объединяет RagApiClient и DeepSeekService, выполняет параллельные
запросы и агрегирует результаты в единый CombinedScore.
Attributes:
- rag_service: Сервис RAG с ruBERT
+ rag_client: HTTP клиент для RAG API
deepseek_service: Сервис DeepSeek API
"""
def __init__(
self,
- rag_service: Optional[RAGService] = None,
+ rag_client: Optional[RagApiClient] = None,
deepseek_service: Optional[DeepSeekService] = None,
):
"""
Инициализация менеджера.
Args:
- rag_service: Сервис RAG (создается автоматически если не передан)
+ rag_client: HTTP клиент для RAG API (создается автоматически если не передан)
deepseek_service: Сервис DeepSeek (создается автоматически если не передан)
"""
- self.rag_service = rag_service
+ self.rag_client = rag_client
self.deepseek_service = deepseek_service
logger.info(
f"ScoringManager инициализирован "
- f"(rag={rag_service is not None and rag_service.is_enabled}, "
+ f"(rag={rag_client is not None and rag_client.is_enabled}, "
f"deepseek={deepseek_service is not None and deepseek_service.is_enabled})"
)
@property
def is_any_enabled(self) -> bool:
"""Проверяет, включен ли хотя бы один сервис."""
- rag_enabled = self.rag_service is not None and self.rag_service.is_enabled
+ rag_enabled = self.rag_client is not None and self.rag_client.is_enabled
deepseek_enabled = self.deepseek_service is not None and self.deepseek_service.is_enabled
return rag_enabled or deepseek_enabled
@@ -82,8 +81,8 @@ class ScoringManager:
tasks = []
task_names = []
- # RAG сервис
- if self.rag_service and self.rag_service.is_enabled:
+ # RAG API клиент
+ if self.rag_client and self.rag_client.is_enabled:
tasks.append(self._get_rag_score(text))
task_names.append("rag")
@@ -104,6 +103,7 @@ class ScoringManager:
if isinstance(res, Exception):
error_msg = str(res)
result.errors[name] = error_msg
+ # Ошибки уже залогированы в сервисах, здесь только предупреждение
logger.warning(f"ScoringManager: Ошибка от {name}: {error_msg}")
elif res is not None:
if name == "rag":
@@ -119,9 +119,9 @@ class ScoringManager:
return result
async def _get_rag_score(self, text: str) -> Optional[ScoringResult]:
- """Получает скор от RAG сервиса."""
+ """Получает скор от RAG API."""
try:
- return await self.rag_service.calculate_score(text)
+ return await self.rag_client.calculate_score(text)
except InsufficientExamplesError:
# Недостаточно примеров - это не ошибка, просто нет данных
logger.info("ScoringManager: RAG - недостаточно примеров")
@@ -131,7 +131,7 @@ class ScoringManager:
logger.debug("ScoringManager: RAG - текст слишком короткий")
return None
except Exception as e:
- logger.error(f"ScoringManager: RAG ошибка: {e}")
+ # Ошибки уже залогированы в RagApiClient, здесь только пробрасываем
raise
async def _get_deepseek_score(self, text: str) -> Optional[ScoringResult]:
@@ -143,7 +143,7 @@ class ScoringManager:
logger.debug("ScoringManager: DeepSeek - текст слишком короткий")
return None
except Exception as e:
- logger.error(f"ScoringManager: DeepSeek ошибка: {e}")
+ # Ошибки уже залогированы в DeepSeekService, здесь только пробрасываем
raise
@track_time("on_post_published", "scoring_manager")
@@ -161,8 +161,8 @@ class ScoringManager:
tasks = []
- if self.rag_service and self.rag_service.is_enabled:
- tasks.append(self.rag_service.add_positive_example(text))
+ if self.rag_client and self.rag_client.is_enabled:
+ tasks.append(self.rag_client.add_positive_example(text))
if self.deepseek_service and self.deepseek_service.is_enabled:
tasks.append(self.deepseek_service.add_positive_example(text))
@@ -186,8 +186,8 @@ class ScoringManager:
tasks = []
- if self.rag_service and self.rag_service.is_enabled:
- tasks.append(self.rag_service.add_negative_example(text))
+ if self.rag_client and self.rag_client.is_enabled:
+ tasks.append(self.rag_client.add_negative_example(text))
if self.deepseek_service and self.deepseek_service.is_enabled:
tasks.append(self.deepseek_service.add_negative_example(text))
@@ -196,45 +196,25 @@ class ScoringManager:
await asyncio.gather(*tasks, return_exceptions=True)
logger.info("ScoringManager: Добавлен отрицательный пример")
- async def load_examples_from_db(
- self,
- positive_texts: List[str],
- negative_texts: List[str],
- ) -> None:
- """
- Загружает примеры из базы данных при запуске бота.
-
- Args:
- positive_texts: Список текстов опубликованных постов
- negative_texts: Список текстов отклоненных постов
- """
- if self.rag_service and self.rag_service.is_enabled:
- await self.rag_service.load_examples_from_db(
- positive_texts,
- negative_texts
- )
-
- def save_vectors(self) -> None:
- """Сохраняет векторы RAG на диск."""
- if self.rag_service:
- self.rag_service.save_vectors()
async def close(self) -> None:
"""Закрывает ресурсы всех сервисов."""
if self.deepseek_service:
await self.deepseek_service.close()
- # Сохраняем векторы перед закрытием
- self.save_vectors()
+ if self.rag_client:
+ await self.rag_client.close()
- def get_stats(self) -> dict:
+ async def get_stats(self) -> dict:
"""Возвращает статистику всех сервисов."""
stats = {
"any_enabled": self.is_any_enabled,
}
- if self.rag_service:
- stats["rag"] = self.rag_service.get_stats()
+ if self.rag_client:
+ # Получаем статистику асинхронно от API
+ rag_stats = await self.rag_client.get_stats()
+ stats["rag"] = rag_stats if rag_stats else self.rag_client.get_stats_sync()
if self.deepseek_service:
stats["deepseek"] = self.deepseek_service.get_stats()
diff --git a/helper_bot/services/scoring/vector_store.py b/helper_bot/services/scoring/vector_store.py
deleted file mode 100644
index a0381b3..0000000
--- a/helper_bot/services/scoring/vector_store.py
+++ /dev/null
@@ -1,399 +0,0 @@
-"""
-In-memory хранилище векторов на numpy.
-
-Хранит векторные представления постов для быстрого сравнения.
-Поддерживает персистентность через сохранение/загрузку с диска.
-"""
-
-import hashlib
-import os
-from pathlib import Path
-from typing import Optional, Tuple, List
-import threading
-
-import numpy as np
-
-from logs.custom_logger import logger
-from .exceptions import VectorStoreError, InsufficientExamplesError
-
-
-class VectorStore:
- """
- In-memory хранилище векторов для RAG.
-
- Хранит отдельно положительные (опубликованные) и отрицательные (отклоненные)
- примеры. Использует косинусное сходство для расчета скора.
-
- Attributes:
- vector_dim: Размерность векторов (768 для ruBERT)
- max_examples: Максимальное количество примеров каждого типа
- """
-
- def __init__(
- self,
- vector_dim: int = 768,
- max_examples: int = 10000,
- storage_path: Optional[str] = None,
- score_multiplier: float = 5.0,
- ):
- """
- Инициализация хранилища.
-
- Args:
- vector_dim: Размерность векторов
- max_examples: Максимальное количество примеров каждого типа
- storage_path: Путь для сохранения/загрузки векторов (опционально)
- score_multiplier: Множитель для усиления разницы в скорах
- """
- self.vector_dim = vector_dim
- self.max_examples = max_examples
- self.storage_path = storage_path
- self.score_multiplier = score_multiplier
-
- # Инициализируем пустые массивы
- # Используем список для динамического добавления, потом конвертируем в numpy
- self._positive_vectors: list = []
- self._negative_vectors: list = []
- self._positive_hashes: list = [] # Хеши текстов для дедупликации
- self._negative_hashes: list = []
-
- # Lock для потокобезопасности
- self._lock = threading.Lock()
-
- # Пытаемся загрузить сохраненные векторы
- if storage_path and os.path.exists(storage_path):
- self._load_from_disk()
-
- @property
- def positive_count(self) -> int:
- """Количество положительных примеров."""
- return len(self._positive_vectors)
-
- @property
- def negative_count(self) -> int:
- """Количество отрицательных примеров."""
- return len(self._negative_vectors)
-
- @property
- def total_count(self) -> int:
- """Общее количество примеров."""
- return self.positive_count + self.negative_count
-
- @staticmethod
- def compute_text_hash(text: str) -> str:
- """Вычисляет хеш текста для дедупликации."""
- return hashlib.md5(text.encode('utf-8')).hexdigest()
-
- def _normalize_vector(self, vector: np.ndarray) -> np.ndarray:
- """Нормализует вектор для косинусного сходства."""
- norm = np.linalg.norm(vector)
- if norm == 0:
- return vector
- return vector / norm
-
- def add_positive(self, vector: np.ndarray, text_hash: Optional[str] = None) -> bool:
- """
- Добавляет положительный пример (опубликованный пост).
-
- Args:
- vector: Векторное представление текста
- text_hash: Хеш текста для дедупликации (опционально)
-
- Returns:
- True если добавлен, False если дубликат или превышен лимит
- """
- with self._lock:
- # Проверяем дубликат по хешу
- if text_hash and text_hash in self._positive_hashes:
- logger.debug(f"VectorStore: Пропуск дубликата положительного примера")
- return False
-
- # Проверяем лимит
- if len(self._positive_vectors) >= self.max_examples:
- # Удаляем самый старый пример (FIFO)
- self._positive_vectors.pop(0)
- self._positive_hashes.pop(0)
- logger.debug("VectorStore: Удален старый положительный пример (лимит)")
-
- # Нормализуем и добавляем
- normalized = self._normalize_vector(vector)
- self._positive_vectors.append(normalized)
- if text_hash:
- self._positive_hashes.append(text_hash)
-
- logger.info(f"VectorStore: Добавлен положительный пример (всего: {self.positive_count})")
- return True
-
- def add_positive_batch(
- self,
- vectors: List[np.ndarray],
- text_hashes: Optional[List[str]] = None
- ) -> int:
- """
- Добавляет батч положительных примеров.
-
- Args:
- vectors: Список векторов
- text_hashes: Список хешей текстов для дедупликации
-
- Returns:
- Количество добавленных примеров
- """
- if text_hashes is None:
- text_hashes = [None] * len(vectors)
-
- added = 0
- with self._lock:
- for vector, text_hash in zip(vectors, text_hashes):
- # Проверяем дубликат по хешу
- if text_hash and text_hash in self._positive_hashes:
- continue
-
- # Проверяем лимит
- if len(self._positive_vectors) >= self.max_examples:
- self._positive_vectors.pop(0)
- self._positive_hashes.pop(0)
-
- # Нормализуем и добавляем
- normalized = self._normalize_vector(vector)
- self._positive_vectors.append(normalized)
- if text_hash:
- self._positive_hashes.append(text_hash)
- added += 1
-
- logger.info(f"VectorStore: Добавлено {added} положительных примеров батчем (всего: {self.positive_count})")
- return added
-
- def add_negative(self, vector: np.ndarray, text_hash: Optional[str] = None) -> bool:
- """
- Добавляет отрицательный пример (отклоненный пост).
-
- Args:
- vector: Векторное представление текста
- text_hash: Хеш текста для дедупликации (опционально)
-
- Returns:
- True если добавлен, False если дубликат или превышен лимит
- """
- with self._lock:
- # Проверяем дубликат по хешу
- if text_hash and text_hash in self._negative_hashes:
- logger.debug(f"VectorStore: Пропуск дубликата отрицательного примера")
- return False
-
- # Проверяем лимит
- if len(self._negative_vectors) >= self.max_examples:
- # Удаляем самый старый пример (FIFO)
- self._negative_vectors.pop(0)
- self._negative_hashes.pop(0)
- logger.debug("VectorStore: Удален старый отрицательный пример (лимит)")
-
- # Нормализуем и добавляем
- normalized = self._normalize_vector(vector)
- self._negative_vectors.append(normalized)
- if text_hash:
- self._negative_hashes.append(text_hash)
-
- logger.info(f"VectorStore: Добавлен отрицательный пример (всего: {self.negative_count})")
- return True
-
- def add_negative_batch(
- self,
- vectors: List[np.ndarray],
- text_hashes: Optional[List[str]] = None
- ) -> int:
- """
- Добавляет батч отрицательных примеров.
-
- Args:
- vectors: Список векторов
- text_hashes: Список хешей текстов для дедупликации
-
- Returns:
- Количество добавленных примеров
- """
- if text_hashes is None:
- text_hashes = [None] * len(vectors)
-
- added = 0
- with self._lock:
- for vector, text_hash in zip(vectors, text_hashes):
- # Проверяем дубликат по хешу
- if text_hash and text_hash in self._negative_hashes:
- continue
-
- # Проверяем лимит
- if len(self._negative_vectors) >= self.max_examples:
- self._negative_vectors.pop(0)
- self._negative_hashes.pop(0)
-
- # Нормализуем и добавляем
- normalized = self._normalize_vector(vector)
- self._negative_vectors.append(normalized)
- if text_hash:
- self._negative_hashes.append(text_hash)
- added += 1
-
- logger.info(f"VectorStore: Добавлено {added} отрицательных примеров батчем (всего: {self.negative_count})")
- return added
-
- def calculate_similarity_score(self, vector: np.ndarray) -> Tuple[float, float]:
- """
- Рассчитывает скор на основе сходства с примерами.
-
- Алгоритм:
- 1. Вычисляем среднее косинусное сходство с положительными примерами
- 2. Вычисляем среднее косинусное сходство с отрицательными примерами
- 3. Финальный скор = pos_sim / (pos_sim + neg_sim + eps)
-
- Args:
- vector: Векторное представление нового поста
-
- Returns:
- Tuple (score, confidence):
- - score: Оценка от 0.0 до 1.0
- - confidence: Уверенность (зависит от количества примеров)
-
- Raises:
- InsufficientExamplesError: Если недостаточно примеров
- """
- with self._lock:
- if self.positive_count == 0:
- raise InsufficientExamplesError(
- "Нет положительных примеров для сравнения"
- )
-
- # Нормализуем входной вектор
- normalized = self._normalize_vector(vector)
-
- # Конвертируем в numpy массивы для быстрых вычислений
- pos_matrix = np.array(self._positive_vectors)
-
- # Косинусное сходство с положительными примерами
- # Для нормализованных векторов это просто скалярное произведение
- pos_similarities = np.dot(pos_matrix, normalized)
- pos_sim = float(np.mean(pos_similarities))
-
- # Косинусное сходство с отрицательными примерами
- if self.negative_count > 0:
- neg_matrix = np.array(self._negative_vectors)
- neg_similarities = np.dot(neg_matrix, normalized)
- neg_sim = float(np.mean(neg_similarities))
- else:
- # Если нет отрицательных примеров, используем нейтральное значение
- neg_sim = pos_sim # Нейтральный скор = 0.5
-
- # === Вариант 1: neg/pos (разница между положительными и отрицательными) ===
- diff = pos_sim - neg_sim
- score_neg_pos = 0.5 + (diff * self.score_multiplier)
- score_neg_pos = max(0.0, min(1.0, score_neg_pos))
-
- # === Вариант 2: pos only (только положительные, топ-k ближайших) ===
- # Берём топ-5 ближайших положительных примеров
- top_k = min(5, len(pos_similarities))
- top_k_sim = float(np.mean(np.sort(pos_similarities)[-top_k:]))
- # Нормализуем: 0.85 -> 0.0, 0.95 -> 1.0 (типичный диапазон для BERT)
- score_pos_only = (top_k_sim - 0.85) / 0.10
- score_pos_only = max(0.0, min(1.0, score_pos_only))
-
- # Основной скор — neg/pos (можно будет переключить позже)
- score = score_neg_pos
-
- # Confidence зависит от количества примеров (100% при 1000 примерах)
- total_examples = self.positive_count + self.negative_count
- confidence = min(1.0, total_examples / 1000)
-
- logger.info(
- f"VectorStore: pos_sim={pos_sim:.4f}, neg_sim={neg_sim:.4f}, "
- f"top_k_sim={top_k_sim:.4f}, score_neg_pos={score_neg_pos:.4f}, "
- f"score_pos_only={score_pos_only:.4f}"
- )
-
- return score, confidence, score_pos_only
-
- def save_to_disk(self, path: Optional[str] = None) -> None:
- """
- Сохраняет векторы на диск.
-
- Args:
- path: Путь для сохранения (если не указан, используется storage_path)
- """
- save_path = path or self.storage_path
- if not save_path:
- raise VectorStoreError("Путь для сохранения не указан")
-
- with self._lock:
- # Создаем директорию если нужно
- Path(save_path).parent.mkdir(parents=True, exist_ok=True)
-
- # Сохраняем в npz формате
- np.savez_compressed(
- save_path,
- positive_vectors=np.array(self._positive_vectors) if self._positive_vectors else np.array([]),
- negative_vectors=np.array(self._negative_vectors) if self._negative_vectors else np.array([]),
- positive_hashes=np.array(self._positive_hashes, dtype=object),
- negative_hashes=np.array(self._negative_hashes, dtype=object),
- vector_dim=self.vector_dim,
- max_examples=self.max_examples,
- )
-
- logger.info(
- f"VectorStore: Сохранено на диск ({self.positive_count} pos, "
- f"{self.negative_count} neg): {save_path}"
- )
-
- def _load_from_disk(self) -> None:
- """Загружает векторы с диска."""
- if not self.storage_path or not os.path.exists(self.storage_path):
- return
-
- try:
- with self._lock:
- data = np.load(self.storage_path, allow_pickle=True)
-
- # Загружаем векторы
- pos_vectors = data.get('positive_vectors', np.array([]))
- neg_vectors = data.get('negative_vectors', np.array([]))
-
- if pos_vectors.size > 0:
- self._positive_vectors = list(pos_vectors)
- if neg_vectors.size > 0:
- self._negative_vectors = list(neg_vectors)
-
- # Загружаем хеши
- pos_hashes = data.get('positive_hashes', np.array([]))
- neg_hashes = data.get('negative_hashes', np.array([]))
-
- if pos_hashes.size > 0:
- self._positive_hashes = list(pos_hashes)
- if neg_hashes.size > 0:
- self._negative_hashes = list(neg_hashes)
-
- logger.info(
- f"VectorStore: Загружено с диска ({self.positive_count} pos, "
- f"{self.negative_count} neg): {self.storage_path}"
- )
-
- except Exception as e:
- logger.error(f"VectorStore: Ошибка загрузки с диска: {e}")
- # Продолжаем с пустым хранилищем
-
- def clear(self) -> None:
- """Очищает все векторы."""
- with self._lock:
- self._positive_vectors.clear()
- self._negative_vectors.clear()
- self._positive_hashes.clear()
- self._negative_hashes.clear()
- logger.info("VectorStore: Хранилище очищено")
-
- def get_stats(self) -> dict:
- """Возвращает статистику хранилища."""
- return {
- "positive_count": self.positive_count,
- "negative_count": self.negative_count,
- "total_count": self.total_count,
- "vector_dim": self.vector_dim,
- "max_examples": self.max_examples,
- "storage_path": self.storage_path,
- }
diff --git a/helper_bot/utils/base_dependency_factory.py b/helper_bot/utils/base_dependency_factory.py
index 82a0660..db71fa0 100644
--- a/helper_bot/utils/base_dependency_factory.py
+++ b/helper_bot/utils/base_dependency_factory.py
@@ -67,13 +67,12 @@ class BaseDependencyFactory:
# Настройки ML-скоринга
self.settings['Scoring'] = {
- # RAG (ruBERT)
+ # RAG API
'rag_enabled': self._parse_bool(os.getenv('RAG_ENABLED', 'false')),
- 'rag_model': os.getenv('RAG_MODEL', 'DeepPavlov/rubert-base-cased'),
- 'rag_cache_dir': os.getenv('RAG_CACHE_DIR', 'data/models'),
- 'rag_vectors_path': os.getenv('RAG_VECTORS_PATH', 'data/vectors.npz'),
- 'rag_max_examples': self._parse_int(os.getenv('RAG_MAX_EXAMPLES', '10000')),
- 'rag_score_multiplier': self._parse_float(os.getenv('RAG_SCORE_MULTIPLIER', '5.0')),
+ 'rag_api_url': os.getenv('RAG_API_URL', ''),
+ 'rag_api_key': os.getenv('RAG_API_KEY', ''),
+ 'rag_api_timeout': self._parse_int(os.getenv('RAG_API_TIMEOUT', '30')),
+ 'rag_test_mode': self._parse_bool(os.getenv('RAG_TEST_MODE', 'false')),
# DeepSeek
'deepseek_enabled': self._parse_bool(os.getenv('DEEPSEEK_ENABLED', 'false')),
'deepseek_api_key': os.getenv('DEEPSEEK_API_KEY', ''),
@@ -127,53 +126,35 @@ class BaseDependencyFactory:
def _init_scoring_manager(self):
"""
- Инициализирует ScoringManager с RAG и DeepSeek сервисами.
+ Инициализирует ScoringManager с RAG API клиентом и DeepSeek сервисом.
Вызывается лениво при первом обращении к get_scoring_manager().
"""
from helper_bot.services.scoring import (
ScoringManager,
- RAGService,
+ RagApiClient,
DeepSeekService,
- VectorStore,
)
scoring_config = self.settings['Scoring']
- # Инициализация RAG сервиса
- rag_service = None
+ # Инициализация RAG API клиента
+ rag_client = None
if scoring_config['rag_enabled']:
- # Путь к векторам
- vectors_path = scoring_config['rag_vectors_path']
- if not os.path.isabs(vectors_path):
- vectors_path = os.path.join(self._project_dir, vectors_path)
+ api_url = scoring_config['rag_api_url']
+ api_key = scoring_config['rag_api_key']
- # Путь к кешу моделей
- cache_dir = scoring_config['rag_cache_dir']
- if not os.path.isabs(cache_dir):
- cache_dir = os.path.join(self._project_dir, cache_dir)
-
- # Создаем директории если нужно
- os.makedirs(os.path.dirname(vectors_path), exist_ok=True)
- os.makedirs(cache_dir, exist_ok=True)
-
- # Создаем VectorStore
- vector_store = VectorStore(
- vector_dim=768, # ruBERT dimension
- max_examples=scoring_config['rag_max_examples'],
- storage_path=vectors_path,
- score_multiplier=scoring_config['rag_score_multiplier'],
- )
-
- # Создаем RAGService
- rag_service = RAGService(
- model_name=scoring_config['rag_model'],
- vector_store=vector_store,
- cache_dir=cache_dir,
- enabled=True,
- )
-
- logger.info(f"RAGService инициализирован: {scoring_config['rag_model']}")
+ if not api_url or not api_key:
+ logger.warning("RAG включен, но не указаны RAG_API_URL или RAG_API_KEY")
+ else:
+ rag_client = RagApiClient(
+ api_url=api_url,
+ api_key=api_key,
+ timeout=scoring_config['rag_api_timeout'],
+ test_mode=scoring_config['rag_test_mode'],
+ enabled=True,
+ )
+ logger.info(f"RagApiClient инициализирован: {api_url} (test_mode={scoring_config['rag_test_mode']})")
# Инициализация DeepSeek сервиса
deepseek_service = None
@@ -189,7 +170,7 @@ class BaseDependencyFactory:
# Создаем менеджер
self._scoring_manager = ScoringManager(
- rag_service=rag_service,
+ rag_client=rag_client,
deepseek_service=deepseek_service,
)
diff --git a/requirements.txt b/requirements.txt
index 968c3f3..c2a5b89 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -32,8 +32,5 @@ emoji~=2.8.0
# S3 Storage (для хранения медиафайлов опубликованных постов)
aioboto3>=12.0.0
-# ML Scoring (для оценки вероятности публикации постов)
-numpy>=1.24.0
-transformers>=4.30.0
-torch>=2.0.0
+# HTTP клиент для RAG API
httpx>=0.24.0
\ No newline at end of file
diff --git a/scripts/add_ml_scores_columns.py b/scripts/add_ml_scores_columns.py
index a7c23ff..78e237e 100644
--- a/scripts/add_ml_scores_columns.py
+++ b/scripts/add_ml_scores_columns.py
@@ -1,10 +1,9 @@
#!/usr/bin/env python3
"""
-Миграция: Добавление колонок для ML-скоринга постов.
+Миграция: Добавление колонки для ML-скоринга постов.
Добавляет:
- ml_scores (TEXT/JSON) - JSON с результатами оценки от разных моделей
-- vector_hash (TEXT) - хеш текста для кеширования векторов
Структура ml_scores:
{
@@ -46,7 +45,7 @@ async def main(db_path: str) -> None:
"""
Основная функция миграции.
- Добавляет колонки ml_scores и vector_hash в таблицу post_from_telegram_suggest.
+ Добавляет колонку ml_scores в таблицу post_from_telegram_suggest.
Миграция идемпотентна - можно запускать повторно без ошибок.
"""
db_path = os.path.abspath(db_path)
@@ -67,22 +66,13 @@ async def main(db_path: str) -> None:
else:
logger.info("Колонка ml_scores уже существует")
- # Проверяем и добавляем колонку vector_hash
- if not await column_exists(conn, "post_from_telegram_suggest", "vector_hash"):
- await conn.execute(
- "ALTER TABLE post_from_telegram_suggest ADD COLUMN vector_hash TEXT"
- )
- logger.info("Колонка vector_hash добавлена в post_from_telegram_suggest")
- else:
- logger.info("Колонка vector_hash уже существует")
-
await conn.commit()
logger.info("Миграция add_ml_scores_columns завершена успешно")
if __name__ == "__main__":
parser = argparse.ArgumentParser(
- description="Добавление колонок ml_scores и vector_hash для ML-скоринга"
+ description="Добавление колонки ml_scores для ML-скоринга"
)
parser.add_argument(
"--db",
diff --git a/scripts/drop_vector_hash_column.py b/scripts/drop_vector_hash_column.py
new file mode 100644
index 0000000..4d0bdd5
--- /dev/null
+++ b/scripts/drop_vector_hash_column.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python3
+"""
+Миграция: Удаление колонки vector_hash из таблицы post_from_telegram_suggest.
+
+Колонка больше не нужна, т.к. RAG сервис вынесен в отдельный микросервис
+и хранит векторы самостоятельно.
+
+SQLite не поддерживает DROP COLUMN напрямую (до версии 3.35.0),
+поэтому используем пересоздание таблицы.
+"""
+import argparse
+import asyncio
+import os
+import sys
+from pathlib import Path
+
+# Добавляем корень проекта в путь
+project_root = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(project_root))
+
+import aiosqlite
+
+# Пытаемся импортировать logger, если не получается - используем стандартный
+try:
+ from logs.custom_logger import logger
+except ImportError:
+ import logging
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
+ logger = logging.getLogger(__name__)
+
+DEFAULT_DB_PATH = "database/tg-bot-database.db"
+
+
+async def column_exists(conn: aiosqlite.Connection, table: str, column: str) -> bool:
+ """Проверяет существование колонки в таблице."""
+ cursor = await conn.execute(f"PRAGMA table_info({table})")
+ columns = await cursor.fetchall()
+ return any(col[1] == column for col in columns)
+
+
+async def get_sqlite_version(conn: aiosqlite.Connection) -> tuple:
+ """Возвращает версию SQLite."""
+ cursor = await conn.execute("SELECT sqlite_version()")
+ version_str = (await cursor.fetchone())[0]
+ return tuple(map(int, version_str.split('.')))
+
+
+async def main(db_path: str) -> None:
+ """
+ Удаляет колонку vector_hash из таблицы post_from_telegram_suggest.
+ """
+ db_path = os.path.abspath(db_path)
+
+ if not os.path.exists(db_path):
+ logger.error(f"База данных не найдена: {db_path}")
+ return
+
+ async with aiosqlite.connect(db_path) as conn:
+ # Проверяем существует ли колонка
+ if not await column_exists(conn, "post_from_telegram_suggest", "vector_hash"):
+ logger.info("Колонка vector_hash не существует, миграция не требуется")
+ return
+
+ # Проверяем версию SQLite
+ version = await get_sqlite_version(conn)
+ logger.info(f"Версия SQLite: {'.'.join(map(str, version))}")
+
+ # SQLite 3.35.0+ поддерживает DROP COLUMN
+ if version >= (3, 35, 0):
+ logger.info("Используем ALTER TABLE DROP COLUMN")
+ await conn.execute(
+ "ALTER TABLE post_from_telegram_suggest DROP COLUMN vector_hash"
+ )
+ else:
+ # Для старых версий пересоздаём таблицу
+ logger.info("Используем пересоздание таблицы (SQLite < 3.35.0)")
+
+ # Получаем список колонок без vector_hash
+ cursor = await conn.execute("PRAGMA table_info(post_from_telegram_suggest)")
+ columns = await cursor.fetchall()
+ column_names = [col[1] for col in columns if col[1] != "vector_hash"]
+ columns_str = ", ".join(column_names)
+
+ logger.info(f"Колонки для сохранения: {columns_str}")
+
+ # Пересоздаём таблицу
+ await conn.execute("BEGIN TRANSACTION")
+ try:
+ # Создаём временную таблицу
+ await conn.execute(
+ f"CREATE TABLE post_from_telegram_suggest_backup AS "
+ f"SELECT {columns_str} FROM post_from_telegram_suggest"
+ )
+
+ # Удаляем старую таблицу
+ await conn.execute("DROP TABLE post_from_telegram_suggest")
+
+ # Переименовываем временную
+ await conn.execute(
+ "ALTER TABLE post_from_telegram_suggest_backup "
+ "RENAME TO post_from_telegram_suggest"
+ )
+
+ await conn.execute("COMMIT")
+ except Exception as e:
+ await conn.execute("ROLLBACK")
+ raise e
+
+ await conn.commit()
+ logger.info("Колонка vector_hash успешно удалена")
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(
+ description="Удаление колонки vector_hash из post_from_telegram_suggest"
+ )
+ parser.add_argument(
+ "--db",
+ default=os.environ.get("DATABASE_PATH", DEFAULT_DB_PATH),
+ help="Путь к БД",
+ )
+ args = parser.parse_args()
+ asyncio.run(main(args.db))