import os import sys from typing import Optional from database.async_db import AsyncBotDB from dotenv import load_dotenv from helper_bot.utils.s3_storage import S3StorageService from logs.custom_logger import logger class BaseDependencyFactory: def __init__(self): project_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) env_path = os.path.join(project_dir, '.env') if os.path.exists(env_path): load_dotenv(env_path) self.settings = {} self._project_dir = project_dir database_path = os.getenv('DATABASE_PATH', 'database/tg-bot-database.db') if not os.path.isabs(database_path): database_path = os.path.join(project_dir, database_path) self.database = AsyncBotDB(database_path) self._load_settings_from_env() self._init_s3_storage() # ScoringManager инициализируется лениво self._scoring_manager = None def _load_settings_from_env(self): """Загружает настройки из переменных окружения.""" self.settings['Telegram'] = { 'bot_token': os.getenv('BOT_TOKEN', ''), 'listen_bot_token': os.getenv('LISTEN_BOT_TOKEN', ''), 'test_bot_token': os.getenv('TEST_BOT_TOKEN', ''), 'preview_link': self._parse_bool(os.getenv('PREVIEW_LINK', 'false')), 'main_public': os.getenv('MAIN_PUBLIC', ''), 'group_for_posts': self._parse_int(os.getenv('GROUP_FOR_POSTS', '0')), 'group_for_message': self._parse_int(os.getenv('GROUP_FOR_MESSAGE', '0')), 'group_for_logs': self._parse_int(os.getenv('GROUP_FOR_LOGS', '0')), 'important_logs': self._parse_int(os.getenv('IMPORTANT_LOGS', '0')), 'archive': self._parse_int(os.getenv('ARCHIVE', '0')), 'test_group': self._parse_int(os.getenv('TEST_GROUP', '0')) } self.settings['Settings'] = { 'logs': self._parse_bool(os.getenv('LOGS', 'false')), 'test': self._parse_bool(os.getenv('TEST', 'false')) } self.settings['Metrics'] = { 'host': os.getenv('METRICS_HOST', '0.0.0.0'), 'port': self._parse_int(os.getenv('METRICS_PORT', '8080')) } self.settings['S3'] = { 'enabled': self._parse_bool(os.getenv('S3_ENABLED', 'false')), 'endpoint_url': os.getenv('S3_ENDPOINT_URL', ''), 'access_key': os.getenv('S3_ACCESS_KEY', ''), 'secret_key': os.getenv('S3_SECRET_KEY', ''), 'bucket_name': os.getenv('S3_BUCKET_NAME', ''), 'region': os.getenv('S3_REGION', 'us-east-1') } # Настройки ML-скоринга self.settings['Scoring'] = { # RAG API 'rag_enabled': self._parse_bool(os.getenv('RAG_ENABLED', 'false')), 'rag_api_url': os.getenv('RAG_API_URL', ''), 'rag_api_key': os.getenv('RAG_API_KEY', ''), 'rag_api_timeout': self._parse_int(os.getenv('RAG_API_TIMEOUT', '30')), 'rag_test_mode': self._parse_bool(os.getenv('RAG_TEST_MODE', 'false')), # DeepSeek 'deepseek_enabled': self._parse_bool(os.getenv('DEEPSEEK_ENABLED', 'false')), 'deepseek_api_key': os.getenv('DEEPSEEK_API_KEY', ''), 'deepseek_api_url': os.getenv('DEEPSEEK_API_URL', 'https://api.deepseek.com/v1/chat/completions'), 'deepseek_model': os.getenv('DEEPSEEK_MODEL', 'deepseek-chat'), 'deepseek_timeout': self._parse_int(os.getenv('DEEPSEEK_TIMEOUT', '30')), } def _init_s3_storage(self): """Инициализирует S3StorageService если S3 включен.""" self.s3_storage = None if self.settings['S3']['enabled']: s3_config = self.settings['S3'] if s3_config['endpoint_url'] and s3_config['access_key'] and s3_config['secret_key'] and s3_config['bucket_name']: self.s3_storage = S3StorageService( endpoint_url=s3_config['endpoint_url'], access_key=s3_config['access_key'], secret_key=s3_config['secret_key'], bucket_name=s3_config['bucket_name'], region=s3_config['region'] ) def _parse_bool(self, value: str) -> bool: """Парсит строковое значение в boolean.""" return value.lower() in ('true', '1', 'yes', 'on') def _parse_int(self, value: str) -> int: """Парсит строковое значение в integer.""" try: return int(value) except (ValueError, TypeError): return 0 def _parse_float(self, value: str) -> float: """Парсит строковое значение в float.""" try: return float(value) except (ValueError, TypeError): return 0.0 def get_settings(self): return self.settings def get_db(self) -> AsyncBotDB: """Возвращает подключение к базе данных.""" return self.database def get_s3_storage(self) -> Optional[S3StorageService]: """Возвращает S3StorageService если S3 включен, иначе None.""" return self.s3_storage def _init_scoring_manager(self): """ Инициализирует ScoringManager с RAG API клиентом и DeepSeek сервисом. Вызывается лениво при первом обращении к get_scoring_manager(). """ from helper_bot.services.scoring import (DeepSeekService, RagApiClient, ScoringManager) scoring_config = self.settings['Scoring'] # Инициализация RAG API клиента rag_client = None if scoring_config['rag_enabled']: api_url = scoring_config['rag_api_url'] api_key = scoring_config['rag_api_key'] if not api_url or not api_key: logger.warning("RAG включен, но не указаны RAG_API_URL или RAG_API_KEY") else: rag_client = RagApiClient( api_url=api_url, api_key=api_key, timeout=scoring_config['rag_api_timeout'], test_mode=scoring_config['rag_test_mode'], enabled=True, ) logger.info(f"RagApiClient инициализирован: {api_url} (test_mode={scoring_config['rag_test_mode']})") # Инициализация DeepSeek сервиса deepseek_service = None if scoring_config['deepseek_enabled'] and scoring_config['deepseek_api_key']: deepseek_service = DeepSeekService( api_key=scoring_config['deepseek_api_key'], api_url=scoring_config['deepseek_api_url'], model=scoring_config['deepseek_model'], timeout=scoring_config['deepseek_timeout'], enabled=True, ) logger.info(f"DeepSeekService инициализирован: {scoring_config['deepseek_model']}") # Создаем менеджер self._scoring_manager = ScoringManager( rag_client=rag_client, deepseek_service=deepseek_service, ) return self._scoring_manager def get_scoring_manager(self): """ Возвращает ScoringManager для ML-скоринга постов. Инициализируется лениво при первом вызове. Returns: ScoringManager или None если скоринг полностью отключен """ if self._scoring_manager is None: scoring_config = self.settings.get('Scoring', {}) # Проверяем, включен ли хотя бы один сервис rag_enabled = scoring_config.get('rag_enabled', False) deepseek_enabled = scoring_config.get('deepseek_enabled', False) if not rag_enabled and not deepseek_enabled: logger.info("Scoring полностью отключен (RAG и DeepSeek disabled)") return None self._init_scoring_manager() return self._scoring_manager _global_instance = None def get_global_instance(): """Возвращает глобальный экземпляр BaseDependencyFactory.""" global _global_instance if _global_instance is None: _global_instance = BaseDependencyFactory() return _global_instance