Implement user-specific question numbering and update database schema. Added triggers for automatic question numbering and adjustments upon deletion. Enhanced CRUD operations to manage user_question_number effectively.

This commit is contained in:
2025-09-06 18:35:12 +03:00
parent 50be010026
commit 596a2fa813
111 changed files with 16847 additions and 65 deletions

View File

@@ -0,0 +1,20 @@
"""
Система разрешений для AnonBot
Соблюдает принцип открытости/закрытости (OCP)
"""
from .base import Permission, PermissionChecker, PermissionRegistry
from .decorators import require_permission, require_admin, require_superuser
from .registry import get_permission_registry, get_permission_checker, init_permission_checker
__all__ = [
'Permission',
'PermissionChecker',
'PermissionRegistry',
'require_permission',
'require_admin',
'require_superuser',
'get_permission_registry',
'get_permission_checker',
'init_permission_checker'
]

View File

@@ -0,0 +1,165 @@
"""
Базовые классы для системы разрешений
"""
from abc import ABC, abstractmethod
from typing import Dict, Type, Optional, Any
from services.infrastructure.database import DatabaseService
from services.infrastructure.logger import get_logger
logger = get_logger(__name__)
class Permission(ABC):
"""
Абстрактный базовый класс для всех разрешений.
Соблюдает принцип открытости/закрытости (OCP).
"""
def __init__(self, name: str, description: str = ""):
self.name = name
self.description = description
@abstractmethod
async def check(self, user_id: int, database: DatabaseService, config: Any) -> bool:
"""
Проверка разрешения для пользователя
Args:
user_id: ID пользователя в Telegram
database: Сервис базы данных
config: Конфигурация приложения
Returns:
True если у пользователя есть разрешение, False иначе
"""
pass
def __str__(self) -> str:
return f"Permission({self.name})"
def __repr__(self) -> str:
return f"Permission(name='{self.name}', description='{self.description}')"
class PermissionRegistry:
"""
Реестр разрешений. Позволяет регистрировать и получать разрешения.
"""
def __init__(self):
self._permissions: Dict[str, Permission] = {}
def register(self, permission: Permission) -> None:
"""
Регистрация разрешения
Args:
permission: Разрешение для регистрации
"""
if permission.name in self._permissions:
logger.warning(f"Разрешение '{permission.name}' уже зарегистрировано. Перезаписываем.")
self._permissions[permission.name] = permission
logger.debug(f"Зарегистрировано разрешение: {permission}")
def get(self, name: str) -> Optional[Permission]:
"""
Получение разрешения по имени
Args:
name: Имя разрешения
Returns:
Разрешение или None если не найдено
"""
return self._permissions.get(name)
def get_all(self) -> Dict[str, Permission]:
"""
Получение всех зарегистрированных разрешений
Returns:
Словарь всех разрешений
"""
return self._permissions.copy()
def is_registered(self, name: str) -> bool:
"""
Проверка, зарегистрировано ли разрешение
Args:
name: Имя разрешения
Returns:
True если разрешение зарегистрировано, False иначе
"""
return name in self._permissions
class PermissionChecker:
"""
Сервис для проверки разрешений пользователей.
Использует реестр разрешений для получения логики проверки.
"""
def __init__(self, registry: PermissionRegistry, database: DatabaseService, config: Any):
self.registry = registry
self.database = database
self.config = config
async def has_permission(self, user_id: int, permission_name: str) -> bool:
"""
Проверка наличия разрешения у пользователя
Args:
user_id: ID пользователя в Telegram
permission_name: Имя разрешения
Returns:
True если у пользователя есть разрешение, False иначе
"""
try:
permission = self.registry.get(permission_name)
if not permission:
logger.warning(f"Разрешение '{permission_name}' не найдено в реестре")
return False
result = await permission.check(user_id, self.database, self.config)
logger.debug(f"Проверка разрешения '{permission_name}' для пользователя {user_id}: {result}")
return result
except Exception as e:
logger.error(f"Ошибка при проверке разрешения '{permission_name}' для пользователя {user_id}: {e}")
return False
async def has_any_permission(self, user_id: int, permission_names: list[str]) -> bool:
"""
Проверка наличия хотя бы одного из разрешений у пользователя
Args:
user_id: ID пользователя в Telegram
permission_names: Список имен разрешений
Returns:
True если у пользователя есть хотя бы одно разрешение, False иначе
"""
for permission_name in permission_names:
if await self.has_permission(user_id, permission_name):
return True
return False
async def has_all_permissions(self, user_id: int, permission_names: list[str]) -> bool:
"""
Проверка наличия всех разрешений у пользователя
Args:
user_id: ID пользователя в Telegram
permission_names: Список имен разрешений
Returns:
True если у пользователя есть все разрешения, False иначе
"""
for permission_name in permission_names:
if not await self.has_permission(user_id, permission_name):
return False
return True

