""" Утилиты для очистки и диагностики проблем с голосовыми файлами """ import asyncio import os from pathlib import Path from typing import List, Tuple from helper_bot.handlers.voice.constants import VOICE_USERS_DIR from logs.custom_logger import logger class VoiceFileCleanupUtils: """Утилиты для очистки и диагностики голосовых файлов""" def __init__(self, bot_db): self.bot_db = bot_db async def find_orphaned_db_records(self) -> List[Tuple[str, int]]: """Найти записи в БД, для которых нет соответствующих файлов""" try: # Получаем все записи из БД all_audio_records = await self.bot_db.get_all_audio_records() orphaned_records = [] for record in all_audio_records: file_name = record.get("file_name", "") user_id = record.get("author_id", 0) file_path = f"{VOICE_USERS_DIR}/{file_name}.ogg" if not os.path.exists(file_path): orphaned_records.append((file_name, user_id)) logger.warning( f"Найдена запись в БД без файла: {file_name} (user_id: {user_id})" ) logger.info( f"Найдено {len(orphaned_records)} записей в БД без соответствующих файлов" ) return orphaned_records except Exception as e: logger.error(f"Ошибка при поиске orphaned записей: {e}") return [] async def find_orphaned_files(self) -> List[str]: """Найти файлы на диске, для которых нет записей в БД""" try: if not os.path.exists(VOICE_USERS_DIR): logger.warning(f"Директория {VOICE_USERS_DIR} не существует") return [] # Получаем все файлы .ogg в директории ogg_files = list(Path(VOICE_USERS_DIR).glob("*.ogg")) orphaned_files = [] # Получаем все записи из БД all_audio_records = await self.bot_db.get_all_audio_records() db_file_names = { record.get("file_name", "") for record in all_audio_records } for file_path in ogg_files: file_name = file_path.stem # Имя файла без расширения if file_name not in db_file_names: orphaned_files.append(str(file_path)) logger.warning(f"Найден файл без записи в БД: {file_path}") logger.info(f"Найдено {len(orphaned_files)} файлов без записей в БД") return orphaned_files except Exception as e: logger.error(f"Ошибка при поиске orphaned файлов: {e}") return [] async def cleanup_orphaned_db_records(self, dry_run: bool = True) -> int: """Удалить записи в БД, для которых нет файлов""" try: orphaned_records = await self.find_orphaned_db_records() if not orphaned_records: logger.info("Нет orphaned записей для удаления") return 0 if dry_run: logger.info( f"DRY RUN: Найдено {len(orphaned_records)} записей для удаления" ) for file_name, user_id in orphaned_records: logger.info( f"DRY RUN: Будет удалена запись: {file_name} (user_id: {user_id})" ) return len(orphaned_records) # Удаляем записи deleted_count = 0 for file_name, user_id in orphaned_records: try: await self.bot_db.delete_audio_record_by_file_name(file_name) deleted_count += 1 logger.info( f"Удалена запись в БД: {file_name} (user_id: {user_id})" ) except Exception as e: logger.error(f"Ошибка при удалении записи {file_name}: {e}") logger.info(f"Удалено {deleted_count} orphaned записей из БД") return deleted_count except Exception as e: logger.error(f"Ошибка при очистке orphaned записей: {e}") return 0 async def cleanup_orphaned_files(self, dry_run: bool = True) -> int: """Удалить файлы на диске, для которых нет записей в БД""" try: orphaned_files = await self.find_orphaned_files() if not orphaned_files: logger.info("Нет orphaned файлов для удаления") return 0 if dry_run: logger.info( f"DRY RUN: Найдено {len(orphaned_files)} файлов для удаления" ) for file_path in orphaned_files: logger.info(f"DRY RUN: Будет удален файл: {file_path}") return len(orphaned_files) # Удаляем файлы deleted_count = 0 for file_path in orphaned_files: try: os.remove(file_path) deleted_count += 1 logger.info(f"Удален файл: {file_path}") except Exception as e: logger.error(f"Ошибка при удалении файла {file_path}: {e}") logger.info(f"Удалено {deleted_count} orphaned файлов") return deleted_count except Exception as e: logger.error(f"Ошибка при очистке orphaned файлов: {e}") return 0 async def get_disk_usage_stats(self) -> dict: """Получить статистику использования диска""" try: if not os.path.exists(VOICE_USERS_DIR): return {"error": f"Директория {VOICE_USERS_DIR} не существует"} total_size = 0 file_count = 0 for file_path in Path(VOICE_USERS_DIR).glob("*.ogg"): if file_path.is_file(): total_size += file_path.stat().st_size file_count += 1 return { "total_files": file_count, "total_size_bytes": total_size, "total_size_mb": round(total_size / (1024 * 1024), 2), "directory": VOICE_USERS_DIR, } except Exception as e: logger.error(f"Ошибка при получении статистики диска: {e}") return {"error": str(e)} async def run_full_diagnostic(self) -> dict: """Запустить полную диагностику""" try: logger.info("Запуск полной диагностики голосовых файлов...") # Статистика диска disk_stats = await self.get_disk_usage_stats() # Orphaned записи в БД orphaned_db_records = await self.find_orphaned_db_records() # Orphaned файлы orphaned_files = await self.find_orphaned_files() # Количество записей в БД all_audio_records = await self.bot_db.get_all_audio_records() db_records_count = len(all_audio_records) diagnostic_result = { "disk_stats": disk_stats, "db_records_count": db_records_count, "orphaned_db_records_count": len(orphaned_db_records), "orphaned_files_count": len(orphaned_files), "orphaned_db_records": orphaned_db_records[ :10 ], # Первые 10 для примера "orphaned_files": orphaned_files[:10], # Первые 10 для примера "status": ( "healthy" if len(orphaned_db_records) == 0 and len(orphaned_files) == 0 else "issues_found" ), } logger.info(f"Диагностика завершена. Статус: {diagnostic_result['status']}") return diagnostic_result except Exception as e: logger.error(f"Ошибка при диагностике: {e}") return {"error": str(e)}