- Создана система отслеживания миграций (MigrationRepository, таблица migrations) - Добавлен скрипт apply_migrations.py для автоматического применения миграций - Созданы CI/CD пайплайны (.github/workflows/ci.yml, deploy.yml) - Обновлена документация по миграциям в database-patterns.md - Миграции применяются автоматически при деплое в продакшн
8.7 KiB
description, globs
| description | globs | ||
|---|---|---|---|
| Паттерны работы с базой данных, репозитории и модели |
|
Паттерны работы с базой данных
Repository Pattern
Все операции с БД выполняются через репозитории. Каждый репозиторий:
- Наследуется от
DatabaseConnectionизdatabase/base.py - Работает с одной сущностью (User, Post, Blacklist, etc.)
- Содержит методы для CRUD операций
- Использует асинхронные методы
_execute_query()и_execute_query_with_result()
Структура репозитория
from database.base import DatabaseConnection
from database.models import User
class UserRepository(DatabaseConnection):
"""Репозиторий для работы с пользователями."""
async def create_tables(self):
"""Создание таблицы пользователей."""
query = '''
CREATE TABLE IF NOT EXISTS our_users (
user_id INTEGER NOT NULL PRIMARY KEY,
...
)
'''
await self._execute_query(query)
self.logger.info("Таблица пользователей создана")
async def add_user(self, user: User) -> None:
"""Добавление нового пользователя."""
query = "INSERT OR IGNORE INTO our_users (...) VALUES (...)"
params = (...)
await self._execute_query(query, params)
self.logger.info(f"Пользователь добавлен: {user.user_id}")
async def get_user_by_id(self, user_id: int) -> Optional[User]:
"""Получение пользователя по ID."""
query = "SELECT * FROM our_users WHERE user_id = ?"
rows = await self._execute_query_with_result(query, (user_id,))
# Преобразование row в модель User
...
Модели данных
- Модели определены в
database/models.py - Используются dataclasses или простые классы
- Модели передаются между слоями (Repository → Service → Handler)
Работа с БД
AsyncBotDB
- Основной интерфейс для работы с БД
- Использует
RepositoryFactoryдля доступа к репозиториям - Методы делегируют вызовы соответствующим репозиториям
DatabaseConnection
- Базовый класс для всех репозиториев
- Предоставляет методы:
_get_connection()- получение соединения_execute_query()- выполнение запроса без результата_execute_query_with_result()- выполнение запроса с результатом
- Автоматически управляет соединениями (открытие/закрытие)
- Настраивает PRAGMA для оптимизации SQLite
Важные правила
-
Всегда используйте параметризованные запросы для защиты от SQL injection:
# ✅ Правильно query = "SELECT * FROM users WHERE user_id = ?" await self._execute_query_with_result(query, (user_id,)) # ❌ Неправильно query = f"SELECT * FROM users WHERE user_id = {user_id}" -
Логируйте важные операции:
self.logger.info(f"Пользователь добавлен: {user_id}") self.logger.error(f"Ошибка при добавлении пользователя: {e}") -
Используйте транзакции для множественных операций (если нужно):
async with await self._get_connection() as conn: await conn.execute(...) await conn.execute(...) await conn.commit() -
Обрабатывайте None при получении данных:
rows = await self._execute_query_with_result(query, params) if not rows: return None row = rows[0]
RepositoryFactory
- Создает и кэширует экземпляры репозиториев
- Доступ через свойства:
factory.users,factory.posts, etc. - Используется в
AsyncBotDBдля доступа к репозиториям
Миграции
Обзор
Система миграций автоматически отслеживает и применяет изменения схемы БД. Миграции хранятся в scripts/ и применяются автоматически при деплое.
Создание миграции
-
Создайте файл в
scripts/с понятным именем (например,add_user_email_column.py) -
Обязательные требования:
- Функция
async def main(db_path: str) - Использует
aiosqliteдля работы с БД - Идемпотентна - можно запускать несколько раз без ошибок
- Проверяет текущее состояние перед применением изменений
- Функция
-
Пример структуры:
#!/usr/bin/env python3
import argparse
import asyncio
import os
import sys
from pathlib import Path
project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))
import aiosqlite
from logs.custom_logger import logger
DEFAULT_DB_PATH = "database/tg-bot-database.db"
async def main(db_path: str) -> None:
"""Основная функция миграции."""
db_path = os.path.abspath(db_path)
if not os.path.exists(db_path):
logger.error(f"База данных не найдена: {db_path}")
return
async with aiosqlite.connect(db_path) as conn:
await conn.execute("PRAGMA foreign_keys = ON")
# Проверяем текущее состояние
cursor = await conn.execute("PRAGMA table_info(users)")
columns = await cursor.fetchall()
# Проверяем, нужно ли применять изменения
column_exists = any(col[1] == "email" for col in columns)
if not column_exists:
await conn.execute("ALTER TABLE users ADD COLUMN email TEXT")
await conn.commit()
logger.info("Колонка email добавлена")
else:
logger.info("Колонка email уже существует")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Добавление колонки email")
parser.add_argument(
"--db",
default=os.environ.get("DATABASE_PATH", DEFAULT_DB_PATH),
help="Путь к БД",
)
args = parser.parse_args()
asyncio.run(main(args.db))
Применение миграций
Локально:
python3 scripts/apply_migrations.py --dry-run # проверить
python3 scripts/apply_migrations.py # применить
В продакшене: Применяются автоматически при деплое через CI/CD (перед перезапуском контейнера).
Важные правила
-
Идемпотентность - всегда проверяйте состояние перед изменением:
# ✅ Правильно cursor = await conn.execute("PRAGMA table_info(users)") columns = await cursor.fetchall() if not any(col[1] == "email" for col in columns): await conn.execute("ALTER TABLE users ADD COLUMN email TEXT") # ❌ Неправильно - упадет при повторном запуске await conn.execute("ALTER TABLE users ADD COLUMN email TEXT") -
Порядок применения - миграции применяются в алфавитном порядке по имени файла
-
Исключения - следующие скрипты не считаются миграциями:
apply_migrations.py,backfill_migrations.py,test_s3_connection.py,voice_cleanup.py
Регистрация существующих миграций
Если миграции уже применены, но не зарегистрированы:
python3 scripts/backfill_migrations.py # зарегистрировать все существующие