#!/usr/bin/env python3 """ Обработка скачанных фото: сквозная дедупликация и классификация. Все фото собираются из downloads/ (из всех диалогов), сортируются хронологически (самые старые первыми) и раскладываются в output/: output/personal/ — личные фото людей output/travel/ — путешествия, места output/food/ — еда output/screenshots/ — скриншоты переписок output/_duplicates/ — дубликаты (оригинал = самое раннее фото) output/_junk/ — мемы, стикеры output/_review/ — неуверенная классификация, art, document Использование: python process_photos.py run # Полная обработка python process_photos.py run --limit 200 # Тест на 200 фото python process_photos.py run --dry-run # Без перемещения python process_photos.py rollback # Откатить всё назад python process_photos.py stats # Статистика """ import argparse import json import os import shutil import signal import sys from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from pathlib import Path from typing import Optional import imagehash import numpy as np from PIL import Image from tqdm import tqdm import config # --------------------------------------------------------------------------- # Константы # --------------------------------------------------------------------------- # Папки-категории в output/ DEDUP_DIR: str = "_duplicates" JUNK_DIR: str = "_junk" REVIEW_DIR: str = "_review" # Описания категорий для CLIP CATEGORIES: dict[str, list[str]] = { "personal": [ "a personal photograph of real people", "a selfie photo of a person", "a group photo of friends or family", "a portrait photograph of a person", "a candid photo of people at an event or party", ], "travel": [ "a landscape photograph of nature or scenery", "a travel photograph of a famous place or landmark", "a photograph of architecture or buildings", "a cityscape or street photograph", "a photograph from a vacation or trip", ], "food": [ "a photograph of food or a dish on a plate", "a restaurant or cafe photograph", "a cooking or baking photograph", ], "screenshots": [ "a screenshot of a mobile phone chat or text messages", "a screenshot of a computer screen with interface", "a screenshot of a social media post or webpage", ], "meme": [ "a meme image with text overlay and funny picture", "a demotivational poster image with black border", "a comic strip or cartoon panel with speech bubbles", "an internet joke image with caption text", ], "sticker": [ "a small cartoon sticker on plain background", "a simple emoji or emoticon image", "a cartoon character sticker with transparent background", ], "art": [ "a digital art or illustration drawing", "a hand-drawn painting or artwork", "abstract colorful art image", ], "document": [ "a scanned printed document or text page", "a photograph of a paper document", "a handwritten note or letter photograph", ], } # Куда перемещать каждую категорию KEEP_CATEGORIES: set[str] = {"personal", "travel", "food", "screenshots"} JUNK_CATEGORIES: set[str] = {"meme", "sticker"} REVIEW_CATEGORIES: set[str] = {"art", "document"} # --------------------------------------------------------------------------- # Утилиты # --------------------------------------------------------------------------- def popcount64(arr: np.ndarray) -> np.ndarray: """Vectorized подсчёт единичных бит в массиве uint64.""" x = arr.astype(np.uint64) x = x - ((x >> np.uint64(1)) & np.uint64(0x5555555555555555)) x = (x & np.uint64(0x3333333333333333)) + ( (x >> np.uint64(2)) & np.uint64(0x3333333333333333) ) x = (x + (x >> np.uint64(4))) & np.uint64(0x0F0F0F0F0F0F0F0F) return ((x * np.uint64(0x0101010101010101)) >> np.uint64(56)).astype( np.int32 ) def unique_dest(dest: Path) -> Path: """Если файл с таким именем уже существует, добавляет суффикс _2, _3 и т.д.""" if not dest.exists(): return dest stem = dest.stem suffix = dest.suffix parent = dest.parent counter = 2 while True: candidate = parent / f"{stem}_{counter}{suffix}" if not candidate.exists(): return candidate counter += 1 # --------------------------------------------------------------------------- # Трекер прогресса # --------------------------------------------------------------------------- class ProgressTracker: """Прогресс обработки с поддержкой возобновления.""" def __init__(self, filepath: str = config.PROCESS_PROGRESS_FILE) -> None: self.filepath: Path = Path(filepath) self.data: dict = self._load() def _load(self) -> dict: """Загружает прогресс из файла.""" if self.filepath.exists(): try: with open(self.filepath, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return { "version": 2, "hashed_files": {}, "classified_files": {}, "processed_files": [], } def save(self) -> None: """Сохраняет прогресс (атомарная запись).""" self.data["last_updated"] = datetime.now().isoformat() tmp = self.filepath.with_suffix(".tmp") with open(tmp, "w", encoding="utf-8") as f: json.dump(self.data, f, ensure_ascii=False, indent=2) tmp.replace(self.filepath) def reset(self) -> None: """Полный сброс прогресса.""" self.data = { "version": 2, "hashed_files": {}, "classified_files": {}, "processed_files": [], } self.save() # --------------------------------------------------------------------------- # Лог отката # --------------------------------------------------------------------------- class RollbackLog: """Журнал перемещений для отката.""" def __init__(self, filepath: str = config.ROLLBACK_LOG_FILE) -> None: self.filepath: Path = Path(filepath) self.moves: list[dict] = self._load() def _load(self) -> list[dict]: if self.filepath.exists(): try: with open(self.filepath, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return [] def log_move(self, src: str, dst: str, reason: str) -> None: """Записывает перемещение.""" self.moves.append({"src": src, "dst": dst, "reason": reason}) self._save() def _save(self) -> None: with open(self.filepath, "w", encoding="utf-8") as f: json.dump(self.moves, f, ensure_ascii=False, indent=2) def rollback(self) -> int: """Откатить все перемещения (LIFO). Возвращает кол-во восстановленных.""" restored = 0 for move in tqdm( reversed(self.moves), total=len(self.moves), desc="Откат", unit=" файлов", ): current = Path(move["dst"]) original = Path(move["src"]) if current.exists(): try: original.parent.mkdir(parents=True, exist_ok=True) shutil.move(str(current), str(original)) restored += 1 except OSError as exc: tqdm.write(f" Ошибка: {current.name}: {exc}") self.moves.clear() self._save() return restored @property def count(self) -> int: return len(self.moves) # --------------------------------------------------------------------------- # Основной обработчик # --------------------------------------------------------------------------- class PhotoProcessor: """Сквозная обработка: сбор → сортировка → хеширование → деdup → CLIP → организация.""" def __init__(self, args: argparse.Namespace) -> None: self.source_dir: Path = Path(args.source) self.output_dir: Path = Path(args.output) self.limit: Optional[int] = args.limit self.threshold: int = args.threshold self.dry_run: bool = args.dry_run self._stop: bool = False signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) self.progress = ProgressTracker() self.rollback = RollbackLog() def _signal_handler(self, _sig: int, _frame: object) -> None: if self._stop: tqdm.write("\nПринудительная остановка!") sys.exit(1) self._stop = True tqdm.write("\nОстановка... сохраняю прогресс.") # -- 1. Сбор и сортировка -- def _scan_photos(self) -> list[Path]: """Собирает все фото из downloads/, сортирует по дате файла (oldest first).""" photos: list[Path] = [] for root, _dirs, files in os.walk(self.source_dir): for f in files: if f.lower().endswith((".jpg", ".jpeg", ".png")): photos.append(Path(root) / f) # Сортировка по mtime (= дата сообщения ВК, установлена через os.utime) photos.sort(key=lambda p: p.stat().st_mtime) if self.limit: photos = photos[: self.limit] return photos # -- 2. Хеширование -- @staticmethod def _compute_one_hash(photo_path: Path) -> Optional[tuple[str, str]]: """Вычисляет pHash одного фото.""" try: with Image.open(photo_path) as img: h = imagehash.phash(img, hash_size=config.HASH_SIZE) return (str(photo_path), str(h)) except Exception: return None def _compute_hashes(self, photos: list[Path]) -> dict[str, str]: """Вычисляет хеши для всех фото (с кешем и многопоточностью).""" cached: dict[str, str] = self.progress.data.get("hashed_files", {}) result: dict[str, str] = {} to_hash: list[Path] = [] for p in photos: key = str(p) if key in cached: result[key] = cached[key] else: to_hash.append(p) if not to_hash: tqdm.write(f" Все {len(result)} хешей из кеша.") return result tqdm.write(f" Хеширование: {len(to_hash)} новых + {len(result)} из кеша") bar = tqdm(total=len(to_hash), desc="Хеширование", unit=" фото") with ThreadPoolExecutor(max_workers=config.HASH_WORKERS) as executor: futures = { executor.submit(self._compute_one_hash, p): p for p in to_hash } done_count = 0 for future in as_completed(futures): if self._stop: for f in futures: f.cancel() break res = future.result() if res: path_str, hash_hex = res result[path_str] = hash_hex cached[path_str] = hash_hex bar.update(1) done_count += 1 if done_count % 500 == 0: self.progress.data["hashed_files"] = cached self.progress.save() bar.close() self.progress.data["hashed_files"] = cached self.progress.save() return result # -- 3. Хронологическая дедупликация -- def _dedup_chronological( self, sorted_photos: list[Path], hashes: dict[str, str], ) -> tuple[list[Path], list[Path]]: """Проходит фото от старых к новым. Первое вхождение — оригинал, остальные — дубликаты. Возвращает (originals, duplicates). """ # Массив уникальных хешей (int) для vectorized сравнения unique_hash_ints: list[int] = [] unique_hash_np: Optional[np.ndarray] = None originals: list[Path] = [] duplicates: list[Path] = [] tqdm.write(f" Сквозная дедупликация (порог: {self.threshold})...") for photo in tqdm( sorted_photos, desc="Дедупликация", unit=" фото", leave=False, ): if self._stop: break key = str(photo) hash_hex = hashes.get(key) if hash_hex is None: # Не удалось вычислить хеш — пропускаем continue hash_int = int(hash_hex, 16) is_duplicate = False if unique_hash_ints: # Vectorized сравнение с уже встреченными уникальными хешами if unique_hash_np is None or len(unique_hash_np) != len(unique_hash_ints): unique_hash_np = np.array(unique_hash_ints, dtype=np.uint64) xor = np.bitwise_xor(np.uint64(hash_int), unique_hash_np) distances = popcount64(xor) if np.any(distances <= self.threshold): is_duplicate = True if is_duplicate: duplicates.append(photo) else: originals.append(photo) unique_hash_ints.append(hash_int) unique_hash_np = None # Инвалидируем кеш numpy-массива tqdm.write( f" Оригиналов: {len(originals)}, дубликатов: {len(duplicates)}" ) return originals, duplicates # -- 4. CLIP классификация -- def _classify_photos( self, photos: list[Path], ) -> dict[str, tuple[str, float]]: """Классифицирует фото через CLIP. Возвращает {path_str: (category, score)}.""" import torch import open_clip cached: dict[str, list] = self.progress.data.get("classified_files", {}) result: dict[str, tuple[str, float]] = {} to_classify: list[Path] = [] for p in photos: key = str(p) if key in cached: result[key] = tuple(cached[key]) elif p.exists(): to_classify.append(p) if not to_classify: tqdm.write(f" Все {len(result)} классификаций из кеша.") return result tqdm.write(f" Классификация: {len(to_classify)} новых + {len(result)} из кеша") # Устройство if torch.backends.mps.is_available(): device = torch.device("mps") tqdm.write(" Устройство: Apple Silicon MPS") elif torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") tqdm.write(" Устройство: CPU") # Загрузка модели tqdm.write(" Загрузка CLIP ViT-B-32...") model, _, preprocess = open_clip.create_model_and_transforms( "ViT-B-32", pretrained="laion2b_s34b_b79k", ) tokenizer = open_clip.get_tokenizer("ViT-B-32") model = model.to(device) model.eval() # Подготовка text-эмбеддингов all_prompts: list[str] = [] category_mapping: list[str] = [] for cat_name, prompts in CATEGORIES.items(): for prompt in prompts: all_prompts.append(prompt) category_mapping.append(cat_name) tokens = tokenizer(all_prompts).to(device) with torch.no_grad(): text_features = model.encode_text(tokens) text_features /= text_features.norm(dim=-1, keepdim=True) tqdm.write(" Модель загружена.") # Классификация батчами batch_size = config.CLIP_BATCH_SIZE bar = tqdm(total=len(to_classify), desc="Классификация", unit=" фото") for i in range(0, len(to_classify), batch_size): if self._stop: break batch_paths = to_classify[i: i + batch_size] tensors: list = [] valid_paths: list[Path] = [] for p in batch_paths: try: img = Image.open(p).convert("RGB") tensors.append(preprocess(img)) valid_paths.append(p) except Exception: pass if tensors: image_batch = torch.stack(tensors).to(device) with torch.no_grad(): image_features = model.encode_image(image_batch) image_features /= image_features.norm(dim=-1, keepdim=True) similarities = image_features @ text_features.T for j, path in enumerate(valid_paths): sims = similarities[j] cat_scores: dict[str, list[float]] = defaultdict(list) for idx, cat_name in enumerate(category_mapping): cat_scores[cat_name].append(sims[idx].item()) cat_avg = { cat: sum(scores) / len(scores) for cat, scores in cat_scores.items() } best_cat = max(cat_avg, key=lambda k: cat_avg[k]) best_score = cat_avg[best_cat] key = str(path) result[key] = (best_cat, best_score) cached[key] = [best_cat, best_score] bar.update(len(batch_paths)) if (i // batch_size) % 10 == 0: self.progress.data["classified_files"] = cached self.progress.save() bar.close() self.progress.data["classified_files"] = cached self.progress.save() return result # -- 5. Организация (перемещение) -- def _move_photo( self, photo: Path, dest_dir: Path, reason: str, ) -> Optional[Path]: """Перемещает фото в dest_dir (плоско, без подпапок диалогов). Возвращает путь назначения или None при dry-run. """ dest = unique_dest(dest_dir / photo.name) if self.dry_run: tqdm.write(f" [DRY-RUN] {photo.name} → {dest_dir.name}/") return None dest_dir.mkdir(parents=True, exist_ok=True) shutil.move(str(photo), str(dest)) self.rollback.log_move(str(photo), str(dest), reason) return dest def _organize_all( self, originals: list[Path], duplicates: list[Path], classifications: dict[str, tuple[str, float]], ) -> dict[str, int]: """Перемещает все фото в output/ по категориям (плоская структура).""" stats: dict[str, int] = defaultdict(int) confidence_min = config.CLIP_CONFIDENCE_MIN # Дубликаты → output/_duplicates/ tqdm.write("Перемещение дубликатов...") for photo in tqdm(duplicates, desc="Дубликаты", unit=" фото", leave=False): if self._stop: break self._move_photo(photo, self.output_dir / DEDUP_DIR, "duplicate") stats["duplicates"] += 1 # Оригиналы → по категориям tqdm.write("Перемещение оригиналов по категориям...") for photo in tqdm(originals, desc="Организация", unit=" фото", leave=False): if self._stop: break key = str(photo) cat_info = classifications.get(key) if cat_info is None: # Нет классификации — в review self._move_photo( photo, self.output_dir / REVIEW_DIR / "unclassified", "unclassified", ) stats["review_unclassified"] += 1 continue category, score = cat_info if score < confidence_min: dest = self.output_dir / REVIEW_DIR / "low_confidence" stats["review_low_conf"] += 1 reason = f"low_conf:{category}" elif category in JUNK_CATEGORIES: dest = self.output_dir / JUNK_DIR stats[f"junk_{category}"] += 1 reason = f"junk:{category}" elif category in REVIEW_CATEGORIES: dest = self.output_dir / REVIEW_DIR / category stats[f"review_{category}"] += 1 reason = f"review:{category}" elif category in KEEP_CATEGORIES: dest = self.output_dir / category stats[f"keep_{category}"] += 1 reason = f"keep:{category}" else: dest = self.output_dir / REVIEW_DIR / "other" stats["review_other"] += 1 reason = f"other:{category}" self._move_photo(photo, dest, reason) return dict(stats) # -- Главная команда: run -- def run(self) -> None: """Полная обработка: сбор → деdup → classify → организация.""" self.output_dir.mkdir(parents=True, exist_ok=True) tqdm.write("=" * 60) tqdm.write(" Обработка фото: дедупликация + классификация") tqdm.write(f" Источник: {self.source_dir}/") tqdm.write(f" Выход: {self.output_dir}/") tqdm.write("=" * 60) # 1. Сбор фото, отсортированных по дате (oldest first) tqdm.write("\n[1/5] Сбор и сортировка фото по дате...") photos = self._scan_photos() if not photos: tqdm.write("Фото не найдено.") return oldest = datetime.fromtimestamp(photos[0].stat().st_mtime) newest = datetime.fromtimestamp(photos[-1].stat().st_mtime) tqdm.write( f" Найдено: {len(photos)} фото " f"({oldest.strftime('%Y-%m-%d')} → {newest.strftime('%Y-%m-%d')})" ) if self._stop: return # 2. Хеширование tqdm.write("\n[2/5] Вычисление перцептивных хешей...") hashes = self._compute_hashes(photos) if self._stop: return # 3. Хронологическая дедупликация (от старых к новым) tqdm.write("\n[3/5] Сквозная дедупликация (oldest = оригинал)...") originals, duplicates = self._dedup_chronological(photos, hashes) if self._stop: return # 4. Классификация оригиналов через CLIP tqdm.write(f"\n[4/5] Классификация {len(originals)} оригиналов...") classifications = self._classify_photos(originals) if self._stop: return # 5. Перемещение в output/ tqdm.write(f"\n[5/5] Организация файлов в {self.output_dir}/...") stats = self._organize_all(originals, duplicates, classifications) self.progress.save() # Итого tqdm.write("\n" + "=" * 60) tqdm.write(" Итого:") total_keep = sum(v for k, v in stats.items() if k.startswith("keep_")) total_junk = sum(v for k, v in stats.items() if k.startswith("junk_")) total_review = sum(v for k, v in stats.items() if k.startswith("review_")) total_dup = stats.get("duplicates", 0) tqdm.write(f" Оригиналов (keep): {total_keep}") for k, v in sorted(stats.items()): if k.startswith("keep_"): tqdm.write(f" {k.replace('keep_', '')}: {v}") tqdm.write(f" Дубликатов: {total_dup}") tqdm.write(f" Мусор (junk): {total_junk}") tqdm.write(f" На проверку (review): {total_review}") tqdm.write(f" Всего обработано: {sum(stats.values())}") tqdm.write("=" * 60) # -- Откат -- def run_rollback(self) -> None: """Откатывает все перемещения обратно в downloads/.""" tqdm.write("=" * 60) tqdm.write(" Откат всех перемещений") tqdm.write("=" * 60) if self.rollback.count == 0: tqdm.write("Нет перемещений для отката.") return tqdm.write(f"Записей в журнале: {self.rollback.count}") restored = self.rollback.rollback() tqdm.write(f"Восстановлено файлов: {restored}") # Сбрасываем прогресс self.progress.reset() # Удаляем пустые папки в output/ if self.output_dir.exists(): self._remove_empty_dirs(self.output_dir) if self.output_dir.exists() and not any(self.output_dir.iterdir()): self.output_dir.rmdir() tqdm.write(f" Удалена пустая папка: {self.output_dir.name}/") # -- Статистика -- def run_stats(self) -> None: """Показывает статистику из отчёта.""" report_path = self.output_dir / "classification_report.json" if not report_path.exists(): tqdm.write("Отчёт не найден. Сначала запусти run.") return with open(report_path, "r", encoding="utf-8") as f: report = json.load(f) tqdm.write("=" * 60) tqdm.write(" Статистика обработки") tqdm.write("=" * 60) tqdm.write(f"Всего фото: {report.get('total_photos', '?')}") tqdm.write(f"Дубликатов: {report.get('duplicates', '?')}") tqdm.write(f"\nКатегории оригиналов:") for cat, count in report.get("categories", {}).items(): marker = "" if cat in KEEP_CATEGORIES: marker = " [ОСТАВИТЬ]" elif cat in JUNK_CATEGORIES: marker = " [МУСОР]" elif cat in REVIEW_CATEGORIES: marker = " [ПРОВЕРИТЬ]" total = report.get("originals", report.get("total_photos", 1)) pct = count * 100 / total if total else 0 tqdm.write(f" {cat}: {count} ({pct:.1f}%){marker}") # -- Утилиты -- @staticmethod def _remove_empty_dirs(path: Path) -> None: """Рекурсивно удаляет пустые подпапки.""" for child in sorted(path.rglob("*"), reverse=True): if child.is_dir() and not any(child.iterdir()): child.rmdir() # --------------------------------------------------------------------------- # Точка входа # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="Сквозная дедупликация и классификация фото из ВК", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Примеры: python process_photos.py run # Полная обработка python process_photos.py run --limit 200 # Тест на 200 фото python process_photos.py run --dry-run # Без перемещения python process_photos.py rollback # Откат python process_photos.py stats # Статистика """, ) parser.add_argument( "command", choices=["run", "rollback", "stats"], help="Команда: run | rollback | stats", ) parser.add_argument( "--source", default=config.DOWNLOAD_DIR, help=f"Папка с фото (по умолчанию: {config.DOWNLOAD_DIR})", ) parser.add_argument( "--output", default=config.OUTPUT_DIR, help=f"Папка для результатов (по умолчанию: {config.OUTPUT_DIR})", ) parser.add_argument( "--limit", type=int, default=None, help="Обработать только первые N фото (для тестирования)", ) parser.add_argument( "--threshold", type=int, default=config.DEDUP_THRESHOLD, help=f"Порог Хэмминга (по умолчанию: {config.DEDUP_THRESHOLD})", ) parser.add_argument( "--dry-run", action="store_true", help="Показать без перемещения файлов", ) args = parser.parse_args() processor = PhotoProcessor(args) if args.command == "run": processor.run() elif args.command == "rollback": processor.run_rollback() elif args.command == "stats": processor.run_stats() if __name__ == "__main__": main()