View File

@@ -0,0 +1,141 @@
"""
Декораторы для проверки разрешений
"""
from functools import wraps
from typing import Callable, Any, Union
from aiogram.types import Message, CallbackQuery
from services.infrastructure.logger import get_logger
from .registry import get_permission_checker
logger = get_logger(__name__)
def require_permission(permission_name: str, error_message: str = "У вас нет прав для выполнения этой команды."):
"""
Декоратор для проверки разрешения пользователя
Args:
permission_name: Имя разрешения для проверки
error_message: Сообщение об ошибке при отсутствии разрешения
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Извлекаем объект события (Message или CallbackQuery)
event = None
for arg in args:
if isinstance(arg, (Message, CallbackQuery)):
event = arg
break
if not event:
logger.error("Не удалось найти объект события для проверки разрешения")
return await func(*args, **kwargs)
# Получаем проверщик разрешений
checker = get_permission_checker()
if not checker:
logger.error("Проверщик разрешений не инициализирован")
return await func(*args, **kwargs)
# Проверяем разрешение
user_id = event.from_user.id
has_permission = await checker.has_permission(user_id, permission_name)
if not has_permission:
if isinstance(event, Message):
await event.answer(error_message)
elif isinstance(event, CallbackQuery):
await event.answer(error_message, show_alert=True)
return
# Выполняем оригинальную функцию
return await func(*args, **kwargs)
return wrapper
return decorator
def require_admin(error_message: str = "У вас нет прав администратора."):
"""
Декоратор для проверки прав администратора
Args:
error_message: Сообщение об ошибке при отсутствии прав администратора
"""
return require_permission("admin", error_message)
def require_superuser(error_message: str = "У вас нет прав суперпользователя."):
"""
Декоратор для проверки прав суперпользователя
Args:
error_message: Сообщение об ошибке при отсутствии прав суперпользователя
"""
return require_permission("superuser", error_message)
def require_admin_or_superuser(error_message: str = "У вас нет прав для выполнения этой команды."):
"""
Декоратор для проверки прав администратора или суперпользователя
Args:
error_message: Сообщение об ошибке при отсутствии прав
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Извлекаем объект события (Message или CallbackQuery)
event = None
for arg in args:
if isinstance(arg, (Message, CallbackQuery)):
event = arg
break
if not event:
logger.error("Не удалось найти объект события для проверки разрешения")
return await func(*args, **kwargs)
# Получаем проверщик разрешений
checker = get_permission_checker()
if not checker:
logger.error("Проверщик разрешений не инициализирован")
return await func(*args, **kwargs)
# Проверяем права администратора или суперпользователя
user_id = event.from_user.id
has_permission = await checker.has_any_permission(user_id, ["admin", "superuser"])
if not has_permission:
if isinstance(event, Message):
await event.answer(error_message)
elif isinstance(event, CallbackQuery):
await event.answer(error_message, show_alert=True)
return
# Выполняем оригинальную функцию
return await func(*args, **kwargs)
return wrapper
return decorator
def require_active_user(error_message: str = "❌ Ваш аккаунт неактивен."):
"""
Декоратор для проверки активности пользователя
Args:
error_message: Сообщение об ошибке при неактивном аккаунте
"""
return require_permission("view_questions", error_message)
def require_unbanned_user(error_message: str = "❌ Ваш аккаунт заблокирован."):
"""
Декоратор для проверки, что пользователь не забанен
Args:
error_message: Сообщение об ошибке при заблокированном аккаунте
"""
return require_permission("ask_questions", error_message)

