#!/usr/bin/env python3 """ Массовая загрузка фото и видео в Immich с прогресс-баром и возобновлением. Использование: 1. pip install requests tqdm 2. python immich_upload.py 3. Ctrl+C для остановки (прогресс сохраняется автоматически) 4. Повторный запуск продолжит с места остановки """ import hashlib import json import os import re import signal import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from threading import Lock from typing import Optional import requests from tqdm import tqdm # --------------------------------------------------------------------------- # Конфигурация # --------------------------------------------------------------------------- # Immich сервер (через Upload Optimizer прокси) IMMICH_URL: str = "http://YOUR_IMMICH_HOST:2283" # Прямой доступ к Immich (минуя оптимизатор) — fallback при ошибках оптимизатора IMMICH_DIRECT_URL: str = "http://YOUR_IMMICH_HOST:2284" API_KEY: str = "" # Папки для загрузки (рекурсивный поиск медиафайлов) UPLOAD_FOLDERS: list[str] = [ "downloads_video", ] # Файл прогресса (для resume) PROGRESS_FILE: str = "immich_upload_progress.json" # Лог файлов, которые оптимизатор не смог обработать (загружены напрямую) OPTIMIZER_FAILED_LOG: str = "optimizer_failed.txt" # Параллельные загрузки (8 потоков — оптимально для 1 Гбит LAN) MAX_WORKERS: int = 8 # Повторные попытки при ошибке MAX_RETRIES: int = 3 # Таймаут загрузки одного файла (секунды) UPLOAD_TIMEOUT: int = 300 # Расширения изображений IMAGE_EXTENSIONS: set[str] = { ".jpg", ".jpeg", ".png", ".heic", ".heif", ".webp", ".gif", ".avif", ".bmp", ".tiff", ".tif", ".jxl", ".raw", ".rw2", ".dng", ".nef", ".cr2", ".arw", ".orf", ".pef", ".raf", } # Расширения видео VIDEO_EXTENSIONS: set[str] = { ".mp4", ".mov", ".avi", ".mkv", ".webm", ".m4v", ".3gp", ".flv", ".wmv", ".mts", ".m2ts", ".mpg", ".mpeg", } # Все допустимые расширения медиафайлов MEDIA_EXTENSIONS: set[str] = IMAGE_EXTENSIONS | VIDEO_EXTENSIONS # Режим загрузки: "photos" — только фото (видео пропускаются и логируются), # "videos" — только видео, "all" — всё UPLOAD_MODE: str = "videos" # Лог пропущенных видео (при UPLOAD_MODE="photos") SKIPPED_VIDEOS_LOG: str = "skipped_videos.txt" # Идентификатор устройства (для Immich) DEVICE_ID: str = "python-bulk-uploader" # --------------------------------------------------------------------------- # Модели данных # --------------------------------------------------------------------------- @dataclass class UploadStats: """Статистика загрузки (потокобезопасная).""" uploaded: int = 0 skipped: int = 0 duplicates: int = 0 errors: int = 0 bytes_sent: int = 0 _lock: Lock = field(default_factory=Lock, repr=False) def inc_uploaded(self, size: int) -> None: with self._lock: self.uploaded += 1 self.bytes_sent += size def inc_skipped(self) -> None: with self._lock: self.skipped += 1 def inc_duplicate(self) -> None: with self._lock: self.duplicates += 1 def inc_error(self) -> None: with self._lock: self.errors += 1 # --------------------------------------------------------------------------- # Менеджер прогресса (resume) # --------------------------------------------------------------------------- class ProgressManager: """Управление файлом прогресса для механизма resume.""" def __init__(self, progress_file: str) -> None: self.progress_file: Path = Path(progress_file) self._lock: Lock = Lock() self.data: dict = self._load() # Множество для быстрого поиска уже загруженных файлов (по хешу пути) self._uploaded_hashes: set[str] = set(self.data.get("uploaded_hashes", [])) def _load(self) -> dict: """Загружает прогресс из файла или создаёт пустой.""" if self.progress_file.exists(): try: with open(self.progress_file, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return { "version": 1, "last_updated": "", "uploaded_hashes": [], "stats": { "uploaded": 0, "skipped": 0, "duplicates": 0, "errors": 0, "bytes_sent": 0, }, } def save(self) -> None: """Сохраняет текущее состояние прогресса в файл.""" with self._lock: self.data["last_updated"] = datetime.now().isoformat() self.data["uploaded_hashes"] = list(self._uploaded_hashes) tmp_path = self.progress_file.with_suffix(".tmp") with open(tmp_path, "w", encoding="utf-8") as f: json.dump(self.data, f, ensure_ascii=False) tmp_path.replace(self.progress_file) @staticmethod def _file_hash(filepath: Path) -> str: """Генерирует хеш на основе пути и размера файла (быстро, без чтения).""" stat = filepath.stat() key = f"{filepath.resolve()}|{stat.st_size}|{stat.st_mtime}" return hashlib.md5(key.encode()).hexdigest() def is_uploaded(self, filepath: Path) -> bool: """Проверяет, был ли файл уже загружен.""" return self._file_hash(filepath) in self._uploaded_hashes def mark_uploaded(self, filepath: Path) -> None: """Помечает файл как загруженный.""" with self._lock: self._uploaded_hashes.add(self._file_hash(filepath)) @property def uploaded_count(self) -> int: return len(self._uploaded_hashes) # --------------------------------------------------------------------------- # Сканер файлов # --------------------------------------------------------------------------- def scan_media_files(folders: list[str]) -> list[Path]: """Рекурсивно сканирует папки, возвращает отсортированный список медиафайлов.""" files: list[Path] = [] for folder in folders: root = Path(folder) if not root.exists(): tqdm.write(f" Папка не найдена: {folder}") continue for filepath in root.rglob("*"): if not filepath.is_file(): continue if filepath.suffix.lower() in MEDIA_EXTENSIONS: files.append(filepath) # Сортировка по имени для предсказуемого порядка files.sort(key=lambda p: p.name) return files # --------------------------------------------------------------------------- # Загрузчик # --------------------------------------------------------------------------- class ImmichUploader: """Загрузка файлов в Immich через API с многопоточностью.""" def __init__(self) -> None: self._stop_requested: bool = False self.progress: ProgressManager = ProgressManager(PROGRESS_FILE) self.stats: UploadStats = UploadStats() self._optimizer_fallbacks: int = 0 # Счётчик fallback-загрузок self._optimizer_log_lock: Lock = Lock() # HTTP-сессия с пулом соединений (keep-alive) self._session: requests.Session = requests.Session() self._session.headers.update({ "Accept": "application/json", "x-api-key": API_KEY, }) # Увеличиваем пул соединений для параллельных запросов adapter = requests.adapters.HTTPAdapter( pool_connections=MAX_WORKERS, pool_maxsize=MAX_WORKERS * 2, ) self._session.mount("http://", adapter) self._session.mount("https://", adapter) # Обработчики сигналов для graceful shutdown signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) def _signal_handler(self, _signum: int, _frame: object) -> None: """Graceful shutdown по Ctrl+C.""" if self._stop_requested: # Повторный Ctrl+C — тихо выходим без traceback os._exit(1) self._stop_requested = True tqdm.write( "\nПолучен сигнал остановки. " "Завершаю текущие загрузки и сохраняю прогресс..." ) def _check_connection(self) -> bool: """Проверяет подключение к Immich.""" try: resp = self._session.get( f"{IMMICH_URL}/api/server/version", timeout=10 ) data = resp.json() version = f"v{data['major']}.{data['minor']}.{data['patch']}" tqdm.write(f"Immich сервер: {version} ({IMMICH_URL})") return True except Exception as exc: tqdm.write(f"ОШИБКА: Не удалось подключиться к Immich: {exc}") return False def _check_auth(self) -> bool: """Проверяет API-ключ.""" try: resp = self._session.get( f"{IMMICH_URL}/api/users/me", timeout=10 ) if resp.status_code == 200: user = resp.json() tqdm.write(f"Авторизован как: {user.get('name', user.get('email', '?'))}") return True tqdm.write(f"ОШИБКА авторизации: HTTP {resp.status_code}") return False except Exception as exc: tqdm.write(f"ОШИБКА авторизации: {exc}") return False # Паттерны даты в именах файлов _FILENAME_DATE_PATTERNS: list[tuple[str, str]] = [ # "2019-12-01 15-30-00" (Яндекс Диск) (r"(\d{4}-\d{2}-\d{2})\s+(\d{2}-\d{2}-\d{2})", "%Y-%m-%d %H-%M-%S"), # "20191201_153000" (стандарт камер) (r"(\d{8}_\d{6})", "%Y%m%d_%H%M%S"), # "2019-12-01_15-30-00" (r"(\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2})", "%Y-%m-%d_%H-%M-%S"), # "IMG_20191201_153000" (r"IMG_(\d{8}_\d{6})", "%Y%m%d_%H%M%S"), # Только дата "2019-12-01" (r"(\d{4}-\d{2}-\d{2})", "%Y-%m-%d"), ] @staticmethod def _parse_date_from_filename(stem: str) -> Optional[datetime]: """Извлекает дату из имени файла (без расширения). Поддерживает форматы Яндекс Диска, камер и прочие. """ for pattern, fmt in ImmichUploader._FILENAME_DATE_PATTERNS: match = re.search(pattern, stem) if match: date_str = " ".join(match.groups()) if match.lastindex and match.lastindex > 1 else match.group(1) try: return datetime.strptime(date_str, fmt) except ValueError: continue return None @staticmethod def _read_google_sidecar(filepath: Path) -> Optional[dict]: """Читает JSON-сайдкар Google Фото (supplemental-metadata.json). Возвращает dict с метаданными или None, если сайдкар не найден. """ # Google Фото кладёт метаданные в файл вида: photo.jpg.supplemental-metadata.json sidecar = filepath.with_name(filepath.name + ".supplemental-metadata.json") if not sidecar.exists(): return None try: with open(sidecar, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): return None def _log_optimizer_failed(self, filepath: Path) -> None: """Записывает файл в лог неудачных оптимизаций.""" with self._optimizer_log_lock: self._optimizer_fallbacks += 1 with open(OPTIMIZER_FAILED_LOG, "a", encoding="utf-8") as f: f.write(f"{filepath}\n") def _do_upload(self, filepath: Path, data: dict, upload_url: str) -> requests.Response: """Выполняет HTTP-загрузку файла на указанный URL.""" with open(filepath, "rb") as f: files = {"assetData": (filepath.name, f, "application/octet-stream")} return self._session.post( f"{upload_url}/api/assets", data=data, files=files, timeout=UPLOAD_TIMEOUT, ) def _handle_response(self, resp: requests.Response, file_size: int) -> Optional[str]: """Обрабатывает ответ Immich. Возвращает статус или None если нужен retry.""" if resp.status_code == 201: result = resp.json() status = result.get("status", "created") if status == "duplicate" or result.get("duplicate"): self.stats.inc_duplicate() return "duplicate" self.stats.inc_uploaded(file_size) return "created" elif resp.status_code == 200: # Дубликат (некоторые версии Immich) self.stats.inc_duplicate() return "duplicate" return None # Нужен retry или fallback def _upload_one(self, filepath: Path) -> Optional[str]: """Загружает один файл в Immich. Возвращает статус или None при ошибке. Логика: сначала через Upload Optimizer (порт 2283). Если оптимизатор вернул 400 — fallback на прямую загрузку (порт 2284). Статусы: 'created', 'duplicate', 'error' """ stat = filepath.stat() file_size = stat.st_size # Базовые метаданные из файловой системы created_at = datetime.fromtimestamp( stat.st_birthtime if hasattr(stat, "st_birthtime") else stat.st_mtime ) modified_at = datetime.fromtimestamp(stat.st_mtime) # 1) Пробуем парсить дату из имени файла (Яндекс Диск: "2019-12-01 15-30-00.JPG") parsed_date = self._parse_date_from_filename(filepath.stem) if parsed_date: created_at = parsed_date # 2) Google JSON-сайдкар перезаписывает — он точнее всего sidecar = self._read_google_sidecar(filepath) if sidecar: photo_taken = sidecar.get("photoTakenTime", {}).get("timestamp") if photo_taken: try: created_at = datetime.fromtimestamp(int(photo_taken)) except (ValueError, OSError): pass data = { "deviceAssetId": f"{filepath.name}-{stat.st_size}-{stat.st_mtime}", "deviceId": DEVICE_ID, "fileCreatedAt": created_at.isoformat(), "fileModifiedAt": modified_at.isoformat(), "isFavorite": "false", } # GPS из Google-сайдкара (если есть и не нулевые) if sidecar: geo = sidecar.get("geoData", {}) lat = geo.get("latitude", 0.0) lon = geo.get("longitude", 0.0) if lat != 0.0 or lon != 0.0: data["latitude"] = str(lat) data["longitude"] = str(lon) # Видео грузим напрямую — оптимизатор не умеет их обрабатывать is_video = filepath.suffix.lower() in VIDEO_EXTENSIONS if is_video: for attempt in range(MAX_RETRIES): try: resp = self._do_upload(filepath, data, IMMICH_DIRECT_URL) result = self._handle_response(resp, file_size) if result is not None: return result if attempt < MAX_RETRIES - 1: time.sleep(2 ** attempt) else: tqdm.write( f" Ошибка загрузки {filepath.name}: " f"HTTP {resp.status_code} — {resp.text[:200]}" ) self.stats.inc_error() return "error" except (requests.RequestException, OSError) as exc: if attempt < MAX_RETRIES - 1: time.sleep(2 ** attempt) else: tqdm.write(f" Ошибка загрузки {filepath.name}: {exc}") self.stats.inc_error() return "error" return "error" # --- Прямая загрузка в Immich (порт 2284), минуя оптимизатор --- for attempt in range(MAX_RETRIES): try: resp = self._do_upload(filepath, data, IMMICH_DIRECT_URL) result = self._handle_response(resp, file_size) if result is not None: return result if attempt < MAX_RETRIES - 1: time.sleep(2 ** attempt) else: tqdm.write( f" Ошибка загрузки {filepath.name}: " f"HTTP {resp.status_code} — {resp.text[:200]}" ) self.stats.inc_error() return "error" except (requests.RequestException, OSError) as exc: if attempt < MAX_RETRIES - 1: time.sleep(2 ** attempt) else: tqdm.write(f" Ошибка загрузки {filepath.name}: {exc}") self.stats.inc_error() return "error" return "error" @staticmethod def _format_size(size_bytes: int) -> str: """Форматирует размер в человекочитаемый вид.""" for unit in ("Б", "КБ", "МБ", "ГБ"): if size_bytes < 1024: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024 return f"{size_bytes:.1f} ТБ" def run(self) -> None: """Запускает процесс загрузки.""" tqdm.write("=" * 60) tqdm.write(" Массовая загрузка в Immich") tqdm.write("=" * 60) # Проверяем подключение и авторизацию if not self._check_connection() or not self._check_auth(): sys.exit(1) # Сканируем файлы tqdm.write("\nСканирование папок...") all_files = scan_media_files(UPLOAD_FOLDERS) total_size = sum(f.stat().st_size for f in all_files) tqdm.write( f"Найдено медиафайлов: {len(all_files)} " f"({self._format_size(total_size)})" ) # Фильтрация по режиму (photos / videos / all) if UPLOAD_MODE == "photos": skipped_videos = [f for f in all_files if f.suffix.lower() in VIDEO_EXTENSIONS] if skipped_videos: with open(SKIPPED_VIDEOS_LOG, "w", encoding="utf-8") as vlog: for v in skipped_videos: vlog.write(f"{v}\n") tqdm.write( f"Режим: только фото. Пропущено видео: {len(skipped_videos)} " f"(см. {SKIPPED_VIDEOS_LOG})" ) all_files = [f for f in all_files if f.suffix.lower() in IMAGE_EXTENSIONS] elif UPLOAD_MODE == "videos": all_files = [f for f in all_files if f.suffix.lower() in VIDEO_EXTENSIONS] tqdm.write(f"Режим: только видео ({len(all_files)} файлов)") # Фильтруем уже загруженные pending_files = [f for f in all_files if not self.progress.is_uploaded(f)] pending_size = sum(f.stat().st_size for f in pending_files) already_done = len(all_files) - len(pending_files) tqdm.write( f"Уже загружено: {already_done} файлов\n" f"Осталось загрузить: {len(pending_files)} файлов " f"({self._format_size(pending_size)})" ) tqdm.write(f"Потоков: {MAX_WORKERS}") tqdm.write("-" * 60) if not pending_files: tqdm.write("\nВсе файлы уже загружены!") return # Прогресс-бар с размером и скоростью МБ/с bar = tqdm( total=len(all_files), initial=already_done, desc="Загрузка", unit=" файл", dynamic_ncols=True, bar_format=( "{l_bar}{bar}| {n_fmt}/{total_fmt} " "[{elapsed}<{remaining}, {rate_fmt}] {postfix}" ), ) # Счётчик для периодического сохранения прогресса save_counter = 0 save_lock = Lock() start_time = time.time() def _process_file(filepath: Path) -> Optional[str]: """Обёртка загрузки одного файла для ThreadPoolExecutor.""" if self._stop_requested: return None return self._upload_one(filepath) try: with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # Отправляем все файлы в пул future_to_file = { executor.submit(_process_file, f): f for f in pending_files } for future in as_completed(future_to_file): if self._stop_requested: # Отменяем оставшиеся задачи for f in future_to_file: f.cancel() break filepath = future_to_file[future] try: status = future.result() except Exception as exc: tqdm.write(f" Неожиданная ошибка ({filepath.name}): {exc}") self.stats.inc_error() status = "error" if status and status != "error": self.progress.mark_uploaded(filepath) # Fallback-загрузки сохраняем в прогресс сразу, # чтобы не потерять при остановке скрипта if status.startswith("fallback_"): self.progress.save() bar.update(1) # Обновляем описание с текущей скоростью elapsed = time.time() - start_time if elapsed > 0: speed = self.stats.bytes_sent / elapsed if self.stats.bytes_sent > 0 else 0 speed_str = f"{self._format_size(speed)}/с" if speed > 0 else "..." fallback_str = ( f" FB:{self._optimizer_fallbacks}" if self._optimizer_fallbacks > 0 else "" ) bar.set_postfix_str( f"{speed_str} | " f"OK:{self.stats.uploaded} " f"DUP:{self.stats.duplicates} " f"ERR:{self.stats.errors}" f"{fallback_str}", refresh=False, ) # Сохраняем прогресс каждые 20 файлов with save_lock: save_counter += 1 if save_counter % 20 == 0: self.progress.save() finally: bar.close() self.progress.save() elapsed = time.time() - start_time avg_speed = self.stats.bytes_sent / elapsed if elapsed > 0 else 0 print("\n" + "=" * 60) print(" Итого:") print(f" Загружено: {self.stats.uploaded}") print(f" Дубликатов: {self.stats.duplicates}") print(f" Пропущено: {self.stats.skipped}") print(f" Ошибок: {self.stats.errors}") if self._optimizer_fallbacks > 0: print(f" Fallback (без оптимизации): {self._optimizer_fallbacks}") print(f" (см. {OPTIMIZER_FAILED_LOG})") print(f" Передано: {self._format_size(self.stats.bytes_sent)}") print(f" Средняя скорость: {self._format_size(avg_speed)}/с") print(f" Время: {elapsed:.0f} сек") if self._stop_requested: print( "\n Работа остановлена. " "Запусти скрипт снова для продолжения." ) else: print("\n Все файлы загружены!") print("=" * 60) # --------------------------------------------------------------------------- # Точка входа # --------------------------------------------------------------------------- def main() -> None: """Точка входа скрипта.""" uploader = ImmichUploader() uploader.run() if __name__ == "__main__": main()