import os import sys from typing import Optional from dotenv import load_dotenv from database.async_db import AsyncBotDB from helper_bot.utils.s3_storage import S3StorageService from logs.custom_logger import logger class BaseDependencyFactory: def __init__(self): project_dir = os.path.dirname( os.path.dirname(os.path.dirname(os.path.abspath(__file__))) ) env_path = os.path.join(project_dir, ".env") if os.path.exists(env_path): load_dotenv(env_path) self.settings = {} self._project_dir = project_dir database_path = os.getenv("DATABASE_PATH", "database/tg-bot-database.db") if not os.path.isabs(database_path): database_path = os.path.join(project_dir, database_path) self.database = AsyncBotDB(database_path) self._load_settings_from_env() self._init_s3_storage() # ScoringManager инициализируется лениво self._scoring_manager = None def _load_settings_from_env(self): """Загружает настройки из переменных окружения.""" self.settings["Telegram"] = { "bot_token": os.getenv("BOT_TOKEN", ""), "listen_bot_token": os.getenv("LISTEN_BOT_TOKEN", ""), "test_bot_token": os.getenv("TEST_BOT_TOKEN", ""), "preview_link": self._parse_bool(os.getenv("PREVIEW_LINK", "false")), "main_public": os.getenv("MAIN_PUBLIC", ""), "group_for_posts": self._parse_int(os.getenv("GROUP_FOR_POSTS", "0")), "group_for_message": self._parse_int(os.getenv("GROUP_FOR_MESSAGE", "0")), "group_for_logs": self._parse_int(os.getenv("GROUP_FOR_LOGS", "0")), "important_logs": self._parse_int(os.getenv("IMPORTANT_LOGS", "0")), "archive": self._parse_int(os.getenv("ARCHIVE", "0")), "test_group": self._parse_int(os.getenv("TEST_GROUP", "0")), } self.settings["Settings"] = { "logs": self._parse_bool(os.getenv("LOGS", "false")), "test": self._parse_bool(os.getenv("TEST", "false")), } self.settings["Metrics"] = { "host": os.getenv("METRICS_HOST", "0.0.0.0"), "port": self._parse_int(os.getenv("METRICS_PORT", "8080")), } self.settings["S3"] = { "enabled": self._parse_bool(os.getenv("S3_ENABLED", "false")), "endpoint_url": os.getenv("S3_ENDPOINT_URL", ""), "access_key": os.getenv("S3_ACCESS_KEY", ""), "secret_key": os.getenv("S3_SECRET_KEY", ""), "bucket_name": os.getenv("S3_BUCKET_NAME", ""), "region": os.getenv("S3_REGION", "us-east-1"), } # Настройки ML-скоринга self.settings["Scoring"] = { # RAG API "rag_enabled": self._parse_bool(os.getenv("RAG_ENABLED", "false")), "rag_api_url": os.getenv("RAG_API_URL", ""), "rag_api_key": os.getenv("RAG_API_KEY", ""), "rag_api_timeout": self._parse_int(os.getenv("RAG_API_TIMEOUT", "30")), "rag_test_mode": self._parse_bool(os.getenv("RAG_TEST_MODE", "false")), # DeepSeek "deepseek_enabled": self._parse_bool( os.getenv("DEEPSEEK_ENABLED", "false") ), "deepseek_api_key": os.getenv("DEEPSEEK_API_KEY", ""), "deepseek_api_url": os.getenv( "DEEPSEEK_API_URL", "https://api.deepseek.com/v1/chat/completions" ), "deepseek_model": os.getenv("DEEPSEEK_MODEL", "deepseek-chat"), "deepseek_timeout": self._parse_int(os.getenv("DEEPSEEK_TIMEOUT", "30")), } def _init_s3_storage(self): """Инициализирует S3StorageService если S3 включен.""" self.s3_storage = None if self.settings["S3"]["enabled"]: s3_config = self.settings["S3"] if ( s3_config["endpoint_url"] and s3_config["access_key"] and s3_config["secret_key"] and s3_config["bucket_name"] ): self.s3_storage = S3StorageService( endpoint_url=s3_config["endpoint_url"], access_key=s3_config["access_key"], secret_key=s3_config["secret_key"], bucket_name=s3_config["bucket_name"], region=s3_config["region"], ) def _parse_bool(self, value: str) -> bool: """Парсит строковое значение в boolean.""" return value.lower() in ("true", "1", "yes", "on") def _parse_int(self, value: str) -> int: """Парсит строковое значение в integer.""" try: return int(value) except (ValueError, TypeError): return 0 def _parse_float(self, value: str) -> float: """Парсит строковое значение в float.""" try: return float(value) except (ValueError, TypeError): return 0.0 def get_settings(self): return self.settings def get_db(self) -> AsyncBotDB: """Возвращает подключение к базе данных.""" return self.database def get_s3_storage(self) -> Optional[S3StorageService]: """Возвращает S3StorageService если S3 включен, иначе None.""" return self.s3_storage def _init_scoring_manager(self): """ Инициализирует ScoringManager с RAG API клиентом и DeepSeek сервисом. Вызывается лениво при первом обращении к get_scoring_manager(). """ from helper_bot.services.scoring import ( DeepSeekService, RagApiClient, ScoringManager, ) scoring_config = self.settings["Scoring"] # Инициализация RAG API клиента rag_client = None if scoring_config["rag_enabled"]: api_url = scoring_config["rag_api_url"] api_key = scoring_config["rag_api_key"] if not api_url or not api_key: logger.warning("RAG включен, но не указаны RAG_API_URL или RAG_API_KEY") else: rag_client = RagApiClient( api_url=api_url, api_key=api_key, timeout=scoring_config["rag_api_timeout"], test_mode=scoring_config["rag_test_mode"], enabled=True, ) logger.info( f"RagApiClient инициализирован: {api_url} (test_mode={scoring_config['rag_test_mode']})" ) # Инициализация DeepSeek сервиса deepseek_service = None if scoring_config["deepseek_enabled"] and scoring_config["deepseek_api_key"]: deepseek_service = DeepSeekService( api_key=scoring_config["deepseek_api_key"], api_url=scoring_config["deepseek_api_url"], model=scoring_config["deepseek_model"], timeout=scoring_config["deepseek_timeout"], enabled=True, ) logger.info( f"DeepSeekService инициализирован: {scoring_config['deepseek_model']}" ) # Создаем менеджер self._scoring_manager = ScoringManager( rag_client=rag_client, deepseek_service=deepseek_service, ) return self._scoring_manager def get_scoring_manager(self): """ Возвращает ScoringManager для ML-скоринга постов. Инициализируется лениво при первом вызове. Returns: ScoringManager или None если скоринг полностью отключен """ if self._scoring_manager is None: scoring_config = self.settings.get("Scoring", {}) # Проверяем, включен ли хотя бы один сервис rag_enabled = scoring_config.get("rag_enabled", False) deepseek_enabled = scoring_config.get("deepseek_enabled", False) if not rag_enabled and not deepseek_enabled: logger.info("Scoring полностью отключен (RAG и DeepSeek disabled)") return None self._init_scoring_manager() return self._scoring_manager _global_instance = None def get_global_instance(): """Возвращает глобальный экземпляр BaseDependencyFactory.""" global _global_instance if _global_instance is None: _global_instance = BaseDependencyFactory() return _global_instance