--- description: "Паттерны работы с базой данных, репозитории и модели" globs: ["database/**/*.py", "**/repositories/*.py"] --- # Паттерны работы с базой данных ## Repository Pattern Все операции с БД выполняются через репозитории. Каждый репозиторий: - Наследуется от `DatabaseConnection` из `database/base.py` - Работает с одной сущностью (User, Post, Blacklist, etc.) - Содержит методы для CRUD операций - Использует асинхронные методы `_execute_query()` и `_execute_query_with_result()` ### Структура репозитория ```python from database.base import DatabaseConnection from database.models import User class UserRepository(DatabaseConnection): """Репозиторий для работы с пользователями.""" async def create_tables(self): """Создание таблицы пользователей.""" query = ''' CREATE TABLE IF NOT EXISTS our_users ( user_id INTEGER NOT NULL PRIMARY KEY, ... ) ''' await self._execute_query(query) self.logger.info("Таблица пользователей создана") async def add_user(self, user: User) -> None: """Добавление нового пользователя.""" query = "INSERT OR IGNORE INTO our_users (...) VALUES (...)" params = (...) await self._execute_query(query, params) self.logger.info(f"Пользователь добавлен: {user.user_id}") async def get_user_by_id(self, user_id: int) -> Optional[User]: """Получение пользователя по ID.""" query = "SELECT * FROM our_users WHERE user_id = ?" rows = await self._execute_query_with_result(query, (user_id,)) # Преобразование row в модель User ... ``` ## Модели данных - Модели определены в `database/models.py` - Используются dataclasses или простые классы - Модели передаются между слоями (Repository → Service → Handler) ## Работа с БД ### AsyncBotDB - Основной интерфейс для работы с БД - Использует `RepositoryFactory` для доступа к репозиториям - Методы делегируют вызовы соответствующим репозиториям ### DatabaseConnection - Базовый класс для всех репозиториев - Предоставляет методы: - `_get_connection()` - получение соединения - `_execute_query()` - выполнение запроса без результата - `_execute_query_with_result()` - выполнение запроса с результатом - Автоматически управляет соединениями (открытие/закрытие) - Настраивает PRAGMA для оптимизации SQLite ### Важные правила 1. **Всегда используйте параметризованные запросы** для защиты от SQL injection: ```python # ✅ Правильно query = "SELECT * FROM users WHERE user_id = ?" await self._execute_query_with_result(query, (user_id,)) # ❌ Неправильно query = f"SELECT * FROM users WHERE user_id = {user_id}" ``` 2. **Логируйте важные операции**: ```python self.logger.info(f"Пользователь добавлен: {user_id}") self.logger.error(f"Ошибка при добавлении пользователя: {e}") ``` 3. **Используйте транзакции** для множественных операций (если нужно): ```python async with await self._get_connection() as conn: await conn.execute(...) await conn.execute(...) await conn.commit() ``` 4. **Обрабатывайте None** при получении данных: ```python rows = await self._execute_query_with_result(query, params) if not rows: return None row = rows[0] ``` ## RepositoryFactory - Создает и кэширует экземпляры репозиториев - Доступ через свойства: `factory.users`, `factory.posts`, etc. - Используется в `AsyncBotDB` для доступа к репозиториям ## Миграции ### Обзор Система миграций автоматически отслеживает и применяет изменения схемы БД. Миграции хранятся в `scripts/` и применяются автоматически при деплое. ### Создание миграции 1. **Создайте файл** в `scripts/` с понятным именем (например, `add_user_email_column.py`) 2. **Обязательные требования:** - Функция `async def main(db_path: str)` - Использует `aiosqlite` для работы с БД - **Идемпотентна** - можно запускать несколько раз без ошибок - Проверяет текущее состояние перед применением изменений 3. **Пример структуры:** ```python #!/usr/bin/env python3 import argparse import asyncio import os import sys from pathlib import Path project_root = Path(__file__).resolve().parent.parent sys.path.insert(0, str(project_root)) import aiosqlite from logs.custom_logger import logger DEFAULT_DB_PATH = "database/tg-bot-database.db" async def main(db_path: str) -> None: """Основная функция миграции.""" db_path = os.path.abspath(db_path) if not os.path.exists(db_path): logger.error(f"База данных не найдена: {db_path}") return async with aiosqlite.connect(db_path) as conn: await conn.execute("PRAGMA foreign_keys = ON") # Проверяем текущее состояние cursor = await conn.execute("PRAGMA table_info(users)") columns = await cursor.fetchall() # Проверяем, нужно ли применять изменения column_exists = any(col[1] == "email" for col in columns) if not column_exists: await conn.execute("ALTER TABLE users ADD COLUMN email TEXT") await conn.commit() logger.info("Колонка email добавлена") else: logger.info("Колонка email уже существует") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Добавление колонки email") parser.add_argument( "--db", default=os.environ.get("DATABASE_PATH", DEFAULT_DB_PATH), help="Путь к БД", ) args = parser.parse_args() asyncio.run(main(args.db)) ``` ### Применение миграций **Локально:** ```bash python3 scripts/apply_migrations.py --dry-run # проверить python3 scripts/apply_migrations.py # применить ``` **В продакшене:** Применяются автоматически при деплое через CI/CD (перед перезапуском контейнера). ### Важные правила 1. **Идемпотентность** - всегда проверяйте состояние перед изменением: ```python # ✅ Правильно cursor = await conn.execute("PRAGMA table_info(users)") columns = await cursor.fetchall() if not any(col[1] == "email" for col in columns): await conn.execute("ALTER TABLE users ADD COLUMN email TEXT") # ❌ Неправильно - упадет при повторном запуске await conn.execute("ALTER TABLE users ADD COLUMN email TEXT") ``` 2. **Порядок применения** - миграции применяются в алфавитном порядке по имени файла 3. **Исключения** - следующие скрипты не считаются миграциями: - `apply_migrations.py`, `backfill_migrations.py`, `test_s3_connection.py`, `voice_cleanup.py` ### Регистрация существующих миграций Если миграции уже применены, но не зарегистрированы: ```bash python3 scripts/backfill_migrations.py # зарегистрировать все существующие ```