Files
telegram-helper-bot/helper_bot/services/scoring/rag_client.py
Andrey feee7f010c refactor: обновление системы ML-скоринга и переход на RAG API
- Обновлен Dockerfile для использования Alpine вместо Slim, улучшая размер образа.
- Удален устаревший RAGService и добавлен RagApiClient для работы с внешним RAG API.
- Обновлены переменные окружения в env.example для настройки нового RAG API.
- Обновлен ScoringManager для интеграции с RagApiClient.
- Упрощена структура проекта, удалены ненужные файлы и зависимости, связанные с векторным хранилищем.
- Обновлены обработчики и функции для работы с новым API, включая получение статистики и обработку ошибок.
2026-01-26 22:03:15 +03:00

312 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
HTTP клиент для взаимодействия с внешним RAG сервисом.
Использует REST API для получения скоров и отправки примеров.
"""
from typing import Optional, Dict, Any
import httpx
from logs.custom_logger import logger
from helper_bot.utils.metrics import track_time, track_errors
from .base import ScoringResult
from .exceptions import ScoringError, InsufficientExamplesError, TextTooShortError
class RagApiClient:
"""
HTTP клиент для взаимодействия с внешним RAG сервисом.
Использует REST API для:
- Получения скоров постов (POST /api/v1/score)
- Отправки положительных примеров (POST /api/v1/examples/positive)
- Отправки отрицательных примеров (POST /api/v1/examples/negative)
- Получения статистики (GET /api/v1/stats)
Attributes:
api_url: Базовый URL API сервиса
api_key: API ключ для аутентификации
timeout: Таймаут запросов в секундах
test_mode: Включен ли тестовый режим (добавляет заголовок X-Test-Mode: true)
enabled: Включен ли клиент
"""
def __init__(
self,
api_url: str,
api_key: str,
timeout: int = 30,
test_mode: bool = False,
enabled: bool = True,
):
"""
Инициализация клиента.
Args:
api_url: Базовый URL API (например, http://хх.ххх.ххх.хх/api/v1)
api_key: API ключ для аутентификации
timeout: Таймаут запросов в секундах
test_mode: Включен ли тестовый режим (добавляет заголовок X-Test-Mode: true к запросам examples)
enabled: Включен ли клиент
"""
# Убираем trailing slash если есть
self.api_url = api_url.rstrip('/')
self.api_key = api_key
self.timeout = timeout
self.test_mode = test_mode
self._enabled = enabled
# Создаем HTTP клиент
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(timeout),
headers={
"X-API-Key": api_key,
"Content-Type": "application/json",
}
)
logger.info(f"RagApiClient инициализирован (url={self.api_url}, enabled={enabled})")
@property
def source_name(self) -> str:
"""Имя источника для результатов."""
return "rag"
@property
def is_enabled(self) -> bool:
"""Проверяет, включен ли клиент."""
return self._enabled
async def close(self) -> None:
"""Закрывает HTTP клиент."""
await self._client.aclose()
@track_time("calculate_score", "rag_client")
@track_errors("rag_client", "calculate_score")
async def calculate_score(self, text: str) -> ScoringResult:
"""
Рассчитывает скор для текста поста через API.
Args:
text: Текст поста для оценки
Returns:
ScoringResult с оценкой
Raises:
ScoringError: При ошибке расчета
InsufficientExamplesError: Если недостаточно примеров
TextTooShortError: Если текст слишком короткий
"""
if not self._enabled:
raise ScoringError("RAG API клиент отключен")
if not text or not text.strip():
raise TextTooShortError("Текст пустой")
try:
response = await self._client.post(
f"{self.api_url}/score",
json={"text": text.strip()}
)
# Обрабатываем различные статусы
if response.status_code == 400:
try:
error_data = response.json()
error_msg = error_data.get("detail", "Неизвестная ошибка")
except Exception:
error_msg = response.text or "Неизвестная ошибка"
logger.warning(f"RagApiClient: Ошибка валидации запроса: {error_msg}")
if "недостаточно" in error_msg.lower() or "insufficient" in error_msg.lower():
raise InsufficientExamplesError(error_msg)
if "коротк" in error_msg.lower() or "short" in error_msg.lower():
raise TextTooShortError(error_msg)
raise ScoringError(f"Ошибка валидации: {error_msg}")
if response.status_code == 401:
logger.error("RagApiClient: Ошибка аутентификации: неверный API ключ")
raise ScoringError("Ошибка аутентификации: неверный API ключ")
if response.status_code == 404:
logger.error("RagApiClient: RAG API endpoint не найден")
raise ScoringError("RAG API endpoint не найден")
if response.status_code >= 500:
logger.error(f"RagApiClient: Ошибка сервера RAG API: {response.status_code}")
raise ScoringError(f"Ошибка сервера RAG API: {response.status_code}")
# Проверяем успешный статус
if response.status_code != 200:
response.raise_for_status()
data = response.json()
# Парсим ответ
score = float(data.get("rag_score", 0.0))
confidence = float(data.get("rag_confidence", 0.0)) if data.get("rag_confidence") is not None else None
# Форматируем confidence для логирования
confidence_str = f"{confidence:.4f}" if confidence is not None else "None"
logger.info(
f"RagApiClient: Скор успешно получен "
f"(score={score:.4f}, confidence={confidence_str})"
)
return ScoringResult(
score=score,
source=self.source_name,
model=data.get("meta", {}).get("model", "rag-service"),
confidence=confidence,
metadata={
"rag_score_pos_only": float(data.get("rag_score_pos_only", 0.0)) if data.get("rag_score_pos_only") is not None else None,
"positive_examples": data.get("meta", {}).get("positive_examples"),
"negative_examples": data.get("meta", {}).get("negative_examples"),
}
)
except httpx.TimeoutException:
logger.error(f"RagApiClient: Таймаут запроса к RAG API (>{self.timeout}с)")
raise ScoringError(f"Таймаут запроса к RAG API (>{self.timeout}с)")
except httpx.RequestError as e:
logger.error(f"RagApiClient: Ошибка подключения к RAG API: {e}")
raise ScoringError(f"Ошибка подключения к RAG API: {e}")
except (KeyError, ValueError, TypeError) as e:
logger.error(f"RagApiClient: Ошибка парсинга ответа: {e}, response: {response.text if 'response' in locals() else 'N/A'}")
raise ScoringError(f"Ошибка парсинга ответа от RAG API: {e}")
except InsufficientExamplesError:
raise
except TextTooShortError:
raise
except ScoringError:
# Уже залогированные ошибки (401, 404, 500, таймауты и т.д.) - просто пробрасываем
raise
except Exception as e:
# Только действительно неожиданные ошибки логируем здесь
logger.error(f"RagApiClient: Неожиданная ошибка при расчете скора: {e}", exc_info=True)
raise ScoringError(f"Неожиданная ошибка: {e}")
@track_time("add_positive_example", "rag_client")
async def add_positive_example(self, text: str) -> None:
"""
Добавляет текст как положительный пример (опубликованный пост).
Args:
text: Текст опубликованного поста
"""
if not self._enabled:
return
if not text or not text.strip():
return
try:
# Формируем заголовки (добавляем X-Test-Mode если включен тестовый режим)
headers = {}
if self.test_mode:
headers["X-Test-Mode"] = "true"
response = await self._client.post(
f"{self.api_url}/examples/positive",
json={"text": text.strip()},
headers=headers
)
if response.status_code == 200 or response.status_code == 201:
logger.info("RagApiClient: Положительный пример успешно добавлен")
elif response.status_code == 400:
logger.warning(f"RagApiClient: Ошибка валидации при добавлении положительного примера: {response.text}")
else:
logger.warning(f"RagApiClient: Неожиданный статус при добавлении положительного примера: {response.status_code}")
except httpx.TimeoutException:
logger.warning(f"RagApiClient: Таймаут при добавлении положительного примера")
except httpx.RequestError as e:
logger.warning(f"RagApiClient: Ошибка подключения при добавлении положительного примера: {e}")
except Exception as e:
logger.error(f"RagApiClient: Ошибка добавления положительного примера: {e}")
@track_time("add_negative_example", "rag_client")
async def add_negative_example(self, text: str) -> None:
"""
Добавляет текст как отрицательный пример (отклоненный пост).
Args:
text: Текст отклоненного поста
"""
if not self._enabled:
return
if not text or not text.strip():
return
try:
# Формируем заголовки (добавляем X-Test-Mode если включен тестовый режим)
headers = {}
if self.test_mode:
headers["X-Test-Mode"] = "true"
response = await self._client.post(
f"{self.api_url}/examples/negative",
json={"text": text.strip()},
headers=headers
)
if response.status_code == 200 or response.status_code == 201:
logger.info("RagApiClient: Отрицательный пример успешно добавлен")
elif response.status_code == 400:
logger.warning(f"RagApiClient: Ошибка валидации при добавлении отрицательного примера: {response.text}")
else:
logger.warning(f"RagApiClient: Неожиданный статус при добавлении отрицательного примера: {response.status_code}")
except httpx.TimeoutException:
logger.warning(f"RagApiClient: Таймаут при добавлении отрицательного примера")
except httpx.RequestError as e:
logger.warning(f"RagApiClient: Ошибка подключения при добавлении отрицательного примера: {e}")
except Exception as e:
logger.error(f"RagApiClient: Ошибка добавления отрицательного примера: {e}")
async def get_stats(self) -> Dict[str, Any]:
"""
Получает статистику от RAG API.
Returns:
Словарь со статистикой или пустой словарь при ошибке
"""
if not self._enabled:
return {}
try:
response = await self._client.get(f"{self.api_url}/stats")
if response.status_code == 200:
return response.json()
else:
logger.warning(f"RagApiClient: Неожиданный статус при получении статистики: {response.status_code}")
return {}
except httpx.TimeoutException:
logger.warning(f"RagApiClient: Таймаут при получении статистики")
return {}
except httpx.RequestError as e:
logger.warning(f"RagApiClient: Ошибка подключения при получении статистики: {e}")
return {}
except Exception as e:
logger.error(f"RagApiClient: Ошибка получения статистики: {e}")
return {}
def get_stats_sync(self) -> Dict[str, Any]:
"""
Синхронная версия get_stats для использования в get_stats() ScoringManager.
Внимание: Это заглушка, реальная статистика будет получена асинхронно.
"""
return {
"enabled": self._enabled,
"api_url": self.api_url,
"timeout": self.timeout,
}