Files
telegram-helper-bot/helper_bot/utils/base_dependency_factory.py
2026-02-02 00:13:33 +03:00

229 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sys
from typing import Optional
from dotenv import load_dotenv
from database.async_db import AsyncBotDB
from helper_bot.utils.s3_storage import S3StorageService
from logs.custom_logger import logger
class BaseDependencyFactory:
def __init__(self):
project_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
env_path = os.path.join(project_dir, ".env")
if os.path.exists(env_path):
load_dotenv(env_path)
self.settings = {}
self._project_dir = project_dir
database_path = os.getenv("DATABASE_PATH", "database/tg-bot-database.db")
if not os.path.isabs(database_path):
database_path = os.path.join(project_dir, database_path)
self.database = AsyncBotDB(database_path)
self._load_settings_from_env()
self._init_s3_storage()
# ScoringManager инициализируется лениво
self._scoring_manager = None
def _load_settings_from_env(self):
"""Загружает настройки из переменных окружения."""
self.settings["Telegram"] = {
"bot_token": os.getenv("BOT_TOKEN", ""),
"listen_bot_token": os.getenv("LISTEN_BOT_TOKEN", ""),
"test_bot_token": os.getenv("TEST_BOT_TOKEN", ""),
"preview_link": self._parse_bool(os.getenv("PREVIEW_LINK", "false")),
"main_public": os.getenv("MAIN_PUBLIC", ""),
"group_for_posts": self._parse_int(os.getenv("GROUP_FOR_POSTS", "0")),
"group_for_message": self._parse_int(os.getenv("GROUP_FOR_MESSAGE", "0")),
"group_for_logs": self._parse_int(os.getenv("GROUP_FOR_LOGS", "0")),
"important_logs": self._parse_int(os.getenv("IMPORTANT_LOGS", "0")),
"archive": self._parse_int(os.getenv("ARCHIVE", "0")),
"test_group": self._parse_int(os.getenv("TEST_GROUP", "0")),
}
self.settings["Settings"] = {
"logs": self._parse_bool(os.getenv("LOGS", "false")),
"test": self._parse_bool(os.getenv("TEST", "false")),
}
self.settings["Metrics"] = {
"host": os.getenv("METRICS_HOST", "0.0.0.0"),
"port": self._parse_int(os.getenv("METRICS_PORT", "8080")),
}
self.settings["S3"] = {
"enabled": self._parse_bool(os.getenv("S3_ENABLED", "false")),
"endpoint_url": os.getenv("S3_ENDPOINT_URL", ""),
"access_key": os.getenv("S3_ACCESS_KEY", ""),
"secret_key": os.getenv("S3_SECRET_KEY", ""),
"bucket_name": os.getenv("S3_BUCKET_NAME", ""),
"region": os.getenv("S3_REGION", "us-east-1"),
}
# Настройки ML-скоринга
self.settings["Scoring"] = {
# RAG API
"rag_enabled": self._parse_bool(os.getenv("RAG_ENABLED", "false")),
"rag_api_url": os.getenv("RAG_API_URL", ""),
"rag_api_key": os.getenv("RAG_API_KEY", ""),
"rag_api_timeout": self._parse_int(os.getenv("RAG_API_TIMEOUT", "30")),
"rag_test_mode": self._parse_bool(os.getenv("RAG_TEST_MODE", "false")),
# DeepSeek
"deepseek_enabled": self._parse_bool(
os.getenv("DEEPSEEK_ENABLED", "false")
),
"deepseek_api_key": os.getenv("DEEPSEEK_API_KEY", ""),
"deepseek_api_url": os.getenv(
"DEEPSEEK_API_URL", "https://api.deepseek.com/v1/chat/completions"
),
"deepseek_model": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
"deepseek_timeout": self._parse_int(os.getenv("DEEPSEEK_TIMEOUT", "30")),
}
def _init_s3_storage(self):
"""Инициализирует S3StorageService если S3 включен."""
self.s3_storage = None
if self.settings["S3"]["enabled"]:
s3_config = self.settings["S3"]
if (
s3_config["endpoint_url"]
and s3_config["access_key"]
and s3_config["secret_key"]
and s3_config["bucket_name"]
):
self.s3_storage = S3StorageService(
endpoint_url=s3_config["endpoint_url"],
access_key=s3_config["access_key"],
secret_key=s3_config["secret_key"],
bucket_name=s3_config["bucket_name"],
region=s3_config["region"],
)
def _parse_bool(self, value: str) -> bool:
"""Парсит строковое значение в boolean."""
return value.lower() in ("true", "1", "yes", "on")
def _parse_int(self, value: str) -> int:
"""Парсит строковое значение в integer."""
try:
return int(value)
except (ValueError, TypeError):
return 0
def _parse_float(self, value: str) -> float:
"""Парсит строковое значение в float."""
try:
return float(value)
except (ValueError, TypeError):
return 0.0
def get_settings(self):
return self.settings
def get_db(self) -> AsyncBotDB:
"""Возвращает подключение к базе данных."""
return self.database
def get_s3_storage(self) -> Optional[S3StorageService]:
"""Возвращает S3StorageService если S3 включен, иначе None."""
return self.s3_storage
def _init_scoring_manager(self):
"""
Инициализирует ScoringManager с RAG API клиентом и DeepSeek сервисом.
Вызывается лениво при первом обращении к get_scoring_manager().
"""
from helper_bot.services.scoring import (
DeepSeekService,
RagApiClient,
ScoringManager,
)
scoring_config = self.settings["Scoring"]
# Инициализация RAG API клиента
rag_client = None
if scoring_config["rag_enabled"]:
api_url = scoring_config["rag_api_url"]
api_key = scoring_config["rag_api_key"]
if not api_url or not api_key:
logger.warning("RAG включен, но не указаны RAG_API_URL или RAG_API_KEY")
else:
rag_client = RagApiClient(
api_url=api_url,
api_key=api_key,
timeout=scoring_config["rag_api_timeout"],
test_mode=scoring_config["rag_test_mode"],
enabled=True,
)
logger.info(
f"RagApiClient инициализирован: {api_url} (test_mode={scoring_config['rag_test_mode']})"
)
# Инициализация DeepSeek сервиса
deepseek_service = None
if scoring_config["deepseek_enabled"] and scoring_config["deepseek_api_key"]:
deepseek_service = DeepSeekService(
api_key=scoring_config["deepseek_api_key"],
api_url=scoring_config["deepseek_api_url"],
model=scoring_config["deepseek_model"],
timeout=scoring_config["deepseek_timeout"],
enabled=True,
)
logger.info(
f"DeepSeekService инициализирован: {scoring_config['deepseek_model']}"
)
# Создаем менеджер
self._scoring_manager = ScoringManager(
rag_client=rag_client,
deepseek_service=deepseek_service,
)
return self._scoring_manager
def get_scoring_manager(self):
"""
Возвращает ScoringManager для ML-скоринга постов.
Инициализируется лениво при первом вызове.
Returns:
ScoringManager или None если скоринг полностью отключен
"""
if self._scoring_manager is None:
scoring_config = self.settings.get("Scoring", {})
# Проверяем, включен ли хотя бы один сервис
rag_enabled = scoring_config.get("rag_enabled", False)
deepseek_enabled = scoring_config.get("deepseek_enabled", False)
if not rag_enabled and not deepseek_enabled:
logger.info("Scoring полностью отключен (RAG и DeepSeek disabled)")
return None
self._init_scoring_manager()
return self._scoring_manager
_global_instance = None
def get_global_instance():
"""Возвращает глобальный экземпляр BaseDependencyFactory."""
global _global_instance
if _global_instance is None:
_global_instance = BaseDependencyFactory()
return _global_instance