From feee7f010c71b5d7b194f5e1de3b25f8b2311365 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 26 Jan 2026 22:03:15 +0300 Subject: [PATCH] =?UTF-8?q?refactor:=20=D0=BE=D0=B1=D0=BD=D0=BE=D0=B2?= =?UTF-8?q?=D0=BB=D0=B5=D0=BD=D0=B8=D0=B5=20=D1=81=D0=B8=D1=81=D1=82=D0=B5?= =?UTF-8?q?=D0=BC=D1=8B=20ML-=D1=81=D0=BA=D0=BE=D1=80=D0=B8=D0=BD=D0=B3?= =?UTF-8?q?=D0=B0=20=D0=B8=20=D0=BF=D0=B5=D1=80=D0=B5=D1=85=D0=BE=D0=B4=20?= =?UTF-8?q?=D0=BD=D0=B0=20RAG=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Обновлен Dockerfile для использования Alpine вместо Slim, улучшая размер образа. - Удален устаревший RAGService и добавлен RagApiClient для работы с внешним RAG API. - Обновлены переменные окружения в env.example для настройки нового RAG API. - Обновлен ScoringManager для интеграции с RagApiClient. - Упрощена структура проекта, удалены ненужные файлы и зависимости, связанные с векторным хранилищем. - Обновлены обработчики и функции для работы с новым API, включая получение статистики и обработку ошибок. --- .cursor/rules/error-handling.md | 59 +- .gitignore | 5 +- Dockerfile | 30 +- database/repositories/post_repository.py | 18 - env.example | 13 +- helper_bot/handlers/admin/admin_handlers.py | 24 +- helper_bot/handlers/private/services.py | 2 +- helper_bot/main.py | 35 +- helper_bot/services/scoring/__init__.py | 9 +- helper_bot/services/scoring/rag_client.py | 311 +++++++++++ helper_bot/services/scoring/rag_service.py | 507 ------------------ .../services/scoring/scoring_manager.py | 76 +-- helper_bot/services/scoring/vector_store.py | 399 -------------- helper_bot/utils/base_dependency_factory.py | 65 +-- requirements.txt | 5 +- scripts/add_ml_scores_columns.py | 16 +- scripts/drop_vector_hash_column.py | 123 +++++ 17 files changed, 602 insertions(+), 1095 deletions(-) create mode 100644 helper_bot/services/scoring/rag_client.py delete mode 100644 helper_bot/services/scoring/rag_service.py delete mode 100644 helper_bot/services/scoring/vector_store.py create mode 100644 scripts/drop_vector_hash_column.py diff --git a/.cursor/rules/error-handling.md b/.cursor/rules/error-handling.md index 8ca866c..f995d49 100644 --- a/.cursor/rules/error-handling.md +++ b/.cursor/rules/error-handling.md @@ -110,12 +110,63 @@ logger.error(f"Критическая ошибка: {e}", exc_info=True) ### Уровни логирования -- `logger.debug()` - отладочная информация -- `logger.info()` - информационные сообщения о работе -- `logger.warning()` - предупреждения о потенциальных проблемах -- `logger.error()` - ошибки, требующие внимания +- `logger.debug()` - отладочная информация (детали выполнения, промежуточные значения, HTTP запросы(не используется в проекте)) +- `logger.info()` - информационные сообщения о работе (успешные операции, важные события) +- `logger.warning()` - предупреждения о потенциальных проблемах (некритичные ошибки, таймауты) +- `logger.error()` - ошибки, требующие внимания (исключения, сбои) - `logger.critical()` - критические ошибки +### Паттерн логирования в сервисах + +При работе с внешними API и сервисами используйте следующий паттерн: + +```python +from logs.custom_logger import logger + +class ApiClient: + async def calculate_score(self, text: str) -> Score: + # Логируем начало операции (debug) + logger.debug(f"ApiClient: Отправка запроса на расчет скора (text_preview='{text[:50]}')") + + try: + response = await self._client.post(url, json=data) + + # Логируем статус ответа (debug) + logger.debug(f"ApiClient: Получен ответ (status={response.status_code})") + + # Обрабатываем ответ + if response.status_code == 200: + result = response.json() + # Логируем успешный результат (info) + logger.info(f"ApiClient: Скор успешно получен (score={result['score']:.4f})") + return result + else: + # Логируем ошибку (error) + logger.error(f"ApiClient: Ошибка API (status={response.status_code})") + raise ApiError(f"Ошибка API: {response.status_code}") + + except httpx.TimeoutException: + # Логируем таймаут (error) + logger.error(f"ApiClient: Таймаут запроса (>{timeout}с)") + raise + except httpx.RequestError as e: + # Логируем ошибку подключения (error) + logger.error(f"ApiClient: Ошибка подключения: {e}") + raise + except Exception as e: + # Логируем неожиданные ошибки (error) + logger.error(f"ApiClient: Неожиданная ошибка: {e}", exc_info=True) + raise +``` + +**Принципы:** +- `logger.debug()` - для деталей выполнения (URL, параметры запроса, статус ответа) +- `logger.info()` - для успешных операций с важными результатами +- `logger.warning()` - для некритичных проблем (валидация, таймауты в неважных операциях) +- `logger.error()` - для всех ошибок перед пробросом исключения +- Всегда логируйте ошибки перед `raise` +- Используйте `exc_info=True` для критических ошибок + ## Метрики ошибок Декоратор `@track_errors` автоматически отслеживает ошибки: diff --git a/.gitignore b/.gitignore index 690367e..6cc702c 100644 --- a/.gitignore +++ b/.gitignore @@ -92,4 +92,7 @@ venv.bak/ # Other files voice_users/ -files/ \ No newline at end of file +files/ + +# ML models and vectors cache +data/ \ No newline at end of file diff --git a/Dockerfile b/Dockerfile index c41ab93..c120d14 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,14 +1,14 @@ ########################################### # Этап 1: Сборщик (Builder) ########################################### -FROM python:3.11.9-slim as builder +FROM python:3.11.9-alpine as builder -# Устанавливаем инструменты для компиляции -RUN apt-get update && apt-get install --no-install-recommends -y \ +# Устанавливаем инструменты для компиляции (если нужны для некоторых пакетов) +RUN apk add --no-cache \ gcc \ - g++ \ - python3-dev \ - && rm -rf /var/lib/apt/lists/* + musl-dev \ + libffi-dev \ + && rm -rf /var/cache/apk/* WORKDIR /app COPY requirements.txt . @@ -20,30 +20,20 @@ RUN pip install --no-cache-dir --target /install -r requirements.txt ########################################### # Этап 2: Финальный образ (Runtime) ########################################### -FROM python:3.11.9-slim as runtime - -# Минимальные рантайм-зависимости -RUN apt-get update && apt-get install --no-install-recommends -y \ - libgomp1 \ - && rm -rf /var/lib/apt/lists/* +FROM python:3.11.9-alpine as runtime # Создаем пользователя -RUN groupadd -g 1001 deploy && useradd -r -u 1001 -g deploy deploy +RUN addgroup -g 1001 deploy && adduser -D -u 1001 -G deploy deploy WORKDIR /app # Копируем зависимости COPY --from=builder --chown=deploy:deploy /install /usr/local/lib/python3.11/site-packages -# Создаем структуру папок (включая директории для ML моделей) -RUN mkdir -p database logs voice_users data/models && \ +# Создаем структуру папок +RUN mkdir -p database logs voice_users && \ chown -R deploy:deploy /app -# Устанавливаем переменные для HuggingFace (кеш моделей внутри /app) -ENV HF_HOME=/app/data/models -ENV TRANSFORMERS_CACHE=/app/data/models -ENV HF_HUB_CACHE=/app/data/models - # Копируем исходный код COPY --chown=deploy:deploy . . diff --git a/database/repositories/post_repository.py b/database/repositories/post_repository.py index cae5ede..e819cb6 100644 --- a/database/repositories/post_repository.py +++ b/database/repositories/post_repository.py @@ -462,21 +462,3 @@ class PostRepository(DatabaseConnection): self.logger.info(f"Получено {len(texts)} отклоненных постов для обучения") return texts - async def update_vector_hash(self, message_id: int, vector_hash: str) -> bool: - """ - Обновляет хеш вектора для поста (для кеширования). - - Args: - message_id: ID сообщения - vector_hash: Хеш вектора - - Returns: - True если обновлено успешно - """ - try: - query = "UPDATE post_from_telegram_suggest SET vector_hash = ? WHERE message_id = ?" - await self._execute_query(query, (vector_hash, message_id)) - return True - except Exception as e: - self.logger.error(f"Ошибка обновления vector_hash для message_id={message_id}: {e}") - return False diff --git a/env.example b/env.example index ea06d24..9350527 100644 --- a/env.example +++ b/env.example @@ -36,14 +36,13 @@ METRICS_PORT=8080 LOG_LEVEL=INFO LOG_RETENTION_DAYS=30 -# ML Scoring - RAG (ruBERT) -# Включает локальное векторное сравнение с использованием ruBERT +# ML Scoring - RAG API +# Включает оценку постов через внешний RAG API сервис RAG_ENABLED=false -RAG_MODEL=DeepPavlov/rubert-base-cased -RAG_CACHE_DIR=data/models -RAG_VECTORS_PATH=data/vectors.npz -RAG_MAX_EXAMPLES=10000 -RAG_SCORE_MULTIPLIER=5 +RAG_API_URL=http://xx.xxx.xx.xx/api/v1 +RAG_API_KEY=your_rag_api_key_here +RAG_API_TIMEOUT=30 +RAG_TEST_MODE=false # ML Scoring - DeepSeek API # Включает оценку постов через DeepSeek API diff --git a/helper_bot/handlers/admin/admin_handlers.py b/helper_bot/handlers/admin/admin_handlers.py index 31ed534..89159a5 100644 --- a/helper_bot/handlers/admin/admin_handlers.py +++ b/helper_bot/handlers/admin/admin_handlers.py @@ -161,7 +161,7 @@ async def get_ml_stats( await message.answer("📊 ML Scoring отключен\n\nДля включения установите RAG_ENABLED=true или DEEPSEEK_ENABLED=true в .env") return - stats = scoring_manager.get_stats() + stats = await scoring_manager.get_stats() # Формируем текст статистики lines = ["📊 ML Scoring Статистика\n"] @@ -169,16 +169,22 @@ async def get_ml_stats( # RAG статистика if "rag" in stats: rag = stats["rag"] - lines.append("🤖 RAG (ruBERT):") + lines.append("🤖 RAG API:") lines.append(f" • Статус: {'✅ Включен' if rag.get('enabled') else '❌ Отключен'}") - lines.append(f" • Модель: {rag.get('model_name', 'N/A')}") - lines.append(f" • Модель загружена: {'✅' if rag.get('model_loaded') else '❌'}") + lines.append(f" • API URL: {rag.get('api_url', 'N/A')}") + + # Статистика из API (если доступна) + if "positive_examples" in rag or "negative_examples" in rag: + lines.append(f" • Положительных примеров: {rag.get('positive_examples', 0)}") + lines.append(f" • Отрицательных примеров: {rag.get('negative_examples', 0)}") + lines.append(f" • Всего примеров: {rag.get('total_examples', rag.get('positive_examples', 0) + rag.get('negative_examples', 0))}") + + # Модель из API (если доступна) + if "model_loaded" in rag: + lines.append(f" • Модель загружена: {'✅' if rag.get('model_loaded') else '❌'}") + if "model_name" in rag: + lines.append(f" • Модель: {rag.get('model_name', 'N/A')}") - vs = rag.get("vector_store", {}) - lines.append(f" • Положительных примеров: {vs.get('positive_count', 0)}") - lines.append(f" • Отрицательных примеров: {vs.get('negative_count', 0)}") - lines.append(f" • Всего примеров: {vs.get('total_count', 0)}") - lines.append(f" • Макс. примеров: {vs.get('max_examples', 'N/A')}") lines.append("") # DeepSeek статистика diff --git a/helper_bot/handlers/private/services.py b/helper_bot/handlers/private/services.py index 904dd60..3fc4582 100644 --- a/helper_bot/handlers/private/services.py +++ b/helper_bot/handlers/private/services.py @@ -162,7 +162,7 @@ class PostService: # Получаем данные от RAG rag_confidence = scores.rag.confidence if scores.rag else None - rag_score_pos_only = scores.rag.metadata.get("score_pos_only") if scores.rag else None + rag_score_pos_only = scores.rag.metadata.get("rag_score_pos_only") if scores.rag else None return scores.deepseek_score, scores.rag_score, rag_confidence, rag_score_pos_only, ml_scores_json except Exception as e: diff --git a/helper_bot/main.py b/helper_bot/main.py index b710d47..a85ee0a 100644 --- a/helper_bot/main.py +++ b/helper_bot/main.py @@ -66,11 +66,22 @@ async def start_bot(bdf): # Middleware уже добавлены на уровне dispatcher dp.include_routers(admin_router, private_router, callback_router, group_router, voice_router) + # Получаем scoring_manager для использования в shutdown + scoring_manager = bdf.get_scoring_manager() + # Добавляем обработчик завершения для корректного закрытия @dp.shutdown() async def on_shutdown(): logging.info("Bot shutdown initiated, cleaning up resources...") try: + # Закрываем ресурсы ScoringManager + if scoring_manager: + try: + await scoring_manager.close() + logging.info("ScoringManager закрыт") + except Exception as e: + logging.error(f"Ошибка закрытия ScoringManager: {e}") + await bot.session.close() logging.info("Bot session closed successfully") except Exception as e: @@ -78,22 +89,6 @@ async def start_bot(bdf): await bot.delete_webhook(drop_pending_updates=True) - # Загружаем примеры для RAG из базы данных - scoring_manager = bdf.get_scoring_manager() - if scoring_manager and scoring_manager.rag_service and scoring_manager.rag_service.is_enabled: - try: - db = bdf.get_db() - positive_texts = await db.get_approved_posts_texts(limit=5000) - negative_texts = await db.get_declined_posts_texts(limit=5000) - - if positive_texts or negative_texts: - await scoring_manager.load_examples_from_db(positive_texts, negative_texts) - logging.info(f"RAG: Загружено {len(positive_texts)} положительных и {len(negative_texts)} отрицательных примеров") - else: - logging.warning("RAG: Нет примеров в базе данных для загрузки") - except Exception as e: - logging.error(f"Ошибка загрузки примеров для RAG: {e}") - # Запускаем HTTP сервер для метрик параллельно с ботом metrics_host = bdf.settings.get('Metrics', {}).get('host', '0.0.0.0') metrics_port = bdf.settings.get('Metrics', {}).get('port', 8080) @@ -113,6 +108,14 @@ async def start_bot(bdf): logging.error(f"❌ Ошибка запуска бота: {e}") raise finally: + # Закрываем ресурсы ScoringManager перед завершением (на случай если shutdown не сработал) + if scoring_manager: + try: + await scoring_manager.close() + logging.info("ScoringManager закрыт в finally") + except Exception as e: + logging.error(f"Ошибка закрытия ScoringManager в finally: {e}") + # Останавливаем метрики сервер при завершении try: await stop_metrics_server() diff --git a/helper_bot/services/scoring/__init__.py b/helper_bot/services/scoring/__init__.py index a56b7fe..b0eb339 100644 --- a/helper_bot/services/scoring/__init__.py +++ b/helper_bot/services/scoring/__init__.py @@ -2,10 +2,9 @@ Сервисы для ML-скоринга постов. Включает: -- RAGService - локальное векторное сравнение с ruBERT +- RagApiClient - HTTP клиент для внешнего RAG API сервиса - DeepSeekService - интеграция с DeepSeek API - ScoringManager - объединение всех сервисов скоринга -- VectorStore - in-memory хранилище векторов """ from .base import ScoringResult, ScoringServiceProtocol, CombinedScore @@ -17,8 +16,7 @@ from .exceptions import ( InsufficientExamplesError, TextTooShortError, ) -from .vector_store import VectorStore -from .rag_service import RAGService +from .rag_client import RagApiClient from .deepseek_service import DeepSeekService from .scoring_manager import ScoringManager @@ -35,8 +33,7 @@ __all__ = [ "InsufficientExamplesError", "TextTooShortError", # Сервисы - "VectorStore", - "RAGService", + "RagApiClient", "DeepSeekService", "ScoringManager", ] diff --git a/helper_bot/services/scoring/rag_client.py b/helper_bot/services/scoring/rag_client.py new file mode 100644 index 0000000..fc35a14 --- /dev/null +++ b/helper_bot/services/scoring/rag_client.py @@ -0,0 +1,311 @@ +""" +HTTP клиент для взаимодействия с внешним RAG сервисом. + +Использует REST API для получения скоров и отправки примеров. +""" + +from typing import Optional, Dict, Any +import httpx +from logs.custom_logger import logger +from helper_bot.utils.metrics import track_time, track_errors + +from .base import ScoringResult +from .exceptions import ScoringError, InsufficientExamplesError, TextTooShortError + + +class RagApiClient: + """ + HTTP клиент для взаимодействия с внешним RAG сервисом. + + Использует REST API для: + - Получения скоров постов (POST /api/v1/score) + - Отправки положительных примеров (POST /api/v1/examples/positive) + - Отправки отрицательных примеров (POST /api/v1/examples/negative) + - Получения статистики (GET /api/v1/stats) + + Attributes: + api_url: Базовый URL API сервиса + api_key: API ключ для аутентификации + timeout: Таймаут запросов в секундах + test_mode: Включен ли тестовый режим (добавляет заголовок X-Test-Mode: true) + enabled: Включен ли клиент + """ + + def __init__( + self, + api_url: str, + api_key: str, + timeout: int = 30, + test_mode: bool = False, + enabled: bool = True, + ): + """ + Инициализация клиента. + + Args: + api_url: Базовый URL API (например, http://хх.ххх.ххх.хх/api/v1) + api_key: API ключ для аутентификации + timeout: Таймаут запросов в секундах + test_mode: Включен ли тестовый режим (добавляет заголовок X-Test-Mode: true к запросам examples) + enabled: Включен ли клиент + """ + # Убираем trailing slash если есть + self.api_url = api_url.rstrip('/') + self.api_key = api_key + self.timeout = timeout + self.test_mode = test_mode + self._enabled = enabled + + # Создаем HTTP клиент + self._client = httpx.AsyncClient( + timeout=httpx.Timeout(timeout), + headers={ + "X-API-Key": api_key, + "Content-Type": "application/json", + } + ) + + logger.info(f"RagApiClient инициализирован (url={self.api_url}, enabled={enabled})") + + @property + def source_name(self) -> str: + """Имя источника для результатов.""" + return "rag" + + @property + def is_enabled(self) -> bool: + """Проверяет, включен ли клиент.""" + return self._enabled + + async def close(self) -> None: + """Закрывает HTTP клиент.""" + await self._client.aclose() + + @track_time("calculate_score", "rag_client") + @track_errors("rag_client", "calculate_score") + async def calculate_score(self, text: str) -> ScoringResult: + """ + Рассчитывает скор для текста поста через API. + + Args: + text: Текст поста для оценки + + Returns: + ScoringResult с оценкой + + Raises: + ScoringError: При ошибке расчета + InsufficientExamplesError: Если недостаточно примеров + TextTooShortError: Если текст слишком короткий + """ + if not self._enabled: + raise ScoringError("RAG API клиент отключен") + + if not text or not text.strip(): + raise TextTooShortError("Текст пустой") + + try: + response = await self._client.post( + f"{self.api_url}/score", + json={"text": text.strip()} + ) + + # Обрабатываем различные статусы + if response.status_code == 400: + try: + error_data = response.json() + error_msg = error_data.get("detail", "Неизвестная ошибка") + except Exception: + error_msg = response.text or "Неизвестная ошибка" + + logger.warning(f"RagApiClient: Ошибка валидации запроса: {error_msg}") + + if "недостаточно" in error_msg.lower() or "insufficient" in error_msg.lower(): + raise InsufficientExamplesError(error_msg) + if "коротк" in error_msg.lower() or "short" in error_msg.lower(): + raise TextTooShortError(error_msg) + raise ScoringError(f"Ошибка валидации: {error_msg}") + + if response.status_code == 401: + logger.error("RagApiClient: Ошибка аутентификации: неверный API ключ") + raise ScoringError("Ошибка аутентификации: неверный API ключ") + + if response.status_code == 404: + logger.error("RagApiClient: RAG API endpoint не найден") + raise ScoringError("RAG API endpoint не найден") + + if response.status_code >= 500: + logger.error(f"RagApiClient: Ошибка сервера RAG API: {response.status_code}") + raise ScoringError(f"Ошибка сервера RAG API: {response.status_code}") + + # Проверяем успешный статус + if response.status_code != 200: + response.raise_for_status() + + data = response.json() + + # Парсим ответ + score = float(data.get("rag_score", 0.0)) + confidence = float(data.get("rag_confidence", 0.0)) if data.get("rag_confidence") is not None else None + + # Форматируем confidence для логирования + confidence_str = f"{confidence:.4f}" if confidence is not None else "None" + + logger.info( + f"RagApiClient: Скор успешно получен " + f"(score={score:.4f}, confidence={confidence_str})" + ) + + return ScoringResult( + score=score, + source=self.source_name, + model=data.get("meta", {}).get("model", "rag-service"), + confidence=confidence, + metadata={ + "rag_score_pos_only": float(data.get("rag_score_pos_only", 0.0)) if data.get("rag_score_pos_only") is not None else None, + "positive_examples": data.get("meta", {}).get("positive_examples"), + "negative_examples": data.get("meta", {}).get("negative_examples"), + } + ) + + except httpx.TimeoutException: + logger.error(f"RagApiClient: Таймаут запроса к RAG API (>{self.timeout}с)") + raise ScoringError(f"Таймаут запроса к RAG API (>{self.timeout}с)") + except httpx.RequestError as e: + logger.error(f"RagApiClient: Ошибка подключения к RAG API: {e}") + raise ScoringError(f"Ошибка подключения к RAG API: {e}") + except (KeyError, ValueError, TypeError) as e: + logger.error(f"RagApiClient: Ошибка парсинга ответа: {e}, response: {response.text if 'response' in locals() else 'N/A'}") + raise ScoringError(f"Ошибка парсинга ответа от RAG API: {e}") + except InsufficientExamplesError: + raise + except TextTooShortError: + raise + except ScoringError: + # Уже залогированные ошибки (401, 404, 500, таймауты и т.д.) - просто пробрасываем + raise + except Exception as e: + # Только действительно неожиданные ошибки логируем здесь + logger.error(f"RagApiClient: Неожиданная ошибка при расчете скора: {e}", exc_info=True) + raise ScoringError(f"Неожиданная ошибка: {e}") + + @track_time("add_positive_example", "rag_client") + async def add_positive_example(self, text: str) -> None: + """ + Добавляет текст как положительный пример (опубликованный пост). + + Args: + text: Текст опубликованного поста + """ + if not self._enabled: + return + + if not text or not text.strip(): + return + + try: + # Формируем заголовки (добавляем X-Test-Mode если включен тестовый режим) + headers = {} + if self.test_mode: + headers["X-Test-Mode"] = "true" + + response = await self._client.post( + f"{self.api_url}/examples/positive", + json={"text": text.strip()}, + headers=headers + ) + + if response.status_code == 200 or response.status_code == 201: + logger.info("RagApiClient: Положительный пример успешно добавлен") + elif response.status_code == 400: + logger.warning(f"RagApiClient: Ошибка валидации при добавлении положительного примера: {response.text}") + else: + logger.warning(f"RagApiClient: Неожиданный статус при добавлении положительного примера: {response.status_code}") + + except httpx.TimeoutException: + logger.warning(f"RagApiClient: Таймаут при добавлении положительного примера") + except httpx.RequestError as e: + logger.warning(f"RagApiClient: Ошибка подключения при добавлении положительного примера: {e}") + except Exception as e: + logger.error(f"RagApiClient: Ошибка добавления положительного примера: {e}") + + @track_time("add_negative_example", "rag_client") + async def add_negative_example(self, text: str) -> None: + """ + Добавляет текст как отрицательный пример (отклоненный пост). + + Args: + text: Текст отклоненного поста + """ + if not self._enabled: + return + + if not text or not text.strip(): + return + + try: + # Формируем заголовки (добавляем X-Test-Mode если включен тестовый режим) + headers = {} + if self.test_mode: + headers["X-Test-Mode"] = "true" + + response = await self._client.post( + f"{self.api_url}/examples/negative", + json={"text": text.strip()}, + headers=headers + ) + + if response.status_code == 200 or response.status_code == 201: + logger.info("RagApiClient: Отрицательный пример успешно добавлен") + elif response.status_code == 400: + logger.warning(f"RagApiClient: Ошибка валидации при добавлении отрицательного примера: {response.text}") + else: + logger.warning(f"RagApiClient: Неожиданный статус при добавлении отрицательного примера: {response.status_code}") + + except httpx.TimeoutException: + logger.warning(f"RagApiClient: Таймаут при добавлении отрицательного примера") + except httpx.RequestError as e: + logger.warning(f"RagApiClient: Ошибка подключения при добавлении отрицательного примера: {e}") + except Exception as e: + logger.error(f"RagApiClient: Ошибка добавления отрицательного примера: {e}") + + async def get_stats(self) -> Dict[str, Any]: + """ + Получает статистику от RAG API. + + Returns: + Словарь со статистикой или пустой словарь при ошибке + """ + if not self._enabled: + return {} + + try: + response = await self._client.get(f"{self.api_url}/stats") + + if response.status_code == 200: + return response.json() + else: + logger.warning(f"RagApiClient: Неожиданный статус при получении статистики: {response.status_code}") + return {} + + except httpx.TimeoutException: + logger.warning(f"RagApiClient: Таймаут при получении статистики") + return {} + except httpx.RequestError as e: + logger.warning(f"RagApiClient: Ошибка подключения при получении статистики: {e}") + return {} + except Exception as e: + logger.error(f"RagApiClient: Ошибка получения статистики: {e}") + return {} + + def get_stats_sync(self) -> Dict[str, Any]: + """ + Синхронная версия get_stats для использования в get_stats() ScoringManager. + + Внимание: Это заглушка, реальная статистика будет получена асинхронно. + """ + return { + "enabled": self._enabled, + "api_url": self.api_url, + "timeout": self.timeout, + } diff --git a/helper_bot/services/scoring/rag_service.py b/helper_bot/services/scoring/rag_service.py deleted file mode 100644 index 0c02272..0000000 --- a/helper_bot/services/scoring/rag_service.py +++ /dev/null @@ -1,507 +0,0 @@ -""" -RAG сервис для скоринга постов с использованием ruBERT. - -Использует модель DeepPavlov/rubert-base-cased для создания эмбеддингов -и сравнивает их с эталонными примерами через VectorStore. -""" - -import asyncio -from typing import Optional, List - -import numpy as np - -from logs.custom_logger import logger -from helper_bot.utils.metrics import track_time, track_errors - -from .base import ScoringResult -from .vector_store import VectorStore -from .exceptions import ( - ModelNotLoadedError, - ScoringError, - InsufficientExamplesError, - TextTooShortError, -) - - -class RAGService: - """ - RAG сервис для оценки постов на основе векторного сходства. - - Использует ruBERT для создания эмбеддингов текста и сравнивает - их с эталонными примерами (опубликованные vs отклоненные посты). - - Attributes: - model_name: Название модели HuggingFace - vector_store: Хранилище векторов - min_text_length: Минимальная длина текста для обработки - """ - - # Название модели по умолчанию - DEFAULT_MODEL = "DeepPavlov/rubert-base-cased" - - def __init__( - self, - model_name: Optional[str] = None, - vector_store: Optional[VectorStore] = None, - cache_dir: Optional[str] = None, - enabled: bool = True, - min_text_length: int = 3, - ): - """ - Инициализация RAG сервиса. - - Args: - model_name: Название модели HuggingFace (по умолчанию ruBERT) - vector_store: Хранилище векторов (создается автоматически если не передано) - cache_dir: Директория для кеширования модели - enabled: Включен ли сервис - min_text_length: Минимальная длина текста для обработки - """ - self.model_name = model_name or self.DEFAULT_MODEL - self.cache_dir = cache_dir - self._enabled = enabled - self.min_text_length = min_text_length - - # Модель и токенизатор загружаются лениво - self._model = None - self._tokenizer = None - self._model_loaded = False - - # Хранилище векторов - self.vector_store = vector_store or VectorStore() - - logger.info(f"RAGService инициализирован (model={self.model_name}, enabled={enabled})") - - @property - def source_name(self) -> str: - """Имя источника для результатов.""" - return "rag" - - @property - def is_enabled(self) -> bool: - """Проверяет, включен ли сервис.""" - return self._enabled - - @property - def is_model_loaded(self) -> bool: - """Проверяет, загружена ли модель.""" - return self._model_loaded - - async def load_model(self) -> None: - """ - Загружает модель и токенизатор. - - Выполняется асинхронно в отдельном потоке чтобы не блокировать event loop. - """ - if self._model_loaded: - return - - if not self._enabled: - logger.warning("RAGService: Сервис отключен, модель не загружается") - return - - logger.info(f"RAGService: Загрузка модели {self.model_name}...") - - try: - # Загрузка в отдельном потоке - loop = asyncio.get_event_loop() - await loop.run_in_executor(None, self._load_model_sync) - - self._model_loaded = True - logger.info(f"RAGService: Модель {self.model_name} успешно загружена") - - except Exception as e: - logger.error(f"RAGService: Ошибка загрузки модели: {e}") - raise ModelNotLoadedError(f"Не удалось загрузить модель {self.model_name}: {e}") - - def _load_model_sync(self) -> None: - """Синхронная загрузка модели (вызывается в executor).""" - logger.info("RAGService: Начало _load_model_sync, импорт transformers...") - from transformers import AutoTokenizer, AutoModel - import torch - - # Определяем устройство - self._device = "cuda" if torch.cuda.is_available() else "cpu" - logger.info(f"RAGService: Устройство определено: {self._device}") - - # Загружаем токенизатор - logger.info(f"RAGService: Загрузка токенизатора из {self.model_name}...") - self._tokenizer = AutoTokenizer.from_pretrained( - self.model_name, - cache_dir=self.cache_dir, - ) - logger.info("RAGService: Токенизатор загружен") - - # Загружаем модель - logger.info(f"RAGService: Загрузка модели из {self.model_name} (это может занять несколько минут)...") - self._model = AutoModel.from_pretrained( - self.model_name, - cache_dir=self.cache_dir, - ) - logger.info("RAGService: Модель загружена, перенос на устройство...") - self._model.to(self._device) - self._model.eval() # Режим инференса - - logger.info(f"RAGService: Модель готова на устройстве: {self._device}") - - def _get_embedding_sync(self, text: str) -> np.ndarray: - """ - Получает эмбеддинг текста (синхронно). - - Использует [CLS] токен как представление всего текста. - - Args: - text: Текст для векторизации - - Returns: - Numpy массив с эмбеддингом (768 измерений для ruBERT) - """ - import torch - - # Токенизация с ограничением длины - inputs = self._tokenizer( - text, - return_tensors="pt", - truncation=True, - max_length=512, - padding=True, - ) - inputs = {k: v.to(self._device) for k, v in inputs.items()} - - # Получаем эмбеддинг - with torch.no_grad(): - outputs = self._model(**inputs) - # Используем [CLS] токен (первый токен) - embedding = outputs.last_hidden_state[:, 0, :].cpu().numpy() - - return embedding.flatten() - - def _get_embeddings_batch_sync(self, texts: List[str], batch_size: int = 16) -> List[np.ndarray]: - """ - Получает эмбеддинги для батча текстов (синхронно). - - Обрабатывает тексты пачками для эффективного использования GPU/CPU. - - Args: - texts: Список текстов для векторизации - batch_size: Размер батча (по умолчанию 16) - - Returns: - Список numpy массивов с эмбеддингами - """ - import torch - - all_embeddings = [] - - for i in range(0, len(texts), batch_size): - batch_texts = texts[i:i + batch_size] - - # Токенизация батча - inputs = self._tokenizer( - batch_texts, - return_tensors="pt", - truncation=True, - max_length=512, - padding=True, - ) - inputs = {k: v.to(self._device) for k, v in inputs.items()} - - # Получаем эмбеддинги - with torch.no_grad(): - outputs = self._model(**inputs) - # [CLS] токен для каждого текста в батче - batch_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy() - - # Разбиваем на отдельные эмбеддинги - for j in range(len(batch_texts)): - all_embeddings.append(batch_embeddings[j]) - - if i > 0 and i % (batch_size * 10) == 0: - logger.info(f"RAGService: Обработано {i}/{len(texts)} текстов") - - return all_embeddings - - async def get_embeddings_batch(self, texts: List[str], batch_size: int = 16) -> List[np.ndarray]: - """ - Получает эмбеддинги для батча текстов (асинхронно). - - Args: - texts: Список текстов для векторизации - batch_size: Размер батча - - Returns: - Список numpy массивов с эмбеддингами - """ - if not self._model_loaded: - await self.load_model() - - if not self._model_loaded: - raise ModelNotLoadedError("Модель не загружена") - - # Очищаем тексты - clean_texts = [self._clean_text(text) for text in texts] - - # Выполняем батч-обработку в thread pool - loop = asyncio.get_event_loop() - embeddings = await loop.run_in_executor( - None, - self._get_embeddings_batch_sync, - clean_texts, - batch_size, - ) - - return embeddings - - async def get_embedding(self, text: str) -> np.ndarray: - """ - Получает эмбеддинг текста (асинхронно). - - Args: - text: Текст для векторизации - - Returns: - Numpy массив с эмбеддингом - - Raises: - ModelNotLoadedError: Если модель не загружена - TextTooShortError: Если текст слишком короткий - """ - if not self._model_loaded: - await self.load_model() - - if not self._model_loaded: - raise ModelNotLoadedError("Модель не загружена") - - # Очищаем текст - clean_text = self._clean_text(text) - - if len(clean_text) < self.min_text_length: - raise TextTooShortError( - f"Текст слишком короткий (минимум {self.min_text_length} символов)" - ) - - # Выполняем в отдельном потоке - loop = asyncio.get_event_loop() - embedding = await loop.run_in_executor( - None, - self._get_embedding_sync, - clean_text - ) - - return embedding - - def _clean_text(self, text: str) -> str: - """Очищает текст от лишних символов.""" - if not text: - return "" - - # Удаляем лишние пробелы и переносы строк - clean = " ".join(text.split()) - - # Удаляем служебные символы (например "^" для helper сообщений) - if clean == "^": - return "" - - return clean.strip() - - @track_time("calculate_score", "rag_service") - @track_errors("rag_service", "calculate_score") - async def calculate_score(self, text: str) -> ScoringResult: - """ - Рассчитывает скор для текста поста. - - Args: - text: Текст поста для оценки - - Returns: - ScoringResult с оценкой - - Raises: - ScoringError: При ошибке расчета - """ - if not self._enabled: - raise ScoringError("RAG сервис отключен") - - try: - # Получаем эмбеддинг текста - embedding = await self.get_embedding(text) - - # Логируем первые элементы вектора для отладки - logger.info( - f"RAGService: embedding[:3]={embedding[:3].tolist()}, " - f"text_preview='{text[:30]}'" - ) - - # Рассчитываем скор через VectorStore - score, confidence, score_pos_only = self.vector_store.calculate_similarity_score(embedding) - - return ScoringResult( - score=score, - source=self.source_name, - model=self.model_name, - confidence=confidence, - metadata={ - "positive_examples": self.vector_store.positive_count, - "negative_examples": self.vector_store.negative_count, - "score_pos_only": score_pos_only, # Для сравнения - }, - ) - - except InsufficientExamplesError: - # Не достаточно примеров - возвращаем нейтральный скор - logger.warning("RAGService: Недостаточно примеров для расчета скора") - raise - - except TextTooShortError: - logger.warning(f"RAGService: Текст слишком короткий для оценки") - raise - - except Exception as e: - logger.error(f"RAGService: Ошибка расчета скора: {e}") - raise ScoringError(f"Ошибка расчета скора: {e}") - - @track_time("add_positive_example", "rag_service") - async def add_positive_example(self, text: str) -> None: - """ - Добавляет текст как положительный пример (опубликованный пост). - - Args: - text: Текст опубликованного поста - """ - if not self._enabled: - return - - try: - clean_text = self._clean_text(text) - if len(clean_text) < self.min_text_length: - logger.debug("RAGService: Текст слишком короткий для примера, пропускаем") - return - - # Получаем эмбеддинг - embedding = await self.get_embedding(clean_text) - - # Вычисляем хеш для дедупликации - text_hash = VectorStore.compute_text_hash(clean_text) - - # Добавляем в хранилище - added = self.vector_store.add_positive(embedding, text_hash) - - if added: - logger.info(f"RAGService: Добавлен положительный пример") - - except Exception as e: - logger.error(f"RAGService: Ошибка добавления положительного примера: {e}") - - @track_time("add_negative_example", "rag_service") - async def add_negative_example(self, text: str) -> None: - """ - Добавляет текст как отрицательный пример (отклоненный пост). - - Args: - text: Текст отклоненного поста - """ - if not self._enabled: - return - - try: - clean_text = self._clean_text(text) - if len(clean_text) < self.min_text_length: - logger.debug("RAGService: Текст слишком короткий для примера, пропускаем") - return - - # Получаем эмбеддинг - embedding = await self.get_embedding(clean_text) - - # Вычисляем хеш для дедупликации - text_hash = VectorStore.compute_text_hash(clean_text) - - # Добавляем в хранилище - added = self.vector_store.add_negative(embedding, text_hash) - - if added: - logger.info(f"RAGService: Добавлен отрицательный пример") - - except Exception as e: - logger.error(f"RAGService: Ошибка добавления отрицательного примера: {e}") - - async def load_examples_from_db( - self, - positive_texts: list[str], - negative_texts: list[str], - batch_size: int = 16, - ) -> None: - """ - Загружает примеры из базы данных с батч-обработкой. - - Используется при запуске бота для восстановления VectorStore. - Батч-обработка ускоряет загрузку в 10-20 раз. - - Args: - positive_texts: Список текстов опубликованных постов - negative_texts: Список текстов отклоненных постов - batch_size: Размер батча для обработки (по умолчанию 16) - """ - if not self._enabled: - return - - logger.info( - f"RAGService: Загрузка примеров из БД с батч-обработкой " - f"(positive: {len(positive_texts)}, negative: {len(negative_texts)}, batch_size: {batch_size})" - ) - - # Убеждаемся что модель загружена - await self.load_model() - - import time - start_time = time.time() - - # Фильтруем и очищаем положительные тексты - if positive_texts: - clean_positive = [] - positive_hashes = [] - for text in positive_texts: - clean_text = self._clean_text(text) - if len(clean_text) >= self.min_text_length: - clean_positive.append(clean_text) - positive_hashes.append(VectorStore.compute_text_hash(clean_text)) - - if clean_positive: - logger.info(f"RAGService: Обработка {len(clean_positive)} положительных примеров батчами...") - positive_embeddings = await self.get_embeddings_batch(clean_positive, batch_size) - self.vector_store.add_positive_batch(positive_embeddings, positive_hashes) - - # Фильтруем и очищаем отрицательные тексты - if negative_texts: - clean_negative = [] - negative_hashes = [] - for text in negative_texts: - clean_text = self._clean_text(text) - if len(clean_text) >= self.min_text_length: - clean_negative.append(clean_text) - negative_hashes.append(VectorStore.compute_text_hash(clean_text)) - - if clean_negative: - logger.info(f"RAGService: Обработка {len(clean_negative)} отрицательных примеров батчами...") - negative_embeddings = await self.get_embeddings_batch(clean_negative, batch_size) - self.vector_store.add_negative_batch(negative_embeddings, negative_hashes) - - elapsed = time.time() - start_time - logger.info( - f"RAGService: Загрузка завершена за {elapsed:.1f} сек " - f"(positive: {self.vector_store.positive_count}, " - f"negative: {self.vector_store.negative_count})" - ) - - def save_vectors(self) -> None: - """Сохраняет векторы на диск.""" - if self.vector_store.storage_path: - self.vector_store.save_to_disk() - - def get_stats(self) -> dict: - """Возвращает статистику сервиса.""" - return { - "enabled": self._enabled, - "model_name": self.model_name, - "model_loaded": self._model_loaded, - "vector_store": self.vector_store.get_stats(), - } diff --git a/helper_bot/services/scoring/scoring_manager.py b/helper_bot/services/scoring/scoring_manager.py index 1a9b7b3..fa23dcb 100644 --- a/helper_bot/services/scoring/scoring_manager.py +++ b/helper_bot/services/scoring/scoring_manager.py @@ -1,20 +1,19 @@ """ Менеджер для объединения всех сервисов скоринга. -Координирует работу RAGService и DeepSeekService, +Координирует работу RagApiClient и DeepSeekService, выполняет параллельные запросы и агрегирует результаты. """ import asyncio -from typing import Optional, List +from typing import Optional from logs.custom_logger import logger from helper_bot.utils.metrics import track_time, track_errors from .base import CombinedScore, ScoringResult -from .rag_service import RAGService +from .rag_client import RagApiClient from .deepseek_service import DeepSeekService -from .vector_store import VectorStore from .exceptions import ScoringError, InsufficientExamplesError, TextTooShortError @@ -22,39 +21,39 @@ class ScoringManager: """ Менеджер для управления всеми сервисами скоринга. - Объединяет RAGService и DeepSeekService, выполняет параллельные + Объединяет RagApiClient и DeepSeekService, выполняет параллельные запросы и агрегирует результаты в единый CombinedScore. Attributes: - rag_service: Сервис RAG с ruBERT + rag_client: HTTP клиент для RAG API deepseek_service: Сервис DeepSeek API """ def __init__( self, - rag_service: Optional[RAGService] = None, + rag_client: Optional[RagApiClient] = None, deepseek_service: Optional[DeepSeekService] = None, ): """ Инициализация менеджера. Args: - rag_service: Сервис RAG (создается автоматически если не передан) + rag_client: HTTP клиент для RAG API (создается автоматически если не передан) deepseek_service: Сервис DeepSeek (создается автоматически если не передан) """ - self.rag_service = rag_service + self.rag_client = rag_client self.deepseek_service = deepseek_service logger.info( f"ScoringManager инициализирован " - f"(rag={rag_service is not None and rag_service.is_enabled}, " + f"(rag={rag_client is not None and rag_client.is_enabled}, " f"deepseek={deepseek_service is not None and deepseek_service.is_enabled})" ) @property def is_any_enabled(self) -> bool: """Проверяет, включен ли хотя бы один сервис.""" - rag_enabled = self.rag_service is not None and self.rag_service.is_enabled + rag_enabled = self.rag_client is not None and self.rag_client.is_enabled deepseek_enabled = self.deepseek_service is not None and self.deepseek_service.is_enabled return rag_enabled or deepseek_enabled @@ -82,8 +81,8 @@ class ScoringManager: tasks = [] task_names = [] - # RAG сервис - if self.rag_service and self.rag_service.is_enabled: + # RAG API клиент + if self.rag_client and self.rag_client.is_enabled: tasks.append(self._get_rag_score(text)) task_names.append("rag") @@ -104,6 +103,7 @@ class ScoringManager: if isinstance(res, Exception): error_msg = str(res) result.errors[name] = error_msg + # Ошибки уже залогированы в сервисах, здесь только предупреждение logger.warning(f"ScoringManager: Ошибка от {name}: {error_msg}") elif res is not None: if name == "rag": @@ -119,9 +119,9 @@ class ScoringManager: return result async def _get_rag_score(self, text: str) -> Optional[ScoringResult]: - """Получает скор от RAG сервиса.""" + """Получает скор от RAG API.""" try: - return await self.rag_service.calculate_score(text) + return await self.rag_client.calculate_score(text) except InsufficientExamplesError: # Недостаточно примеров - это не ошибка, просто нет данных logger.info("ScoringManager: RAG - недостаточно примеров") @@ -131,7 +131,7 @@ class ScoringManager: logger.debug("ScoringManager: RAG - текст слишком короткий") return None except Exception as e: - logger.error(f"ScoringManager: RAG ошибка: {e}") + # Ошибки уже залогированы в RagApiClient, здесь только пробрасываем raise async def _get_deepseek_score(self, text: str) -> Optional[ScoringResult]: @@ -143,7 +143,7 @@ class ScoringManager: logger.debug("ScoringManager: DeepSeek - текст слишком короткий") return None except Exception as e: - logger.error(f"ScoringManager: DeepSeek ошибка: {e}") + # Ошибки уже залогированы в DeepSeekService, здесь только пробрасываем raise @track_time("on_post_published", "scoring_manager") @@ -161,8 +161,8 @@ class ScoringManager: tasks = [] - if self.rag_service and self.rag_service.is_enabled: - tasks.append(self.rag_service.add_positive_example(text)) + if self.rag_client and self.rag_client.is_enabled: + tasks.append(self.rag_client.add_positive_example(text)) if self.deepseek_service and self.deepseek_service.is_enabled: tasks.append(self.deepseek_service.add_positive_example(text)) @@ -186,8 +186,8 @@ class ScoringManager: tasks = [] - if self.rag_service and self.rag_service.is_enabled: - tasks.append(self.rag_service.add_negative_example(text)) + if self.rag_client and self.rag_client.is_enabled: + tasks.append(self.rag_client.add_negative_example(text)) if self.deepseek_service and self.deepseek_service.is_enabled: tasks.append(self.deepseek_service.add_negative_example(text)) @@ -196,45 +196,25 @@ class ScoringManager: await asyncio.gather(*tasks, return_exceptions=True) logger.info("ScoringManager: Добавлен отрицательный пример") - async def load_examples_from_db( - self, - positive_texts: List[str], - negative_texts: List[str], - ) -> None: - """ - Загружает примеры из базы данных при запуске бота. - - Args: - positive_texts: Список текстов опубликованных постов - negative_texts: Список текстов отклоненных постов - """ - if self.rag_service and self.rag_service.is_enabled: - await self.rag_service.load_examples_from_db( - positive_texts, - negative_texts - ) - - def save_vectors(self) -> None: - """Сохраняет векторы RAG на диск.""" - if self.rag_service: - self.rag_service.save_vectors() async def close(self) -> None: """Закрывает ресурсы всех сервисов.""" if self.deepseek_service: await self.deepseek_service.close() - # Сохраняем векторы перед закрытием - self.save_vectors() + if self.rag_client: + await self.rag_client.close() - def get_stats(self) -> dict: + async def get_stats(self) -> dict: """Возвращает статистику всех сервисов.""" stats = { "any_enabled": self.is_any_enabled, } - if self.rag_service: - stats["rag"] = self.rag_service.get_stats() + if self.rag_client: + # Получаем статистику асинхронно от API + rag_stats = await self.rag_client.get_stats() + stats["rag"] = rag_stats if rag_stats else self.rag_client.get_stats_sync() if self.deepseek_service: stats["deepseek"] = self.deepseek_service.get_stats() diff --git a/helper_bot/services/scoring/vector_store.py b/helper_bot/services/scoring/vector_store.py deleted file mode 100644 index a0381b3..0000000 --- a/helper_bot/services/scoring/vector_store.py +++ /dev/null @@ -1,399 +0,0 @@ -""" -In-memory хранилище векторов на numpy. - -Хранит векторные представления постов для быстрого сравнения. -Поддерживает персистентность через сохранение/загрузку с диска. -""" - -import hashlib -import os -from pathlib import Path -from typing import Optional, Tuple, List -import threading - -import numpy as np - -from logs.custom_logger import logger -from .exceptions import VectorStoreError, InsufficientExamplesError - - -class VectorStore: - """ - In-memory хранилище векторов для RAG. - - Хранит отдельно положительные (опубликованные) и отрицательные (отклоненные) - примеры. Использует косинусное сходство для расчета скора. - - Attributes: - vector_dim: Размерность векторов (768 для ruBERT) - max_examples: Максимальное количество примеров каждого типа - """ - - def __init__( - self, - vector_dim: int = 768, - max_examples: int = 10000, - storage_path: Optional[str] = None, - score_multiplier: float = 5.0, - ): - """ - Инициализация хранилища. - - Args: - vector_dim: Размерность векторов - max_examples: Максимальное количество примеров каждого типа - storage_path: Путь для сохранения/загрузки векторов (опционально) - score_multiplier: Множитель для усиления разницы в скорах - """ - self.vector_dim = vector_dim - self.max_examples = max_examples - self.storage_path = storage_path - self.score_multiplier = score_multiplier - - # Инициализируем пустые массивы - # Используем список для динамического добавления, потом конвертируем в numpy - self._positive_vectors: list = [] - self._negative_vectors: list = [] - self._positive_hashes: list = [] # Хеши текстов для дедупликации - self._negative_hashes: list = [] - - # Lock для потокобезопасности - self._lock = threading.Lock() - - # Пытаемся загрузить сохраненные векторы - if storage_path and os.path.exists(storage_path): - self._load_from_disk() - - @property - def positive_count(self) -> int: - """Количество положительных примеров.""" - return len(self._positive_vectors) - - @property - def negative_count(self) -> int: - """Количество отрицательных примеров.""" - return len(self._negative_vectors) - - @property - def total_count(self) -> int: - """Общее количество примеров.""" - return self.positive_count + self.negative_count - - @staticmethod - def compute_text_hash(text: str) -> str: - """Вычисляет хеш текста для дедупликации.""" - return hashlib.md5(text.encode('utf-8')).hexdigest() - - def _normalize_vector(self, vector: np.ndarray) -> np.ndarray: - """Нормализует вектор для косинусного сходства.""" - norm = np.linalg.norm(vector) - if norm == 0: - return vector - return vector / norm - - def add_positive(self, vector: np.ndarray, text_hash: Optional[str] = None) -> bool: - """ - Добавляет положительный пример (опубликованный пост). - - Args: - vector: Векторное представление текста - text_hash: Хеш текста для дедупликации (опционально) - - Returns: - True если добавлен, False если дубликат или превышен лимит - """ - with self._lock: - # Проверяем дубликат по хешу - if text_hash and text_hash in self._positive_hashes: - logger.debug(f"VectorStore: Пропуск дубликата положительного примера") - return False - - # Проверяем лимит - if len(self._positive_vectors) >= self.max_examples: - # Удаляем самый старый пример (FIFO) - self._positive_vectors.pop(0) - self._positive_hashes.pop(0) - logger.debug("VectorStore: Удален старый положительный пример (лимит)") - - # Нормализуем и добавляем - normalized = self._normalize_vector(vector) - self._positive_vectors.append(normalized) - if text_hash: - self._positive_hashes.append(text_hash) - - logger.info(f"VectorStore: Добавлен положительный пример (всего: {self.positive_count})") - return True - - def add_positive_batch( - self, - vectors: List[np.ndarray], - text_hashes: Optional[List[str]] = None - ) -> int: - """ - Добавляет батч положительных примеров. - - Args: - vectors: Список векторов - text_hashes: Список хешей текстов для дедупликации - - Returns: - Количество добавленных примеров - """ - if text_hashes is None: - text_hashes = [None] * len(vectors) - - added = 0 - with self._lock: - for vector, text_hash in zip(vectors, text_hashes): - # Проверяем дубликат по хешу - if text_hash and text_hash in self._positive_hashes: - continue - - # Проверяем лимит - if len(self._positive_vectors) >= self.max_examples: - self._positive_vectors.pop(0) - self._positive_hashes.pop(0) - - # Нормализуем и добавляем - normalized = self._normalize_vector(vector) - self._positive_vectors.append(normalized) - if text_hash: - self._positive_hashes.append(text_hash) - added += 1 - - logger.info(f"VectorStore: Добавлено {added} положительных примеров батчем (всего: {self.positive_count})") - return added - - def add_negative(self, vector: np.ndarray, text_hash: Optional[str] = None) -> bool: - """ - Добавляет отрицательный пример (отклоненный пост). - - Args: - vector: Векторное представление текста - text_hash: Хеш текста для дедупликации (опционально) - - Returns: - True если добавлен, False если дубликат или превышен лимит - """ - with self._lock: - # Проверяем дубликат по хешу - if text_hash and text_hash in self._negative_hashes: - logger.debug(f"VectorStore: Пропуск дубликата отрицательного примера") - return False - - # Проверяем лимит - if len(self._negative_vectors) >= self.max_examples: - # Удаляем самый старый пример (FIFO) - self._negative_vectors.pop(0) - self._negative_hashes.pop(0) - logger.debug("VectorStore: Удален старый отрицательный пример (лимит)") - - # Нормализуем и добавляем - normalized = self._normalize_vector(vector) - self._negative_vectors.append(normalized) - if text_hash: - self._negative_hashes.append(text_hash) - - logger.info(f"VectorStore: Добавлен отрицательный пример (всего: {self.negative_count})") - return True - - def add_negative_batch( - self, - vectors: List[np.ndarray], - text_hashes: Optional[List[str]] = None - ) -> int: - """ - Добавляет батч отрицательных примеров. - - Args: - vectors: Список векторов - text_hashes: Список хешей текстов для дедупликации - - Returns: - Количество добавленных примеров - """ - if text_hashes is None: - text_hashes = [None] * len(vectors) - - added = 0 - with self._lock: - for vector, text_hash in zip(vectors, text_hashes): - # Проверяем дубликат по хешу - if text_hash and text_hash in self._negative_hashes: - continue - - # Проверяем лимит - if len(self._negative_vectors) >= self.max_examples: - self._negative_vectors.pop(0) - self._negative_hashes.pop(0) - - # Нормализуем и добавляем - normalized = self._normalize_vector(vector) - self._negative_vectors.append(normalized) - if text_hash: - self._negative_hashes.append(text_hash) - added += 1 - - logger.info(f"VectorStore: Добавлено {added} отрицательных примеров батчем (всего: {self.negative_count})") - return added - - def calculate_similarity_score(self, vector: np.ndarray) -> Tuple[float, float]: - """ - Рассчитывает скор на основе сходства с примерами. - - Алгоритм: - 1. Вычисляем среднее косинусное сходство с положительными примерами - 2. Вычисляем среднее косинусное сходство с отрицательными примерами - 3. Финальный скор = pos_sim / (pos_sim + neg_sim + eps) - - Args: - vector: Векторное представление нового поста - - Returns: - Tuple (score, confidence): - - score: Оценка от 0.0 до 1.0 - - confidence: Уверенность (зависит от количества примеров) - - Raises: - InsufficientExamplesError: Если недостаточно примеров - """ - with self._lock: - if self.positive_count == 0: - raise InsufficientExamplesError( - "Нет положительных примеров для сравнения" - ) - - # Нормализуем входной вектор - normalized = self._normalize_vector(vector) - - # Конвертируем в numpy массивы для быстрых вычислений - pos_matrix = np.array(self._positive_vectors) - - # Косинусное сходство с положительными примерами - # Для нормализованных векторов это просто скалярное произведение - pos_similarities = np.dot(pos_matrix, normalized) - pos_sim = float(np.mean(pos_similarities)) - - # Косинусное сходство с отрицательными примерами - if self.negative_count > 0: - neg_matrix = np.array(self._negative_vectors) - neg_similarities = np.dot(neg_matrix, normalized) - neg_sim = float(np.mean(neg_similarities)) - else: - # Если нет отрицательных примеров, используем нейтральное значение - neg_sim = pos_sim # Нейтральный скор = 0.5 - - # === Вариант 1: neg/pos (разница между положительными и отрицательными) === - diff = pos_sim - neg_sim - score_neg_pos = 0.5 + (diff * self.score_multiplier) - score_neg_pos = max(0.0, min(1.0, score_neg_pos)) - - # === Вариант 2: pos only (только положительные, топ-k ближайших) === - # Берём топ-5 ближайших положительных примеров - top_k = min(5, len(pos_similarities)) - top_k_sim = float(np.mean(np.sort(pos_similarities)[-top_k:])) - # Нормализуем: 0.85 -> 0.0, 0.95 -> 1.0 (типичный диапазон для BERT) - score_pos_only = (top_k_sim - 0.85) / 0.10 - score_pos_only = max(0.0, min(1.0, score_pos_only)) - - # Основной скор — neg/pos (можно будет переключить позже) - score = score_neg_pos - - # Confidence зависит от количества примеров (100% при 1000 примерах) - total_examples = self.positive_count + self.negative_count - confidence = min(1.0, total_examples / 1000) - - logger.info( - f"VectorStore: pos_sim={pos_sim:.4f}, neg_sim={neg_sim:.4f}, " - f"top_k_sim={top_k_sim:.4f}, score_neg_pos={score_neg_pos:.4f}, " - f"score_pos_only={score_pos_only:.4f}" - ) - - return score, confidence, score_pos_only - - def save_to_disk(self, path: Optional[str] = None) -> None: - """ - Сохраняет векторы на диск. - - Args: - path: Путь для сохранения (если не указан, используется storage_path) - """ - save_path = path or self.storage_path - if not save_path: - raise VectorStoreError("Путь для сохранения не указан") - - with self._lock: - # Создаем директорию если нужно - Path(save_path).parent.mkdir(parents=True, exist_ok=True) - - # Сохраняем в npz формате - np.savez_compressed( - save_path, - positive_vectors=np.array(self._positive_vectors) if self._positive_vectors else np.array([]), - negative_vectors=np.array(self._negative_vectors) if self._negative_vectors else np.array([]), - positive_hashes=np.array(self._positive_hashes, dtype=object), - negative_hashes=np.array(self._negative_hashes, dtype=object), - vector_dim=self.vector_dim, - max_examples=self.max_examples, - ) - - logger.info( - f"VectorStore: Сохранено на диск ({self.positive_count} pos, " - f"{self.negative_count} neg): {save_path}" - ) - - def _load_from_disk(self) -> None: - """Загружает векторы с диска.""" - if not self.storage_path or not os.path.exists(self.storage_path): - return - - try: - with self._lock: - data = np.load(self.storage_path, allow_pickle=True) - - # Загружаем векторы - pos_vectors = data.get('positive_vectors', np.array([])) - neg_vectors = data.get('negative_vectors', np.array([])) - - if pos_vectors.size > 0: - self._positive_vectors = list(pos_vectors) - if neg_vectors.size > 0: - self._negative_vectors = list(neg_vectors) - - # Загружаем хеши - pos_hashes = data.get('positive_hashes', np.array([])) - neg_hashes = data.get('negative_hashes', np.array([])) - - if pos_hashes.size > 0: - self._positive_hashes = list(pos_hashes) - if neg_hashes.size > 0: - self._negative_hashes = list(neg_hashes) - - logger.info( - f"VectorStore: Загружено с диска ({self.positive_count} pos, " - f"{self.negative_count} neg): {self.storage_path}" - ) - - except Exception as e: - logger.error(f"VectorStore: Ошибка загрузки с диска: {e}") - # Продолжаем с пустым хранилищем - - def clear(self) -> None: - """Очищает все векторы.""" - with self._lock: - self._positive_vectors.clear() - self._negative_vectors.clear() - self._positive_hashes.clear() - self._negative_hashes.clear() - logger.info("VectorStore: Хранилище очищено") - - def get_stats(self) -> dict: - """Возвращает статистику хранилища.""" - return { - "positive_count": self.positive_count, - "negative_count": self.negative_count, - "total_count": self.total_count, - "vector_dim": self.vector_dim, - "max_examples": self.max_examples, - "storage_path": self.storage_path, - } diff --git a/helper_bot/utils/base_dependency_factory.py b/helper_bot/utils/base_dependency_factory.py index 82a0660..db71fa0 100644 --- a/helper_bot/utils/base_dependency_factory.py +++ b/helper_bot/utils/base_dependency_factory.py @@ -67,13 +67,12 @@ class BaseDependencyFactory: # Настройки ML-скоринга self.settings['Scoring'] = { - # RAG (ruBERT) + # RAG API 'rag_enabled': self._parse_bool(os.getenv('RAG_ENABLED', 'false')), - 'rag_model': os.getenv('RAG_MODEL', 'DeepPavlov/rubert-base-cased'), - 'rag_cache_dir': os.getenv('RAG_CACHE_DIR', 'data/models'), - 'rag_vectors_path': os.getenv('RAG_VECTORS_PATH', 'data/vectors.npz'), - 'rag_max_examples': self._parse_int(os.getenv('RAG_MAX_EXAMPLES', '10000')), - 'rag_score_multiplier': self._parse_float(os.getenv('RAG_SCORE_MULTIPLIER', '5.0')), + 'rag_api_url': os.getenv('RAG_API_URL', ''), + 'rag_api_key': os.getenv('RAG_API_KEY', ''), + 'rag_api_timeout': self._parse_int(os.getenv('RAG_API_TIMEOUT', '30')), + 'rag_test_mode': self._parse_bool(os.getenv('RAG_TEST_MODE', 'false')), # DeepSeek 'deepseek_enabled': self._parse_bool(os.getenv('DEEPSEEK_ENABLED', 'false')), 'deepseek_api_key': os.getenv('DEEPSEEK_API_KEY', ''), @@ -127,53 +126,35 @@ class BaseDependencyFactory: def _init_scoring_manager(self): """ - Инициализирует ScoringManager с RAG и DeepSeek сервисами. + Инициализирует ScoringManager с RAG API клиентом и DeepSeek сервисом. Вызывается лениво при первом обращении к get_scoring_manager(). """ from helper_bot.services.scoring import ( ScoringManager, - RAGService, + RagApiClient, DeepSeekService, - VectorStore, ) scoring_config = self.settings['Scoring'] - # Инициализация RAG сервиса - rag_service = None + # Инициализация RAG API клиента + rag_client = None if scoring_config['rag_enabled']: - # Путь к векторам - vectors_path = scoring_config['rag_vectors_path'] - if not os.path.isabs(vectors_path): - vectors_path = os.path.join(self._project_dir, vectors_path) + api_url = scoring_config['rag_api_url'] + api_key = scoring_config['rag_api_key'] - # Путь к кешу моделей - cache_dir = scoring_config['rag_cache_dir'] - if not os.path.isabs(cache_dir): - cache_dir = os.path.join(self._project_dir, cache_dir) - - # Создаем директории если нужно - os.makedirs(os.path.dirname(vectors_path), exist_ok=True) - os.makedirs(cache_dir, exist_ok=True) - - # Создаем VectorStore - vector_store = VectorStore( - vector_dim=768, # ruBERT dimension - max_examples=scoring_config['rag_max_examples'], - storage_path=vectors_path, - score_multiplier=scoring_config['rag_score_multiplier'], - ) - - # Создаем RAGService - rag_service = RAGService( - model_name=scoring_config['rag_model'], - vector_store=vector_store, - cache_dir=cache_dir, - enabled=True, - ) - - logger.info(f"RAGService инициализирован: {scoring_config['rag_model']}") + if not api_url or not api_key: + logger.warning("RAG включен, но не указаны RAG_API_URL или RAG_API_KEY") + else: + rag_client = RagApiClient( + api_url=api_url, + api_key=api_key, + timeout=scoring_config['rag_api_timeout'], + test_mode=scoring_config['rag_test_mode'], + enabled=True, + ) + logger.info(f"RagApiClient инициализирован: {api_url} (test_mode={scoring_config['rag_test_mode']})") # Инициализация DeepSeek сервиса deepseek_service = None @@ -189,7 +170,7 @@ class BaseDependencyFactory: # Создаем менеджер self._scoring_manager = ScoringManager( - rag_service=rag_service, + rag_client=rag_client, deepseek_service=deepseek_service, ) diff --git a/requirements.txt b/requirements.txt index 968c3f3..c2a5b89 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,8 +32,5 @@ emoji~=2.8.0 # S3 Storage (для хранения медиафайлов опубликованных постов) aioboto3>=12.0.0 -# ML Scoring (для оценки вероятности публикации постов) -numpy>=1.24.0 -transformers>=4.30.0 -torch>=2.0.0 +# HTTP клиент для RAG API httpx>=0.24.0 \ No newline at end of file diff --git a/scripts/add_ml_scores_columns.py b/scripts/add_ml_scores_columns.py index a7c23ff..78e237e 100644 --- a/scripts/add_ml_scores_columns.py +++ b/scripts/add_ml_scores_columns.py @@ -1,10 +1,9 @@ #!/usr/bin/env python3 """ -Миграция: Добавление колонок для ML-скоринга постов. +Миграция: Добавление колонки для ML-скоринга постов. Добавляет: - ml_scores (TEXT/JSON) - JSON с результатами оценки от разных моделей -- vector_hash (TEXT) - хеш текста для кеширования векторов Структура ml_scores: { @@ -46,7 +45,7 @@ async def main(db_path: str) -> None: """ Основная функция миграции. - Добавляет колонки ml_scores и vector_hash в таблицу post_from_telegram_suggest. + Добавляет колонку ml_scores в таблицу post_from_telegram_suggest. Миграция идемпотентна - можно запускать повторно без ошибок. """ db_path = os.path.abspath(db_path) @@ -67,22 +66,13 @@ async def main(db_path: str) -> None: else: logger.info("Колонка ml_scores уже существует") - # Проверяем и добавляем колонку vector_hash - if not await column_exists(conn, "post_from_telegram_suggest", "vector_hash"): - await conn.execute( - "ALTER TABLE post_from_telegram_suggest ADD COLUMN vector_hash TEXT" - ) - logger.info("Колонка vector_hash добавлена в post_from_telegram_suggest") - else: - logger.info("Колонка vector_hash уже существует") - await conn.commit() logger.info("Миграция add_ml_scores_columns завершена успешно") if __name__ == "__main__": parser = argparse.ArgumentParser( - description="Добавление колонок ml_scores и vector_hash для ML-скоринга" + description="Добавление колонки ml_scores для ML-скоринга" ) parser.add_argument( "--db", diff --git a/scripts/drop_vector_hash_column.py b/scripts/drop_vector_hash_column.py new file mode 100644 index 0000000..4d0bdd5 --- /dev/null +++ b/scripts/drop_vector_hash_column.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +""" +Миграция: Удаление колонки vector_hash из таблицы post_from_telegram_suggest. + +Колонка больше не нужна, т.к. RAG сервис вынесен в отдельный микросервис +и хранит векторы самостоятельно. + +SQLite не поддерживает DROP COLUMN напрямую (до версии 3.35.0), +поэтому используем пересоздание таблицы. +""" +import argparse +import asyncio +import os +import sys +from pathlib import Path + +# Добавляем корень проекта в путь +project_root = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(project_root)) + +import aiosqlite + +# Пытаемся импортировать logger, если не получается - используем стандартный +try: + from logs.custom_logger import logger +except ImportError: + import logging + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + logger = logging.getLogger(__name__) + +DEFAULT_DB_PATH = "database/tg-bot-database.db" + + +async def column_exists(conn: aiosqlite.Connection, table: str, column: str) -> bool: + """Проверяет существование колонки в таблице.""" + cursor = await conn.execute(f"PRAGMA table_info({table})") + columns = await cursor.fetchall() + return any(col[1] == column for col in columns) + + +async def get_sqlite_version(conn: aiosqlite.Connection) -> tuple: + """Возвращает версию SQLite.""" + cursor = await conn.execute("SELECT sqlite_version()") + version_str = (await cursor.fetchone())[0] + return tuple(map(int, version_str.split('.'))) + + +async def main(db_path: str) -> None: + """ + Удаляет колонку vector_hash из таблицы post_from_telegram_suggest. + """ + db_path = os.path.abspath(db_path) + + if not os.path.exists(db_path): + logger.error(f"База данных не найдена: {db_path}") + return + + async with aiosqlite.connect(db_path) as conn: + # Проверяем существует ли колонка + if not await column_exists(conn, "post_from_telegram_suggest", "vector_hash"): + logger.info("Колонка vector_hash не существует, миграция не требуется") + return + + # Проверяем версию SQLite + version = await get_sqlite_version(conn) + logger.info(f"Версия SQLite: {'.'.join(map(str, version))}") + + # SQLite 3.35.0+ поддерживает DROP COLUMN + if version >= (3, 35, 0): + logger.info("Используем ALTER TABLE DROP COLUMN") + await conn.execute( + "ALTER TABLE post_from_telegram_suggest DROP COLUMN vector_hash" + ) + else: + # Для старых версий пересоздаём таблицу + logger.info("Используем пересоздание таблицы (SQLite < 3.35.0)") + + # Получаем список колонок без vector_hash + cursor = await conn.execute("PRAGMA table_info(post_from_telegram_suggest)") + columns = await cursor.fetchall() + column_names = [col[1] for col in columns if col[1] != "vector_hash"] + columns_str = ", ".join(column_names) + + logger.info(f"Колонки для сохранения: {columns_str}") + + # Пересоздаём таблицу + await conn.execute("BEGIN TRANSACTION") + try: + # Создаём временную таблицу + await conn.execute( + f"CREATE TABLE post_from_telegram_suggest_backup AS " + f"SELECT {columns_str} FROM post_from_telegram_suggest" + ) + + # Удаляем старую таблицу + await conn.execute("DROP TABLE post_from_telegram_suggest") + + # Переименовываем временную + await conn.execute( + "ALTER TABLE post_from_telegram_suggest_backup " + "RENAME TO post_from_telegram_suggest" + ) + + await conn.execute("COMMIT") + except Exception as e: + await conn.execute("ROLLBACK") + raise e + + await conn.commit() + logger.info("Колонка vector_hash успешно удалена") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Удаление колонки vector_hash из post_from_telegram_suggest" + ) + parser.add_argument( + "--db", + default=os.environ.get("DATABASE_PATH", DEFAULT_DB_PATH), + help="Путь к БД", + ) + args = parser.parse_args() + asyncio.run(main(args.db))