""" HTTP клиент для взаимодействия с внешним RAG сервисом. Использует REST API для получения скоров и отправки примеров. """ from typing import Any, Dict, Optional import httpx from helper_bot.utils.metrics import track_errors, track_time from logs.custom_logger import logger from .base import ScoringResult from .exceptions import (InsufficientExamplesError, ScoringError, TextTooShortError) class RagApiClient: """ HTTP клиент для взаимодействия с внешним RAG сервисом. Использует REST API для: - Получения скоров постов (POST /api/v1/score) - Отправки положительных примеров (POST /api/v1/examples/positive) - Отправки отрицательных примеров (POST /api/v1/examples/negative) - Получения статистики (GET /api/v1/stats) Attributes: api_url: Базовый URL API сервиса api_key: API ключ для аутентификации timeout: Таймаут запросов в секундах test_mode: Включен ли тестовый режим (добавляет заголовок X-Test-Mode: true) enabled: Включен ли клиент """ def __init__( self, api_url: str, api_key: str, timeout: int = 30, test_mode: bool = False, enabled: bool = True, ): """ Инициализация клиента. Args: api_url: Базовый URL API (например, http://хх.ххх.ххх.хх/api/v1) api_key: API ключ для аутентификации timeout: Таймаут запросов в секундах test_mode: Включен ли тестовый режим (добавляет заголовок X-Test-Mode: true к запросам examples) enabled: Включен ли клиент """ # Убираем trailing slash если есть self.api_url = api_url.rstrip('/') self.api_key = api_key self.timeout = timeout self.test_mode = test_mode self._enabled = enabled # Создаем HTTP клиент self._client = httpx.AsyncClient( timeout=httpx.Timeout(timeout), headers={ "X-API-Key": api_key, "Content-Type": "application/json", } ) logger.info(f"RagApiClient инициализирован (url={self.api_url}, enabled={enabled})") @property def source_name(self) -> str: """Имя источника для результатов.""" return "rag" @property def is_enabled(self) -> bool: """Проверяет, включен ли клиент.""" return self._enabled async def close(self) -> None: """Закрывает HTTP клиент.""" await self._client.aclose() @track_time("calculate_score", "rag_client") @track_errors("rag_client", "calculate_score") async def calculate_score(self, text: str) -> ScoringResult: """ Рассчитывает скор для текста поста через API. Args: text: Текст поста для оценки Returns: ScoringResult с оценкой Raises: ScoringError: При ошибке расчета InsufficientExamplesError: Если недостаточно примеров TextTooShortError: Если текст слишком короткий """ if not self._enabled: raise ScoringError("RAG API клиент отключен") if not text or not text.strip(): raise TextTooShortError("Текст пустой") try: response = await self._client.post( f"{self.api_url}/score", json={"text": text.strip()} ) # Обрабатываем различные статусы if response.status_code == 400: try: error_data = response.json() error_msg = error_data.get("detail", "Неизвестная ошибка") except Exception: error_msg = response.text or "Неизвестная ошибка" logger.warning(f"RagApiClient: Ошибка валидации запроса: {error_msg}") if "недостаточно" in error_msg.lower() or "insufficient" in error_msg.lower(): raise InsufficientExamplesError(error_msg) if "коротк" in error_msg.lower() or "short" in error_msg.lower(): raise TextTooShortError(error_msg) raise ScoringError(f"Ошибка валидации: {error_msg}") if response.status_code == 401: logger.error("RagApiClient: Ошибка аутентификации: неверный API ключ") raise ScoringError("Ошибка аутентификации: неверный API ключ") if response.status_code == 404: logger.error("RagApiClient: RAG API endpoint не найден") raise ScoringError("RAG API endpoint не найден") if response.status_code >= 500: logger.error(f"RagApiClient: Ошибка сервера RAG API: {response.status_code}") raise ScoringError(f"Ошибка сервера RAG API: {response.status_code}") # Проверяем успешный статус if response.status_code != 200: response.raise_for_status() data = response.json() # Парсим ответ score = float(data.get("rag_score", 0.0)) confidence = float(data.get("rag_confidence", 0.0)) if data.get("rag_confidence") is not None else None rag_score_pos_only_raw = data.get("rag_score_pos_only") rag_score_pos_only = float(rag_score_pos_only_raw) if rag_score_pos_only_raw is not None else None # Форматируем confidence для логирования confidence_str = f"{confidence:.4f}" if confidence is not None else "None" rag_score_pos_only_str = f"{rag_score_pos_only:.4f}" if rag_score_pos_only is not None else "None" logger.info( f"RagApiClient: Скор успешно получен из API - " f"rag_score={score:.4f} (type: {type(score).__name__}), " f"rag_confidence={confidence_str}, " f"rag_score_pos_only={rag_score_pos_only_str}, " f"raw_response_rag_score={data.get('rag_score')}, " f"raw_response_rag_score_pos_only={rag_score_pos_only_raw}" ) return ScoringResult( score=score, source=self.source_name, model=data.get("meta", {}).get("model", "rag-service"), confidence=confidence, metadata={ "rag_score_pos_only": rag_score_pos_only, "positive_examples": data.get("meta", {}).get("positive_examples"), "negative_examples": data.get("meta", {}).get("negative_examples"), } ) except httpx.TimeoutException: logger.error(f"RagApiClient: Таймаут запроса к RAG API (>{self.timeout}с)") raise ScoringError(f"Таймаут запроса к RAG API (>{self.timeout}с)") except httpx.RequestError as e: logger.error(f"RagApiClient: Ошибка подключения к RAG API: {e}") raise ScoringError(f"Ошибка подключения к RAG API: {e}") except (KeyError, ValueError, TypeError) as e: logger.error(f"RagApiClient: Ошибка парсинга ответа: {e}, response: {response.text if 'response' in locals() else 'N/A'}") raise ScoringError(f"Ошибка парсинга ответа от RAG API: {e}") except InsufficientExamplesError: raise except TextTooShortError: raise except ScoringError: # Уже залогированные ошибки (401, 404, 500, таймауты и т.д.) - просто пробрасываем raise except Exception as e: # Только действительно неожиданные ошибки логируем здесь logger.error(f"RagApiClient: Неожиданная ошибка при расчете скора: {e}", exc_info=True) raise ScoringError(f"Неожиданная ошибка: {e}") @track_time("add_positive_example", "rag_client") async def add_positive_example(self, text: str) -> None: """ Добавляет текст как положительный пример (опубликованный пост). Args: text: Текст опубликованного поста """ if not self._enabled: return if not text or not text.strip(): return try: # Формируем заголовки (добавляем X-Test-Mode если включен тестовый режим) headers = {} if self.test_mode: headers["X-Test-Mode"] = "true" response = await self._client.post( f"{self.api_url}/examples/positive", json={"text": text.strip()}, headers=headers ) if response.status_code == 200 or response.status_code == 201: logger.info("RagApiClient: Положительный пример успешно добавлен") elif response.status_code == 400: logger.warning(f"RagApiClient: Ошибка валидации при добавлении положительного примера: {response.text}") else: logger.warning(f"RagApiClient: Неожиданный статус при добавлении положительного примера: {response.status_code}") except httpx.TimeoutException: logger.warning(f"RagApiClient: Таймаут при добавлении положительного примера") except httpx.RequestError as e: logger.warning(f"RagApiClient: Ошибка подключения при добавлении положительного примера: {e}") except Exception as e: logger.error(f"RagApiClient: Ошибка добавления положительного примера: {e}") @track_time("add_negative_example", "rag_client") async def add_negative_example(self, text: str) -> None: """ Добавляет текст как отрицательный пример (отклоненный пост). Args: text: Текст отклоненного поста """ if not self._enabled: return if not text or not text.strip(): return try: # Формируем заголовки (добавляем X-Test-Mode если включен тестовый режим) headers = {} if self.test_mode: headers["X-Test-Mode"] = "true" response = await self._client.post( f"{self.api_url}/examples/negative", json={"text": text.strip()}, headers=headers ) if response.status_code == 200 or response.status_code == 201: logger.info("RagApiClient: Отрицательный пример успешно добавлен") elif response.status_code == 400: logger.warning(f"RagApiClient: Ошибка валидации при добавлении отрицательного примера: {response.text}") else: logger.warning(f"RagApiClient: Неожиданный статус при добавлении отрицательного примера: {response.status_code}") except httpx.TimeoutException: logger.warning(f"RagApiClient: Таймаут при добавлении отрицательного примера") except httpx.RequestError as e: logger.warning(f"RagApiClient: Ошибка подключения при добавлении отрицательного примера: {e}") except Exception as e: logger.error(f"RagApiClient: Ошибка добавления отрицательного примера: {e}") async def get_stats(self) -> Dict[str, Any]: """ Получает статистику от RAG API через endpoint /stats. Returns: Словарь со статистикой или пустой словарь при ошибке """ if not self._enabled: return {} try: response = await self._client.get(f"{self.api_url}/stats") if response.status_code == 200: return response.json() else: logger.warning(f"RagApiClient: Неожиданный статус при получении статистики: {response.status_code}") return {} except httpx.TimeoutException: logger.warning(f"RagApiClient: Таймаут при получении статистики") return {} except httpx.RequestError as e: logger.warning(f"RagApiClient: Ошибка подключения при получении статистики: {e}") return {} except Exception as e: logger.error(f"RagApiClient: Ошибка получения статистики: {e}") return {} def get_stats_sync(self) -> Dict[str, Any]: """ Синхронная версия get_stats для использования в get_stats() ScoringManager. Внимание: Это заглушка, реальная статистика будет получена асинхронно. """ return { "enabled": self._enabled, "api_url": self.api_url, "timeout": self.timeout, }