- Added `ca-certificates` installation to Dockerfile for improved network security. - Updated health check command in Dockerfile to include better timeout handling. - Refactored `run_helper.py` to implement proper signal handling and logging during shutdown. - Transitioned database operations to an asynchronous model in `async_db.py`, improving performance and responsiveness. - Updated database schema to support new foreign key relationships and optimized indexing for better query performance. - Enhanced various bot handlers to utilize async database methods, improving overall efficiency and user experience. - Removed obsolete database and fix scripts to streamline the project structure.
75 lines
3.1 KiB
Python
75 lines
3.1 KiB
Python
from typing import Optional
|
||
from database.base import DatabaseConnection
|
||
from database.models import Admin
|
||
|
||
|
||
class AdminRepository(DatabaseConnection):
|
||
"""Репозиторий для работы с администраторами."""
|
||
|
||
async def create_tables(self):
|
||
"""Создание таблицы администраторов."""
|
||
# Включаем поддержку внешних ключей
|
||
await self._execute_query("PRAGMA foreign_keys = ON")
|
||
|
||
query = '''
|
||
CREATE TABLE IF NOT EXISTS admins (
|
||
user_id INTEGER NOT NULL PRIMARY KEY,
|
||
role TEXT DEFAULT 'admin',
|
||
created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
||
FOREIGN KEY (user_id) REFERENCES our_users (user_id) ON DELETE CASCADE
|
||
)
|
||
'''
|
||
await self._execute_query(query)
|
||
self.logger.info("Таблица администраторов создана")
|
||
|
||
async def add_admin(self, admin: Admin) -> None:
|
||
"""Добавление администратора."""
|
||
query = "INSERT INTO admins (user_id, role) VALUES (?, ?)"
|
||
params = (admin.user_id, admin.role)
|
||
|
||
await self._execute_query(query, params)
|
||
self.logger.info(f"Администратор добавлен: user_id={admin.user_id}, role={admin.role}")
|
||
|
||
async def remove_admin(self, user_id: int) -> None:
|
||
"""Удаление администратора."""
|
||
query = "DELETE FROM admins WHERE user_id = ?"
|
||
await self._execute_query(query, (user_id,))
|
||
self.logger.info(f"Администратор удален: user_id={user_id}")
|
||
|
||
async def is_admin(self, user_id: int) -> bool:
|
||
"""Проверка, является ли пользователь администратором."""
|
||
query = "SELECT 1 FROM admins WHERE user_id = ?"
|
||
rows = await self._execute_query_with_result(query, (user_id,))
|
||
row = rows[0] if rows else None
|
||
return bool(row)
|
||
|
||
async def get_admin(self, user_id: int) -> Optional[Admin]:
|
||
"""Получение информации об администраторе."""
|
||
query = "SELECT user_id, role, created_at FROM admins WHERE user_id = ?"
|
||
rows = await self._execute_query_with_result(query, (user_id,))
|
||
row = rows[0] if rows else None
|
||
|
||
if row:
|
||
return Admin(
|
||
user_id=row[0],
|
||
role=row[1],
|
||
created_at=row[2] if len(row) > 2 else None
|
||
)
|
||
return None
|
||
|
||
async def get_all_admins(self) -> list[Admin]:
|
||
"""Получение всех администраторов."""
|
||
query = "SELECT user_id, role, created_at FROM admins ORDER BY created_at DESC"
|
||
rows = await self._execute_query_with_result(query)
|
||
|
||
admins = []
|
||
for row in rows:
|
||
admin = Admin(
|
||
user_id=row[0],
|
||
role=row[1],
|
||
created_at=row[2] if len(row) > 2 else None
|
||
)
|
||
admins.append(admin)
|
||
|
||
return admins
|