Files
telegram-helper-bot/.cursor/rules/database-patterns.md
Andrey e2b1353408 feat: добавлена система миграций БД и CI/CD пайплайны
- Создана система отслеживания миграций (MigrationRepository, таблица migrations)
- Добавлен скрипт apply_migrations.py для автоматического применения миграций
- Созданы CI/CD пайплайны (.github/workflows/ci.yml, deploy.yml)
- Обновлена документация по миграциям в database-patterns.md
- Миграции применяются автоматически при деплое в продакшн
2026-01-25 23:17:09 +03:00

8.7 KiB
Raw Permalink Blame History

description, globs
description globs
Паттерны работы с базой данных, репозитории и модели
database/**/*.py
**/repositories/*.py

Паттерны работы с базой данных

Repository Pattern

Все операции с БД выполняются через репозитории. Каждый репозиторий:

  • Наследуется от DatabaseConnection из database/base.py
  • Работает с одной сущностью (User, Post, Blacklist, etc.)
  • Содержит методы для CRUD операций
  • Использует асинхронные методы _execute_query() и _execute_query_with_result()

Структура репозитория

from database.base import DatabaseConnection
from database.models import User

class UserRepository(DatabaseConnection):
    """Репозиторий для работы с пользователями."""
    
    async def create_tables(self):
        """Создание таблицы пользователей."""
        query = '''
            CREATE TABLE IF NOT EXISTS our_users (
                user_id INTEGER NOT NULL PRIMARY KEY,
                ...
            )
        '''
        await self._execute_query(query)
        self.logger.info("Таблица пользователей создана")
    
    async def add_user(self, user: User) -> None:
        """Добавление нового пользователя."""
        query = "INSERT OR IGNORE INTO our_users (...) VALUES (...)"
        params = (...)
        await self._execute_query(query, params)
        self.logger.info(f"Пользователь добавлен: {user.user_id}")
    
    async def get_user_by_id(self, user_id: int) -> Optional[User]:
        """Получение пользователя по ID."""
        query = "SELECT * FROM our_users WHERE user_id = ?"
        rows = await self._execute_query_with_result(query, (user_id,))
        # Преобразование row в модель User
        ...

Модели данных

  • Модели определены в database/models.py
  • Используются dataclasses или простые классы
  • Модели передаются между слоями (Repository → Service → Handler)

Работа с БД

AsyncBotDB

  • Основной интерфейс для работы с БД
  • Использует RepositoryFactory для доступа к репозиториям
  • Методы делегируют вызовы соответствующим репозиториям

DatabaseConnection

  • Базовый класс для всех репозиториев
  • Предоставляет методы:
    • _get_connection() - получение соединения
    • _execute_query() - выполнение запроса без результата
    • _execute_query_with_result() - выполнение запроса с результатом
  • Автоматически управляет соединениями (открытие/закрытие)
  • Настраивает PRAGMA для оптимизации SQLite

Важные правила

  1. Всегда используйте параметризованные запросы для защиты от SQL injection:

    # ✅ Правильно
    query = "SELECT * FROM users WHERE user_id = ?"
    await self._execute_query_with_result(query, (user_id,))
    
    # ❌ Неправильно
    query = f"SELECT * FROM users WHERE user_id = {user_id}"
    
  2. Логируйте важные операции:

    self.logger.info(f"Пользователь добавлен: {user_id}")
    self.logger.error(f"Ошибка при добавлении пользователя: {e}")
    
  3. Используйте транзакции для множественных операций (если нужно):

    async with await self._get_connection() as conn:
        await conn.execute(...)
        await conn.execute(...)
        await conn.commit()
    
  4. Обрабатывайте None при получении данных:

    rows = await self._execute_query_with_result(query, params)
    if not rows:
        return None
    row = rows[0]
    

RepositoryFactory

  • Создает и кэширует экземпляры репозиториев
  • Доступ через свойства: factory.users, factory.posts, etc.
  • Используется в AsyncBotDB для доступа к репозиториям

Миграции

Обзор

Система миграций автоматически отслеживает и применяет изменения схемы БД. Миграции хранятся в scripts/ и применяются автоматически при деплое.

Создание миграции

  1. Создайте файл в scripts/ с понятным именем (например, add_user_email_column.py)

  2. Обязательные требования:

    • Функция async def main(db_path: str)
    • Использует aiosqlite для работы с БД
    • Идемпотентна - можно запускать несколько раз без ошибок
    • Проверяет текущее состояние перед применением изменений
  3. Пример структуры:

#!/usr/bin/env python3
import argparse
import asyncio
import os
import sys
from pathlib import Path

project_root = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(project_root))

import aiosqlite
from logs.custom_logger import logger

DEFAULT_DB_PATH = "database/tg-bot-database.db"


async def main(db_path: str) -> None:
    """Основная функция миграции."""
    db_path = os.path.abspath(db_path)
    if not os.path.exists(db_path):
        logger.error(f"База данных не найдена: {db_path}")
        return
    
    async with aiosqlite.connect(db_path) as conn:
        await conn.execute("PRAGMA foreign_keys = ON")
        
        # Проверяем текущее состояние
        cursor = await conn.execute("PRAGMA table_info(users)")
        columns = await cursor.fetchall()
        
        # Проверяем, нужно ли применять изменения
        column_exists = any(col[1] == "email" for col in columns)
        
        if not column_exists:
            await conn.execute("ALTER TABLE users ADD COLUMN email TEXT")
            await conn.commit()
            logger.info("Колонка email добавлена")
        else:
            logger.info("Колонка email уже существует")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Добавление колонки email")
    parser.add_argument(
        "--db",
        default=os.environ.get("DATABASE_PATH", DEFAULT_DB_PATH),
        help="Путь к БД",
    )
    args = parser.parse_args()
    asyncio.run(main(args.db))

Применение миграций

Локально:

python3 scripts/apply_migrations.py --dry-run  # проверить
python3 scripts/apply_migrations.py           # применить

В продакшене: Применяются автоматически при деплое через CI/CD (перед перезапуском контейнера).

Важные правила

  1. Идемпотентность - всегда проверяйте состояние перед изменением:

    # ✅ Правильно
    cursor = await conn.execute("PRAGMA table_info(users)")
    columns = await cursor.fetchall()
    if not any(col[1] == "email" for col in columns):
        await conn.execute("ALTER TABLE users ADD COLUMN email TEXT")
    
    # ❌ Неправильно - упадет при повторном запуске
    await conn.execute("ALTER TABLE users ADD COLUMN email TEXT")
    
  2. Порядок применения - миграции применяются в алфавитном порядке по имени файла

  3. Исключения - следующие скрипты не считаются миграциями:

    • apply_migrations.py, backfill_migrations.py, test_s3_connection.py, voice_cleanup.py

Регистрация существующих миграций

Если миграции уже применены, но не зарегистрированы:

python3 scripts/backfill_migrations.py  # зарегистрировать все существующие