250 lines
9.3 KiB
Python
250 lines
9.3 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Скрипт для автоматического применения миграций базы данных.
|
||
|
||
Сканирует папку scripts/ и применяет все новые миграции, которые еще не были применены.
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import importlib.util
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import List, Tuple
|
||
|
||
# Исключаем служебные скрипты из миграций
|
||
EXCLUDED_SCRIPTS = {
|
||
"apply_migrations.py",
|
||
"test_s3_connection.py",
|
||
"voice_cleanup.py",
|
||
}
|
||
|
||
DEFAULT_DB_PATH = "database/tg-bot-database.db"
|
||
|
||
|
||
def get_migration_scripts(scripts_dir: Path) -> List[Tuple[str, Path]]:
|
||
"""
|
||
Получает список скриптов миграций из папки scripts.
|
||
|
||
Возвращает список кортежей (имя_файла, путь_к_файлу), отсортированный по имени файла.
|
||
"""
|
||
scripts = []
|
||
for script_file in sorted(scripts_dir.glob("*.py")):
|
||
if script_file.name not in EXCLUDED_SCRIPTS:
|
||
scripts.append((script_file.name, script_file))
|
||
return scripts
|
||
|
||
|
||
async def is_migration_script(script_path: Path) -> bool:
|
||
"""
|
||
Проверяет, является ли скрипт миграцией.
|
||
|
||
Миграция должна иметь функцию main() с параметром db_path.
|
||
"""
|
||
try:
|
||
spec = importlib.util.spec_from_file_location("migration_script", script_path)
|
||
if spec is None or spec.loader is None:
|
||
return False
|
||
|
||
module = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(module)
|
||
|
||
# Проверяем наличие функции main
|
||
if hasattr(module, "main"):
|
||
import inspect
|
||
|
||
sig = inspect.signature(module.main)
|
||
# Проверяем, что функция принимает db_path
|
||
params = list(sig.parameters.keys())
|
||
return "db_path" in params
|
||
return False
|
||
except Exception:
|
||
# Если не удалось проверить, считаем что это не миграция
|
||
return False
|
||
|
||
|
||
async def apply_migration(script_path: Path, db_path: str) -> bool:
|
||
"""
|
||
Применяет миграцию, запуская скрипт.
|
||
|
||
Returns:
|
||
True если миграция применена успешно, False в противном случае.
|
||
"""
|
||
script_name = script_path.name
|
||
|
||
try:
|
||
# Запускаем скрипт как отдельный процесс
|
||
result = subprocess.run(
|
||
[sys.executable, str(script_path), "--db", db_path],
|
||
cwd=script_path.parent.parent,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=300, # 5 минут максимум на миграцию
|
||
)
|
||
|
||
if result.returncode == 0:
|
||
if result.stdout:
|
||
print(f" {result.stdout.strip()}")
|
||
return True
|
||
else:
|
||
print(f" ❌ Ошибка:")
|
||
if result.stdout:
|
||
print(f" STDOUT: {result.stdout}")
|
||
if result.stderr:
|
||
print(f" STDERR: {result.stderr}")
|
||
return False
|
||
|
||
except subprocess.TimeoutExpired:
|
||
print(f" ❌ Превышен лимит времени (5 минут)")
|
||
return False
|
||
except Exception as e:
|
||
print(f" ❌ Ошибка: {e}")
|
||
return False
|
||
|
||
|
||
async def main(db_path: str, dry_run: bool = False) -> None:
|
||
"""
|
||
Основная функция для применения миграций.
|
||
|
||
Args:
|
||
db_path: Путь к базе данных
|
||
dry_run: Если True, только показывает какие миграции будут применены
|
||
"""
|
||
# Импортируем зависимости только когда они действительно нужны
|
||
project_root = Path(__file__).resolve().parent.parent
|
||
sys.path.insert(0, str(project_root))
|
||
|
||
# Проверяем наличие необходимых зависимостей
|
||
try:
|
||
import aiosqlite
|
||
except ImportError:
|
||
print("❌ Ошибка: модуль aiosqlite не установлен.")
|
||
print("💡 Установите зависимости: pip install -r requirements.txt")
|
||
sys.exit(1)
|
||
|
||
# Импортируем logger
|
||
try:
|
||
from logs.custom_logger import logger
|
||
except ImportError:
|
||
import logging
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Импортируем MigrationRepository напрямую из файла
|
||
migration_repo_path = (
|
||
project_root / "database" / "repositories" / "migration_repository.py"
|
||
)
|
||
if not migration_repo_path.exists():
|
||
print(f"❌ Файл migration_repository.py не найден: {migration_repo_path}")
|
||
sys.exit(1)
|
||
|
||
spec = importlib.util.spec_from_file_location(
|
||
"migration_repository", migration_repo_path
|
||
)
|
||
if spec is None or spec.loader is None:
|
||
print("❌ Не удалось загрузить модуль migration_repository")
|
||
sys.exit(1)
|
||
|
||
migration_module = importlib.util.module_from_spec(spec)
|
||
spec.loader.exec_module(migration_module)
|
||
MigrationRepository = migration_module.MigrationRepository
|
||
|
||
db_path = os.path.abspath(db_path)
|
||
if not os.path.exists(db_path):
|
||
logger.error(f"База данных не найдена: {db_path}")
|
||
print(f"❌ Ошибка: база данных не найдена: {db_path}")
|
||
return
|
||
|
||
scripts_dir = project_root / "scripts"
|
||
if not scripts_dir.exists():
|
||
logger.error(f"Папка scripts не найдена: {scripts_dir}")
|
||
print(f"❌ Ошибка: папка scripts не найдена: {scripts_dir}")
|
||
return
|
||
|
||
# Инициализируем репозиторий миграций напрямую
|
||
migration_repo = MigrationRepository(db_path)
|
||
await migration_repo.create_table()
|
||
|
||
# Получаем список примененных миграций
|
||
applied_migrations = await migration_repo.get_applied_migrations()
|
||
logger.info(f"Примененных миграций: {len(applied_migrations)}")
|
||
|
||
# Получаем все скрипты миграций
|
||
all_scripts = get_migration_scripts(scripts_dir)
|
||
|
||
# Фильтруем только миграции
|
||
migration_scripts = []
|
||
for script_name, script_path in all_scripts:
|
||
if await is_migration_script(script_path):
|
||
migration_scripts.append((script_name, script_path))
|
||
else:
|
||
logger.debug(f"Скрипт {script_name} не является миграцией, пропускаем")
|
||
|
||
# Находим новые миграции
|
||
new_migrations = [
|
||
(name, path)
|
||
for name, path in migration_scripts
|
||
if name not in applied_migrations
|
||
]
|
||
|
||
if not new_migrations:
|
||
print("✅ Все миграции уже применены")
|
||
logger.info("Новых миграций не найдено")
|
||
return
|
||
|
||
print(f"📋 Найдено новых миграций: {len(new_migrations)}")
|
||
for name, _ in new_migrations:
|
||
print(f" - {name}")
|
||
|
||
if dry_run:
|
||
print("\n🔍 DRY RUN: миграции не будут применены")
|
||
return
|
||
|
||
# Применяем миграции по порядку
|
||
print("\n🚀 Применение миграций...")
|
||
failed_migrations = []
|
||
|
||
for script_name, script_path in new_migrations:
|
||
print(f"📝 {script_name}...", end=" ", flush=True)
|
||
success = await apply_migration(script_path, db_path)
|
||
if success:
|
||
# Отмечаем миграцию как примененную
|
||
await migration_repo.mark_migration_applied(script_name)
|
||
print("✅")
|
||
else:
|
||
failed_migrations.append(script_name)
|
||
print("❌")
|
||
logger.error(f"Не удалось применить миграцию: {script_name}")
|
||
# Прерываем выполнение при ошибке
|
||
print(f"\n⚠️ Прерывание: миграция {script_name} завершилась с ошибкой")
|
||
break
|
||
|
||
if failed_migrations:
|
||
print(f"\n❌ Не удалось применить {len(failed_migrations)} миграций:")
|
||
for name in failed_migrations:
|
||
print(f" - {name}")
|
||
sys.exit(1)
|
||
else:
|
||
print(f"\n✅ Все миграции применены успешно ({len(new_migrations)} шт.)")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
parser = argparse.ArgumentParser(description="Применение миграций базы данных")
|
||
parser.add_argument(
|
||
"--db",
|
||
default=os.environ.get("DATABASE_PATH", DEFAULT_DB_PATH),
|
||
help="Путь к БД (или DATABASE_PATH из env)",
|
||
)
|
||
parser.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="Показать какие миграции будут применены без фактического применения",
|
||
)
|
||
args = parser.parse_args()
|
||
asyncio.run(main(args.db, args.dry_run))
|