#!/usr/bin/env python3 """ Скрипт миграции для добавления поддержки опубликованных постов: 1. Добавляет колонку published_message_id в таблицу post_from_telegram_suggest 2. Создает таблицу published_post_content для хранения медиафайлов опубликованных постов 3. Создает индексы для производительности """ import argparse import asyncio import os import sys from pathlib import Path project_root = Path(__file__).resolve().parent.parent sys.path.insert(0, str(project_root)) import aiosqlite from logs.custom_logger import logger DEFAULT_DB_PATH = "database/tg-bot-database.db" def _column_exists(rows: list, name: str) -> bool: """Проверяет существование колонки в таблице. PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk).""" for row in rows: if row[1] == name: return True return False async def main(db_path: str, dry_run: bool = False) -> None: """Выполняет миграцию БД для поддержки опубликованных постов.""" db_path = os.path.abspath(db_path) if not os.path.exists(db_path): logger.error("База данных не найдена: %s", db_path) print(f"Ошибка: база данных не найдена: {db_path}") return async with aiosqlite.connect(db_path) as conn: await conn.execute("PRAGMA foreign_keys = ON") changes_made = [] # 1. Проверяем и добавляем колонку published_message_id cursor = await conn.execute("PRAGMA table_info(post_from_telegram_suggest)") rows = await cursor.fetchall() await cursor.close() if not _column_exists(rows, "published_message_id"): if dry_run: print( "DRY RUN: Будет добавлена колонка published_message_id в post_from_telegram_suggest" ) changes_made.append("Добавление колонки published_message_id") else: logger.info( "Добавление колонки published_message_id в post_from_telegram_suggest" ) await conn.execute( "ALTER TABLE post_from_telegram_suggest " "ADD COLUMN published_message_id INTEGER" ) await conn.commit() print( "✓ Колонка published_message_id добавлена в post_from_telegram_suggest" ) changes_made.append("Добавлена колонка published_message_id") else: print( "✓ Колонка published_message_id уже существует в post_from_telegram_suggest" ) # 2. Проверяем и создаем таблицу published_post_content cursor = await conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='published_post_content'" ) table_exists = await cursor.fetchone() await cursor.close() if not table_exists: if dry_run: print("DRY RUN: Будет создана таблица published_post_content") changes_made.append("Создание таблицы published_post_content") else: logger.info("Создание таблицы published_post_content") await conn.execute(""" CREATE TABLE IF NOT EXISTS published_post_content ( published_message_id INTEGER NOT NULL, content_name TEXT NOT NULL, content_type TEXT, published_at INTEGER NOT NULL, PRIMARY KEY (published_message_id, content_name) ) """) await conn.commit() print("✓ Таблица published_post_content создана") changes_made.append("Создана таблица published_post_content") else: print("✓ Таблица published_post_content уже существует") # 3. Проверяем и создаем индексы indexes = [ ( "idx_published_post_content_message_id", "CREATE INDEX IF NOT EXISTS idx_published_post_content_message_id " "ON published_post_content(published_message_id)", ), ( "idx_post_from_telegram_suggest_published", "CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_published " "ON post_from_telegram_suggest(published_message_id)", ), ] for index_name, index_sql in indexes: cursor = await conn.execute( "SELECT name FROM sqlite_master WHERE type='index' AND name=?", (index_name,), ) index_exists = await cursor.fetchone() await cursor.close() if not index_exists: if dry_run: print(f"DRY RUN: Будет создан индекс {index_name}") changes_made.append(f"Создание индекса {index_name}") else: logger.info(f"Создание индекса {index_name}") await conn.execute(index_sql) await conn.commit() print(f"✓ Индекс {index_name} создан") changes_made.append(f"Создан индекс {index_name}") else: print(f"✓ Индекс {index_name} уже существует") # Финальная статистика if dry_run: if changes_made: print("\n" + "=" * 60) print("DRY RUN: Следующие изменения будут выполнены:") for change in changes_made: print(f" - {change}") print("=" * 60) else: print( "\n✓ Все необходимые изменения уже применены. Ничего делать не нужно." ) else: if changes_made: logger.info( f"Миграция завершена. Выполнено изменений: {len(changes_made)}" ) print(f"\n✓ Миграция завершена успешно!") print(f"Выполнено изменений: {len(changes_made)}") for change in changes_made: print(f" - {change}") else: print( "\n✓ Все необходимые изменения уже применены. Ничего делать не нужно." ) if __name__ == "__main__": parser = argparse.ArgumentParser( description="Добавление поддержки опубликованных постов в БД" ) parser.add_argument( "--db", default=os.environ.get("DB_PATH", DEFAULT_DB_PATH), help="Путь к БД (или переменная окружения DB_PATH)", ) parser.add_argument( "--dry-run", action="store_true", help="Показать что будет сделано без выполнения изменений", ) args = parser.parse_args() asyncio.run(main(args.db, dry_run=args.dry_run))