From f760e94206b7f44de1e14ee489219a285cb1ebd8 Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 16 Feb 2026 21:14:50 +0300 Subject: [PATCH] Initial commit: VK media tools MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Скрипты для выгрузки фото и видео из диалогов ВКонтакте, обработки (дедупликация + CLIP-классификация) и загрузки в Immich. Co-authored-by: Cursor --- .gitignore | 35 ++ config.py | 85 ++++ immich_upload.py | 647 +++++++++++++++++++++++++++++++ main.py | 957 ++++++++++++++++++++++++++++++++++++++++++++++ main_video.py | 823 +++++++++++++++++++++++++++++++++++++++ process_photos.py | 797 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 3344 insertions(+) create mode 100644 .gitignore create mode 100644 config.py create mode 100644 immich_upload.py create mode 100644 main.py create mode 100644 main_video.py create mode 100644 process_photos.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9012b23 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# Виртуальное окружение +.venv/ +venv/ +env/ + +# Python кеш +__pycache__/ +*.py[cod] +*$py.class +*.pyo + +# Файлы прогресса (генерируются скриптами) +progress.json +process_progress.json +video_progress.json +immich_upload_progress.json +rollback_log.json + +# Логи +optimizer_failed.txt +skipped_videos.txt + +# Загруженные медиафайлы +downloads/ +downloads_video/ +output/ + +# macOS +.DS_Store + +# IDE +.idea/ +.vscode/ +*.swp +*.swo diff --git a/config.py b/config.py new file mode 100644 index 0000000..ee2eb10 --- /dev/null +++ b/config.py @@ -0,0 +1,85 @@ +""" +Конфигурация скрипта для выгрузки фото из диалогов ВКонтакте. + +Как получить токен: +1. Перейди по ссылке в браузере: + https://oauth.vk.com/authorize?client_id=2685278&scope=messages,photos,offline&redirect_uri=https://oauth.vk.com/blank.html&display=page&response_type=token&v=5.199 +2. Авторизуйся и разреши доступ +3. Скопируй access_token из адресной строки (значение между access_token= и &expires_in) +4. Вставь его ниже в переменную VK_TOKEN +""" + +# Токен доступа VK API (обязательно заполнить) +VK_TOKEN: str = "" + +# Папка для сохранения фото (относительный или абсолютный путь) +DOWNLOAD_DIR: str = "downloads" + +# Папка для результатов обработки (дубликаты, мусор, review) — рядом с downloads +OUTPUT_DIR: str = "output" + +# Файл прогресса для механизма resume +PROGRESS_FILE: str = "progress.json" + +# Версия VK API +API_VERSION: str = "5.199" + +# Минимум свободного места на диске (в МБ), при котором скрипт остановится +MIN_FREE_SPACE_MB: int = 500 + +# Задержка между скачиваниями файлов (секунды) — чтобы не перегружать сеть +DOWNLOAD_DELAY: float = 0.1 + +# Количество попыток при сетевых ошибках +MAX_RETRIES: int = 3 + +# Таймаут для скачивания одного фото (секунды) +DOWNLOAD_TIMEOUT: int = 30 + +# Количество параллельных потоков для скачивания фото +DOWNLOAD_WORKERS: int = 8 + +# --------------------------------------------------------------------------- +# Настройки обработки фото (process_photos.py) +# --------------------------------------------------------------------------- + +# Файл прогресса обработки +PROCESS_PROGRESS_FILE: str = "process_progress.json" + +# Файл лога перемещений (для отката) +ROLLBACK_LOG_FILE: str = "rollback_log.json" + +# Размер хеша (hash_size x hash_size бит, 8 = 64 бита) +HASH_SIZE: int = 8 + +# Порог расстояния Хэмминга для near-дубликатов (0 = только точные, 8 = средний) +DEDUP_THRESHOLD: int = 8 + +# Потоки для параллельного хеширования +HASH_WORKERS: int = 8 + +# Размер батча для CLIP-классификации +CLIP_BATCH_SIZE: int = 16 + +# Минимальный порог уверенности CLIP (ниже → папка _review) +# Для CLIP ViT-B-32 cosine similarity обычно в диапазоне 0.12-0.35 +CLIP_CONFIDENCE_MIN: float = 0.15 + +# --------------------------------------------------------------------------- +# Настройки скачивания видео (main_video.py) +# --------------------------------------------------------------------------- + +# Папка для сохранения видео +VIDEO_DOWNLOAD_DIR: str = "downloads_video" + +# Файл прогресса для видео +VIDEO_PROGRESS_FILE: str = "video_progress.json" + +# Потоки скачивания (меньше чем для фото — видео тяжёлые) +VIDEO_DOWNLOAD_WORKERS: int = 4 + +# Таймаут скачивания одного видео (секунды, видео крупнее фото) +VIDEO_DOWNLOAD_TIMEOUT: int = 300 + +# Минимум свободного места для видео (МБ) +VIDEO_MIN_FREE_SPACE_MB: int = 2000 diff --git a/immich_upload.py b/immich_upload.py new file mode 100644 index 0000000..a4c7195 --- /dev/null +++ b/immich_upload.py @@ -0,0 +1,647 @@ +#!/usr/bin/env python3 +""" +Массовая загрузка фото и видео в Immich с прогресс-баром и возобновлением. + +Использование: + 1. pip install requests tqdm + 2. python immich_upload.py + 3. Ctrl+C для остановки (прогресс сохраняется автоматически) + 4. Повторный запуск продолжит с места остановки +""" + +import hashlib +import json +import os +import re +import signal +import sys +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from threading import Lock +from typing import Optional + +import requests +from tqdm import tqdm + +# --------------------------------------------------------------------------- +# Конфигурация +# --------------------------------------------------------------------------- + +# Immich сервер (через Upload Optimizer прокси) +IMMICH_URL: str = "http://YOUR_IMMICH_HOST:2283" +# Прямой доступ к Immich (минуя оптимизатор) — fallback при ошибках оптимизатора +IMMICH_DIRECT_URL: str = "http://YOUR_IMMICH_HOST:2284" +API_KEY: str = "" + +# Папки для загрузки (рекурсивный поиск медиафайлов) +UPLOAD_FOLDERS: list[str] = [ + "downloads_video", +] + +# Файл прогресса (для resume) +PROGRESS_FILE: str = "immich_upload_progress.json" + +# Лог файлов, которые оптимизатор не смог обработать (загружены напрямую) +OPTIMIZER_FAILED_LOG: str = "optimizer_failed.txt" + +# Параллельные загрузки (8 потоков — оптимально для 1 Гбит LAN) +MAX_WORKERS: int = 8 + +# Повторные попытки при ошибке +MAX_RETRIES: int = 3 + +# Таймаут загрузки одного файла (секунды) +UPLOAD_TIMEOUT: int = 300 + +# Расширения изображений +IMAGE_EXTENSIONS: set[str] = { + ".jpg", ".jpeg", ".png", ".heic", ".heif", ".webp", ".gif", + ".avif", ".bmp", ".tiff", ".tif", ".jxl", ".raw", ".rw2", + ".dng", ".nef", ".cr2", ".arw", ".orf", ".pef", ".raf", +} + +# Расширения видео +VIDEO_EXTENSIONS: set[str] = { + ".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".3gp", + ".flv", ".wmv", ".mts", ".m2ts", ".mpg", ".mpeg", +} + +# Все допустимые расширения медиафайлов +MEDIA_EXTENSIONS: set[str] = IMAGE_EXTENSIONS | VIDEO_EXTENSIONS + + +# Режим загрузки: "photos" — только фото (видео пропускаются и логируются), +# "videos" — только видео, "all" — всё +UPLOAD_MODE: str = "videos" + +# Лог пропущенных видео (при UPLOAD_MODE="photos") +SKIPPED_VIDEOS_LOG: str = "skipped_videos.txt" + +# Идентификатор устройства (для Immich) +DEVICE_ID: str = "python-bulk-uploader" + + +# --------------------------------------------------------------------------- +# Модели данных +# --------------------------------------------------------------------------- + +@dataclass +class UploadStats: + """Статистика загрузки (потокобезопасная).""" + + uploaded: int = 0 + skipped: int = 0 + duplicates: int = 0 + errors: int = 0 + bytes_sent: int = 0 + _lock: Lock = field(default_factory=Lock, repr=False) + + def inc_uploaded(self, size: int) -> None: + with self._lock: + self.uploaded += 1 + self.bytes_sent += size + + def inc_skipped(self) -> None: + with self._lock: + self.skipped += 1 + + def inc_duplicate(self) -> None: + with self._lock: + self.duplicates += 1 + + def inc_error(self) -> None: + with self._lock: + self.errors += 1 + + +# --------------------------------------------------------------------------- +# Менеджер прогресса (resume) +# --------------------------------------------------------------------------- + +class ProgressManager: + """Управление файлом прогресса для механизма resume.""" + + def __init__(self, progress_file: str) -> None: + self.progress_file: Path = Path(progress_file) + self._lock: Lock = Lock() + self.data: dict = self._load() + # Множество для быстрого поиска уже загруженных файлов (по хешу пути) + self._uploaded_hashes: set[str] = set(self.data.get("uploaded_hashes", [])) + + def _load(self) -> dict: + """Загружает прогресс из файла или создаёт пустой.""" + if self.progress_file.exists(): + try: + with open(self.progress_file, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return { + "version": 1, + "last_updated": "", + "uploaded_hashes": [], + "stats": { + "uploaded": 0, + "skipped": 0, + "duplicates": 0, + "errors": 0, + "bytes_sent": 0, + }, + } + + def save(self) -> None: + """Сохраняет текущее состояние прогресса в файл.""" + with self._lock: + self.data["last_updated"] = datetime.now().isoformat() + self.data["uploaded_hashes"] = list(self._uploaded_hashes) + tmp_path = self.progress_file.with_suffix(".tmp") + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(self.data, f, ensure_ascii=False) + tmp_path.replace(self.progress_file) + + @staticmethod + def _file_hash(filepath: Path) -> str: + """Генерирует хеш на основе пути и размера файла (быстро, без чтения).""" + stat = filepath.stat() + key = f"{filepath.resolve()}|{stat.st_size}|{stat.st_mtime}" + return hashlib.md5(key.encode()).hexdigest() + + def is_uploaded(self, filepath: Path) -> bool: + """Проверяет, был ли файл уже загружен.""" + return self._file_hash(filepath) in self._uploaded_hashes + + def mark_uploaded(self, filepath: Path) -> None: + """Помечает файл как загруженный.""" + with self._lock: + self._uploaded_hashes.add(self._file_hash(filepath)) + + @property + def uploaded_count(self) -> int: + return len(self._uploaded_hashes) + + +# --------------------------------------------------------------------------- +# Сканер файлов +# --------------------------------------------------------------------------- + +def scan_media_files(folders: list[str]) -> list[Path]: + """Рекурсивно сканирует папки, возвращает отсортированный список медиафайлов.""" + files: list[Path] = [] + for folder in folders: + root = Path(folder) + if not root.exists(): + tqdm.write(f" Папка не найдена: {folder}") + continue + for filepath in root.rglob("*"): + if not filepath.is_file(): + continue + if filepath.suffix.lower() in MEDIA_EXTENSIONS: + files.append(filepath) + # Сортировка по имени для предсказуемого порядка + files.sort(key=lambda p: p.name) + return files + + +# --------------------------------------------------------------------------- +# Загрузчик +# --------------------------------------------------------------------------- + +class ImmichUploader: + """Загрузка файлов в Immich через API с многопоточностью.""" + + def __init__(self) -> None: + self._stop_requested: bool = False + self.progress: ProgressManager = ProgressManager(PROGRESS_FILE) + self.stats: UploadStats = UploadStats() + self._optimizer_fallbacks: int = 0 # Счётчик fallback-загрузок + self._optimizer_log_lock: Lock = Lock() + + # HTTP-сессия с пулом соединений (keep-alive) + self._session: requests.Session = requests.Session() + self._session.headers.update({ + "Accept": "application/json", + "x-api-key": API_KEY, + }) + # Увеличиваем пул соединений для параллельных запросов + adapter = requests.adapters.HTTPAdapter( + pool_connections=MAX_WORKERS, + pool_maxsize=MAX_WORKERS * 2, + ) + self._session.mount("http://", adapter) + self._session.mount("https://", adapter) + + # Обработчики сигналов для graceful shutdown + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + def _signal_handler(self, _signum: int, _frame: object) -> None: + """Graceful shutdown по Ctrl+C.""" + if self._stop_requested: + # Повторный Ctrl+C — тихо выходим без traceback + os._exit(1) + self._stop_requested = True + tqdm.write( + "\nПолучен сигнал остановки. " + "Завершаю текущие загрузки и сохраняю прогресс..." + ) + + def _check_connection(self) -> bool: + """Проверяет подключение к Immich.""" + try: + resp = self._session.get( + f"{IMMICH_URL}/api/server/version", timeout=10 + ) + data = resp.json() + version = f"v{data['major']}.{data['minor']}.{data['patch']}" + tqdm.write(f"Immich сервер: {version} ({IMMICH_URL})") + return True + except Exception as exc: + tqdm.write(f"ОШИБКА: Не удалось подключиться к Immich: {exc}") + return False + + def _check_auth(self) -> bool: + """Проверяет API-ключ.""" + try: + resp = self._session.get( + f"{IMMICH_URL}/api/users/me", timeout=10 + ) + if resp.status_code == 200: + user = resp.json() + tqdm.write(f"Авторизован как: {user.get('name', user.get('email', '?'))}") + return True + tqdm.write(f"ОШИБКА авторизации: HTTP {resp.status_code}") + return False + except Exception as exc: + tqdm.write(f"ОШИБКА авторизации: {exc}") + return False + + # Паттерны даты в именах файлов + _FILENAME_DATE_PATTERNS: list[tuple[str, str]] = [ + # "2019-12-01 15-30-00" (Яндекс Диск) + (r"(\d{4}-\d{2}-\d{2})\s+(\d{2}-\d{2}-\d{2})", "%Y-%m-%d %H-%M-%S"), + # "20191201_153000" (стандарт камер) + (r"(\d{8}_\d{6})", "%Y%m%d_%H%M%S"), + # "2019-12-01_15-30-00" + (r"(\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2})", "%Y-%m-%d_%H-%M-%S"), + # "IMG_20191201_153000" + (r"IMG_(\d{8}_\d{6})", "%Y%m%d_%H%M%S"), + # Только дата "2019-12-01" + (r"(\d{4}-\d{2}-\d{2})", "%Y-%m-%d"), + ] + + @staticmethod + def _parse_date_from_filename(stem: str) -> Optional[datetime]: + """Извлекает дату из имени файла (без расширения). + + Поддерживает форматы Яндекс Диска, камер и прочие. + """ + for pattern, fmt in ImmichUploader._FILENAME_DATE_PATTERNS: + match = re.search(pattern, stem) + if match: + date_str = " ".join(match.groups()) if match.lastindex and match.lastindex > 1 else match.group(1) + try: + return datetime.strptime(date_str, fmt) + except ValueError: + continue + return None + + @staticmethod + def _read_google_sidecar(filepath: Path) -> Optional[dict]: + """Читает JSON-сайдкар Google Фото (supplemental-metadata.json). + + Возвращает dict с метаданными или None, если сайдкар не найден. + """ + # Google Фото кладёт метаданные в файл вида: photo.jpg.supplemental-metadata.json + sidecar = filepath.with_name(filepath.name + ".supplemental-metadata.json") + if not sidecar.exists(): + return None + try: + with open(sidecar, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + return None + + def _log_optimizer_failed(self, filepath: Path) -> None: + """Записывает файл в лог неудачных оптимизаций.""" + with self._optimizer_log_lock: + self._optimizer_fallbacks += 1 + with open(OPTIMIZER_FAILED_LOG, "a", encoding="utf-8") as f: + f.write(f"{filepath}\n") + + def _do_upload(self, filepath: Path, data: dict, upload_url: str) -> requests.Response: + """Выполняет HTTP-загрузку файла на указанный URL.""" + with open(filepath, "rb") as f: + files = {"assetData": (filepath.name, f, "application/octet-stream")} + return self._session.post( + f"{upload_url}/api/assets", + data=data, + files=files, + timeout=UPLOAD_TIMEOUT, + ) + + def _handle_response(self, resp: requests.Response, file_size: int) -> Optional[str]: + """Обрабатывает ответ Immich. Возвращает статус или None если нужен retry.""" + if resp.status_code == 201: + result = resp.json() + status = result.get("status", "created") + if status == "duplicate" or result.get("duplicate"): + self.stats.inc_duplicate() + return "duplicate" + self.stats.inc_uploaded(file_size) + return "created" + elif resp.status_code == 200: + # Дубликат (некоторые версии Immich) + self.stats.inc_duplicate() + return "duplicate" + return None # Нужен retry или fallback + + def _upload_one(self, filepath: Path) -> Optional[str]: + """Загружает один файл в Immich. Возвращает статус или None при ошибке. + + Логика: сначала через Upload Optimizer (порт 2283). + Если оптимизатор вернул 400 — fallback на прямую загрузку (порт 2284). + + Статусы: 'created', 'duplicate', 'error' + """ + stat = filepath.stat() + file_size = stat.st_size + + # Базовые метаданные из файловой системы + created_at = datetime.fromtimestamp( + stat.st_birthtime if hasattr(stat, "st_birthtime") else stat.st_mtime + ) + modified_at = datetime.fromtimestamp(stat.st_mtime) + + # 1) Пробуем парсить дату из имени файла (Яндекс Диск: "2019-12-01 15-30-00.JPG") + parsed_date = self._parse_date_from_filename(filepath.stem) + if parsed_date: + created_at = parsed_date + + # 2) Google JSON-сайдкар перезаписывает — он точнее всего + sidecar = self._read_google_sidecar(filepath) + if sidecar: + photo_taken = sidecar.get("photoTakenTime", {}).get("timestamp") + if photo_taken: + try: + created_at = datetime.fromtimestamp(int(photo_taken)) + except (ValueError, OSError): + pass + + data = { + "deviceAssetId": f"{filepath.name}-{stat.st_size}-{stat.st_mtime}", + "deviceId": DEVICE_ID, + "fileCreatedAt": created_at.isoformat(), + "fileModifiedAt": modified_at.isoformat(), + "isFavorite": "false", + } + + # GPS из Google-сайдкара (если есть и не нулевые) + if sidecar: + geo = sidecar.get("geoData", {}) + lat = geo.get("latitude", 0.0) + lon = geo.get("longitude", 0.0) + if lat != 0.0 or lon != 0.0: + data["latitude"] = str(lat) + data["longitude"] = str(lon) + + # Видео грузим напрямую — оптимизатор не умеет их обрабатывать + is_video = filepath.suffix.lower() in VIDEO_EXTENSIONS + if is_video: + for attempt in range(MAX_RETRIES): + try: + resp = self._do_upload(filepath, data, IMMICH_DIRECT_URL) + result = self._handle_response(resp, file_size) + if result is not None: + return result + if attempt < MAX_RETRIES - 1: + time.sleep(2 ** attempt) + else: + tqdm.write( + f" Ошибка загрузки {filepath.name}: " + f"HTTP {resp.status_code} — {resp.text[:200]}" + ) + self.stats.inc_error() + return "error" + except (requests.RequestException, OSError) as exc: + if attempt < MAX_RETRIES - 1: + time.sleep(2 ** attempt) + else: + tqdm.write(f" Ошибка загрузки {filepath.name}: {exc}") + self.stats.inc_error() + return "error" + return "error" + + # --- Прямая загрузка в Immich (порт 2284), минуя оптимизатор --- + for attempt in range(MAX_RETRIES): + try: + resp = self._do_upload(filepath, data, IMMICH_DIRECT_URL) + result = self._handle_response(resp, file_size) + if result is not None: + return result + + if attempt < MAX_RETRIES - 1: + time.sleep(2 ** attempt) + else: + tqdm.write( + f" Ошибка загрузки {filepath.name}: " + f"HTTP {resp.status_code} — {resp.text[:200]}" + ) + self.stats.inc_error() + return "error" + + except (requests.RequestException, OSError) as exc: + if attempt < MAX_RETRIES - 1: + time.sleep(2 ** attempt) + else: + tqdm.write(f" Ошибка загрузки {filepath.name}: {exc}") + self.stats.inc_error() + return "error" + + return "error" + + @staticmethod + def _format_size(size_bytes: int) -> str: + """Форматирует размер в человекочитаемый вид.""" + for unit in ("Б", "КБ", "МБ", "ГБ"): + if size_bytes < 1024: + return f"{size_bytes:.1f} {unit}" + size_bytes /= 1024 + return f"{size_bytes:.1f} ТБ" + + def run(self) -> None: + """Запускает процесс загрузки.""" + tqdm.write("=" * 60) + tqdm.write(" Массовая загрузка в Immich") + tqdm.write("=" * 60) + + # Проверяем подключение и авторизацию + if not self._check_connection() or not self._check_auth(): + sys.exit(1) + + # Сканируем файлы + tqdm.write("\nСканирование папок...") + all_files = scan_media_files(UPLOAD_FOLDERS) + total_size = sum(f.stat().st_size for f in all_files) + tqdm.write( + f"Найдено медиафайлов: {len(all_files)} " + f"({self._format_size(total_size)})" + ) + + # Фильтрация по режиму (photos / videos / all) + if UPLOAD_MODE == "photos": + skipped_videos = [f for f in all_files if f.suffix.lower() in VIDEO_EXTENSIONS] + if skipped_videos: + with open(SKIPPED_VIDEOS_LOG, "w", encoding="utf-8") as vlog: + for v in skipped_videos: + vlog.write(f"{v}\n") + tqdm.write( + f"Режим: только фото. Пропущено видео: {len(skipped_videos)} " + f"(см. {SKIPPED_VIDEOS_LOG})" + ) + all_files = [f for f in all_files if f.suffix.lower() in IMAGE_EXTENSIONS] + elif UPLOAD_MODE == "videos": + all_files = [f for f in all_files if f.suffix.lower() in VIDEO_EXTENSIONS] + tqdm.write(f"Режим: только видео ({len(all_files)} файлов)") + + # Фильтруем уже загруженные + pending_files = [f for f in all_files if not self.progress.is_uploaded(f)] + pending_size = sum(f.stat().st_size for f in pending_files) + already_done = len(all_files) - len(pending_files) + + tqdm.write( + f"Уже загружено: {already_done} файлов\n" + f"Осталось загрузить: {len(pending_files)} файлов " + f"({self._format_size(pending_size)})" + ) + tqdm.write(f"Потоков: {MAX_WORKERS}") + tqdm.write("-" * 60) + + if not pending_files: + tqdm.write("\nВсе файлы уже загружены!") + return + + # Прогресс-бар с размером и скоростью МБ/с + bar = tqdm( + total=len(all_files), + initial=already_done, + desc="Загрузка", + unit=" файл", + dynamic_ncols=True, + bar_format=( + "{l_bar}{bar}| {n_fmt}/{total_fmt} " + "[{elapsed}<{remaining}, {rate_fmt}] {postfix}" + ), + ) + + # Счётчик для периодического сохранения прогресса + save_counter = 0 + save_lock = Lock() + start_time = time.time() + + def _process_file(filepath: Path) -> Optional[str]: + """Обёртка загрузки одного файла для ThreadPoolExecutor.""" + if self._stop_requested: + return None + return self._upload_one(filepath) + + try: + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # Отправляем все файлы в пул + future_to_file = { + executor.submit(_process_file, f): f + for f in pending_files + } + + for future in as_completed(future_to_file): + if self._stop_requested: + # Отменяем оставшиеся задачи + for f in future_to_file: + f.cancel() + break + + filepath = future_to_file[future] + try: + status = future.result() + except Exception as exc: + tqdm.write(f" Неожиданная ошибка ({filepath.name}): {exc}") + self.stats.inc_error() + status = "error" + + if status and status != "error": + self.progress.mark_uploaded(filepath) + # Fallback-загрузки сохраняем в прогресс сразу, + # чтобы не потерять при остановке скрипта + if status.startswith("fallback_"): + self.progress.save() + + bar.update(1) + + # Обновляем описание с текущей скоростью + elapsed = time.time() - start_time + if elapsed > 0: + speed = self.stats.bytes_sent / elapsed if self.stats.bytes_sent > 0 else 0 + speed_str = f"{self._format_size(speed)}/с" if speed > 0 else "..." + fallback_str = ( + f" FB:{self._optimizer_fallbacks}" + if self._optimizer_fallbacks > 0 + else "" + ) + bar.set_postfix_str( + f"{speed_str} | " + f"OK:{self.stats.uploaded} " + f"DUP:{self.stats.duplicates} " + f"ERR:{self.stats.errors}" + f"{fallback_str}", + refresh=False, + ) + + # Сохраняем прогресс каждые 20 файлов + with save_lock: + save_counter += 1 + if save_counter % 20 == 0: + self.progress.save() + + finally: + bar.close() + self.progress.save() + + elapsed = time.time() - start_time + avg_speed = self.stats.bytes_sent / elapsed if elapsed > 0 else 0 + + print("\n" + "=" * 60) + print(" Итого:") + print(f" Загружено: {self.stats.uploaded}") + print(f" Дубликатов: {self.stats.duplicates}") + print(f" Пропущено: {self.stats.skipped}") + print(f" Ошибок: {self.stats.errors}") + if self._optimizer_fallbacks > 0: + print(f" Fallback (без оптимизации): {self._optimizer_fallbacks}") + print(f" (см. {OPTIMIZER_FAILED_LOG})") + print(f" Передано: {self._format_size(self.stats.bytes_sent)}") + print(f" Средняя скорость: {self._format_size(avg_speed)}/с") + print(f" Время: {elapsed:.0f} сек") + if self._stop_requested: + print( + "\n Работа остановлена. " + "Запусти скрипт снова для продолжения." + ) + else: + print("\n Все файлы загружены!") + print("=" * 60) + + +# --------------------------------------------------------------------------- +# Точка входа +# --------------------------------------------------------------------------- + +def main() -> None: + """Точка входа скрипта.""" + uploader = ImmichUploader() + uploader.run() + + +if __name__ == "__main__": + main() diff --git a/main.py b/main.py new file mode 100644 index 0000000..9d4be7e --- /dev/null +++ b/main.py @@ -0,0 +1,957 @@ +#!/usr/bin/env python3 +""" +Скрипт для выгрузки всех фотографий из личных диалогов ВКонтакте. + +Использование: + 1. Заполни VK_TOKEN в config.py (инструкция в файле) + 2. pip install -r requirements.txt + 3. python main.py + 4. Ctrl+C для остановки (прогресс сохраняется автоматически) + 5. Повторный запуск продолжит с места остановки +""" + +import json +import os +import shutil +import signal +import sys +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Optional + +import piexif +import piexif.helper +import requests +import vk_api +from tqdm import tqdm + +import config + +# Приоритет размеров фото ВК (от лучшего к худшему) +PHOTO_SIZE_PRIORITY: list[str] = ["w", "z", "y", "x", "r", "q", "p", "o", "m", "s"] + + +# --------------------------------------------------------------------------- +# Модели данных +# --------------------------------------------------------------------------- + +@dataclass +class PhotoInfo: + """Информация о фотографии для скачивания и записи EXIF.""" + + photo_id: int + owner_id: int + url: str + date: int # Unix timestamp сообщения / фото + sender_id: int + sender_name: str + message_text: str + photo_text: str + lat: Optional[float] = None + long: Optional[float] = None + + +# --------------------------------------------------------------------------- +# Менеджер прогресса (resume) +# --------------------------------------------------------------------------- + +class ProgressManager: + """Управление файлом прогресса для механизма resume (потокобезопасный).""" + + def __init__(self, progress_file: str) -> None: + self.progress_file: Path = Path(progress_file) + self._lock = threading.Lock() + self.data: dict = self._load() + self._downloaded_ids: set[int] = set(self.data.get("downloaded_photo_ids", [])) + + def _load(self) -> dict: + """Загружает прогресс из файла или создаёт пустой.""" + if self.progress_file.exists(): + try: + with open(self.progress_file, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return self._default_state() + + @staticmethod + def _default_state() -> dict: + """Возвращает пустое состояние прогресса.""" + return { + "version": 1, + "last_updated": "", + "dialogs_total": 0, + "dialogs_completed": [], + "current_dialog": None, + "downloaded_photo_ids": [], + "stats": { + "photos_downloaded": 0, + "photos_skipped": 0, + "errors": 0, + }, + } + + def save(self) -> None: + """Сохраняет текущее состояние прогресса в файл.""" + with self._lock: + self.data["last_updated"] = datetime.now().isoformat() + self.data["downloaded_photo_ids"] = list(self._downloaded_ids) + tmp_path = self.progress_file.with_suffix(".tmp") + with open(tmp_path, "w", encoding="utf-8") as f: + json.dump(self.data, f, ensure_ascii=False, indent=2) + tmp_path.replace(self.progress_file) + + def is_dialog_completed(self, peer_id: int) -> bool: + """Проверяет, завершён ли диалог.""" + return peer_id in self.data["dialogs_completed"] + + def mark_dialog_completed(self, peer_id: int) -> None: + """Помечает диалог как полностью обработанный.""" + with self._lock: + if peer_id not in self.data["dialogs_completed"]: + self.data["dialogs_completed"].append(peer_id) + self.data["current_dialog"] = None + self.save() + + def set_current_dialog(self, peer_id: int) -> None: + """Устанавливает текущий обрабатываемый диалог.""" + with self._lock: + self.data["current_dialog"] = {"peer_id": peer_id} + self.save() + + def get_current_dialog(self) -> Optional[dict]: + """Возвращает текущий обрабатываемый диалог или None.""" + return self.data.get("current_dialog") + + def is_photo_downloaded(self, photo_id: int) -> bool: + """Проверяет, было ли фото уже скачано.""" + return photo_id in self._downloaded_ids + + def mark_photo_downloaded(self, photo_id: int) -> None: + """Отмечает фото как скачанное (потокобезопасно).""" + with self._lock: + self._downloaded_ids.add(photo_id) + self.data["stats"]["photos_downloaded"] += 1 + + def increment_skipped(self) -> None: + """Увеличивает счётчик пропущенных фото (потокобезопасно).""" + with self._lock: + self.data["stats"]["photos_skipped"] += 1 + + def increment_errors(self) -> None: + """Увеличивает счётчик ошибок (потокобезопасно).""" + with self._lock: + self.data["stats"]["errors"] += 1 + + +# --------------------------------------------------------------------------- +# Запись EXIF метаданных +# --------------------------------------------------------------------------- + +class ExifWriter: + """Запись EXIF метаданных в JPEG файлы.""" + + @staticmethod + def _decimal_to_dms(decimal_degrees: float) -> tuple[tuple, bool]: + """Конвертирует десятичные градусы в формат DMS для EXIF GPS.""" + is_negative = decimal_degrees < 0 + decimal_degrees = abs(decimal_degrees) + degrees = int(decimal_degrees) + minutes_float = (decimal_degrees - degrees) * 60 + minutes = int(minutes_float) + seconds = round((minutes_float - minutes) * 60 * 10000) + dms = ( + (degrees, 1), + (minutes, 1), + (seconds, 10000), + ) + return dms, is_negative + + @staticmethod + def write_exif(filepath: Path, photo_info: PhotoInfo) -> None: + """Записывает EXIF метаданные в файл изображения.""" + if filepath.suffix.lower() not in (".jpg", ".jpeg"): + ExifWriter._write_json_meta(filepath, photo_info) + return + + try: + try: + exif_dict = piexif.load(str(filepath)) + except Exception: + exif_dict = { + "0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "Interop": {}, + } + + # Дата отправки сообщения + if photo_info.date: + dt = datetime.fromtimestamp(photo_info.date) + date_bytes = dt.strftime("%Y:%m:%d %H:%M:%S").encode("ascii") + exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = date_bytes + exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = date_bytes + exif_dict["0th"][piexif.ImageIFD.DateTime] = date_bytes + + # Автор + if photo_info.sender_name: + artist = photo_info.sender_name.encode("utf-8") + exif_dict["0th"][piexif.ImageIFD.Artist] = artist + exif_dict["0th"][piexif.ImageIFD.Copyright] = artist + + # Описание фото из ВК + if photo_info.photo_text: + exif_dict["0th"][piexif.ImageIFD.ImageDescription] = ( + photo_info.photo_text.encode("utf-8") + ) + + # Текст сообщения → UserComment + if photo_info.message_text: + user_comment = piexif.helper.UserComment.dump( + photo_info.message_text, encoding="unicode" + ) + exif_dict["Exif"][piexif.ExifIFD.UserComment] = user_comment + + # GPS + if photo_info.lat is not None and photo_info.long is not None: + lat_dms, lat_neg = ExifWriter._decimal_to_dms(photo_info.lat) + lon_dms, lon_neg = ExifWriter._decimal_to_dms(photo_info.long) + exif_dict["GPS"] = { + piexif.GPSIFD.GPSVersionID: (2, 3, 0, 0), + piexif.GPSIFD.GPSLatitude: lat_dms, + piexif.GPSIFD.GPSLatitudeRef: b"S" if lat_neg else b"N", + piexif.GPSIFD.GPSLongitude: lon_dms, + piexif.GPSIFD.GPSLongitudeRef: b"W" if lon_neg else b"E", + } + + exif_bytes = piexif.dump(exif_dict) + piexif.insert(exif_bytes, str(filepath)) + + except Exception as exc: + tqdm.write(f" EXIF ошибка ({filepath.name}): {exc}. Сохраняю в JSON.") + ExifWriter._write_json_meta(filepath, photo_info) + + @staticmethod + def _write_json_meta(filepath: Path, photo_info: PhotoInfo) -> None: + """Сохраняет метаданные в JSON файл рядом с изображением.""" + meta_path = filepath.with_suffix(filepath.suffix + ".meta.json") + meta: dict = { + "photo_id": photo_info.photo_id, + "date": datetime.fromtimestamp(photo_info.date).isoformat() if photo_info.date else None, + "sender_id": photo_info.sender_id, + "sender": photo_info.sender_name, + "message_text": photo_info.message_text, + "photo_text": photo_info.photo_text, + } + if photo_info.lat is not None: + meta["gps"] = {"lat": photo_info.lat, "long": photo_info.long} + + with open(meta_path, "w", encoding="utf-8") as f: + json.dump(meta, f, ensure_ascii=False, indent=2) + + +# --------------------------------------------------------------------------- +# Основной загрузчик +# --------------------------------------------------------------------------- + +class VKPhotoDownloader: + """Скачивание фото из всех личных диалогов ВКонтакте.""" + + def __init__(self) -> None: + self._stop_requested: bool = False + self.progress: ProgressManager = ProgressManager(config.PROGRESS_FILE) + self.download_dir: Path = Path(config.DOWNLOAD_DIR) + self.download_dir.mkdir(parents=True, exist_ok=True) + self._user_cache: dict[int, str] = {} + self._http_session: requests.Session = requests.Session() + + # Обработчики сигналов для graceful shutdown + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + # Авторизация VK API + self._vk_session = vk_api.VkApi(token=config.VK_TOKEN, api_version=config.API_VERSION) + self.api = self._vk_session.get_api() + + # -- сигналы -- + + def _signal_handler(self, _signum: int, _frame: object) -> None: + """Graceful shutdown по Ctrl+C / SIGTERM.""" + if self._stop_requested: + tqdm.write("\nПринудительная остановка!") + sys.exit(1) + self._stop_requested = True + tqdm.write( + "\nПолучен сигнал остановки. " + "Завершаю текущие загрузки и сохраняю прогресс..." + ) + + # -- утилиты -- + + def _check_free_space(self) -> bool: + """Проверяет, достаточно ли свободного места на диске.""" + usage = shutil.disk_usage(str(self.download_dir)) + free_mb = usage.free / (1024 * 1024) + if free_mb < config.MIN_FREE_SPACE_MB: + tqdm.write( + f"Недостаточно места на диске! " + f"Свободно: {free_mb:.0f} МБ, минимум: {config.MIN_FREE_SPACE_MB} МБ" + ) + return False + return True + + @staticmethod + def _safe_name(name: str) -> str: + """Делает строку безопасной для имени папки/файла.""" + return "".join(c if c.isalnum() or c in ("_", "-") else "_" for c in name) + + # -- VK API execute (до 25 вызовов за 1 запрос) -- + + def _execute(self, code: str) -> dict: + """Выполняет VKScript через метод execute.""" + return self._vk_session.method("execute", {"code": code}) + + # -- батч-загрузка имён пользователей -- + + def _prefetch_user_names(self, user_ids: list[int]) -> None: + """Загружает имена пользователей пакетами (до 1000 за запрос).""" + # Разделяем на пользователей и сообщества + need_users: list[int] = [] + need_groups: list[int] = [] + + for uid in user_ids: + if uid in self._user_cache or uid == 0: + continue + if uid > 2_000_000_000: + self._user_cache[uid] = f"Беседа_{uid - 2_000_000_000}" + elif uid > 0: + need_users.append(uid) + else: + need_groups.append(abs(uid)) + + # Пакетная загрузка пользователей (до 1000 за запрос) + for i in range(0, len(need_users), 1000): + batch = need_users[i:i + 1000] + try: + ids_str = ",".join(str(x) for x in batch) + users = self.api.users.get(user_ids=ids_str) + for u in users: + self._user_cache[u["id"]] = f"{u['first_name']} {u['last_name']}" + except Exception: + pass + time.sleep(0.34) + + # Пакетная загрузка сообществ (до 500 за запрос) + for i in range(0, len(need_groups), 500): + batch = need_groups[i:i + 500] + try: + ids_str = ",".join(str(x) for x in batch) + resp = self.api.groups.getById(group_ids=ids_str) + groups = resp if isinstance(resp, list) else resp.get("groups", []) + for g in groups: + gid = -g["id"] + self._user_cache[gid] = g.get("name", f"group_{g['id']}") + except Exception: + pass + time.sleep(0.34) + + def _get_user_name(self, user_id: int) -> str: + """Получает имя из кэша. Если нет — дозагружает.""" + if user_id in self._user_cache: + return self._user_cache[user_id] + # Фоллбэк на единичный запрос + self._prefetch_user_names([user_id]) + return self._user_cache.get(user_id, f"id{user_id}") + + # -- получение списка диалогов через execute -- + + def _get_all_conversations(self) -> list[dict]: + """Получает полный список диалогов (через execute — 25 страниц за запрос).""" + conversations: list[dict] = [] + offset = 0 + + tqdm.write("Получаю список диалогов...") + + while True: + # VKScript: за один execute получаем до 25 страниц по 200 диалогов + code = f""" + var results = []; + var offset = {offset}; + var i = 0; + while (i < 25) {{ + var resp = API.messages.getConversations({{ + "offset": offset, "count": 200, "extended": 0 + }}); + results.push(resp); + offset = offset + 200; + if (offset >= resp.count || resp.items.length == 0) {{ + return {{"results": results, "done": true}}; + }} + i = i + 1; + }} + return {{"results": results, "done": false, "next_offset": offset}}; + """ + try: + data = self._execute(code) + except Exception as exc: + tqdm.write(f" execute ошибка (conversations): {exc}, пробую обычный метод") + return self._get_all_conversations_fallback() + + for page in data.get("results", []): + if not page: + continue + for item in page.get("items", []): + peer = item["conversation"]["peer"] + if peer["type"] == "chat": + continue + conversations.append( + {"peer_id": peer["id"], "type": peer["type"]} + ) + + if data.get("done", True): + break + offset = data.get("next_offset", offset + 5000) + time.sleep(0.34) + + tqdm.write(f"Найдено личных диалогов: {len(conversations)} (беседы пропущены)") + return conversations + + def _get_all_conversations_fallback(self) -> list[dict]: + """Фоллбэк: получение диалогов обычным методом (без execute).""" + conversations: list[dict] = [] + offset = 0 + + while True: + resp = self.api.messages.getConversations( + offset=offset, count=200, extended=0, + ) + items = resp.get("items", []) + if not items: + break + for item in items: + peer = item["conversation"]["peer"] + if peer["type"] == "chat": + continue + conversations.append( + {"peer_id": peer["id"], "type": peer["type"]} + ) + offset += 200 + if offset >= resp.get("count", 0): + break + time.sleep(0.34) + + return conversations + + # -- выбор лучшего размера фото -- + + @staticmethod + def _best_photo_url(photo: dict) -> Optional[str]: + """Выбирает URL фото максимального доступного размера.""" + orig = photo.get("orig_photo") + if orig and orig.get("url"): + return orig["url"] + + sizes = photo.get("sizes") + if sizes: + size_map = {s["type"]: s["url"] for s in sizes} + for prio in PHOTO_SIZE_PRIORITY: + if prio in size_map: + return size_map[prio] + best = max(sizes, key=lambda s: s.get("width", 0) * s.get("height", 0)) + return best.get("url") + + for key in ("photo_2560", "photo_1280", "photo_807", "photo_604", "photo_130", "photo_75"): + if key in photo: + return photo[key] + + return None + + # -- извлечение фото из сообщений -- + + def _extract_photos_recursive(self, message: dict) -> list[dict]: + """Рекурсивно извлекает все фото из сообщения (вложения + пересланные).""" + result: list[dict] = [] + + for att in message.get("attachments", []): + if att.get("type") == "photo": + photo = att["photo"] + url = self._best_photo_url(photo) + if url: + result.append({ + "photo": photo, + "url": url, + "from_id": message.get("from_id", 0), + "date": message.get("date", photo.get("date", 0)), + "message_text": message.get("text", ""), + }) + + for fwd in message.get("fwd_messages", []): + result.extend(self._extract_photos_recursive(fwd)) + + reply = message.get("reply_message") + if reply: + result.extend(self._extract_photos_recursive(reply)) + + return result + + # -- сбор фото через execute (getHistoryAttachments, до 25 страниц за запрос) -- + + def _collect_all_attachment_photos(self, peer_id: int) -> list[dict]: + """Собирает все фото-вложения через execute (25 страниц за 1 API вызов).""" + all_photos: list[dict] = [] + cursor = "" + + while not self._stop_requested: + # VKScript: до 25 вызовов getHistoryAttachments за один execute + start_from_clause = ( + f'"start_from": "{cursor}",' if cursor else "" + ) + code = f""" + var results = []; + var cursor = "{cursor}"; + var i = 0; + while (i < 25) {{ + var params = {{ + "peer_id": {peer_id}, + "media_type": "photo", + "count": 200, + "preserve_order": 1 + }}; + if (cursor != "") {{ + params.start_from = cursor; + }} + var resp = API.messages.getHistoryAttachments(params); + results.push(resp); + if (!resp.next_from || resp.items.length == 0) {{ + return {{"results": results, "cursor": ""}}; + }} + cursor = resp.next_from; + i = i + 1; + }} + return {{"results": results, "cursor": cursor}}; + """ + try: + data = self._execute(code) + except Exception as exc: + tqdm.write(f" execute ошибка (attachments): {exc}, фоллбэк") + return self._collect_attachments_fallback(peer_id, all_photos, cursor) + + for page in data.get("results", []): + if not page: + continue + for item in page.get("items", []): + att = item.get("attachment", {}) + if att.get("type") != "photo": + continue + photo = att["photo"] + url = self._best_photo_url(photo) + if url: + all_photos.append({ + "photo": photo, + "url": url, + "from_id": item.get("from_id", 0), + "date": item.get("date", photo.get("date", 0)), + "message_text": "", + }) + + cursor = data.get("cursor", "") + if not cursor: + break + time.sleep(0.34) + + return all_photos + + def _collect_attachments_fallback( + self, peer_id: int, existing: list[dict], start_from: str + ) -> list[dict]: + """Фоллбэк: обычная пагинация getHistoryAttachments.""" + cursor: Optional[str] = start_from or None + while not self._stop_requested: + params: dict = { + "peer_id": peer_id, "media_type": "photo", + "count": 200, "preserve_order": 1, + } + if cursor: + params["start_from"] = cursor + resp = self.api.messages.getHistoryAttachments(**params) + items = resp.get("items", []) + cursor = resp.get("next_from") + if not items: + break + for item in items: + att = item.get("attachment", {}) + if att.get("type") != "photo": + continue + photo = att["photo"] + url = self._best_photo_url(photo) + if url: + existing.append({ + "photo": photo, "url": url, + "from_id": item.get("from_id", 0), + "date": item.get("date", photo.get("date", 0)), + "message_text": "", + }) + if not cursor: + break + time.sleep(0.34) + return existing + + # -- сбор фото из пересланных через execute (getHistory) -- + + def _collect_forwarded_photos( + self, peer_id: int, known_ids: set[int], + ) -> list[dict]: + """Сканирует историю сообщений через execute (25 страниц за запрос).""" + found: list[dict] = [] + offset = 0 + + while not self._stop_requested: + code = f""" + var results = []; + var offset = {offset}; + var i = 0; + while (i < 25) {{ + var resp = API.messages.getHistory({{ + "peer_id": {peer_id}, "offset": offset, "count": 200 + }}); + results.push(resp); + offset = offset + 200; + if (offset >= resp.count || resp.items.length == 0) {{ + return {{"results": results, "done": true}}; + }} + i = i + 1; + }} + return {{"results": results, "done": false, "next_offset": offset}}; + """ + try: + data = self._execute(code) + except vk_api.exceptions.ApiError as exc: + tqdm.write(f" API ошибка при getHistory: {exc}") + break + except Exception as exc: + tqdm.write(f" execute ошибка (history): {exc}, прерываю сканирование") + break + + for page in data.get("results", []): + if not page: + continue + for msg in page.get("items", []): + sources: list[dict] = [] + for fwd in msg.get("fwd_messages", []): + sources.extend(self._extract_photos_recursive(fwd)) + reply = msg.get("reply_message") + if reply: + sources.extend(self._extract_photos_recursive(reply)) + + for item in sources: + pid = item["photo"].get("id", 0) + if pid and pid not in known_ids: + known_ids.add(pid) + if not item.get("message_text"): + item["message_text"] = msg.get("text", "") + found.append(item) + + if data.get("done", True): + break + offset = data.get("next_offset", offset + 5000) + time.sleep(0.34) + + return found + + # -- скачивание одного фото (для потока) -- + + def _download_single( + self, photo_data: dict, dialog_dir: Path + ) -> Optional[PhotoInfo]: + """Скачивает одно фото, записывает EXIF и utime. Возвращает PhotoInfo или None.""" + photo = photo_data["photo"] + photo_id = photo.get("id", 0) + url = photo_data["url"] + date_ts: int = photo_data.get("date", photo.get("date", 0)) + + # Формируем путь: dialog_dir / YYYY / photo_{id}_{date}.jpg + dt = datetime.fromtimestamp(date_ts) if date_ts else datetime.now() + subfolder = dt.strftime("%Y") + filename = f"photo_{photo_id}_{dt.strftime('%Y%m%d_%H%M%S')}.jpg" + filepath = dialog_dir / subfolder / filename + + # Скачиваем с retry + for attempt in range(config.MAX_RETRIES): + try: + resp = self._http_session.get( + url, timeout=config.DOWNLOAD_TIMEOUT, stream=True + ) + resp.raise_for_status() + filepath.parent.mkdir(parents=True, exist_ok=True) + with open(filepath, "wb") as f: + for chunk in resp.iter_content(chunk_size=8192): + f.write(chunk) + break + except (requests.RequestException, OSError) as exc: + if attempt < config.MAX_RETRIES - 1: + time.sleep(2 ** attempt) + else: + tqdm.write(f" Ошибка скачивания ({filename}): {exc}") + return None + + # Метаданные + sender_id = photo_data.get("from_id", 0) + sender_name = self._user_cache.get(sender_id, f"id{sender_id}") if sender_id else "" + + info = PhotoInfo( + photo_id=photo_id, + owner_id=photo.get("owner_id", 0), + url=url, + date=date_ts, + sender_id=sender_id, + sender_name=sender_name, + message_text=photo_data.get("message_text", ""), + photo_text=photo.get("text", ""), + lat=photo.get("lat"), + long=photo.get("long"), + ) + + # EXIF + ExifWriter.write_exif(filepath, info) + + # Дата файла = дата сообщения + if date_ts: + os.utime(filepath, (date_ts, date_ts)) + + return info + + # -- обработка одного диалога -- + + def _process_dialog( + self, peer_id: int, dialog_name: str, photos_bar: tqdm + ) -> None: + """Обрабатывает один диалог: собирает и скачивает все фото.""" + + saved = self.progress.get_current_dialog() + resuming = saved is not None and saved.get("peer_id") == peer_id + + if not resuming: + self.progress.set_current_dialog(peer_id) + + known_ids: set[int] = set() + all_photos: list[dict] = [] + + # Фаза 1: вложения через execute + getHistoryAttachments + tqdm.write(f" [{dialog_name}] Сбор фото-вложений...") + att_photos = self._collect_all_attachment_photos(peer_id) + for p in att_photos: + pid = p["photo"].get("id", 0) + if pid and pid not in known_ids: + known_ids.add(pid) + all_photos.append(p) + + # Фаза 2: пересланные сообщения через execute + getHistory + if not self._stop_requested: + tqdm.write(f" [{dialog_name}] Поиск фото в пересланных сообщениях...") + fwd_photos = self._collect_forwarded_photos(peer_id, known_ids) + all_photos.extend(fwd_photos) + + if not all_photos: + tqdm.write(f" [{dialog_name}] Нет фото") + if not self._stop_requested: + self.progress.mark_dialog_completed(peer_id) + return + + # Фильтруем уже скачанные + tasks = [ + p for p in all_photos + if not self.progress.is_photo_downloaded(p["photo"].get("id", 0)) + ] + skipped = len(all_photos) - len(tasks) + + tqdm.write( + f" [{dialog_name}] Всего: {len(all_photos)}, " + f"скачать: {len(tasks)}, пропустить: {skipped}" + ) + + if not tasks: + if not self._stop_requested: + self.progress.mark_dialog_completed(peer_id) + return + + # Предзагрузка имён отправителей пакетом + sender_ids = list({t.get("from_id", 0) for t in tasks if t.get("from_id", 0)}) + if sender_ids: + self._prefetch_user_names(sender_ids) + + # Настройка прогресс-бара + photos_bar.reset(total=len(all_photos)) + photos_bar.n = skipped + photos_bar.refresh() + photos_bar.set_description(f"Фото ({dialog_name[:25]})") + + safe_dialog = self._safe_name(dialog_name) + dialog_dir = self.download_dir / f"{safe_dialog}_id{peer_id}" + + # -- Параллельное скачивание через ThreadPoolExecutor -- + completed_count = 0 + + with ThreadPoolExecutor(max_workers=config.DOWNLOAD_WORKERS) as executor: + futures: dict = {} + + for task in tasks: + if self._stop_requested: + break + if not self._check_free_space(): + tqdm.write("Остановка из-за нехватки места на диске.") + self._stop_requested = True + break + future = executor.submit(self._download_single, task, dialog_dir) + futures[future] = task + + for future in as_completed(futures): + task = futures[future] + photo_id = task["photo"].get("id", 0) + + try: + result = future.result() + if result is not None: + self.progress.mark_photo_downloaded(photo_id) + else: + self.progress.increment_errors() + except Exception: + self.progress.increment_errors() + + photos_bar.update(1) + completed_count += 1 + + # Сохраняем прогресс каждые 50 фото + if completed_count % 50 == 0: + self.progress.save() + + if self._stop_requested: + # Отменяем ещё не запущенные задачи + for f in futures: + f.cancel() + break + + self.progress.save() + + if not self._stop_requested: + self.progress.mark_dialog_completed(peer_id) + + # -- главный цикл -- + + def run(self) -> None: + """Запускает процесс скачивания фотографий.""" + tqdm.write("=" * 60) + tqdm.write(" Выгрузка фото из диалогов ВКонтакте") + tqdm.write(f" Потоков скачивания: {config.DOWNLOAD_WORKERS}") + tqdm.write("=" * 60) + + if not config.VK_TOKEN: + tqdm.write( + "ОШИБКА: Заполни VK_TOKEN в config.py!\n" + "Инструкция по получению токена — в комментарии в config.py" + ) + sys.exit(1) + + try: + me = self.api.users.get()[0] + my_name = f"{me['first_name']} {me['last_name']}" + tqdm.write(f"Авторизован как: {my_name}") + self._user_cache[me["id"]] = my_name + except Exception as exc: + tqdm.write(f"ОШИБКА авторизации: {exc}") + tqdm.write("Проверь VK_TOKEN в config.py") + sys.exit(1) + + conversations = self._get_all_conversations() + self.progress.data["dialogs_total"] = len(conversations) + self.progress.save() + + # Предзагрузка имён для всех собеседников + peer_ids = [c["peer_id"] for c in conversations] + tqdm.write("Загружаю имена собеседников...") + self._prefetch_user_names(peer_ids) + + completed_ids: set[int] = set(self.progress.data["dialogs_completed"]) + remaining = [c for c in conversations if c["peer_id"] not in completed_ids] + + current = self.progress.get_current_dialog() + if current: + cur_pid = current["peer_id"] + remaining = [c for c in remaining if c["peer_id"] != cur_pid] + for c in conversations: + if c["peer_id"] == cur_pid: + remaining.insert(0, c) + break + + stats = self.progress.data["stats"] + tqdm.write( + f"\nПрогресс: {len(completed_ids)}/{len(conversations)} диалогов, " + f"{stats['photos_downloaded']} фото уже скачано" + ) + tqdm.write(f"Осталось обработать: {len(remaining)} диалогов") + tqdm.write("-" * 60) + + dialogs_bar = tqdm( + total=len(conversations), + initial=len(completed_ids), + desc="Диалоги", + unit=" диал", + position=0, + dynamic_ncols=True, + ) + photos_bar = tqdm( + total=0, + desc="Фото", + unit=" фото", + position=1, + leave=False, + dynamic_ncols=True, + ) + + try: + for conv in remaining: + if self._stop_requested: + break + + peer_id = conv["peer_id"] + dialog_name = self._get_user_name(peer_id) + + self._process_dialog(peer_id, dialog_name, photos_bar) + + if not self._stop_requested: + dialogs_bar.update(1) + + finally: + photos_bar.close() + dialogs_bar.close() + self.progress.save() + + stats = self.progress.data["stats"] + completed_count = len(self.progress.data["dialogs_completed"]) + total_count = self.progress.data["dialogs_total"] + + print("\n" + "=" * 60) + print(" Итого:") + print(f" Фото скачано: {stats['photos_downloaded']}") + print(f" Фото пропущено: {stats['photos_skipped']}") + print(f" Ошибок: {stats['errors']}") + print(f" Диалогов обработано: {completed_count}/{total_count}") + if self._stop_requested: + print("\n Работа остановлена. Запусти скрипт снова для продолжения.") + else: + print("\n Все диалоги обработаны!") + print("=" * 60) + + +# --------------------------------------------------------------------------- +# Точка входа +# --------------------------------------------------------------------------- + +def main() -> None: + """Точка входа скрипта.""" + downloader = VKPhotoDownloader() + downloader.run() + + +if __name__ == "__main__": + main() diff --git a/main_video.py b/main_video.py new file mode 100644 index 0000000..a6193e1 --- /dev/null +++ b/main_video.py @@ -0,0 +1,823 @@ +#!/usr/bin/env python3 +""" +Скрипт для выгрузки СВОИХ видео из личных диалогов ВКонтакте. + +Скачивает только видео, где owner_id == ваш ID (загруженные вами). +Чужие видео, видео сообществ — пропускаются. + +Использование: + 1. Заполни VK_TOKEN в config.py + 2. pip install -r requirements.txt + 3. python main_video.py + 4. Ctrl+C для остановки (прогресс сохраняется) + 5. Повторный запуск продолжит с места остановки +""" + +import json +import os +import shutil +import signal +import sys +import threading +import time +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Optional + +import requests +import vk_api +from tqdm import tqdm + +import config + +# Приоритет качества видео (от лучшего к худшему) +VIDEO_QUALITY_PRIORITY: list[str] = [ + "mp4_2160", "mp4_1440", "mp4_1080", "mp4_720", "mp4_480", "mp4_360", "mp4_240", "mp4_144", +] + + +# --------------------------------------------------------------------------- +# Модели данных +# --------------------------------------------------------------------------- + +@dataclass +class VideoMeta: + """Метаданные видео для JSON-сайдкара.""" + + video_id: int + owner_id: int + title: str + duration: int + date: int + sender_id: int + sender_name: str + message_text: str + quality: str + + +# --------------------------------------------------------------------------- +# Менеджер прогресса +# --------------------------------------------------------------------------- + +class ProgressManager: + """Прогресс с resume-механизмом (потокобезопасный).""" + + def __init__(self, progress_file: str) -> None: + self.progress_file: Path = Path(progress_file) + self._lock = threading.Lock() + self.data: dict = self._load() + self._downloaded_ids: set[int] = set( + self.data.get("downloaded_video_ids", []) + ) + + def _load(self) -> dict: + if self.progress_file.exists(): + try: + with open(self.progress_file, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return self._default_state() + + @staticmethod + def _default_state() -> dict: + return { + "version": 3, + "last_updated": "", + "dialogs_total": 0, + "dialogs_completed": [], + "current_dialog": None, + "downloaded_video_ids": [], + "stats": { + "videos_downloaded": 0, + "foreign_skipped": 0, + "external_saved": 0, + "no_files": 0, + "errors": 0, + "bytes_downloaded": 0, + }, + } + + def save(self) -> None: + with self._lock: + self.data["last_updated"] = datetime.now().isoformat() + self.data["downloaded_video_ids"] = list(self._downloaded_ids) + tmp = self.progress_file.with_suffix(".tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump(self.data, f, ensure_ascii=False, indent=2) + tmp.replace(self.progress_file) + + def is_dialog_completed(self, peer_id: int) -> bool: + return peer_id in self.data["dialogs_completed"] + + def mark_dialog_completed(self, peer_id: int) -> None: + with self._lock: + if peer_id not in self.data["dialogs_completed"]: + self.data["dialogs_completed"].append(peer_id) + self.data["current_dialog"] = None + self.save() + + def set_current_dialog(self, peer_id: int) -> None: + with self._lock: + self.data["current_dialog"] = {"peer_id": peer_id} + self.save() + + def get_current_dialog(self) -> Optional[dict]: + return self.data.get("current_dialog") + + def is_video_downloaded(self, video_id: int) -> bool: + return video_id in self._downloaded_ids + + def mark_video_downloaded(self, video_id: int, size: int) -> None: + with self._lock: + self._downloaded_ids.add(video_id) + self.data["stats"]["videos_downloaded"] += 1 + self.data["stats"]["bytes_downloaded"] += size + + def increment_foreign(self, count: int = 1) -> None: + with self._lock: + self.data["stats"]["foreign_skipped"] += count + + def increment_external(self) -> None: + with self._lock: + self.data["stats"]["external_saved"] += 1 + + def increment_no_files(self) -> None: + with self._lock: + self.data["stats"]["no_files"] += 1 + + def increment_errors(self) -> None: + with self._lock: + self.data["stats"]["errors"] += 1 + + +# --------------------------------------------------------------------------- +# Основной загрузчик +# --------------------------------------------------------------------------- + +class VKVideoDownloader: + """Скачивание своих видео из диалогов ВК.""" + + def __init__(self) -> None: + self._stop_requested: bool = False + self.progress = ProgressManager(config.VIDEO_PROGRESS_FILE) + self.download_dir = Path(config.VIDEO_DOWNLOAD_DIR) + self.download_dir.mkdir(parents=True, exist_ok=True) + self._user_cache: dict[int, str] = {} + self._my_id: int = 0 + + # HTTP-сессия для скачивания файлов + self._http = requests.Session() + + # Отдельная сессия для video.get БЕЗ User-Agent браузера + # (VK не отдаёт поле files если видит браузерный UA) + self._video_api_session = requests.Session() + + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + # vk_api для всего кроме video.get + self._vk_session = vk_api.VkApi( + token=config.VK_TOKEN, api_version=config.API_VERSION, + ) + self.api = self._vk_session.get_api() + + def _signal_handler(self, _signum: int, _frame: object) -> None: + if self._stop_requested: + tqdm.write("\nПринудительная остановка!") + sys.exit(1) + self._stop_requested = True + tqdm.write("\nОстановка... сохраняю прогресс.") + + # -- утилиты -- + + def _check_free_space(self) -> bool: + usage = shutil.disk_usage(str(self.download_dir)) + free_mb = usage.free / (1024 * 1024) + if free_mb < config.VIDEO_MIN_FREE_SPACE_MB: + tqdm.write( + f"Мало места! Свободно: {free_mb:.0f} МБ, " + f"минимум: {config.VIDEO_MIN_FREE_SPACE_MB} МБ" + ) + return False + return True + + @staticmethod + def _safe_name(name: str) -> str: + return "".join(c if c.isalnum() or c in ("_", "-") else "_" for c in name) + + @staticmethod + def _format_size(size_bytes: int) -> str: + sz = float(size_bytes) + for unit in ("Б", "КБ", "МБ", "ГБ"): + if sz < 1024: + return f"{sz:.1f} {unit}" + sz /= 1024 + return f"{sz:.1f} ТБ" + + @staticmethod + def _format_duration(seconds: int) -> str: + m, s = divmod(seconds, 60) + h, m = divmod(m, 60) + if h: + return f"{h}:{m:02d}:{s:02d}" + return f"{m}:{s:02d}" + + # -- VK API -- + + def _execute(self, code: str) -> dict: + return self._vk_session.method("execute", {"code": code}) + + def _video_get_raw(self, video_keys: list[str]) -> list[dict]: + """Вызов video.get через requests БЕЗ браузерного User-Agent. + + VK не отдаёт поле files если видит User-Agent браузера. + """ + result: list[dict] = [] + for i in range(0, len(video_keys), 200): + if self._stop_requested: + break + batch = video_keys[i:i + 200] + try: + resp = self._video_api_session.post( + "https://api.vk.com/method/video.get", + data={ + "videos": ",".join(batch), + "access_token": config.VK_TOKEN, + "v": config.API_VERSION, + }, + timeout=30, + ) + data = resp.json() + items = data.get("response", {}).get("items", []) + result.extend(items) + except Exception as exc: + tqdm.write(f" video.get ошибка: {exc}") + time.sleep(0.34) + return result + + def _prefetch_user_names(self, user_ids: list[int]) -> None: + need_users: list[int] = [] + need_groups: list[int] = [] + + for uid in user_ids: + if uid in self._user_cache or uid == 0: + continue + if uid > 2_000_000_000: + self._user_cache[uid] = f"Беседа_{uid - 2_000_000_000}" + elif uid > 0: + need_users.append(uid) + else: + need_groups.append(abs(uid)) + + for i in range(0, len(need_users), 1000): + batch = need_users[i:i + 1000] + try: + ids_str = ",".join(str(x) for x in batch) + users = self.api.users.get(user_ids=ids_str) + for u in users: + self._user_cache[u["id"]] = f"{u['first_name']} {u['last_name']}" + except Exception: + pass + time.sleep(0.34) + + for i in range(0, len(need_groups), 500): + batch = need_groups[i:i + 500] + try: + ids_str = ",".join(str(x) for x in batch) + resp = self.api.groups.getById(group_ids=ids_str) + groups = resp if isinstance(resp, list) else resp.get("groups", []) + for g in groups: + self._user_cache[-g["id"]] = g.get("name", f"group_{g['id']}") + except Exception: + pass + time.sleep(0.34) + + def _get_user_name(self, user_id: int) -> str: + if user_id in self._user_cache: + return self._user_cache[user_id] + self._prefetch_user_names([user_id]) + return self._user_cache.get(user_id, f"id{user_id}") + + # -- получение диалогов -- + + def _get_all_conversations(self) -> list[dict]: + conversations: list[dict] = [] + offset = 0 + + tqdm.write("Получаю список диалогов...") + while True: + code = f""" + var results = []; + var offset = {offset}; + var i = 0; + while (i < 25) {{ + var resp = API.messages.getConversations({{ + "offset": offset, "count": 200, "extended": 0 + }}); + results.push(resp); + offset = offset + 200; + if (offset >= resp.count || resp.items.length == 0) {{ + return {{"results": results, "done": true}}; + }} + i = i + 1; + }} + return {{"results": results, "done": false, "next_offset": offset}}; + """ + try: + data = self._execute(code) + except Exception as exc: + tqdm.write(f" execute ошибка: {exc}, фоллбэк") + return self._get_conversations_fallback() + + for page in data.get("results", []): + if not page: + continue + for item in page.get("items", []): + peer = item["conversation"]["peer"] + if peer["type"] == "chat": + continue + conversations.append( + {"peer_id": peer["id"], "type": peer["type"]} + ) + + if data.get("done", True): + break + offset = data.get("next_offset", offset + 5000) + time.sleep(0.34) + + tqdm.write(f"Найдено личных диалогов: {len(conversations)}") + return conversations + + def _get_conversations_fallback(self) -> list[dict]: + conversations: list[dict] = [] + offset = 0 + while True: + resp = self.api.messages.getConversations( + offset=offset, count=200, extended=0, + ) + items = resp.get("items", []) + if not items: + break + for item in items: + peer = item["conversation"]["peer"] + if peer["type"] == "chat": + continue + conversations.append( + {"peer_id": peer["id"], "type": peer["type"]} + ) + offset += 200 + if offset >= resp.get("count", 0): + break + time.sleep(0.34) + return conversations + + # -- сбор видео-вложений -- + + def _collect_my_videos(self, peer_id: int) -> list[dict]: + """Собирает видео-вложения, фильтруя только свои (owner_id == my_id).""" + all_videos: list[dict] = [] + cursor = "" + foreign_count = 0 + + while not self._stop_requested: + code = f""" + var results = []; + var cursor = "{cursor}"; + var i = 0; + while (i < 25) {{ + var params = {{ + "peer_id": {peer_id}, + "media_type": "video", + "count": 200, + "preserve_order": 1 + }}; + if (cursor != "") {{ + params.start_from = cursor; + }} + var resp = API.messages.getHistoryAttachments(params); + results.push(resp); + if (!resp.next_from || resp.items.length == 0) {{ + return {{"results": results, "cursor": ""}}; + }} + cursor = resp.next_from; + i = i + 1; + }} + return {{"results": results, "cursor": cursor}}; + """ + try: + data = self._execute(code) + except Exception as exc: + tqdm.write(f" execute ошибка: {exc}, фоллбэк") + fb_result, fb_foreign = self._collect_videos_fallback( + peer_id, all_videos, cursor, + ) + foreign_count += fb_foreign + break + + for page in data.get("results", []): + if not page: + continue + for item in page.get("items", []): + att = item.get("attachment", {}) + if att.get("type") != "video": + continue + video = att["video"] + if video.get("owner_id") != self._my_id: + foreign_count += 1 + continue + all_videos.append({ + "video": video, + "from_id": item.get("from_id", 0), + "date": item.get("date", video.get("date", 0)), + "message_text": "", + }) + + cursor = data.get("cursor", "") + if not cursor: + break + time.sleep(0.34) + + if foreign_count: + self.progress.increment_foreign(foreign_count) + + return all_videos + + def _collect_videos_fallback( + self, peer_id: int, existing: list[dict], start_from: str, + ) -> tuple[list[dict], int]: + cursor: Optional[str] = start_from or None + foreign_count = 0 + while not self._stop_requested: + params: dict = { + "peer_id": peer_id, "media_type": "video", + "count": 200, "preserve_order": 1, + } + if cursor: + params["start_from"] = cursor + resp = self.api.messages.getHistoryAttachments(**params) + items = resp.get("items", []) + cursor = resp.get("next_from") + if not items: + break + for item in items: + att = item.get("attachment", {}) + if att.get("type") != "video": + continue + video = att["video"] + if video.get("owner_id") != self._my_id: + foreign_count += 1 + continue + existing.append({ + "video": video, + "from_id": item.get("from_id", 0), + "date": item.get("date", video.get("date", 0)), + "message_text": "", + }) + if not cursor: + break + time.sleep(0.34) + return existing, foreign_count + + # -- выбор лучшего качества -- + + @staticmethod + def _best_video_url(files: dict) -> Optional[tuple[str, str]]: + """Выбирает URL видео максимального качества. + + Возвращает (url, quality) или None. + """ + for quality in VIDEO_QUALITY_PRIORITY: + url = files.get(quality) + if url: + return (url, quality) + return None + + # -- скачивание одного видео -- + + def _download_single( + self, video_data: dict, files: dict, + ) -> Optional[VideoMeta]: + """Скачивает одно видео через прямой URL. Возвращает VideoMeta или None.""" + video = video_data["video"] + video_id = video.get("id", 0) + date_ts: int = video_data.get("date", video.get("date", 0)) + title = video.get("title", "") + duration = video.get("duration", 0) + + best = self._best_video_url(files) + if not best: + self.progress.increment_no_files() + return None + + url, quality = best + + # Путь: downloads_video/video_{id}_{date}_{title}.mp4 + dt = datetime.fromtimestamp(date_ts) if date_ts else datetime.now() + safe_title = self._safe_name(title)[:50] if title else "" + suffix = f"_{safe_title}" if safe_title else "" + filename = f"video_{video_id}_{dt.strftime('%Y%m%d_%H%M%S')}{suffix}.mp4" + filepath = self.download_dir / filename + + if filepath.exists(): + return None + + # Скачиваем с retry и стримингом (128 КБ чанки) + file_size = 0 + for attempt in range(config.MAX_RETRIES): + try: + resp = self._http.get( + url, timeout=config.VIDEO_DOWNLOAD_TIMEOUT, stream=True, + ) + resp.raise_for_status() + + with open(filepath, "wb") as f: + for chunk in resp.iter_content(chunk_size=131072): + if self._stop_requested: + f.close() + filepath.unlink(missing_ok=True) + return None + f.write(chunk) + + file_size = filepath.stat().st_size + break + except (requests.RequestException, OSError) as exc: + filepath.unlink(missing_ok=True) + if attempt < config.MAX_RETRIES - 1: + time.sleep(2 ** attempt) + else: + tqdm.write(f" Ошибка скачивания ({filename}): {exc}") + return None + + # Метаданные + sender_id = video_data.get("from_id", 0) + sender_name = ( + self._user_cache.get(sender_id, f"id{sender_id}") + if sender_id else "" + ) + + info = VideoMeta( + video_id=video_id, + owner_id=video.get("owner_id", 0), + title=title, + duration=duration, + date=date_ts, + sender_id=sender_id, + sender_name=sender_name, + message_text=video_data.get("message_text", ""), + quality=quality, + ) + + # Дата файла = дата сообщения + if date_ts: + os.utime(filepath, (date_ts, date_ts)) + + return info + + # -- обработка одного диалога -- + + def _process_dialog( + self, peer_id: int, dialog_name: str, bar: tqdm, + ) -> None: + saved = self.progress.get_current_dialog() + resuming = saved is not None and saved.get("peer_id") == peer_id + + if not resuming: + self.progress.set_current_dialog(peer_id) + + # Сбор только моих видео + raw_videos = self._collect_my_videos(peer_id) + if not raw_videos: + if not self._stop_requested: + self.progress.mark_dialog_completed(peer_id) + return + + # Дедупликация по video_id + seen: set[int] = set() + unique: list[dict] = [] + for v in raw_videos: + vid = v["video"].get("id", 0) + if vid and vid not in seen: + seen.add(vid) + unique.append(v) + + # Фильтруем уже скачанные + tasks = [ + v for v in unique + if not self.progress.is_video_downloaded(v["video"].get("id", 0)) + ] + skipped = len(unique) - len(tasks) + + tqdm.write( + f" [{dialog_name}] Моих видео: {len(unique)}, " + f"скачать: {len(tasks)}, пропустить: {skipped}" + ) + + if not tasks: + if not self._stop_requested: + self.progress.mark_dialog_completed(peer_id) + return + + # Получаем прямые URL через video.get (без браузерного UA) + tqdm.write(f" [{dialog_name}] Получаю URL видеофайлов...") + video_keys: list[str] = [] + for t in tasks: + v = t["video"] + key = f"{v.get('owner_id', 0)}_{v.get('id', 0)}" + ak = v.get("access_key", "") + if ak: + key += f"_{ak}" + video_keys.append(key) + + details = self._video_get_raw(video_keys) + # Индекс: video_id → files + files_map: dict[int, dict] = {} + for d in details: + files_map[d["id"]] = d.get("files", {}) + + # Предзагрузка имён + sender_ids = list({t.get("from_id", 0) for t in tasks if t.get("from_id", 0)}) + if sender_ids: + self._prefetch_user_names(sender_ids) + + # Прогресс-бар + bar.reset(total=len(unique)) + bar.n = skipped + bar.refresh() + bar.set_description(f"Видео ({dialog_name[:25]})") + + # Последовательное скачивание + for task in tasks: + if self._stop_requested: + break + if not self._check_free_space(): + tqdm.write("Остановка: мало места на диске.") + self._stop_requested = True + break + + video_id = task["video"].get("id", 0) + files = files_map.get(video_id, {}) + + # Проверяем: внешнее видео (YouTube и т.д.)? + if "external" in files and not any( + k.startswith("mp4_") for k in files + ): + self.progress.increment_external() + bar.update(1) + self.progress.save() + continue + + # Убираем служебные поля + files.pop("failover_host", None) + files.pop("hls_ondemand", None) + files.pop("dash_ondemand", None) + files.pop("external", None) + + result = self._download_single(task, files) + + if result is not None: + fsize = 0 + dt = datetime.fromtimestamp(task.get("date", 0) or time.time()) + safe_title = self._safe_name(task["video"].get("title", ""))[:50] + sfx = f"_{safe_title}" if safe_title else "" + fname = f"video_{video_id}_{dt.strftime('%Y%m%d_%H%M%S')}{sfx}.mp4" + fpath = self.download_dir / fname + if fpath.exists(): + fsize = fpath.stat().st_size + + self.progress.mark_video_downloaded(video_id, fsize) + dur_str = self._format_duration(task["video"].get("duration", 0)) + tqdm.write( + f" ✓ {task['video'].get('title', '')[:40]} " + f"({dur_str}, {result.quality}, {self._format_size(fsize)})" + ) + else: + if not files or not self._best_video_url(files): + pass # increment_no_files уже вызван в _download_single + else: + self.progress.increment_errors() + + bar.update(1) + self.progress.save() + + if not self._stop_requested: + self.progress.mark_dialog_completed(peer_id) + + # -- главный цикл -- + + def run(self) -> None: + tqdm.write("=" * 60) + tqdm.write(" Выгрузка СВОИХ видео из диалогов ВКонтакте") + tqdm.write("=" * 60) + + if not config.VK_TOKEN: + tqdm.write("ОШИБКА: Заполни VK_TOKEN в config.py!") + sys.exit(1) + + try: + me = self.api.users.get()[0] + self._my_id = me["id"] + my_name = f"{me['first_name']} {me['last_name']}" + tqdm.write(f"Авторизован как: {my_name} (id{self._my_id})") + tqdm.write(f"Скачиваю только видео с owner_id={self._my_id}") + self._user_cache[me["id"]] = my_name + except Exception as exc: + tqdm.write(f"ОШИБКА авторизации: {exc}") + sys.exit(1) + + conversations = self._get_all_conversations() + self.progress.data["dialogs_total"] = len(conversations) + self.progress.save() + + peer_ids = [c["peer_id"] for c in conversations] + tqdm.write("Загружаю имена собеседников...") + self._prefetch_user_names(peer_ids) + + completed_ids = set(self.progress.data["dialogs_completed"]) + remaining = [c for c in conversations if c["peer_id"] not in completed_ids] + + current = self.progress.get_current_dialog() + if current: + cur_pid = current["peer_id"] + remaining = [c for c in remaining if c["peer_id"] != cur_pid] + for c in conversations: + if c["peer_id"] == cur_pid: + remaining.insert(0, c) + break + + stats = self.progress.data["stats"] + tqdm.write( + f"\nПрогресс: {len(completed_ids)}/{len(conversations)} диалогов, " + f"{stats['videos_downloaded']} видео скачано, " + f"{stats['foreign_skipped']} чужих пропущено" + ) + tqdm.write(f"Осталось: {len(remaining)} диалогов") + tqdm.write("-" * 60) + + dialogs_bar = tqdm( + total=len(conversations), + initial=len(completed_ids), + desc="Диалоги", + unit=" диал", + position=0, + dynamic_ncols=True, + ) + videos_bar = tqdm( + total=0, + desc="Видео", + unit=" видео", + position=1, + leave=False, + dynamic_ncols=True, + ) + + try: + for conv in remaining: + if self._stop_requested: + break + peer_id = conv["peer_id"] + dialog_name = self._get_user_name(peer_id) + self._process_dialog(peer_id, dialog_name, videos_bar) + if not self._stop_requested: + dialogs_bar.update(1) + finally: + videos_bar.close() + dialogs_bar.close() + self.progress.save() + + stats = self.progress.data["stats"] + completed_count = len(self.progress.data["dialogs_completed"]) + total_count = self.progress.data["dialogs_total"] + + print("\n" + "=" * 60) + print(" Итого:") + print(f" Моих видео скачано: {stats['videos_downloaded']}") + print(f" Чужих видео пропущено: {stats['foreign_skipped']}") + ext = stats.get("external_saved", 0) + if ext: + print(f" Внешних (YouTube и т.п.):{ext}") + nf = stats.get("no_files", 0) + if nf: + print(f" Без файлов (удалены?): {nf}") + print(f" Ошибок: {stats['errors']}") + print(f" Скачано: {self._format_size(stats['bytes_downloaded'])}") + print(f" Диалогов обработано: {completed_count}/{total_count}") + if self._stop_requested: + print("\n Остановлено. Запусти снова для продолжения.") + else: + print("\n Все диалоги обработаны!") + print("=" * 60) + + +# --------------------------------------------------------------------------- +# Точка входа +# --------------------------------------------------------------------------- + +def main() -> None: + downloader = VKVideoDownloader() + downloader.run() + + +if __name__ == "__main__": + main() diff --git a/process_photos.py b/process_photos.py new file mode 100644 index 0000000..fa30ee3 --- /dev/null +++ b/process_photos.py @@ -0,0 +1,797 @@ +#!/usr/bin/env python3 +""" +Обработка скачанных фото: сквозная дедупликация и классификация. + +Все фото собираются из downloads/ (из всех диалогов), сортируются +хронологически (самые старые первыми) и раскладываются в output/: + output/personal/ — личные фото людей + output/travel/ — путешествия, места + output/food/ — еда + output/screenshots/ — скриншоты переписок + output/_duplicates/ — дубликаты (оригинал = самое раннее фото) + output/_junk/ — мемы, стикеры + output/_review/ — неуверенная классификация, art, document + +Использование: + python process_photos.py run # Полная обработка + python process_photos.py run --limit 200 # Тест на 200 фото + python process_photos.py run --dry-run # Без перемещения + python process_photos.py rollback # Откатить всё назад + python process_photos.py stats # Статистика +""" + +import argparse +import json +import os +import shutil +import signal +import sys +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from datetime import datetime +from pathlib import Path +from typing import Optional + +import imagehash +import numpy as np +from PIL import Image +from tqdm import tqdm + +import config + +# --------------------------------------------------------------------------- +# Константы +# --------------------------------------------------------------------------- + +# Папки-категории в output/ +DEDUP_DIR: str = "_duplicates" +JUNK_DIR: str = "_junk" +REVIEW_DIR: str = "_review" + +# Описания категорий для CLIP +CATEGORIES: dict[str, list[str]] = { + "personal": [ + "a personal photograph of real people", + "a selfie photo of a person", + "a group photo of friends or family", + "a portrait photograph of a person", + "a candid photo of people at an event or party", + ], + "travel": [ + "a landscape photograph of nature or scenery", + "a travel photograph of a famous place or landmark", + "a photograph of architecture or buildings", + "a cityscape or street photograph", + "a photograph from a vacation or trip", + ], + "food": [ + "a photograph of food or a dish on a plate", + "a restaurant or cafe photograph", + "a cooking or baking photograph", + ], + "screenshots": [ + "a screenshot of a mobile phone chat or text messages", + "a screenshot of a computer screen with interface", + "a screenshot of a social media post or webpage", + ], + "meme": [ + "a meme image with text overlay and funny picture", + "a demotivational poster image with black border", + "a comic strip or cartoon panel with speech bubbles", + "an internet joke image with caption text", + ], + "sticker": [ + "a small cartoon sticker on plain background", + "a simple emoji or emoticon image", + "a cartoon character sticker with transparent background", + ], + "art": [ + "a digital art or illustration drawing", + "a hand-drawn painting or artwork", + "abstract colorful art image", + ], + "document": [ + "a scanned printed document or text page", + "a photograph of a paper document", + "a handwritten note or letter photograph", + ], +} + +# Куда перемещать каждую категорию +KEEP_CATEGORIES: set[str] = {"personal", "travel", "food", "screenshots"} +JUNK_CATEGORIES: set[str] = {"meme", "sticker"} +REVIEW_CATEGORIES: set[str] = {"art", "document"} + + +# --------------------------------------------------------------------------- +# Утилиты +# --------------------------------------------------------------------------- + +def popcount64(arr: np.ndarray) -> np.ndarray: + """Vectorized подсчёт единичных бит в массиве uint64.""" + x = arr.astype(np.uint64) + x = x - ((x >> np.uint64(1)) & np.uint64(0x5555555555555555)) + x = (x & np.uint64(0x3333333333333333)) + ( + (x >> np.uint64(2)) & np.uint64(0x3333333333333333) + ) + x = (x + (x >> np.uint64(4))) & np.uint64(0x0F0F0F0F0F0F0F0F) + return ((x * np.uint64(0x0101010101010101)) >> np.uint64(56)).astype( + np.int32 + ) + + +def unique_dest(dest: Path) -> Path: + """Если файл с таким именем уже существует, добавляет суффикс _2, _3 и т.д.""" + if not dest.exists(): + return dest + stem = dest.stem + suffix = dest.suffix + parent = dest.parent + counter = 2 + while True: + candidate = parent / f"{stem}_{counter}{suffix}" + if not candidate.exists(): + return candidate + counter += 1 + + +# --------------------------------------------------------------------------- +# Трекер прогресса +# --------------------------------------------------------------------------- + +class ProgressTracker: + """Прогресс обработки с поддержкой возобновления.""" + + def __init__(self, filepath: str = config.PROCESS_PROGRESS_FILE) -> None: + self.filepath: Path = Path(filepath) + self.data: dict = self._load() + + def _load(self) -> dict: + """Загружает прогресс из файла.""" + if self.filepath.exists(): + try: + with open(self.filepath, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return { + "version": 2, + "hashed_files": {}, + "classified_files": {}, + "processed_files": [], + } + + def save(self) -> None: + """Сохраняет прогресс (атомарная запись).""" + self.data["last_updated"] = datetime.now().isoformat() + tmp = self.filepath.with_suffix(".tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump(self.data, f, ensure_ascii=False, indent=2) + tmp.replace(self.filepath) + + def reset(self) -> None: + """Полный сброс прогресса.""" + self.data = { + "version": 2, + "hashed_files": {}, + "classified_files": {}, + "processed_files": [], + } + self.save() + + +# --------------------------------------------------------------------------- +# Лог отката +# --------------------------------------------------------------------------- + +class RollbackLog: + """Журнал перемещений для отката.""" + + def __init__(self, filepath: str = config.ROLLBACK_LOG_FILE) -> None: + self.filepath: Path = Path(filepath) + self.moves: list[dict] = self._load() + + def _load(self) -> list[dict]: + if self.filepath.exists(): + try: + with open(self.filepath, "r", encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + return [] + + def log_move(self, src: str, dst: str, reason: str) -> None: + """Записывает перемещение.""" + self.moves.append({"src": src, "dst": dst, "reason": reason}) + self._save() + + def _save(self) -> None: + with open(self.filepath, "w", encoding="utf-8") as f: + json.dump(self.moves, f, ensure_ascii=False, indent=2) + + def rollback(self) -> int: + """Откатить все перемещения (LIFO). Возвращает кол-во восстановленных.""" + restored = 0 + for move in tqdm( + reversed(self.moves), total=len(self.moves), + desc="Откат", unit=" файлов", + ): + current = Path(move["dst"]) + original = Path(move["src"]) + if current.exists(): + try: + original.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(current), str(original)) + restored += 1 + except OSError as exc: + tqdm.write(f" Ошибка: {current.name}: {exc}") + self.moves.clear() + self._save() + return restored + + @property + def count(self) -> int: + return len(self.moves) + + +# --------------------------------------------------------------------------- +# Основной обработчик +# --------------------------------------------------------------------------- + +class PhotoProcessor: + """Сквозная обработка: сбор → сортировка → хеширование → деdup → CLIP → организация.""" + + def __init__(self, args: argparse.Namespace) -> None: + self.source_dir: Path = Path(args.source) + self.output_dir: Path = Path(args.output) + self.limit: Optional[int] = args.limit + self.threshold: int = args.threshold + self.dry_run: bool = args.dry_run + self._stop: bool = False + + signal.signal(signal.SIGINT, self._signal_handler) + signal.signal(signal.SIGTERM, self._signal_handler) + + self.progress = ProgressTracker() + self.rollback = RollbackLog() + + def _signal_handler(self, _sig: int, _frame: object) -> None: + if self._stop: + tqdm.write("\nПринудительная остановка!") + sys.exit(1) + self._stop = True + tqdm.write("\nОстановка... сохраняю прогресс.") + + # -- 1. Сбор и сортировка -- + + def _scan_photos(self) -> list[Path]: + """Собирает все фото из downloads/, сортирует по дате файла (oldest first).""" + photos: list[Path] = [] + for root, _dirs, files in os.walk(self.source_dir): + for f in files: + if f.lower().endswith((".jpg", ".jpeg", ".png")): + photos.append(Path(root) / f) + + # Сортировка по mtime (= дата сообщения ВК, установлена через os.utime) + photos.sort(key=lambda p: p.stat().st_mtime) + + if self.limit: + photos = photos[: self.limit] + + return photos + + # -- 2. Хеширование -- + + @staticmethod + def _compute_one_hash(photo_path: Path) -> Optional[tuple[str, str]]: + """Вычисляет pHash одного фото.""" + try: + with Image.open(photo_path) as img: + h = imagehash.phash(img, hash_size=config.HASH_SIZE) + return (str(photo_path), str(h)) + except Exception: + return None + + def _compute_hashes(self, photos: list[Path]) -> dict[str, str]: + """Вычисляет хеши для всех фото (с кешем и многопоточностью).""" + cached: dict[str, str] = self.progress.data.get("hashed_files", {}) + result: dict[str, str] = {} + to_hash: list[Path] = [] + + for p in photos: + key = str(p) + if key in cached: + result[key] = cached[key] + else: + to_hash.append(p) + + if not to_hash: + tqdm.write(f" Все {len(result)} хешей из кеша.") + return result + + tqdm.write(f" Хеширование: {len(to_hash)} новых + {len(result)} из кеша") + + bar = tqdm(total=len(to_hash), desc="Хеширование", unit=" фото") + with ThreadPoolExecutor(max_workers=config.HASH_WORKERS) as executor: + futures = { + executor.submit(self._compute_one_hash, p): p + for p in to_hash + } + done_count = 0 + for future in as_completed(futures): + if self._stop: + for f in futures: + f.cancel() + break + res = future.result() + if res: + path_str, hash_hex = res + result[path_str] = hash_hex + cached[path_str] = hash_hex + bar.update(1) + done_count += 1 + if done_count % 500 == 0: + self.progress.data["hashed_files"] = cached + self.progress.save() + bar.close() + + self.progress.data["hashed_files"] = cached + self.progress.save() + return result + + # -- 3. Хронологическая дедупликация -- + + def _dedup_chronological( + self, + sorted_photos: list[Path], + hashes: dict[str, str], + ) -> tuple[list[Path], list[Path]]: + """Проходит фото от старых к новым. Первое вхождение — оригинал, остальные — дубликаты. + + Возвращает (originals, duplicates). + """ + # Массив уникальных хешей (int) для vectorized сравнения + unique_hash_ints: list[int] = [] + unique_hash_np: Optional[np.ndarray] = None + + originals: list[Path] = [] + duplicates: list[Path] = [] + + tqdm.write(f" Сквозная дедупликация (порог: {self.threshold})...") + + for photo in tqdm( + sorted_photos, desc="Дедупликация", unit=" фото", leave=False, + ): + if self._stop: + break + + key = str(photo) + hash_hex = hashes.get(key) + if hash_hex is None: + # Не удалось вычислить хеш — пропускаем + continue + + hash_int = int(hash_hex, 16) + + is_duplicate = False + + if unique_hash_ints: + # Vectorized сравнение с уже встреченными уникальными хешами + if unique_hash_np is None or len(unique_hash_np) != len(unique_hash_ints): + unique_hash_np = np.array(unique_hash_ints, dtype=np.uint64) + + xor = np.bitwise_xor(np.uint64(hash_int), unique_hash_np) + distances = popcount64(xor) + if np.any(distances <= self.threshold): + is_duplicate = True + + if is_duplicate: + duplicates.append(photo) + else: + originals.append(photo) + unique_hash_ints.append(hash_int) + unique_hash_np = None # Инвалидируем кеш numpy-массива + + tqdm.write( + f" Оригиналов: {len(originals)}, дубликатов: {len(duplicates)}" + ) + return originals, duplicates + + # -- 4. CLIP классификация -- + + def _classify_photos( + self, photos: list[Path], + ) -> dict[str, tuple[str, float]]: + """Классифицирует фото через CLIP. Возвращает {path_str: (category, score)}.""" + import torch + import open_clip + + cached: dict[str, list] = self.progress.data.get("classified_files", {}) + result: dict[str, tuple[str, float]] = {} + to_classify: list[Path] = [] + + for p in photos: + key = str(p) + if key in cached: + result[key] = tuple(cached[key]) + elif p.exists(): + to_classify.append(p) + + if not to_classify: + tqdm.write(f" Все {len(result)} классификаций из кеша.") + return result + + tqdm.write(f" Классификация: {len(to_classify)} новых + {len(result)} из кеша") + + # Устройство + if torch.backends.mps.is_available(): + device = torch.device("mps") + tqdm.write(" Устройство: Apple Silicon MPS") + elif torch.cuda.is_available(): + device = torch.device("cuda") + else: + device = torch.device("cpu") + tqdm.write(" Устройство: CPU") + + # Загрузка модели + tqdm.write(" Загрузка CLIP ViT-B-32...") + model, _, preprocess = open_clip.create_model_and_transforms( + "ViT-B-32", pretrained="laion2b_s34b_b79k", + ) + tokenizer = open_clip.get_tokenizer("ViT-B-32") + model = model.to(device) + model.eval() + + # Подготовка text-эмбеддингов + all_prompts: list[str] = [] + category_mapping: list[str] = [] + for cat_name, prompts in CATEGORIES.items(): + for prompt in prompts: + all_prompts.append(prompt) + category_mapping.append(cat_name) + + tokens = tokenizer(all_prompts).to(device) + with torch.no_grad(): + text_features = model.encode_text(tokens) + text_features /= text_features.norm(dim=-1, keepdim=True) + + tqdm.write(" Модель загружена.") + + # Классификация батчами + batch_size = config.CLIP_BATCH_SIZE + bar = tqdm(total=len(to_classify), desc="Классификация", unit=" фото") + + for i in range(0, len(to_classify), batch_size): + if self._stop: + break + + batch_paths = to_classify[i: i + batch_size] + tensors: list = [] + valid_paths: list[Path] = [] + + for p in batch_paths: + try: + img = Image.open(p).convert("RGB") + tensors.append(preprocess(img)) + valid_paths.append(p) + except Exception: + pass + + if tensors: + image_batch = torch.stack(tensors).to(device) + with torch.no_grad(): + image_features = model.encode_image(image_batch) + image_features /= image_features.norm(dim=-1, keepdim=True) + similarities = image_features @ text_features.T + + for j, path in enumerate(valid_paths): + sims = similarities[j] + cat_scores: dict[str, list[float]] = defaultdict(list) + for idx, cat_name in enumerate(category_mapping): + cat_scores[cat_name].append(sims[idx].item()) + + cat_avg = { + cat: sum(scores) / len(scores) + for cat, scores in cat_scores.items() + } + best_cat = max(cat_avg, key=lambda k: cat_avg[k]) + best_score = cat_avg[best_cat] + + key = str(path) + result[key] = (best_cat, best_score) + cached[key] = [best_cat, best_score] + + bar.update(len(batch_paths)) + + if (i // batch_size) % 10 == 0: + self.progress.data["classified_files"] = cached + self.progress.save() + + bar.close() + self.progress.data["classified_files"] = cached + self.progress.save() + return result + + # -- 5. Организация (перемещение) -- + + def _move_photo( + self, photo: Path, dest_dir: Path, reason: str, + ) -> Optional[Path]: + """Перемещает фото в dest_dir (плоско, без подпапок диалогов). + + Возвращает путь назначения или None при dry-run. + """ + dest = unique_dest(dest_dir / photo.name) + + if self.dry_run: + tqdm.write(f" [DRY-RUN] {photo.name} → {dest_dir.name}/") + return None + + dest_dir.mkdir(parents=True, exist_ok=True) + shutil.move(str(photo), str(dest)) + self.rollback.log_move(str(photo), str(dest), reason) + return dest + + def _organize_all( + self, + originals: list[Path], + duplicates: list[Path], + classifications: dict[str, tuple[str, float]], + ) -> dict[str, int]: + """Перемещает все фото в output/ по категориям (плоская структура).""" + stats: dict[str, int] = defaultdict(int) + confidence_min = config.CLIP_CONFIDENCE_MIN + + # Дубликаты → output/_duplicates/ + tqdm.write("Перемещение дубликатов...") + for photo in tqdm(duplicates, desc="Дубликаты", unit=" фото", leave=False): + if self._stop: + break + self._move_photo(photo, self.output_dir / DEDUP_DIR, "duplicate") + stats["duplicates"] += 1 + + # Оригиналы → по категориям + tqdm.write("Перемещение оригиналов по категориям...") + for photo in tqdm(originals, desc="Организация", unit=" фото", leave=False): + if self._stop: + break + + key = str(photo) + cat_info = classifications.get(key) + + if cat_info is None: + # Нет классификации — в review + self._move_photo( + photo, + self.output_dir / REVIEW_DIR / "unclassified", + "unclassified", + ) + stats["review_unclassified"] += 1 + continue + + category, score = cat_info + + if score < confidence_min: + dest = self.output_dir / REVIEW_DIR / "low_confidence" + stats["review_low_conf"] += 1 + reason = f"low_conf:{category}" + elif category in JUNK_CATEGORIES: + dest = self.output_dir / JUNK_DIR + stats[f"junk_{category}"] += 1 + reason = f"junk:{category}" + elif category in REVIEW_CATEGORIES: + dest = self.output_dir / REVIEW_DIR / category + stats[f"review_{category}"] += 1 + reason = f"review:{category}" + elif category in KEEP_CATEGORIES: + dest = self.output_dir / category + stats[f"keep_{category}"] += 1 + reason = f"keep:{category}" + else: + dest = self.output_dir / REVIEW_DIR / "other" + stats["review_other"] += 1 + reason = f"other:{category}" + + self._move_photo(photo, dest, reason) + + return dict(stats) + + # -- Главная команда: run -- + + def run(self) -> None: + """Полная обработка: сбор → деdup → classify → организация.""" + self.output_dir.mkdir(parents=True, exist_ok=True) + + tqdm.write("=" * 60) + tqdm.write(" Обработка фото: дедупликация + классификация") + tqdm.write(f" Источник: {self.source_dir}/") + tqdm.write(f" Выход: {self.output_dir}/") + tqdm.write("=" * 60) + + # 1. Сбор фото, отсортированных по дате (oldest first) + tqdm.write("\n[1/5] Сбор и сортировка фото по дате...") + photos = self._scan_photos() + if not photos: + tqdm.write("Фото не найдено.") + return + + oldest = datetime.fromtimestamp(photos[0].stat().st_mtime) + newest = datetime.fromtimestamp(photos[-1].stat().st_mtime) + tqdm.write( + f" Найдено: {len(photos)} фото " + f"({oldest.strftime('%Y-%m-%d')} → {newest.strftime('%Y-%m-%d')})" + ) + + if self._stop: + return + + # 2. Хеширование + tqdm.write("\n[2/5] Вычисление перцептивных хешей...") + hashes = self._compute_hashes(photos) + if self._stop: + return + + # 3. Хронологическая дедупликация (от старых к новым) + tqdm.write("\n[3/5] Сквозная дедупликация (oldest = оригинал)...") + originals, duplicates = self._dedup_chronological(photos, hashes) + if self._stop: + return + + # 4. Классификация оригиналов через CLIP + tqdm.write(f"\n[4/5] Классификация {len(originals)} оригиналов...") + classifications = self._classify_photos(originals) + if self._stop: + return + + # 5. Перемещение в output/ + tqdm.write(f"\n[5/5] Организация файлов в {self.output_dir}/...") + stats = self._organize_all(originals, duplicates, classifications) + self.progress.save() + + # Итого + tqdm.write("\n" + "=" * 60) + tqdm.write(" Итого:") + total_keep = sum(v for k, v in stats.items() if k.startswith("keep_")) + total_junk = sum(v for k, v in stats.items() if k.startswith("junk_")) + total_review = sum(v for k, v in stats.items() if k.startswith("review_")) + total_dup = stats.get("duplicates", 0) + + tqdm.write(f" Оригиналов (keep): {total_keep}") + for k, v in sorted(stats.items()): + if k.startswith("keep_"): + tqdm.write(f" {k.replace('keep_', '')}: {v}") + tqdm.write(f" Дубликатов: {total_dup}") + tqdm.write(f" Мусор (junk): {total_junk}") + tqdm.write(f" На проверку (review): {total_review}") + tqdm.write(f" Всего обработано: {sum(stats.values())}") + tqdm.write("=" * 60) + + # -- Откат -- + + def run_rollback(self) -> None: + """Откатывает все перемещения обратно в downloads/.""" + tqdm.write("=" * 60) + tqdm.write(" Откат всех перемещений") + tqdm.write("=" * 60) + + if self.rollback.count == 0: + tqdm.write("Нет перемещений для отката.") + return + + tqdm.write(f"Записей в журнале: {self.rollback.count}") + restored = self.rollback.rollback() + tqdm.write(f"Восстановлено файлов: {restored}") + + # Сбрасываем прогресс + self.progress.reset() + + # Удаляем пустые папки в output/ + if self.output_dir.exists(): + self._remove_empty_dirs(self.output_dir) + if self.output_dir.exists() and not any(self.output_dir.iterdir()): + self.output_dir.rmdir() + tqdm.write(f" Удалена пустая папка: {self.output_dir.name}/") + + # -- Статистика -- + + def run_stats(self) -> None: + """Показывает статистику из отчёта.""" + report_path = self.output_dir / "classification_report.json" + if not report_path.exists(): + tqdm.write("Отчёт не найден. Сначала запусти run.") + return + + with open(report_path, "r", encoding="utf-8") as f: + report = json.load(f) + + tqdm.write("=" * 60) + tqdm.write(" Статистика обработки") + tqdm.write("=" * 60) + tqdm.write(f"Всего фото: {report.get('total_photos', '?')}") + tqdm.write(f"Дубликатов: {report.get('duplicates', '?')}") + tqdm.write(f"\nКатегории оригиналов:") + for cat, count in report.get("categories", {}).items(): + marker = "" + if cat in KEEP_CATEGORIES: + marker = " [ОСТАВИТЬ]" + elif cat in JUNK_CATEGORIES: + marker = " [МУСОР]" + elif cat in REVIEW_CATEGORIES: + marker = " [ПРОВЕРИТЬ]" + total = report.get("originals", report.get("total_photos", 1)) + pct = count * 100 / total if total else 0 + tqdm.write(f" {cat}: {count} ({pct:.1f}%){marker}") + + # -- Утилиты -- + + @staticmethod + def _remove_empty_dirs(path: Path) -> None: + """Рекурсивно удаляет пустые подпапки.""" + for child in sorted(path.rglob("*"), reverse=True): + if child.is_dir() and not any(child.iterdir()): + child.rmdir() + + +# --------------------------------------------------------------------------- +# Точка входа +# --------------------------------------------------------------------------- + +def main() -> None: + parser = argparse.ArgumentParser( + description="Сквозная дедупликация и классификация фото из ВК", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Примеры: + python process_photos.py run # Полная обработка + python process_photos.py run --limit 200 # Тест на 200 фото + python process_photos.py run --dry-run # Без перемещения + python process_photos.py rollback # Откат + python process_photos.py stats # Статистика + """, + ) + parser.add_argument( + "command", + choices=["run", "rollback", "stats"], + help="Команда: run | rollback | stats", + ) + parser.add_argument( + "--source", + default=config.DOWNLOAD_DIR, + help=f"Папка с фото (по умолчанию: {config.DOWNLOAD_DIR})", + ) + parser.add_argument( + "--output", + default=config.OUTPUT_DIR, + help=f"Папка для результатов (по умолчанию: {config.OUTPUT_DIR})", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="Обработать только первые N фото (для тестирования)", + ) + parser.add_argument( + "--threshold", + type=int, + default=config.DEDUP_THRESHOLD, + help=f"Порог Хэмминга (по умолчанию: {config.DEDUP_THRESHOLD})", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Показать без перемещения файлов", + ) + args = parser.parse_args() + + processor = PhotoProcessor(args) + + if args.command == "run": + processor.run() + elif args.command == "rollback": + processor.run_rollback() + elif args.command == "stats": + processor.run_stats() + + +if __name__ == "__main__": + main()