Files
vk-scripts/immich_upload.py
Andrey f760e94206 Initial commit: VK media tools
Скрипты для выгрузки фото и видео из диалогов ВКонтакте,
обработки (дедупликация + CLIP-классификация) и загрузки в Immich.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-16 21:14:50 +03:00

648 lines
27 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Массовая загрузка фото и видео в Immich с прогресс-баром и возобновлением.
Использование:
1. pip install requests tqdm
2. python immich_upload.py
3. Ctrl+C для остановки (прогресс сохраняется автоматически)
4. Повторный запуск продолжит с места остановки
"""
import hashlib
import json
import os
import re
import signal
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from threading import Lock
from typing import Optional
import requests
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Конфигурация
# ---------------------------------------------------------------------------
# Immich сервер (через Upload Optimizer прокси)
IMMICH_URL: str = "http://YOUR_IMMICH_HOST:2283"
# Прямой доступ к Immich (минуя оптимизатор) — fallback при ошибках оптимизатора
IMMICH_DIRECT_URL: str = "http://YOUR_IMMICH_HOST:2284"
API_KEY: str = ""
# Папки для загрузки (рекурсивный поиск медиафайлов)
UPLOAD_FOLDERS: list[str] = [
"downloads_video",
]
# Файл прогресса (для resume)
PROGRESS_FILE: str = "immich_upload_progress.json"
# Лог файлов, которые оптимизатор не смог обработать (загружены напрямую)
OPTIMIZER_FAILED_LOG: str = "optimizer_failed.txt"
# Параллельные загрузки (8 потоков — оптимально для 1 Гбит LAN)
MAX_WORKERS: int = 8
# Повторные попытки при ошибке
MAX_RETRIES: int = 3
# Таймаут загрузки одного файла (секунды)
UPLOAD_TIMEOUT: int = 300
# Расширения изображений
IMAGE_EXTENSIONS: set[str] = {
".jpg", ".jpeg", ".png", ".heic", ".heif", ".webp", ".gif",
".avif", ".bmp", ".tiff", ".tif", ".jxl", ".raw", ".rw2",
".dng", ".nef", ".cr2", ".arw", ".orf", ".pef", ".raf",
}
# Расширения видео
VIDEO_EXTENSIONS: set[str] = {
".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".3gp",
".flv", ".wmv", ".mts", ".m2ts", ".mpg", ".mpeg",
}
# Все допустимые расширения медиафайлов
MEDIA_EXTENSIONS: set[str] = IMAGE_EXTENSIONS | VIDEO_EXTENSIONS
# Режим загрузки: "photos" — только фото (видео пропускаются и логируются),
# "videos" — только видео, "all" — всё
UPLOAD_MODE: str = "videos"
# Лог пропущенных видео (при UPLOAD_MODE="photos")
SKIPPED_VIDEOS_LOG: str = "skipped_videos.txt"
# Идентификатор устройства (для Immich)
DEVICE_ID: str = "python-bulk-uploader"
# ---------------------------------------------------------------------------
# Модели данных
# ---------------------------------------------------------------------------
@dataclass
class UploadStats:
"""Статистика загрузки (потокобезопасная)."""
uploaded: int = 0
skipped: int = 0
duplicates: int = 0
errors: int = 0
bytes_sent: int = 0
_lock: Lock = field(default_factory=Lock, repr=False)
def inc_uploaded(self, size: int) -> None:
with self._lock:
self.uploaded += 1
self.bytes_sent += size
def inc_skipped(self) -> None:
with self._lock:
self.skipped += 1
def inc_duplicate(self) -> None:
with self._lock:
self.duplicates += 1
def inc_error(self) -> None:
with self._lock:
self.errors += 1
# ---------------------------------------------------------------------------
# Менеджер прогресса (resume)
# ---------------------------------------------------------------------------
class ProgressManager:
"""Управление файлом прогресса для механизма resume."""
def __init__(self, progress_file: str) -> None:
self.progress_file: Path = Path(progress_file)
self._lock: Lock = Lock()
self.data: dict = self._load()
# Множество для быстрого поиска уже загруженных файлов (по хешу пути)
self._uploaded_hashes: set[str] = set(self.data.get("uploaded_hashes", []))
def _load(self) -> dict:
"""Загружает прогресс из файла или создаёт пустой."""
if self.progress_file.exists():
try:
with open(self.progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {
"version": 1,
"last_updated": "",
"uploaded_hashes": [],
"stats": {
"uploaded": 0,
"skipped": 0,
"duplicates": 0,
"errors": 0,
"bytes_sent": 0,
},
}
def save(self) -> None:
"""Сохраняет текущее состояние прогресса в файл."""
with self._lock:
self.data["last_updated"] = datetime.now().isoformat()
self.data["uploaded_hashes"] = list(self._uploaded_hashes)
tmp_path = self.progress_file.with_suffix(".tmp")
with open(tmp_path, "w", encoding="utf-8") as f:
json.dump(self.data, f, ensure_ascii=False)
tmp_path.replace(self.progress_file)
@staticmethod
def _file_hash(filepath: Path) -> str:
"""Генерирует хеш на основе пути и размера файла (быстро, без чтения)."""
stat = filepath.stat()
key = f"{filepath.resolve()}|{stat.st_size}|{stat.st_mtime}"
return hashlib.md5(key.encode()).hexdigest()
def is_uploaded(self, filepath: Path) -> bool:
"""Проверяет, был ли файл уже загружен."""
return self._file_hash(filepath) in self._uploaded_hashes
def mark_uploaded(self, filepath: Path) -> None:
"""Помечает файл как загруженный."""
with self._lock:
self._uploaded_hashes.add(self._file_hash(filepath))
@property
def uploaded_count(self) -> int:
return len(self._uploaded_hashes)
# ---------------------------------------------------------------------------
# Сканер файлов
# ---------------------------------------------------------------------------
def scan_media_files(folders: list[str]) -> list[Path]:
"""Рекурсивно сканирует папки, возвращает отсортированный список медиафайлов."""
files: list[Path] = []
for folder in folders:
root = Path(folder)
if not root.exists():
tqdm.write(f" Папка не найдена: {folder}")
continue
for filepath in root.rglob("*"):
if not filepath.is_file():
continue
if filepath.suffix.lower() in MEDIA_EXTENSIONS:
files.append(filepath)
# Сортировка по имени для предсказуемого порядка
files.sort(key=lambda p: p.name)
return files
# ---------------------------------------------------------------------------
# Загрузчик
# ---------------------------------------------------------------------------
class ImmichUploader:
"""Загрузка файлов в Immich через API с многопоточностью."""
def __init__(self) -> None:
self._stop_requested: bool = False
self.progress: ProgressManager = ProgressManager(PROGRESS_FILE)
self.stats: UploadStats = UploadStats()
self._optimizer_fallbacks: int = 0 # Счётчик fallback-загрузок
self._optimizer_log_lock: Lock = Lock()
# HTTP-сессия с пулом соединений (keep-alive)
self._session: requests.Session = requests.Session()
self._session.headers.update({
"Accept": "application/json",
"x-api-key": API_KEY,
})
# Увеличиваем пул соединений для параллельных запросов
adapter = requests.adapters.HTTPAdapter(
pool_connections=MAX_WORKERS,
pool_maxsize=MAX_WORKERS * 2,
)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
# Обработчики сигналов для graceful shutdown
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def _signal_handler(self, _signum: int, _frame: object) -> None:
"""Graceful shutdown по Ctrl+C."""
if self._stop_requested:
# Повторный Ctrl+C — тихо выходим без traceback
os._exit(1)
self._stop_requested = True
tqdm.write(
"\nПолучен сигнал остановки. "
"Завершаю текущие загрузки и сохраняю прогресс..."
)
def _check_connection(self) -> bool:
"""Проверяет подключение к Immich."""
try:
resp = self._session.get(
f"{IMMICH_URL}/api/server/version", timeout=10
)
data = resp.json()
version = f"v{data['major']}.{data['minor']}.{data['patch']}"
tqdm.write(f"Immich сервер: {version} ({IMMICH_URL})")
return True
except Exception as exc:
tqdm.write(f"ОШИБКА: Не удалось подключиться к Immich: {exc}")
return False
def _check_auth(self) -> bool:
"""Проверяет API-ключ."""
try:
resp = self._session.get(
f"{IMMICH_URL}/api/users/me", timeout=10
)
if resp.status_code == 200:
user = resp.json()
tqdm.write(f"Авторизован как: {user.get('name', user.get('email', '?'))}")
return True
tqdm.write(f"ОШИБКА авторизации: HTTP {resp.status_code}")
return False
except Exception as exc:
tqdm.write(f"ОШИБКА авторизации: {exc}")
return False
# Паттерны даты в именах файлов
_FILENAME_DATE_PATTERNS: list[tuple[str, str]] = [
# "2019-12-01 15-30-00" (Яндекс Диск)
(r"(\d{4}-\d{2}-\d{2})\s+(\d{2}-\d{2}-\d{2})", "%Y-%m-%d %H-%M-%S"),
# "20191201_153000" (стандарт камер)
(r"(\d{8}_\d{6})", "%Y%m%d_%H%M%S"),
# "2019-12-01_15-30-00"
(r"(\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2})", "%Y-%m-%d_%H-%M-%S"),
# "IMG_20191201_153000"
(r"IMG_(\d{8}_\d{6})", "%Y%m%d_%H%M%S"),
# Только дата "2019-12-01"
(r"(\d{4}-\d{2}-\d{2})", "%Y-%m-%d"),
]
@staticmethod
def _parse_date_from_filename(stem: str) -> Optional[datetime]:
"""Извлекает дату из имени файла (без расширения).
Поддерживает форматы Яндекс Диска, камер и прочие.
"""
for pattern, fmt in ImmichUploader._FILENAME_DATE_PATTERNS:
match = re.search(pattern, stem)
if match:
date_str = " ".join(match.groups()) if match.lastindex and match.lastindex > 1 else match.group(1)
try:
return datetime.strptime(date_str, fmt)
except ValueError:
continue
return None
@staticmethod
def _read_google_sidecar(filepath: Path) -> Optional[dict]:
"""Читает JSON-сайдкар Google Фото (supplemental-metadata.json).
Возвращает dict с метаданными или None, если сайдкар не найден.
"""
# Google Фото кладёт метаданные в файл вида: photo.jpg.supplemental-metadata.json
sidecar = filepath.with_name(filepath.name + ".supplemental-metadata.json")
if not sidecar.exists():
return None
try:
with open(sidecar, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def _log_optimizer_failed(self, filepath: Path) -> None:
"""Записывает файл в лог неудачных оптимизаций."""
with self._optimizer_log_lock:
self._optimizer_fallbacks += 1
with open(OPTIMIZER_FAILED_LOG, "a", encoding="utf-8") as f:
f.write(f"{filepath}\n")
def _do_upload(self, filepath: Path, data: dict, upload_url: str) -> requests.Response:
"""Выполняет HTTP-загрузку файла на указанный URL."""
with open(filepath, "rb") as f:
files = {"assetData": (filepath.name, f, "application/octet-stream")}
return self._session.post(
f"{upload_url}/api/assets",
data=data,
files=files,
timeout=UPLOAD_TIMEOUT,
)
def _handle_response(self, resp: requests.Response, file_size: int) -> Optional[str]:
"""Обрабатывает ответ Immich. Возвращает статус или None если нужен retry."""
if resp.status_code == 201:
result = resp.json()
status = result.get("status", "created")
if status == "duplicate" or result.get("duplicate"):
self.stats.inc_duplicate()
return "duplicate"
self.stats.inc_uploaded(file_size)
return "created"
elif resp.status_code == 200:
# Дубликат (некоторые версии Immich)
self.stats.inc_duplicate()
return "duplicate"
return None # Нужен retry или fallback
def _upload_one(self, filepath: Path) -> Optional[str]:
"""Загружает один файл в Immich. Возвращает статус или None при ошибке.
Логика: сначала через Upload Optimizer (порт 2283).
Если оптимизатор вернул 400 — fallback на прямую загрузку (порт 2284).
Статусы: 'created', 'duplicate', 'error'
"""
stat = filepath.stat()
file_size = stat.st_size
# Базовые метаданные из файловой системы
created_at = datetime.fromtimestamp(
stat.st_birthtime if hasattr(stat, "st_birthtime") else stat.st_mtime
)
modified_at = datetime.fromtimestamp(stat.st_mtime)
# 1) Пробуем парсить дату из имени файла (Яндекс Диск: "2019-12-01 15-30-00.JPG")
parsed_date = self._parse_date_from_filename(filepath.stem)
if parsed_date:
created_at = parsed_date
# 2) Google JSON-сайдкар перезаписывает — он точнее всего
sidecar = self._read_google_sidecar(filepath)
if sidecar:
photo_taken = sidecar.get("photoTakenTime", {}).get("timestamp")
if photo_taken:
try:
created_at = datetime.fromtimestamp(int(photo_taken))
except (ValueError, OSError):
pass
data = {
"deviceAssetId": f"{filepath.name}-{stat.st_size}-{stat.st_mtime}",
"deviceId": DEVICE_ID,
"fileCreatedAt": created_at.isoformat(),
"fileModifiedAt": modified_at.isoformat(),
"isFavorite": "false",
}
# GPS из Google-сайдкара (если есть и не нулевые)
if sidecar:
geo = sidecar.get("geoData", {})
lat = geo.get("latitude", 0.0)
lon = geo.get("longitude", 0.0)
if lat != 0.0 or lon != 0.0:
data["latitude"] = str(lat)
data["longitude"] = str(lon)
# Видео грузим напрямую — оптимизатор не умеет их обрабатывать
is_video = filepath.suffix.lower() in VIDEO_EXTENSIONS
if is_video:
for attempt in range(MAX_RETRIES):
try:
resp = self._do_upload(filepath, data, IMMICH_DIRECT_URL)
result = self._handle_response(resp, file_size)
if result is not None:
return result
if attempt < MAX_RETRIES - 1:
time.sleep(2 ** attempt)
else:
tqdm.write(
f" Ошибка загрузки {filepath.name}: "
f"HTTP {resp.status_code}{resp.text[:200]}"
)
self.stats.inc_error()
return "error"
except (requests.RequestException, OSError) as exc:
if attempt < MAX_RETRIES - 1:
time.sleep(2 ** attempt)
else:
tqdm.write(f" Ошибка загрузки {filepath.name}: {exc}")
self.stats.inc_error()
return "error"
return "error"
# --- Прямая загрузка в Immich (порт 2284), минуя оптимизатор ---
for attempt in range(MAX_RETRIES):
try:
resp = self._do_upload(filepath, data, IMMICH_DIRECT_URL)
result = self._handle_response(resp, file_size)
if result is not None:
return result
if attempt < MAX_RETRIES - 1:
time.sleep(2 ** attempt)
else:
tqdm.write(
f" Ошибка загрузки {filepath.name}: "
f"HTTP {resp.status_code}{resp.text[:200]}"
)
self.stats.inc_error()
return "error"
except (requests.RequestException, OSError) as exc:
if attempt < MAX_RETRIES - 1:
time.sleep(2 ** attempt)
else:
tqdm.write(f" Ошибка загрузки {filepath.name}: {exc}")
self.stats.inc_error()
return "error"
return "error"
@staticmethod
def _format_size(size_bytes: int) -> str:
"""Форматирует размер в человекочитаемый вид."""
for unit in ("Б", "КБ", "МБ", "ГБ"):
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} ТБ"
def run(self) -> None:
"""Запускает процесс загрузки."""
tqdm.write("=" * 60)
tqdm.write(" Массовая загрузка в Immich")
tqdm.write("=" * 60)
# Проверяем подключение и авторизацию
if not self._check_connection() or not self._check_auth():
sys.exit(1)
# Сканируем файлы
tqdm.write("\nСканирование папок...")
all_files = scan_media_files(UPLOAD_FOLDERS)
total_size = sum(f.stat().st_size for f in all_files)
tqdm.write(
f"Найдено медиафайлов: {len(all_files)} "
f"({self._format_size(total_size)})"
)
# Фильтрация по режиму (photos / videos / all)
if UPLOAD_MODE == "photos":
skipped_videos = [f for f in all_files if f.suffix.lower() in VIDEO_EXTENSIONS]
if skipped_videos:
with open(SKIPPED_VIDEOS_LOG, "w", encoding="utf-8") as vlog:
for v in skipped_videos:
vlog.write(f"{v}\n")
tqdm.write(
f"Режим: только фото. Пропущено видео: {len(skipped_videos)} "
f"(см. {SKIPPED_VIDEOS_LOG})"
)
all_files = [f for f in all_files if f.suffix.lower() in IMAGE_EXTENSIONS]
elif UPLOAD_MODE == "videos":
all_files = [f for f in all_files if f.suffix.lower() in VIDEO_EXTENSIONS]
tqdm.write(f"Режим: только видео ({len(all_files)} файлов)")
# Фильтруем уже загруженные
pending_files = [f for f in all_files if not self.progress.is_uploaded(f)]
pending_size = sum(f.stat().st_size for f in pending_files)
already_done = len(all_files) - len(pending_files)
tqdm.write(
f"Уже загружено: {already_done} файлов\n"
f"Осталось загрузить: {len(pending_files)} файлов "
f"({self._format_size(pending_size)})"
)
tqdm.write(f"Потоков: {MAX_WORKERS}")
tqdm.write("-" * 60)
if not pending_files:
tqdm.write("\nВсе файлы уже загружены!")
return
# Прогресс-бар с размером и скоростью МБ/с
bar = tqdm(
total=len(all_files),
initial=already_done,
desc="Загрузка",
unit=" файл",
dynamic_ncols=True,
bar_format=(
"{l_bar}{bar}| {n_fmt}/{total_fmt} "
"[{elapsed}<{remaining}, {rate_fmt}] {postfix}"
),
)
# Счётчик для периодического сохранения прогресса
save_counter = 0
save_lock = Lock()
start_time = time.time()
def _process_file(filepath: Path) -> Optional[str]:
"""Обёртка загрузки одного файла для ThreadPoolExecutor."""
if self._stop_requested:
return None
return self._upload_one(filepath)
try:
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Отправляем все файлы в пул
future_to_file = {
executor.submit(_process_file, f): f
for f in pending_files
}
for future in as_completed(future_to_file):
if self._stop_requested:
# Отменяем оставшиеся задачи
for f in future_to_file:
f.cancel()
break
filepath = future_to_file[future]
try:
status = future.result()
except Exception as exc:
tqdm.write(f" Неожиданная ошибка ({filepath.name}): {exc}")
self.stats.inc_error()
status = "error"
if status and status != "error":
self.progress.mark_uploaded(filepath)
# Fallback-загрузки сохраняем в прогресс сразу,
# чтобы не потерять при остановке скрипта
if status.startswith("fallback_"):
self.progress.save()
bar.update(1)
# Обновляем описание с текущей скоростью
elapsed = time.time() - start_time
if elapsed > 0:
speed = self.stats.bytes_sent / elapsed if self.stats.bytes_sent > 0 else 0
speed_str = f"{self._format_size(speed)}/с" if speed > 0 else "..."
fallback_str = (
f" FB:{self._optimizer_fallbacks}"
if self._optimizer_fallbacks > 0
else ""
)
bar.set_postfix_str(
f"{speed_str} | "
f"OK:{self.stats.uploaded} "
f"DUP:{self.stats.duplicates} "
f"ERR:{self.stats.errors}"
f"{fallback_str}",
refresh=False,
)
# Сохраняем прогресс каждые 20 файлов
with save_lock:
save_counter += 1
if save_counter % 20 == 0:
self.progress.save()
finally:
bar.close()
self.progress.save()
elapsed = time.time() - start_time
avg_speed = self.stats.bytes_sent / elapsed if elapsed > 0 else 0
print("\n" + "=" * 60)
print(" Итого:")
print(f" Загружено: {self.stats.uploaded}")
print(f" Дубликатов: {self.stats.duplicates}")
print(f" Пропущено: {self.stats.skipped}")
print(f" Ошибок: {self.stats.errors}")
if self._optimizer_fallbacks > 0:
print(f" Fallback (без оптимизации): {self._optimizer_fallbacks}")
print(f" (см. {OPTIMIZER_FAILED_LOG})")
print(f" Передано: {self._format_size(self.stats.bytes_sent)}")
print(f" Средняя скорость: {self._format_size(avg_speed)}/с")
print(f" Время: {elapsed:.0f} сек")
if self._stop_requested:
print(
"\n Работа остановлена. "
"Запусти скрипт снова для продолжения."
)
else:
print("\n Все файлы загружены!")
print("=" * 60)
# ---------------------------------------------------------------------------
# Точка входа
# ---------------------------------------------------------------------------
def main() -> None:
"""Точка входа скрипта."""
uploader = ImmichUploader()
uploader.run()
if __name__ == "__main__":
main()