#!/usr/bin/env python3 """ Скрипт для автоматического применения миграций базы данных. Сканирует папку scripts/ и применяет все новые миграции, которые еще не были применены. """ import argparse import asyncio import importlib.util import os import subprocess import sys from pathlib import Path from typing import List, Tuple # Исключаем служебные скрипты из миграций EXCLUDED_SCRIPTS = { 'apply_migrations.py', 'test_s3_connection.py', 'voice_cleanup.py', } DEFAULT_DB_PATH = "database/tg-bot-database.db" def get_migration_scripts(scripts_dir: Path) -> List[Tuple[str, Path]]: """ Получает список скриптов миграций из папки scripts. Возвращает список кортежей (имя_файла, путь_к_файлу), отсортированный по имени файла. """ scripts = [] for script_file in sorted(scripts_dir.glob("*.py")): if script_file.name not in EXCLUDED_SCRIPTS: scripts.append((script_file.name, script_file)) return scripts async def is_migration_script(script_path: Path) -> bool: """ Проверяет, является ли скрипт миграцией. Миграция должна иметь функцию main() с параметром db_path. """ try: spec = importlib.util.spec_from_file_location("migration_script", script_path) if spec is None or spec.loader is None: return False module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Проверяем наличие функции main if hasattr(module, 'main'): import inspect sig = inspect.signature(module.main) # Проверяем, что функция принимает db_path params = list(sig.parameters.keys()) return 'db_path' in params return False except Exception: # Если не удалось проверить, считаем что это не миграция return False async def apply_migration(script_path: Path, db_path: str) -> bool: """ Применяет миграцию, запуская скрипт. Returns: True если миграция применена успешно, False в противном случае. """ script_name = script_path.name try: # Запускаем скрипт как отдельный процесс result = subprocess.run( [sys.executable, str(script_path), "--db", db_path], cwd=script_path.parent.parent, capture_output=True, text=True, timeout=300 # 5 минут максимум на миграцию ) if result.returncode == 0: if result.stdout: print(f" {result.stdout.strip()}") return True else: print(f" ❌ Ошибка:") if result.stdout: print(f" STDOUT: {result.stdout}") if result.stderr: print(f" STDERR: {result.stderr}") return False except subprocess.TimeoutExpired: print(f" ❌ Превышен лимит времени (5 минут)") return False except Exception as e: print(f" ❌ Ошибка: {e}") return False async def main(db_path: str, dry_run: bool = False) -> None: """ Основная функция для применения миграций. Args: db_path: Путь к базе данных dry_run: Если True, только показывает какие миграции будут применены """ # Импортируем зависимости только когда они действительно нужны project_root = Path(__file__).resolve().parent.parent sys.path.insert(0, str(project_root)) # Проверяем наличие необходимых зависимостей try: import aiosqlite except ImportError: print("❌ Ошибка: модуль aiosqlite не установлен.") print("💡 Установите зависимости: pip install -r requirements.txt") sys.exit(1) # Импортируем logger try: from logs.custom_logger import logger except ImportError: import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Импортируем MigrationRepository напрямую из файла migration_repo_path = project_root / "database" / "repositories" / "migration_repository.py" if not migration_repo_path.exists(): print(f"❌ Файл migration_repository.py не найден: {migration_repo_path}") sys.exit(1) spec = importlib.util.spec_from_file_location("migration_repository", migration_repo_path) if spec is None or spec.loader is None: print("❌ Не удалось загрузить модуль migration_repository") sys.exit(1) migration_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(migration_module) MigrationRepository = migration_module.MigrationRepository db_path = os.path.abspath(db_path) if not os.path.exists(db_path): logger.error(f"База данных не найдена: {db_path}") print(f"❌ Ошибка: база данных не найдена: {db_path}") return scripts_dir = project_root / "scripts" if not scripts_dir.exists(): logger.error(f"Папка scripts не найдена: {scripts_dir}") print(f"❌ Ошибка: папка scripts не найдена: {scripts_dir}") return # Инициализируем репозиторий миграций напрямую migration_repo = MigrationRepository(db_path) await migration_repo.create_table() # Получаем список примененных миграций applied_migrations = await migration_repo.get_applied_migrations() logger.info(f"Примененных миграций: {len(applied_migrations)}") # Получаем все скрипты миграций all_scripts = get_migration_scripts(scripts_dir) # Фильтруем только миграции migration_scripts = [] for script_name, script_path in all_scripts: if await is_migration_script(script_path): migration_scripts.append((script_name, script_path)) else: logger.debug(f"Скрипт {script_name} не является миграцией, пропускаем") # Находим новые миграции new_migrations = [ (name, path) for name, path in migration_scripts if name not in applied_migrations ] if not new_migrations: print("✅ Все миграции уже применены") logger.info("Новых миграций не найдено") return print(f"📋 Найдено новых миграций: {len(new_migrations)}") for name, _ in new_migrations: print(f" - {name}") if dry_run: print("\n🔍 DRY RUN: миграции не будут применены") return # Применяем миграции по порядку print("\n🚀 Применение миграций...") failed_migrations = [] for script_name, script_path in new_migrations: print(f"📝 {script_name}...", end=" ", flush=True) success = await apply_migration(script_path, db_path) if success: # Отмечаем миграцию как примененную await migration_repo.mark_migration_applied(script_name) print("✅") else: failed_migrations.append(script_name) print("❌") logger.error(f"Не удалось применить миграцию: {script_name}") # Прерываем выполнение при ошибке print(f"\n⚠️ Прерывание: миграция {script_name} завершилась с ошибкой") break if failed_migrations: print(f"\n❌ Не удалось применить {len(failed_migrations)} миграций:") for name in failed_migrations: print(f" - {name}") sys.exit(1) else: print(f"\n✅ Все миграции применены успешно ({len(new_migrations)} шт.)") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Применение миграций базы данных" ) parser.add_argument( "--db", default=os.environ.get("DATABASE_PATH", DEFAULT_DB_PATH), help="Путь к БД (или DATABASE_PATH из env)", ) parser.add_argument( "--dry-run", action="store_true", help="Показать какие миграции будут применены без фактического применения", ) args = parser.parse_args() asyncio.run(main(args.db, args.dry_run))