""" Базовые классы для системы разрешений """ from abc import ABC, abstractmethod from typing import Dict, Type, Optional, Any from services.infrastructure.database import DatabaseService from services.infrastructure.logger import get_logger logger = get_logger(__name__) class Permission(ABC): """ Абстрактный базовый класс для всех разрешений. Соблюдает принцип открытости/закрытости (OCP). """ def __init__(self, name: str, description: str = ""): self.name = name self.description = description @abstractmethod async def check(self, user_id: int, database: DatabaseService, config: Any) -> bool: """ Проверка разрешения для пользователя Args: user_id: ID пользователя в Telegram database: Сервис базы данных config: Конфигурация приложения Returns: True если у пользователя есть разрешение, False иначе """ pass def __str__(self) -> str: return f"Permission({self.name})" def __repr__(self) -> str: return f"Permission(name='{self.name}', description='{self.description}')" class PermissionRegistry: """ Реестр разрешений. Позволяет регистрировать и получать разрешения. """ def __init__(self): self._permissions: Dict[str, Permission] = {} def register(self, permission: Permission) -> None: """ Регистрация разрешения Args: permission: Разрешение для регистрации """ if permission.name in self._permissions: logger.warning(f"Разрешение '{permission.name}' уже зарегистрировано. Перезаписываем.") self._permissions[permission.name] = permission logger.debug(f"Зарегистрировано разрешение: {permission}") def get(self, name: str) -> Optional[Permission]: """ Получение разрешения по имени Args: name: Имя разрешения Returns: Разрешение или None если не найдено """ return self._permissions.get(name) def get_all(self) -> Dict[str, Permission]: """ Получение всех зарегистрированных разрешений Returns: Словарь всех разрешений """ return self._permissions.copy() def is_registered(self, name: str) -> bool: """ Проверка, зарегистрировано ли разрешение Args: name: Имя разрешения Returns: True если разрешение зарегистрировано, False иначе """ return name in self._permissions class PermissionChecker: """ Сервис для проверки разрешений пользователей. Использует реестр разрешений для получения логики проверки. """ def __init__(self, registry: PermissionRegistry, database: DatabaseService, config: Any): self.registry = registry self.database = database self.config = config async def has_permission(self, user_id: int, permission_name: str) -> bool: """ Проверка наличия разрешения у пользователя Args: user_id: ID пользователя в Telegram permission_name: Имя разрешения Returns: True если у пользователя есть разрешение, False иначе """ try: permission = self.registry.get(permission_name) if not permission: logger.warning(f"Разрешение '{permission_name}' не найдено в реестре") return False result = await permission.check(user_id, self.database, self.config) logger.debug(f"Проверка разрешения '{permission_name}' для пользователя {user_id}: {result}") return result except Exception as e: logger.error(f"Ошибка при проверке разрешения '{permission_name}' для пользователя {user_id}: {e}") return False async def has_any_permission(self, user_id: int, permission_names: list[str]) -> bool: """ Проверка наличия хотя бы одного из разрешений у пользователя Args: user_id: ID пользователя в Telegram permission_names: Список имен разрешений Returns: True если у пользователя есть хотя бы одно разрешение, False иначе """ for permission_name in permission_names: if await self.has_permission(user_id, permission_name): return True return False async def has_all_permissions(self, user_id: int, permission_names: list[str]) -> bool: """ Проверка наличия всех разрешений у пользователя Args: user_id: ID пользователя в Telegram permission_names: Список имен разрешений Returns: True если у пользователя есть все разрешения, False иначе """ for permission_name in permission_names: if not await self.has_permission(user_id, permission_name): return False return True