View File

@@ -0,0 +1,55 @@
"""
Инициализация системы разрешений
Автоматически регистрирует все доступные разрешения
"""
from .registry import get_permission_registry, register_permission
from .permissions import (
AdminPermission,
SuperuserPermission,
ViewStatsPermission,
AdminPanelPermission,
ManageUsersPermission,
BroadcastPermission,
SuperuserOnlyPermission,
ViewQuestionsPermission,
AskQuestionsPermission,
AnswerQuestionsPermission
)
from services.infrastructure.logger import get_logger
logger = get_logger(__name__)
def init_all_permissions():
"""
Инициализация всех разрешений в системе
"""
logger.info("Начинаем инициализацию системы разрешений...")
# Список всех разрешений для регистрации
permissions = [
AdminPermission(),
SuperuserPermission(),
ViewStatsPermission(),
AdminPanelPermission(),
ManageUsersPermission(),
BroadcastPermission(),
SuperuserOnlyPermission(),
ViewQuestionsPermission(),
AskQuestionsPermission(),
AnswerQuestionsPermission(),
]
# Регистрируем все разрешения
for permission in permissions:
register_permission(permission)
logger.debug(f"Зарегистрировано разрешение: {permission.name}")
registry = get_permission_registry()
total_permissions = len(registry.get_all())
logger.info(f"✅ Система разрешений инициализирована. Зарегистрировано {total_permissions} разрешений")
return registry

View File

