Files
AnonBot/database/crud.py
Andrey 50be010026 realize full project
all function is working
added test (empty files for plan)
database schema
business logic
rate limitting
logging decorators
2025-09-06 18:34:57 +03:00

791 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CRUD операции для работы с базой данных
"""
import asyncio
from contextlib import asynccontextmanager
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, List, Optional, Tuple
import aiosqlite
from config.constants import DEFAULT_CONNECTION_POOL_SIZE, DATABASE_TIMEOUT, SQLITE_CACHE_SIZE, EMPTY_VALUES
from models.question import Question, QuestionStatus
from models.user import User
from models.user_block import UserBlock
from models.user_settings import UserSettings
from services.infrastructure.logger import get_logger
logger = get_logger(__name__)
class ConnectionPool:
"""Пул соединений для SQLite"""
def __init__(self, db_path: str, pool_size: int = DEFAULT_CONNECTION_POOL_SIZE):
self.db_path = db_path
self.pool_size = pool_size
self._pool = asyncio.Queue(maxsize=pool_size)
self._created_connections = 0
self._lock = asyncio.Lock()
async def _create_connection(self):
"""Создание нового соединения"""
conn = await aiosqlite.connect(
self.db_path,
timeout=DATABASE_TIMEOUT, # Увеличиваем timeout
isolation_level=None # Автокоммит для лучшей производительности
)
# Настройки для лучшей производительности
await conn.execute("PRAGMA journal_mode=WAL")
await conn.execute("PRAGMA synchronous=NORMAL")
await conn.execute(f"PRAGMA cache_size={SQLITE_CACHE_SIZE}")
await conn.execute("PRAGMA temp_store=MEMORY")
return conn
async def get_connection(self):
"""Получение соединения из пула"""
try:
# Пытаемся получить соединение из пула
return self._pool.get_nowait()
except asyncio.QueueEmpty:
# Если пул пуст, создаем новое соединение
async with self._lock:
if self._created_connections < self.pool_size:
self._created_connections += 1
return await self._create_connection()
else:
# Ждем освобождения соединения
return await self._pool.get()
async def return_connection(self, conn):
"""Возврат соединения в пул"""
try:
self._pool.put_nowait(conn)
except asyncio.QueueFull:
# Если пул полон, закрываем соединение
await conn.close()
async with self._lock:
self._created_connections -= 1
async def close_all(self):
"""Закрытие всех соединений"""
while not self._pool.empty():
conn = await self._pool.get()
await conn.close()
self._created_connections = 0
# Глобальный пул соединений
_connection_pools = {}
def get_connection_pool(db_path: str, pool_size: int = DEFAULT_CONNECTION_POOL_SIZE) -> ConnectionPool:
"""Получение пула соединений для базы данных"""
if db_path not in _connection_pools:
_connection_pools[db_path] = ConnectionPool(db_path, pool_size)
return _connection_pools[db_path]
class BaseCRUD:
"""Базовый класс для CRUD операций"""
def __init__(self, db_path: str):
self.db_path = db_path
self.pool = get_connection_pool(db_path)
@asynccontextmanager
async def get_connection(self):
"""Контекстный менеджер для подключения к БД с использованием пула"""
conn = await self.pool.get_connection()
try:
yield conn
finally:
await self.pool.return_connection(conn)
def _parse_datetime(self, date_str) -> Optional[datetime]:
"""Безопасный парсинг datetime из строки"""
if not date_str or date_str in EMPTY_VALUES:
return None
try:
return datetime.fromisoformat(date_str)
except (ValueError, TypeError):
return None
class UserCRUD(BaseCRUD):
"""CRUD операции для пользователей"""
async def create(self, user: User) -> User:
"""Создание нового пользователя"""
logger.info(f"👤 Создание пользователя: {user.telegram_id} ({user.first_name})")
async with self.get_connection() as conn:
cursor = await conn.execute("""
INSERT INTO users
(telegram_id, username, first_name, last_name, chat_id, profile_link,
is_active, is_superuser, created_at, updated_at, banned_until, ban_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
user.telegram_id, user.username, user.first_name, user.last_name,
user.chat_id, user.profile_link, user.is_active, user.is_superuser,
user.created_at.isoformat() if user.created_at else None,
user.updated_at.isoformat() if user.updated_at else None,
user.banned_until.isoformat() if user.banned_until else None,
user.ban_reason
))
user.id = cursor.lastrowid
await conn.commit()
logger.info(f"✅ Пользователь создан с ID: {user.id}")
return user
async def create_batch(self, users: List[User]) -> List[User]:
"""Создание нескольких пользователей за одну транзакцию (batch операция)"""
if not users:
return []
logger.info(f"📦 Создание {len(users)} пользователей batch операцией")
async with self.get_connection() as conn:
try:
# Подготавливаем данные для batch вставки
batch_data = []
for user in users:
batch_data.append((
user.telegram_id, user.username, user.first_name, user.last_name,
user.chat_id, user.profile_link, user.is_active, user.is_superuser,
user.created_at.isoformat() if user.created_at else None,
user.updated_at.isoformat() if user.updated_at else None,
user.banned_until.isoformat() if user.banned_until else None,
user.ban_reason
))
# Выполняем batch вставку
cursor = await conn.executemany("""
INSERT INTO users
(telegram_id, username, first_name, last_name, chat_id, profile_link,
is_active, is_superuser, created_at, updated_at, banned_until, ban_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", batch_data)
# Обновляем ID для всех созданных пользователей
first_id = cursor.lastrowid - len(users) + 1
for i, user in enumerate(users):
user.id = first_id + i
await conn.commit()
logger.info(f"✅ Создано {len(users)} пользователей batch операцией")
return users
except Exception as e:
await conn.rollback()
logger.error(f"❌ Ошибка при batch создании пользователей: {e}")
raise
async def get_by_telegram_id(self, telegram_id: int) -> Optional[User]:
"""Получение пользователя по Telegram ID"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT * FROM users WHERE telegram_id = ?
""", (telegram_id,)) as cursor:
row = await cursor.fetchone()
if row:
return self._row_to_user(row)
return None
async def get_by_profile_link(self, profile_link: str) -> Optional[User]:
"""Получение пользователя по ссылке профиля"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT * FROM users WHERE profile_link = ?
""", (profile_link,)) as cursor:
row = await cursor.fetchone()
if row:
return self._row_to_user(row)
return None
async def update(self, user: User) -> User:
"""Обновление пользователя"""
async with self.get_connection() as conn:
await conn.execute("""
UPDATE users SET
username = ?, first_name = ?, last_name = ?, chat_id = ?,
profile_link = ?, is_active = ?, is_superuser = ?, updated_at = ?,
banned_until = ?, ban_reason = ?
WHERE telegram_id = ?
""", (
user.username, user.first_name, user.last_name, user.chat_id,
user.profile_link, user.is_active, user.is_superuser,
user.updated_at.isoformat() if user.updated_at else None,
user.banned_until.isoformat() if user.banned_until else None,
user.ban_reason, user.telegram_id
))
await conn.commit()
return user
async def delete(self, telegram_id: int) -> bool:
"""Удаление пользователя"""
async with self.get_connection() as conn:
cursor = await conn.execute("""
DELETE FROM users WHERE telegram_id = ?
""", (telegram_id,))
await conn.commit()
return cursor.rowcount > 0
async def get_all(self, limit: int = 100, offset: int = 0) -> List[User]:
"""Получение всех пользователей"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT * FROM users
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (limit, offset)) as cursor:
rows = await cursor.fetchall()
return [self._row_to_user(row) for row in rows]
async def get_all_users_cursor(
self,
last_id: int,
last_created_at: str,
limit: int,
direction: str = "desc"
) -> List[User]:
"""Получение пользователей с cursor-based пагинацией"""
async with self.get_connection() as conn:
if direction == "desc":
query = """
SELECT * FROM users
WHERE (created_at < ? OR (created_at = ? AND id < ?))
ORDER BY created_at DESC, id DESC
LIMIT ?
"""
params = [last_created_at, last_created_at, last_id, limit]
else:
query = """
SELECT * FROM users
WHERE (created_at > ? OR (created_at = ? AND id > ?))
ORDER BY created_at ASC, id ASC
LIMIT ?
"""
params = [last_created_at, last_created_at, last_id, limit]
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [self._row_to_user(row) for row in rows]
async def get_all_users_asc(self, limit: int = 100, offset: int = 0) -> List[User]:
"""Получение всех пользователей в порядке возрастания"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT * FROM users
ORDER BY created_at ASC
LIMIT ? OFFSET ?
""", (limit, offset)) as cursor:
rows = await cursor.fetchall()
return [self._row_to_user(row) for row in rows]
async def get_stats(self) -> Dict[str, Any]:
"""Получение статистики пользователей"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT
COUNT(*) as total_users,
COUNT(CASE WHEN is_active = TRUE THEN 1 END) as active_users,
COUNT(CASE WHEN is_superuser = TRUE THEN 1 END) as superusers,
COUNT(CASE WHEN created_at > datetime('now', '-7 days') THEN 1 END) as new_users_week,
COUNT(CASE WHEN created_at > datetime('now', '-1 day') THEN 1 END) as new_users_today,
COUNT(CASE WHEN banned_until IS NOT NULL AND banned_until > datetime('now') THEN 1 END) as banned_users
FROM users
""") as cursor:
row = await cursor.fetchone()
return {
'total_users': row[0],
'active_users': row[1],
'superusers': row[2],
'new_users_week': row[3],
'new_users_today': row[4],
'banned_users': row[5]
}
def _row_to_user(self, row) -> User:
"""Преобразование строки БД в объект User"""
return User(
id=row[0],
telegram_id=row[1],
username=row[2],
first_name=row[3],
last_name=row[4],
chat_id=row[5],
profile_link=row[6],
is_active=bool(row[7]),
is_superuser=bool(row[12]), # Исправлено: is_superuser находится на позиции 12
created_at=self._parse_datetime(row[8]),
updated_at=self._parse_datetime(row[9]),
banned_until=self._parse_datetime(row[10]),
ban_reason=row[11]
)
class QuestionCRUD(BaseCRUD):
"""CRUD операции для вопросов"""
async def create(self, question: Question) -> Question:
"""Создание нового вопроса"""
logger.info(f"❓ Создание вопроса от {question.from_user_id} к {question.to_user_id}")
async with self.get_connection() as conn:
cursor = await conn.execute("""
INSERT INTO questions
(from_user_id, to_user_id, message_text, answer_text, is_anonymous,
message_id, created_at, answered_at, is_read, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
question.from_user_id, question.to_user_id, question.message_text,
question.answer_text, question.is_anonymous, question.message_id,
question.created_at.isoformat() if question.created_at else None,
question.answered_at.isoformat() if question.answered_at else None,
question.is_read, question.status.value
))
question.id = cursor.lastrowid
await conn.commit()
logger.info(f"✅ Вопрос создан с ID: {question.id}")
return question
async def create_batch(self, questions: List[Question]) -> List[Question]:
"""Создание нескольких вопросов за одну транзакцию (batch операция)"""
if not questions:
return []
logger.info(f"📦 Создание {len(questions)} вопросов batch операцией")
async with self.get_connection() as conn:
try:
# Подготавливаем данные для batch вставки
batch_data = []
for question in questions:
batch_data.append((
question.from_user_id, question.to_user_id, question.message_text,
question.answer_text, question.is_anonymous, question.message_id,
question.created_at.isoformat() if question.created_at else None,
question.answered_at.isoformat() if question.answered_at else None,
question.is_read, question.status.value
))
# Выполняем batch вставку
cursor = await conn.executemany("""
INSERT INTO questions
(from_user_id, to_user_id, message_text, answer_text, is_anonymous,
message_id, created_at, answered_at, is_read, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", batch_data)
# Обновляем ID для всех созданных вопросов
first_id = cursor.lastrowid - len(questions) + 1
for i, question in enumerate(questions):
question.id = first_id + i
await conn.commit()
logger.info(f"✅ Создано {len(questions)} вопросов batch операцией")
return questions
except Exception as e:
await conn.rollback()
logger.error(f"❌ Ошибка при batch создании вопросов: {e}")
raise
async def get_by_id(self, question_id: int) -> Optional[Question]:
"""Получение вопроса по ID"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT * FROM questions WHERE id = ?
""", (question_id,)) as cursor:
row = await cursor.fetchone()
if row:
return self._row_to_question(row)
return None
async def get_by_to_user(self, to_user_id: int, status: Optional[QuestionStatus] = None,
limit: int = 50, offset: int = 0) -> List[Question]:
"""Получение вопросов для пользователя (оптимизированная версия с JOIN)"""
async with self.get_connection() as conn:
query = """
SELECT
q.id, q.from_user_id, q.to_user_id, q.message_text, q.answer_text,
q.is_anonymous, q.message_id, q.created_at, q.answered_at,
q.is_read, q.status
FROM questions q
WHERE q.to_user_id = ?
"""
params = [to_user_id]
if status:
query += " AND q.status = ?"
params.append(status.value)
query += " ORDER BY q.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [self._row_to_question(row) for row in rows]
async def get_by_to_user_with_authors(self, to_user_id: int, status: Optional[QuestionStatus] = None,
limit: int = 50, offset: int = 0) -> List[Tuple[Question, Optional[User]]]:
"""Получение вопросов для пользователя с информацией об авторах (оптимизированный запрос)"""
async with self.get_connection() as conn:
query = """
SELECT
q.*,
u.id as author_id,
u.telegram_id as author_telegram_id,
u.username as author_username,
u.first_name as author_first_name,
u.last_name as author_last_name,
u.chat_id as author_chat_id,
u.profile_link as author_profile_link,
u.is_active as author_is_active,
u.is_superuser as author_is_superuser,
u.created_at as author_created_at,
u.updated_at as author_updated_at,
u.banned_until as author_banned_until,
u.ban_reason as author_ban_reason
FROM questions q
LEFT JOIN users u ON q.from_user_id = u.telegram_id
WHERE q.to_user_id = ?
"""
params = [to_user_id]
if status:
query += " AND q.status = ?"
params.append(status.value)
query += " ORDER BY q.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
result = []
for row in rows:
try:
question = self._row_to_question(row[:11]) # Первые 11 колонок - это вопрос
if question is None:
print(f"Предупреждение: вопрос не создан для строки {row[:11]}")
continue
except Exception as e:
print(f"Ошибка при создании вопроса из строки {row[:11]}: {e}")
continue
author = None
if row[11]: # Если есть author_id
author = User(
id=row[11],
telegram_id=row[12],
username=row[13],
first_name=row[14] or "",
last_name=row[15],
chat_id=row[16],
profile_link=row[17] or "",
is_active=bool(row[18]),
is_superuser=bool(row[19]),
created_at=self._parse_datetime(row[20]),
updated_at=self._parse_datetime(row[21]),
banned_until=self._parse_datetime(row[22]),
ban_reason=row[23]
)
result.append((question, author))
return result
async def get_by_to_user_cursor(
self,
to_user_id: int,
last_id: int,
last_created_at: str,
limit: int,
direction: str = "desc"
) -> List[Question]:
"""Получение вопросов с cursor-based пагинацией"""
async with self.get_connection() as conn:
if direction == "desc":
query = """
SELECT
q.id, q.from_user_id, q.to_user_id, q.message_text, q.answer_text,
q.is_anonymous, q.message_id, q.created_at, q.answered_at,
q.is_read, q.status
FROM questions q
WHERE q.to_user_id = ?
AND (q.created_at < ? OR (q.created_at = ? AND q.id < ?))
ORDER BY q.created_at DESC, q.id DESC
LIMIT ?
"""
params = [to_user_id, last_created_at, last_created_at, last_id, limit]
else:
query = """
SELECT
q.id, q.from_user_id, q.to_user_id, q.message_text, q.answer_text,
q.is_anonymous, q.message_id, q.created_at, q.answered_at,
q.is_read, q.status
FROM questions q
WHERE q.to_user_id = ?
AND (q.created_at > ? OR (q.created_at = ? AND q.id > ?))
ORDER BY q.created_at ASC, q.id ASC
LIMIT ?
"""
params = [to_user_id, last_created_at, last_created_at, last_id, limit]
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [self._row_to_question(row) for row in rows]
async def get_by_to_user_asc(
self,
to_user_id: int,
status: Optional[QuestionStatus] = None,
limit: int = 50,
offset: int = 0
) -> List[Question]:
"""Получение вопросов для пользователя в порядке возрастания"""
async with self.get_connection() as conn:
query = """
SELECT
q.id, q.from_user_id, q.to_user_id, q.message_text, q.answer_text,
q.is_anonymous, q.message_id, q.created_at, q.answered_at,
q.is_read, q.status
FROM questions q
WHERE q.to_user_id = ?
"""
params = [to_user_id]
if status:
query += " AND q.status = ?"
params.append(status.value)
query += " ORDER BY q.created_at ASC LIMIT ? OFFSET ?"
params.extend([limit, offset])
async with conn.execute(query, params) as cursor:
rows = await cursor.fetchall()
return [self._row_to_question(row) for row in rows]
async def update(self, question: Question) -> Question:
"""Обновление вопроса"""
logger.info(f"📝 Обновление вопроса {question.id} (статус: {question.status.value})")
async with self.get_connection() as conn:
await conn.execute("""
UPDATE questions SET
answer_text = ?, status = ?, answered_at = ?, is_read = ?
WHERE id = ?
""", (
question.answer_text, question.status.value,
question.answered_at.isoformat() if question.answered_at else None,
question.is_read, question.id
))
await conn.commit()
logger.info(f"✅ Вопрос {question.id} обновлен")
return question
async def delete(self, question_id: int) -> bool:
"""Удаление вопроса"""
async with self.get_connection() as conn:
cursor = await conn.execute("""
DELETE FROM questions WHERE id = ?
""", (question_id,))
await conn.commit()
return cursor.rowcount > 0
async def get_unread_count(self, to_user_id: int) -> int:
"""Получение количества непрочитанных вопросов"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT COUNT(*) FROM questions
WHERE to_user_id = ? AND is_read = FALSE AND status = 'pending'
""", (to_user_id,)) as cursor:
row = await cursor.fetchone()
return row[0]
async def get_count_by_to_user(self, to_user_id: int, status: Optional[QuestionStatus] = None) -> int:
"""Получение общего количества вопросов для пользователя"""
async with self.get_connection() as conn:
query = "SELECT COUNT(*) FROM questions WHERE to_user_id = ?"
params = [to_user_id]
if status:
query += " AND status = ?"
params.append(status.value)
async with conn.execute(query, params) as cursor:
row = await cursor.fetchone()
return row[0]
async def get_stats(self) -> Dict[str, Any]:
"""Получение статистики вопросов"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT
COUNT(*) as total_questions,
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending_questions,
COUNT(CASE WHEN status = 'answered' THEN 1 END) as answered_questions,
COUNT(CASE WHEN status = 'rejected' THEN 1 END) as rejected_questions,
COUNT(CASE WHEN created_at > datetime('now', '-1 day') THEN 1 END) as questions_today,
COUNT(CASE WHEN created_at > datetime('now', '-7 days') THEN 1 END) as questions_week,
COUNT(CASE WHEN is_anonymous = TRUE THEN 1 END) as anonymous_questions
FROM questions
""") as cursor:
row = await cursor.fetchone()
return {
'total_questions': row[0],
'pending_questions': row[1],
'answered_questions': row[2],
'rejected_questions': row[3],
'questions_today': row[4],
'questions_week': row[5],
'anonymous_questions': row[6]
}
def _row_to_question(self, row) -> Question:
"""Преобразование строки БД в объект Question"""
# Проверяем, что все необходимые поля присутствуют
if len(row) < 11:
raise ValueError(f"Недостаточно данных в строке БД: {len(row)} колонок, ожидается 11")
# Проверяем статус
status_value = row[10]
if status_value is None:
status = QuestionStatus.PENDING # Значение по умолчанию
else:
try:
status = QuestionStatus(status_value)
except ValueError:
print(f"Неизвестный статус вопроса: {status_value}, используем PENDING")
status = QuestionStatus.PENDING
question = Question(
id=row[0],
from_user_id=row[1],
to_user_id=row[2],
message_text=row[3],
answer_text=row[4],
is_anonymous=bool(row[5]),
message_id=row[6],
created_at=self._parse_datetime(row[7]),
answered_at=self._parse_datetime(row[8]),
is_read=bool(row[9]),
status=status
)
return question
class UserBlockCRUD(BaseCRUD):
"""CRUD операции для блокировок пользователей"""
async def create(self, user_block: UserBlock) -> UserBlock:
"""Создание блокировки"""
async with self.get_connection() as conn:
cursor = await conn.execute("""
INSERT INTO user_blocks (blocker_id, blocked_id, created_at)
VALUES (?, ?, ?)
""", (
user_block.blocker_id, user_block.blocked_id,
user_block.created_at.isoformat() if user_block.created_at else None
))
user_block.id = cursor.lastrowid
await conn.commit()
return user_block
async def is_blocked(self, blocker_id: int, blocked_id: int) -> bool:
"""Проверка, заблокирован ли пользователь"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT COUNT(*) FROM user_blocks
WHERE blocker_id = ? AND blocked_id = ?
""", (blocker_id, blocked_id)) as cursor:
row = await cursor.fetchone()
return row[0] > 0
async def get_blocked_users(self, blocker_id: int) -> List[int]:
"""Получение списка заблокированных пользователей"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT blocked_id FROM user_blocks WHERE blocker_id = ?
""", (blocker_id,)) as cursor:
rows = await cursor.fetchall()
return [row[0] for row in rows]
async def delete(self, blocker_id: int, blocked_id: int) -> bool:
"""Удаление блокировки"""
async with self.get_connection() as conn:
cursor = await conn.execute("""
DELETE FROM user_blocks
WHERE blocker_id = ? AND blocked_id = ?
""", (blocker_id, blocked_id))
await conn.commit()
return cursor.rowcount > 0
class UserSettingsCRUD(BaseCRUD):
"""CRUD операции для настроек пользователей"""
async def create(self, settings: UserSettings) -> UserSettings:
"""Создание настроек пользователя"""
async with self.get_connection() as conn:
cursor = await conn.execute("""
INSERT INTO user_settings
(user_id, allow_questions, notify_new_questions,
notify_answers, language, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
settings.user_id, settings.allow_questions,
settings.notify_new_questions, settings.notify_answers, settings.language,
settings.created_at.isoformat() if settings.created_at else None,
settings.updated_at.isoformat() if settings.updated_at else None
))
settings.id = cursor.lastrowid
await conn.commit()
return settings
async def get_by_user_id(self, user_id: int) -> Optional[UserSettings]:
"""Получение настроек пользователя"""
async with self.get_connection() as conn:
async with conn.execute("""
SELECT * FROM user_settings WHERE user_id = ?
""", (user_id,)) as cursor:
row = await cursor.fetchone()
if row:
return self._row_to_settings(row)
return None
async def update(self, settings: UserSettings) -> UserSettings:
"""Обновление настроек пользователя"""
async with self.get_connection() as conn:
await conn.execute("""
UPDATE user_settings SET
allow_questions = ?, notify_new_questions = ?,
notify_answers = ?, language = ?, updated_at = ?
WHERE user_id = ?
""", (
settings.allow_questions,
settings.notify_new_questions, settings.notify_answers,
settings.language, settings.updated_at.isoformat() if settings.updated_at else None,
settings.user_id
))
await conn.commit()
return settings
async def delete(self, user_id: int) -> bool:
"""Удаление настроек пользователя"""
async with self.get_connection() as conn:
cursor = await conn.execute("""
DELETE FROM user_settings WHERE user_id = ?
""", (user_id,))
await conn.commit()
return cursor.rowcount > 0
def _row_to_settings(self, row) -> UserSettings:
"""Преобразование строки БД в объект UserSettings"""
return UserSettings(
id=row[0],
user_id=row[1],
allow_questions=bool(row[2]),
notify_new_questions=bool(row[3]),
notify_answers=bool(row[4]),
language=row[5],
created_at=self._parse_datetime(row[6]),
updated_at=self._parse_datetime(row[7])
)