Files
rag-service/app/main.py

207 lines
7.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
FastAPI приложение Embedding сервиса.
Сервис для векторного скоринга текстов с использованием sentence-transformers.
"""
import asyncio
import logging
import sys
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app import __version__
from app.api.routes import router
from app.config import get_settings
from app.services.rag_service import RAGService, get_rag_service
# Настройка логирования
def setup_logging() -> None:
"""Настраивает логирование для приложения."""
settings = get_settings()
logging.basicConfig(
level=getattr(logging, settings.log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
],
)
# Уменьшаем логи от библиотек
logging.getLogger("transformers").setLevel(logging.WARNING)
logging.getLogger("torch").setLevel(logging.WARNING)
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# Глобальная задача автосохранения
_autosave_task: asyncio.Task | None = None
async def autosave_loop(service: RAGService, interval: int) -> None:
"""
Фоновая задача для периодического сохранения векторов.
Args:
service: RAG сервис
interval: Интервал сохранения в секундах
"""
logger.info(f"Автосохранение запущено (интервал: {interval} сек)")
while True:
try:
await asyncio.sleep(interval)
# Сохраняем только если есть данные
has_examples = service.vector_store.total_count > 0
has_submitted = service.vector_store.submitted_count > 0
if has_examples or has_submitted:
service.save_vectors()
parts = []
if has_examples:
parts.append(
f"{service.vector_store.positive_count} pos, "
f"{service.vector_store.negative_count} neg"
)
if has_submitted:
parts.append(f"{service.vector_store.submitted_count} submitted")
logger.info(f"Автосохранение: сохранено {', '.join(parts)}")
else:
logger.debug("Автосохранение: нет данных для сохранения")
except asyncio.CancelledError:
logger.info("Автосохранение остановлено")
break
except Exception as e:
logger.error(f"Ошибка автосохранения: {e}")
# Продолжаем работу даже при ошибке
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
"""
Lifespan контекст для FastAPI.
При запуске:
- Настраивает логирование
- Прогревает модель (опционально)
При остановке:
- Сохраняет векторы на диск
"""
global _autosave_task
setup_logging()
logger.info(f"Embedding Service v{__version__} запускается...")
settings = get_settings()
logger.info(f"Настройки: model={settings.model_name}, vectors_path={settings.vectors_path}")
# Получаем сервис (создается singleton)
service = get_rag_service()
# Запускаем автосохранение если включено
if settings.autosave_interval > 0:
_autosave_task = asyncio.create_task(autosave_loop(service, settings.autosave_interval))
logger.info(f"Автосохранение включено: каждые {settings.autosave_interval} сек")
else:
logger.info("Автосохранение отключено")
# Прогреваем модель при запуске (опционально)
# Можно раскомментировать если нужен автопрогрев
# logger.info("Прогрев модели при запуске...")
# await service.warmup()
logger.info("Embedding Service готов к работе")
yield
# Останавливаем автосохранение
if _autosave_task and not _autosave_task.done():
_autosave_task.cancel()
try:
await _autosave_task
except asyncio.CancelledError:
pass
# При остановке сохраняем векторы
logger.info("Embedding Service останавливается, финальное сохранение векторов...")
try:
service.save_vectors()
logger.info("Векторы сохранены")
except Exception as e:
logger.error(f"Ошибка сохранения векторов: {e}")
logger.info("Embedding Service остановлен")
# Создание приложения
app = FastAPI(
title="Embedding Service",
description="""
Сервис векторного скоринга текстов с использованием sentence-transformers.
## Возможности
* **Скоринг** - оценка текстов на основе векторного сходства с примерами
* **Примеры** - добавление положительных и отрицательных примеров
* **Статистика** - мониторинг состояния сервиса
* **Управление** - прогрев модели, настройка параметров формулы
## Алгоритм скоринга
1. Текст преобразуется в вектор через sentence-transformers/all-MiniLM-L12-v2 (384 измерения)
2. Вычисляется косинусное сходство с положительными примерами (топ-k ближайших)
3. Вычисляется косинусное сходство с отрицательными примерами (топ-k ближайших)
4. Финальный скор = (diff * multiplier + 1) / 2, где diff = avg_pos - avg_neg, нормализованный в [0, 1]
""",
version=__version__,
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
swagger_ui_parameters={
"syntaxHighlight.theme": "agate",
"defaultModelsExpandDepth": 1,
"defaultModelExpandDepth": 1,
},
)
# CORS middleware (для возможных веб-клиентов)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # В продакшене ограничить
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Простой healthcheck endpoint без авторизации (для Docker healthcheck)
@app.get("/health")
async def simple_health_check():
"""Простая проверка здоровья без авторизации (для Docker healthcheck)."""
return {"status": "ok"}
# Подключение роутов
app.include_router(router, prefix="/api/v1")
if __name__ == "__main__":
import uvicorn
settings = get_settings()
uvicorn.run(
"app.main:app",
host=settings.api_host,
port=settings.api_port,
reload=True,
)