@@ -0,0 +1,196 @@
"""
Конкретные реализации разрешений
Каждое разрешение - отдельный класс, что позволяет легко добавлять новые без изменения существующего кода
"""
from .base import Permission
from services.infrastructure.database import DatabaseService
from services.infrastructure.logger import get_logger
logger = get_logger(__name__)
class AdminPermission(Permission):
"""Разрешение для администраторов"""
def __init__(self):
super().__init__(
name="admin",
description="Права администратора"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка, является ли пользователь администратором"""
return user_id in config.ADMINS
class SuperuserPermission(Permission):
"""Разрешение для суперпользователей"""
def __init__(self):
super().__init__(
name="superuser",
description="Права суперпользователя"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка, является ли пользователь суперпользователем"""
try:
user = await database.get_user(user_id)
return user.is_superuser if user else False
except Exception as e:
logger.error(f"Ошибка при проверке суперпользователя {user_id}: {e}")
return False
class ViewStatsPermission(Permission):
"""Разрешение на просмотр статистики"""
def __init__(self):
super().__init__(
name="view_stats",
description="Просмотр статистики"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка права на просмотр статистики"""
# Администраторы и суперпользователи могут просматривать статистику
admin_permission = AdminPermission()
superuser_permission = SuperuserPermission()
is_admin = await admin_permission.check(user_id, database, config)
is_superuser = await superuser_permission.check(user_id, database, config)
return is_admin or is_superuser
class AdminPanelPermission(Permission):
"""Разрешение на доступ к админ панели"""
def __init__(self):
super().__init__(
name="admin_panel",
description="Доступ к админ панели"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка права на доступ к админ панели"""
# Администраторы и суперпользователи могут использовать админ панель
admin_permission = AdminPermission()
superuser_permission = SuperuserPermission()
is_admin = await admin_permission.check(user_id, database, config)
is_superuser = await superuser_permission.check(user_id, database, config)
return is_admin or is_superuser
class ManageUsersPermission(Permission):
"""Разрешение на управление пользователями"""
def __init__(self):
super().__init__(
name="manage_users",
description="Управление пользователями"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка права на управление пользователями"""
# Администраторы и суперпользователи могут управлять пользователями
admin_permission = AdminPermission()
superuser_permission = SuperuserPermission()
is_admin = await admin_permission.check(user_id, database, config)
is_superuser = await superuser_permission.check(user_id, database, config)
return is_admin or is_superuser
class BroadcastPermission(Permission):
"""Разрешение на рассылку сообщений"""
def __init__(self):
super().__init__(
name="broadcast",
description="Рассылка сообщений"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка права на рассылку"""
# Только администраторы могут делать рассылку
admin_permission = AdminPermission()
return await admin_permission.check(user_id, database, config)
class SuperuserOnlyPermission(Permission):
"""Разрешение только для суперпользователей"""
def __init__(self):
super().__init__(
name="superuser_only",
description="Только для суперпользователей"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка права только для суперпользователей"""
superuser_permission = SuperuserPermission()
return await superuser_permission.check(user_id, database, config)
class ViewQuestionsPermission(Permission):
"""Разрешение на просмотр вопросов"""
def __init__(self):
super().__init__(
name="view_questions",
description="Просмотр вопросов"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка права на просмотр вопросов"""
# Все активные пользователи могут просматривать вопросы
try:
user = await database.get_user(user_id)
return user.is_active if user else False
except Exception as e:
logger.error(f"Ошибка при проверке активности пользователя {user_id}: {e}")
return False
class AskQuestionsPermission(Permission):
"""Разрешение на задавание вопросов"""
def __init__(self):
super().__init__(
name="ask_questions",
description="Задавание вопросов"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка права на задавание вопросов"""
# Все активные пользователи могут задавать вопросы
try:
user = await database.get_user(user_id)
return user.is_active and not user.is_banned if user else False
except Exception as e:
logger.error(f"Ошибка при проверке права задавать вопросы для пользователя {user_id}: {e}")
return False
class AnswerQuestionsPermission(Permission):
"""Разрешение на ответы на вопросы"""
def __init__(self):
super().__init__(
name="answer_questions",
description="Ответы на вопросы"
)
async def check(self, user_id: int, database: DatabaseService, config) -> bool:
"""Проверка права на ответы на вопросы"""
# Все активные пользователи могут отвечать на вопросы
try:
user = await database.get_user(user_id)
return user.is_active and not user.is_banned if user else False
except Exception as e:
logger.error(f"Ошибка при проверке права отвечать на вопросы для пользователя {user_id}: {e}")
return False

View File

@@ -0,0 +1,66 @@
"""
Глобальный реестр разрешений и фабричные функции
"""
from typing import Optional
from .base import PermissionRegistry, PermissionChecker, Permission
from services.infrastructure.database import DatabaseService
from services.infrastructure.logger import get_logger
logger = get_logger(__name__)
# Глобальный реестр разрешений
_permission_registry: Optional[PermissionRegistry] = None
_permission_checker: Optional[PermissionChecker] = None
def get_permission_registry() -> PermissionRegistry:
"""
Получение глобального реестра разрешений
Returns:
Глобальный экземпляр реестра разрешений
"""
global _permission_registry
if _permission_registry is None:
_permission_registry = PermissionRegistry()
logger.info("Создан глобальный реестр разрешений")
return _permission_registry
def get_permission_checker() -> Optional[PermissionChecker]:
"""
Получение глобального проверщика разрешений
Returns:
Глобальный экземпляр проверщика разрешений или None если не инициализирован
"""
return _permission_checker
def init_permission_checker(database: DatabaseService, config) -> PermissionChecker:
"""
Инициализация глобального проверщика разрешений
Args:
database: Сервис базы данных
config: Конфигурация приложения
Returns:
Инициализированный проверщик разрешений
"""
global _permission_checker
registry = get_permission_registry()
_permission_checker = PermissionChecker(registry, database, config)
logger.info("Инициализирован глобальный проверщик разрешений")
return _permission_checker
def register_permission(permission: Permission) -> None:
"""
Регистрация разрешения в глобальном реестре
Args:
permission: Разрешение для регистрации
"""
registry = get_permission_registry()
registry.register(permission)