159 lines
6.7 KiB
Python
159 lines
6.7 KiB
Python
from typing import Dict, List, Optional
|
||
|
||
from database.base import DatabaseConnection
|
||
from database.models import BlacklistUser
|
||
|
||
|
||
class BlacklistRepository(DatabaseConnection):
|
||
"""Репозиторий для работы с черным списком."""
|
||
|
||
async def create_tables(self):
|
||
"""Создание таблицы черного списка."""
|
||
query = """
|
||
CREATE TABLE IF NOT EXISTS blacklist (
|
||
user_id INTEGER NOT NULL PRIMARY KEY,
|
||
message_for_user TEXT,
|
||
date_to_unban INTEGER,
|
||
created_at INTEGER DEFAULT (strftime('%s', 'now')),
|
||
ban_author INTEGER,
|
||
FOREIGN KEY (user_id) REFERENCES our_users (user_id) ON DELETE CASCADE,
|
||
FOREIGN KEY (ban_author) REFERENCES our_users (user_id) ON DELETE SET NULL
|
||
)
|
||
"""
|
||
await self._execute_query(query)
|
||
self.logger.info("Таблица черного списка создана")
|
||
|
||
async def add_user(self, blacklist_user: BlacklistUser) -> None:
|
||
"""Добавляет пользователя в черный список."""
|
||
query = """
|
||
INSERT INTO blacklist (user_id, message_for_user, date_to_unban, ban_author)
|
||
VALUES (?, ?, ?, ?)
|
||
"""
|
||
params = (
|
||
blacklist_user.user_id,
|
||
blacklist_user.message_for_user,
|
||
blacklist_user.date_to_unban,
|
||
blacklist_user.ban_author,
|
||
)
|
||
|
||
await self._execute_query(query, params)
|
||
self.logger.info(
|
||
f"Пользователь добавлен в черный список: user_id={blacklist_user.user_id}"
|
||
)
|
||
|
||
async def remove_user(self, user_id: int) -> bool:
|
||
"""Удаляет пользователя из черного списка."""
|
||
try:
|
||
query = "DELETE FROM blacklist WHERE user_id = ?"
|
||
await self._execute_query(query, (user_id,))
|
||
self.logger.info(
|
||
f"Пользователь с идентификатором {user_id} успешно удален из черного списка."
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(
|
||
f"Ошибка удаления пользователя с идентификатором {user_id} "
|
||
f"из таблицы blacklist. Ошибка: {str(e)}"
|
||
)
|
||
return False
|
||
|
||
async def user_exists(self, user_id: int) -> bool:
|
||
"""Проверяет, существует ли запись с данным user_id в blacklist."""
|
||
query = "SELECT 1 FROM blacklist WHERE user_id = ?"
|
||
rows = await self._execute_query_with_result(query, (user_id,))
|
||
self.logger.info(f"Существует ли пользователь: user_id={user_id} Итог: {rows}")
|
||
return bool(rows)
|
||
|
||
async def get_user(self, user_id: int) -> Optional[BlacklistUser]:
|
||
"""Возвращает информацию о пользователе в черном списке по user_id."""
|
||
query = """
|
||
SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
|
||
FROM blacklist
|
||
WHERE user_id = ?
|
||
"""
|
||
rows = await self._execute_query_with_result(query, (user_id,))
|
||
row = rows[0] if rows else None
|
||
|
||
if row:
|
||
return BlacklistUser(
|
||
user_id=row[0],
|
||
message_for_user=row[1],
|
||
date_to_unban=row[2],
|
||
created_at=row[3],
|
||
ban_author=row[4] if len(row) > 4 else None,
|
||
)
|
||
return None
|
||
|
||
async def get_all_users(
|
||
self, offset: int = 0, limit: int = 10
|
||
) -> List[BlacklistUser]:
|
||
"""Возвращает список пользователей в черном списке, отсортированных по дате бана (новые первые)."""
|
||
query = """
|
||
SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
|
||
FROM blacklist
|
||
ORDER BY created_at DESC
|
||
LIMIT ? OFFSET ?
|
||
"""
|
||
rows = await self._execute_query_with_result(query, (limit, offset))
|
||
|
||
users = []
|
||
for row in rows:
|
||
users.append(
|
||
BlacklistUser(
|
||
user_id=row[0],
|
||
message_for_user=row[1],
|
||
date_to_unban=row[2],
|
||
created_at=row[3],
|
||
ban_author=row[4] if len(row) > 4 else None,
|
||
)
|
||
)
|
||
|
||
self.logger.info(
|
||
f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {len(users)}"
|
||
)
|
||
return users
|
||
|
||
async def get_all_users_no_limit(self) -> List[BlacklistUser]:
|
||
"""Возвращает список всех пользователей в черном списке без лимитов, отсортированных по дате бана (новые первые)."""
|
||
query = """
|
||
SELECT user_id, message_for_user, date_to_unban, created_at, ban_author
|
||
FROM blacklist
|
||
ORDER BY created_at DESC
|
||
"""
|
||
rows = await self._execute_query_with_result(query)
|
||
|
||
users = []
|
||
for row in rows:
|
||
users.append(
|
||
BlacklistUser(
|
||
user_id=row[0],
|
||
message_for_user=row[1],
|
||
date_to_unban=row[2],
|
||
created_at=row[3],
|
||
ban_author=row[4] if len(row) > 4 else None,
|
||
)
|
||
)
|
||
|
||
self.logger.info(
|
||
f"Получен список всех пользователей в черном списке: {len(users)}"
|
||
)
|
||
return users
|
||
|
||
async def get_users_for_unblock_today(
|
||
self, current_timestamp: int
|
||
) -> Dict[int, int]:
|
||
"""Возвращает список пользователей, у которых истек срок блокировки."""
|
||
query = "SELECT user_id FROM blacklist WHERE date_to_unban IS NOT NULL AND date_to_unban <= ?"
|
||
rows = await self._execute_query_with_result(query, (current_timestamp,))
|
||
|
||
users = {user_id: user_id for user_id, in rows}
|
||
self.logger.info(f"Получен список пользователей для разблокировки: {users}")
|
||
return users
|
||
|
||
async def get_count(self) -> int:
|
||
"""Получение количества пользователей в черном списке."""
|
||
query = "SELECT COUNT(*) FROM blacklist"
|
||
rows = await self._execute_query_with_result(query)
|
||
row = rows[0] if rows else None
|
||
return row[0] if row else 0
|