Скрипты для выгрузки фото и видео из диалогов ВКонтакте, обработки (дедупликация + CLIP-классификация) и загрузки в Immich. Co-authored-by: Cursor <cursoragent@cursor.com>
798 lines
30 KiB
Python
798 lines
30 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Обработка скачанных фото: сквозная дедупликация и классификация.
|
||
|
||
Все фото собираются из downloads/ (из всех диалогов), сортируются
|
||
хронологически (самые старые первыми) и раскладываются в output/:
|
||
output/personal/ — личные фото людей
|
||
output/travel/ — путешествия, места
|
||
output/food/ — еда
|
||
output/screenshots/ — скриншоты переписок
|
||
output/_duplicates/ — дубликаты (оригинал = самое раннее фото)
|
||
output/_junk/ — мемы, стикеры
|
||
output/_review/ — неуверенная классификация, art, document
|
||
|
||
Использование:
|
||
python process_photos.py run # Полная обработка
|
||
python process_photos.py run --limit 200 # Тест на 200 фото
|
||
python process_photos.py run --dry-run # Без перемещения
|
||
python process_photos.py rollback # Откатить всё назад
|
||
python process_photos.py stats # Статистика
|
||
"""
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import shutil
|
||
import signal
|
||
import sys
|
||
from collections import defaultdict
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import imagehash
|
||
import numpy as np
|
||
from PIL import Image
|
||
from tqdm import tqdm
|
||
|
||
import config
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Константы
|
||
# ---------------------------------------------------------------------------
|
||
|
||
# Папки-категории в output/
|
||
DEDUP_DIR: str = "_duplicates"
|
||
JUNK_DIR: str = "_junk"
|
||
REVIEW_DIR: str = "_review"
|
||
|
||
# Описания категорий для CLIP
|
||
CATEGORIES: dict[str, list[str]] = {
|
||
"personal": [
|
||
"a personal photograph of real people",
|
||
"a selfie photo of a person",
|
||
"a group photo of friends or family",
|
||
"a portrait photograph of a person",
|
||
"a candid photo of people at an event or party",
|
||
],
|
||
"travel": [
|
||
"a landscape photograph of nature or scenery",
|
||
"a travel photograph of a famous place or landmark",
|
||
"a photograph of architecture or buildings",
|
||
"a cityscape or street photograph",
|
||
"a photograph from a vacation or trip",
|
||
],
|
||
"food": [
|
||
"a photograph of food or a dish on a plate",
|
||
"a restaurant or cafe photograph",
|
||
"a cooking or baking photograph",
|
||
],
|
||
"screenshots": [
|
||
"a screenshot of a mobile phone chat or text messages",
|
||
"a screenshot of a computer screen with interface",
|
||
"a screenshot of a social media post or webpage",
|
||
],
|
||
"meme": [
|
||
"a meme image with text overlay and funny picture",
|
||
"a demotivational poster image with black border",
|
||
"a comic strip or cartoon panel with speech bubbles",
|
||
"an internet joke image with caption text",
|
||
],
|
||
"sticker": [
|
||
"a small cartoon sticker on plain background",
|
||
"a simple emoji or emoticon image",
|
||
"a cartoon character sticker with transparent background",
|
||
],
|
||
"art": [
|
||
"a digital art or illustration drawing",
|
||
"a hand-drawn painting or artwork",
|
||
"abstract colorful art image",
|
||
],
|
||
"document": [
|
||
"a scanned printed document or text page",
|
||
"a photograph of a paper document",
|
||
"a handwritten note or letter photograph",
|
||
],
|
||
}
|
||
|
||
# Куда перемещать каждую категорию
|
||
KEEP_CATEGORIES: set[str] = {"personal", "travel", "food", "screenshots"}
|
||
JUNK_CATEGORIES: set[str] = {"meme", "sticker"}
|
||
REVIEW_CATEGORIES: set[str] = {"art", "document"}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Утилиты
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def popcount64(arr: np.ndarray) -> np.ndarray:
|
||
"""Vectorized подсчёт единичных бит в массиве uint64."""
|
||
x = arr.astype(np.uint64)
|
||
x = x - ((x >> np.uint64(1)) & np.uint64(0x5555555555555555))
|
||
x = (x & np.uint64(0x3333333333333333)) + (
|
||
(x >> np.uint64(2)) & np.uint64(0x3333333333333333)
|
||
)
|
||
x = (x + (x >> np.uint64(4))) & np.uint64(0x0F0F0F0F0F0F0F0F)
|
||
return ((x * np.uint64(0x0101010101010101)) >> np.uint64(56)).astype(
|
||
np.int32
|
||
)
|
||
|
||
|
||
def unique_dest(dest: Path) -> Path:
|
||
"""Если файл с таким именем уже существует, добавляет суффикс _2, _3 и т.д."""
|
||
if not dest.exists():
|
||
return dest
|
||
stem = dest.stem
|
||
suffix = dest.suffix
|
||
parent = dest.parent
|
||
counter = 2
|
||
while True:
|
||
candidate = parent / f"{stem}_{counter}{suffix}"
|
||
if not candidate.exists():
|
||
return candidate
|
||
counter += 1
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Трекер прогресса
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class ProgressTracker:
|
||
"""Прогресс обработки с поддержкой возобновления."""
|
||
|
||
def __init__(self, filepath: str = config.PROCESS_PROGRESS_FILE) -> None:
|
||
self.filepath: Path = Path(filepath)
|
||
self.data: dict = self._load()
|
||
|
||
def _load(self) -> dict:
|
||
"""Загружает прогресс из файла."""
|
||
if self.filepath.exists():
|
||
try:
|
||
with open(self.filepath, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return {
|
||
"version": 2,
|
||
"hashed_files": {},
|
||
"classified_files": {},
|
||
"processed_files": [],
|
||
}
|
||
|
||
def save(self) -> None:
|
||
"""Сохраняет прогресс (атомарная запись)."""
|
||
self.data["last_updated"] = datetime.now().isoformat()
|
||
tmp = self.filepath.with_suffix(".tmp")
|
||
with open(tmp, "w", encoding="utf-8") as f:
|
||
json.dump(self.data, f, ensure_ascii=False, indent=2)
|
||
tmp.replace(self.filepath)
|
||
|
||
def reset(self) -> None:
|
||
"""Полный сброс прогресса."""
|
||
self.data = {
|
||
"version": 2,
|
||
"hashed_files": {},
|
||
"classified_files": {},
|
||
"processed_files": [],
|
||
}
|
||
self.save()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Лог отката
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class RollbackLog:
|
||
"""Журнал перемещений для отката."""
|
||
|
||
def __init__(self, filepath: str = config.ROLLBACK_LOG_FILE) -> None:
|
||
self.filepath: Path = Path(filepath)
|
||
self.moves: list[dict] = self._load()
|
||
|
||
def _load(self) -> list[dict]:
|
||
if self.filepath.exists():
|
||
try:
|
||
with open(self.filepath, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return []
|
||
|
||
def log_move(self, src: str, dst: str, reason: str) -> None:
|
||
"""Записывает перемещение."""
|
||
self.moves.append({"src": src, "dst": dst, "reason": reason})
|
||
self._save()
|
||
|
||
def _save(self) -> None:
|
||
with open(self.filepath, "w", encoding="utf-8") as f:
|
||
json.dump(self.moves, f, ensure_ascii=False, indent=2)
|
||
|
||
def rollback(self) -> int:
|
||
"""Откатить все перемещения (LIFO). Возвращает кол-во восстановленных."""
|
||
restored = 0
|
||
for move in tqdm(
|
||
reversed(self.moves), total=len(self.moves),
|
||
desc="Откат", unit=" файлов",
|
||
):
|
||
current = Path(move["dst"])
|
||
original = Path(move["src"])
|
||
if current.exists():
|
||
try:
|
||
original.parent.mkdir(parents=True, exist_ok=True)
|
||
shutil.move(str(current), str(original))
|
||
restored += 1
|
||
except OSError as exc:
|
||
tqdm.write(f" Ошибка: {current.name}: {exc}")
|
||
self.moves.clear()
|
||
self._save()
|
||
return restored
|
||
|
||
@property
|
||
def count(self) -> int:
|
||
return len(self.moves)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Основной обработчик
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class PhotoProcessor:
|
||
"""Сквозная обработка: сбор → сортировка → хеширование → деdup → CLIP → организация."""
|
||
|
||
def __init__(self, args: argparse.Namespace) -> None:
|
||
self.source_dir: Path = Path(args.source)
|
||
self.output_dir: Path = Path(args.output)
|
||
self.limit: Optional[int] = args.limit
|
||
self.threshold: int = args.threshold
|
||
self.dry_run: bool = args.dry_run
|
||
self._stop: bool = False
|
||
|
||
signal.signal(signal.SIGINT, self._signal_handler)
|
||
signal.signal(signal.SIGTERM, self._signal_handler)
|
||
|
||
self.progress = ProgressTracker()
|
||
self.rollback = RollbackLog()
|
||
|
||
def _signal_handler(self, _sig: int, _frame: object) -> None:
|
||
if self._stop:
|
||
tqdm.write("\nПринудительная остановка!")
|
||
sys.exit(1)
|
||
self._stop = True
|
||
tqdm.write("\nОстановка... сохраняю прогресс.")
|
||
|
||
# -- 1. Сбор и сортировка --
|
||
|
||
def _scan_photos(self) -> list[Path]:
|
||
"""Собирает все фото из downloads/, сортирует по дате файла (oldest first)."""
|
||
photos: list[Path] = []
|
||
for root, _dirs, files in os.walk(self.source_dir):
|
||
for f in files:
|
||
if f.lower().endswith((".jpg", ".jpeg", ".png")):
|
||
photos.append(Path(root) / f)
|
||
|
||
# Сортировка по mtime (= дата сообщения ВК, установлена через os.utime)
|
||
photos.sort(key=lambda p: p.stat().st_mtime)
|
||
|
||
if self.limit:
|
||
photos = photos[: self.limit]
|
||
|
||
return photos
|
||
|
||
# -- 2. Хеширование --
|
||
|
||
@staticmethod
|
||
def _compute_one_hash(photo_path: Path) -> Optional[tuple[str, str]]:
|
||
"""Вычисляет pHash одного фото."""
|
||
try:
|
||
with Image.open(photo_path) as img:
|
||
h = imagehash.phash(img, hash_size=config.HASH_SIZE)
|
||
return (str(photo_path), str(h))
|
||
except Exception:
|
||
return None
|
||
|
||
def _compute_hashes(self, photos: list[Path]) -> dict[str, str]:
|
||
"""Вычисляет хеши для всех фото (с кешем и многопоточностью)."""
|
||
cached: dict[str, str] = self.progress.data.get("hashed_files", {})
|
||
result: dict[str, str] = {}
|
||
to_hash: list[Path] = []
|
||
|
||
for p in photos:
|
||
key = str(p)
|
||
if key in cached:
|
||
result[key] = cached[key]
|
||
else:
|
||
to_hash.append(p)
|
||
|
||
if not to_hash:
|
||
tqdm.write(f" Все {len(result)} хешей из кеша.")
|
||
return result
|
||
|
||
tqdm.write(f" Хеширование: {len(to_hash)} новых + {len(result)} из кеша")
|
||
|
||
bar = tqdm(total=len(to_hash), desc="Хеширование", unit=" фото")
|
||
with ThreadPoolExecutor(max_workers=config.HASH_WORKERS) as executor:
|
||
futures = {
|
||
executor.submit(self._compute_one_hash, p): p
|
||
for p in to_hash
|
||
}
|
||
done_count = 0
|
||
for future in as_completed(futures):
|
||
if self._stop:
|
||
for f in futures:
|
||
f.cancel()
|
||
break
|
||
res = future.result()
|
||
if res:
|
||
path_str, hash_hex = res
|
||
result[path_str] = hash_hex
|
||
cached[path_str] = hash_hex
|
||
bar.update(1)
|
||
done_count += 1
|
||
if done_count % 500 == 0:
|
||
self.progress.data["hashed_files"] = cached
|
||
self.progress.save()
|
||
bar.close()
|
||
|
||
self.progress.data["hashed_files"] = cached
|
||
self.progress.save()
|
||
return result
|
||
|
||
# -- 3. Хронологическая дедупликация --
|
||
|
||
def _dedup_chronological(
|
||
self,
|
||
sorted_photos: list[Path],
|
||
hashes: dict[str, str],
|
||
) -> tuple[list[Path], list[Path]]:
|
||
"""Проходит фото от старых к новым. Первое вхождение — оригинал, остальные — дубликаты.
|
||
|
||
Возвращает (originals, duplicates).
|
||
"""
|
||
# Массив уникальных хешей (int) для vectorized сравнения
|
||
unique_hash_ints: list[int] = []
|
||
unique_hash_np: Optional[np.ndarray] = None
|
||
|
||
originals: list[Path] = []
|
||
duplicates: list[Path] = []
|
||
|
||
tqdm.write(f" Сквозная дедупликация (порог: {self.threshold})...")
|
||
|
||
for photo in tqdm(
|
||
sorted_photos, desc="Дедупликация", unit=" фото", leave=False,
|
||
):
|
||
if self._stop:
|
||
break
|
||
|
||
key = str(photo)
|
||
hash_hex = hashes.get(key)
|
||
if hash_hex is None:
|
||
# Не удалось вычислить хеш — пропускаем
|
||
continue
|
||
|
||
hash_int = int(hash_hex, 16)
|
||
|
||
is_duplicate = False
|
||
|
||
if unique_hash_ints:
|
||
# Vectorized сравнение с уже встреченными уникальными хешами
|
||
if unique_hash_np is None or len(unique_hash_np) != len(unique_hash_ints):
|
||
unique_hash_np = np.array(unique_hash_ints, dtype=np.uint64)
|
||
|
||
xor = np.bitwise_xor(np.uint64(hash_int), unique_hash_np)
|
||
distances = popcount64(xor)
|
||
if np.any(distances <= self.threshold):
|
||
is_duplicate = True
|
||
|
||
if is_duplicate:
|
||
duplicates.append(photo)
|
||
else:
|
||
originals.append(photo)
|
||
unique_hash_ints.append(hash_int)
|
||
unique_hash_np = None # Инвалидируем кеш numpy-массива
|
||
|
||
tqdm.write(
|
||
f" Оригиналов: {len(originals)}, дубликатов: {len(duplicates)}"
|
||
)
|
||
return originals, duplicates
|
||
|
||
# -- 4. CLIP классификация --
|
||
|
||
def _classify_photos(
|
||
self, photos: list[Path],
|
||
) -> dict[str, tuple[str, float]]:
|
||
"""Классифицирует фото через CLIP. Возвращает {path_str: (category, score)}."""
|
||
import torch
|
||
import open_clip
|
||
|
||
cached: dict[str, list] = self.progress.data.get("classified_files", {})
|
||
result: dict[str, tuple[str, float]] = {}
|
||
to_classify: list[Path] = []
|
||
|
||
for p in photos:
|
||
key = str(p)
|
||
if key in cached:
|
||
result[key] = tuple(cached[key])
|
||
elif p.exists():
|
||
to_classify.append(p)
|
||
|
||
if not to_classify:
|
||
tqdm.write(f" Все {len(result)} классификаций из кеша.")
|
||
return result
|
||
|
||
tqdm.write(f" Классификация: {len(to_classify)} новых + {len(result)} из кеша")
|
||
|
||
# Устройство
|
||
if torch.backends.mps.is_available():
|
||
device = torch.device("mps")
|
||
tqdm.write(" Устройство: Apple Silicon MPS")
|
||
elif torch.cuda.is_available():
|
||
device = torch.device("cuda")
|
||
else:
|
||
device = torch.device("cpu")
|
||
tqdm.write(" Устройство: CPU")
|
||
|
||
# Загрузка модели
|
||
tqdm.write(" Загрузка CLIP ViT-B-32...")
|
||
model, _, preprocess = open_clip.create_model_and_transforms(
|
||
"ViT-B-32", pretrained="laion2b_s34b_b79k",
|
||
)
|
||
tokenizer = open_clip.get_tokenizer("ViT-B-32")
|
||
model = model.to(device)
|
||
model.eval()
|
||
|
||
# Подготовка text-эмбеддингов
|
||
all_prompts: list[str] = []
|
||
category_mapping: list[str] = []
|
||
for cat_name, prompts in CATEGORIES.items():
|
||
for prompt in prompts:
|
||
all_prompts.append(prompt)
|
||
category_mapping.append(cat_name)
|
||
|
||
tokens = tokenizer(all_prompts).to(device)
|
||
with torch.no_grad():
|
||
text_features = model.encode_text(tokens)
|
||
text_features /= text_features.norm(dim=-1, keepdim=True)
|
||
|
||
tqdm.write(" Модель загружена.")
|
||
|
||
# Классификация батчами
|
||
batch_size = config.CLIP_BATCH_SIZE
|
||
bar = tqdm(total=len(to_classify), desc="Классификация", unit=" фото")
|
||
|
||
for i in range(0, len(to_classify), batch_size):
|
||
if self._stop:
|
||
break
|
||
|
||
batch_paths = to_classify[i: i + batch_size]
|
||
tensors: list = []
|
||
valid_paths: list[Path] = []
|
||
|
||
for p in batch_paths:
|
||
try:
|
||
img = Image.open(p).convert("RGB")
|
||
tensors.append(preprocess(img))
|
||
valid_paths.append(p)
|
||
except Exception:
|
||
pass
|
||
|
||
if tensors:
|
||
image_batch = torch.stack(tensors).to(device)
|
||
with torch.no_grad():
|
||
image_features = model.encode_image(image_batch)
|
||
image_features /= image_features.norm(dim=-1, keepdim=True)
|
||
similarities = image_features @ text_features.T
|
||
|
||
for j, path in enumerate(valid_paths):
|
||
sims = similarities[j]
|
||
cat_scores: dict[str, list[float]] = defaultdict(list)
|
||
for idx, cat_name in enumerate(category_mapping):
|
||
cat_scores[cat_name].append(sims[idx].item())
|
||
|
||
cat_avg = {
|
||
cat: sum(scores) / len(scores)
|
||
for cat, scores in cat_scores.items()
|
||
}
|
||
best_cat = max(cat_avg, key=lambda k: cat_avg[k])
|
||
best_score = cat_avg[best_cat]
|
||
|
||
key = str(path)
|
||
result[key] = (best_cat, best_score)
|
||
cached[key] = [best_cat, best_score]
|
||
|
||
bar.update(len(batch_paths))
|
||
|
||
if (i // batch_size) % 10 == 0:
|
||
self.progress.data["classified_files"] = cached
|
||
self.progress.save()
|
||
|
||
bar.close()
|
||
self.progress.data["classified_files"] = cached
|
||
self.progress.save()
|
||
return result
|
||
|
||
# -- 5. Организация (перемещение) --
|
||
|
||
def _move_photo(
|
||
self, photo: Path, dest_dir: Path, reason: str,
|
||
) -> Optional[Path]:
|
||
"""Перемещает фото в dest_dir (плоско, без подпапок диалогов).
|
||
|
||
Возвращает путь назначения или None при dry-run.
|
||
"""
|
||
dest = unique_dest(dest_dir / photo.name)
|
||
|
||
if self.dry_run:
|
||
tqdm.write(f" [DRY-RUN] {photo.name} → {dest_dir.name}/")
|
||
return None
|
||
|
||
dest_dir.mkdir(parents=True, exist_ok=True)
|
||
shutil.move(str(photo), str(dest))
|
||
self.rollback.log_move(str(photo), str(dest), reason)
|
||
return dest
|
||
|
||
def _organize_all(
|
||
self,
|
||
originals: list[Path],
|
||
duplicates: list[Path],
|
||
classifications: dict[str, tuple[str, float]],
|
||
) -> dict[str, int]:
|
||
"""Перемещает все фото в output/ по категориям (плоская структура)."""
|
||
stats: dict[str, int] = defaultdict(int)
|
||
confidence_min = config.CLIP_CONFIDENCE_MIN
|
||
|
||
# Дубликаты → output/_duplicates/
|
||
tqdm.write("Перемещение дубликатов...")
|
||
for photo in tqdm(duplicates, desc="Дубликаты", unit=" фото", leave=False):
|
||
if self._stop:
|
||
break
|
||
self._move_photo(photo, self.output_dir / DEDUP_DIR, "duplicate")
|
||
stats["duplicates"] += 1
|
||
|
||
# Оригиналы → по категориям
|
||
tqdm.write("Перемещение оригиналов по категориям...")
|
||
for photo in tqdm(originals, desc="Организация", unit=" фото", leave=False):
|
||
if self._stop:
|
||
break
|
||
|
||
key = str(photo)
|
||
cat_info = classifications.get(key)
|
||
|
||
if cat_info is None:
|
||
# Нет классификации — в review
|
||
self._move_photo(
|
||
photo,
|
||
self.output_dir / REVIEW_DIR / "unclassified",
|
||
"unclassified",
|
||
)
|
||
stats["review_unclassified"] += 1
|
||
continue
|
||
|
||
category, score = cat_info
|
||
|
||
if score < confidence_min:
|
||
dest = self.output_dir / REVIEW_DIR / "low_confidence"
|
||
stats["review_low_conf"] += 1
|
||
reason = f"low_conf:{category}"
|
||
elif category in JUNK_CATEGORIES:
|
||
dest = self.output_dir / JUNK_DIR
|
||
stats[f"junk_{category}"] += 1
|
||
reason = f"junk:{category}"
|
||
elif category in REVIEW_CATEGORIES:
|
||
dest = self.output_dir / REVIEW_DIR / category
|
||
stats[f"review_{category}"] += 1
|
||
reason = f"review:{category}"
|
||
elif category in KEEP_CATEGORIES:
|
||
dest = self.output_dir / category
|
||
stats[f"keep_{category}"] += 1
|
||
reason = f"keep:{category}"
|
||
else:
|
||
dest = self.output_dir / REVIEW_DIR / "other"
|
||
stats["review_other"] += 1
|
||
reason = f"other:{category}"
|
||
|
||
self._move_photo(photo, dest, reason)
|
||
|
||
return dict(stats)
|
||
|
||
# -- Главная команда: run --
|
||
|
||
def run(self) -> None:
|
||
"""Полная обработка: сбор → деdup → classify → организация."""
|
||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
tqdm.write("=" * 60)
|
||
tqdm.write(" Обработка фото: дедупликация + классификация")
|
||
tqdm.write(f" Источник: {self.source_dir}/")
|
||
tqdm.write(f" Выход: {self.output_dir}/")
|
||
tqdm.write("=" * 60)
|
||
|
||
# 1. Сбор фото, отсортированных по дате (oldest first)
|
||
tqdm.write("\n[1/5] Сбор и сортировка фото по дате...")
|
||
photos = self._scan_photos()
|
||
if not photos:
|
||
tqdm.write("Фото не найдено.")
|
||
return
|
||
|
||
oldest = datetime.fromtimestamp(photos[0].stat().st_mtime)
|
||
newest = datetime.fromtimestamp(photos[-1].stat().st_mtime)
|
||
tqdm.write(
|
||
f" Найдено: {len(photos)} фото "
|
||
f"({oldest.strftime('%Y-%m-%d')} → {newest.strftime('%Y-%m-%d')})"
|
||
)
|
||
|
||
if self._stop:
|
||
return
|
||
|
||
# 2. Хеширование
|
||
tqdm.write("\n[2/5] Вычисление перцептивных хешей...")
|
||
hashes = self._compute_hashes(photos)
|
||
if self._stop:
|
||
return
|
||
|
||
# 3. Хронологическая дедупликация (от старых к новым)
|
||
tqdm.write("\n[3/5] Сквозная дедупликация (oldest = оригинал)...")
|
||
originals, duplicates = self._dedup_chronological(photos, hashes)
|
||
if self._stop:
|
||
return
|
||
|
||
# 4. Классификация оригиналов через CLIP
|
||
tqdm.write(f"\n[4/5] Классификация {len(originals)} оригиналов...")
|
||
classifications = self._classify_photos(originals)
|
||
if self._stop:
|
||
return
|
||
|
||
# 5. Перемещение в output/
|
||
tqdm.write(f"\n[5/5] Организация файлов в {self.output_dir}/...")
|
||
stats = self._organize_all(originals, duplicates, classifications)
|
||
self.progress.save()
|
||
|
||
# Итого
|
||
tqdm.write("\n" + "=" * 60)
|
||
tqdm.write(" Итого:")
|
||
total_keep = sum(v for k, v in stats.items() if k.startswith("keep_"))
|
||
total_junk = sum(v for k, v in stats.items() if k.startswith("junk_"))
|
||
total_review = sum(v for k, v in stats.items() if k.startswith("review_"))
|
||
total_dup = stats.get("duplicates", 0)
|
||
|
||
tqdm.write(f" Оригиналов (keep): {total_keep}")
|
||
for k, v in sorted(stats.items()):
|
||
if k.startswith("keep_"):
|
||
tqdm.write(f" {k.replace('keep_', '')}: {v}")
|
||
tqdm.write(f" Дубликатов: {total_dup}")
|
||
tqdm.write(f" Мусор (junk): {total_junk}")
|
||
tqdm.write(f" На проверку (review): {total_review}")
|
||
tqdm.write(f" Всего обработано: {sum(stats.values())}")
|
||
tqdm.write("=" * 60)
|
||
|
||
# -- Откат --
|
||
|
||
def run_rollback(self) -> None:
|
||
"""Откатывает все перемещения обратно в downloads/."""
|
||
tqdm.write("=" * 60)
|
||
tqdm.write(" Откат всех перемещений")
|
||
tqdm.write("=" * 60)
|
||
|
||
if self.rollback.count == 0:
|
||
tqdm.write("Нет перемещений для отката.")
|
||
return
|
||
|
||
tqdm.write(f"Записей в журнале: {self.rollback.count}")
|
||
restored = self.rollback.rollback()
|
||
tqdm.write(f"Восстановлено файлов: {restored}")
|
||
|
||
# Сбрасываем прогресс
|
||
self.progress.reset()
|
||
|
||
# Удаляем пустые папки в output/
|
||
if self.output_dir.exists():
|
||
self._remove_empty_dirs(self.output_dir)
|
||
if self.output_dir.exists() and not any(self.output_dir.iterdir()):
|
||
self.output_dir.rmdir()
|
||
tqdm.write(f" Удалена пустая папка: {self.output_dir.name}/")
|
||
|
||
# -- Статистика --
|
||
|
||
def run_stats(self) -> None:
|
||
"""Показывает статистику из отчёта."""
|
||
report_path = self.output_dir / "classification_report.json"
|
||
if not report_path.exists():
|
||
tqdm.write("Отчёт не найден. Сначала запусти run.")
|
||
return
|
||
|
||
with open(report_path, "r", encoding="utf-8") as f:
|
||
report = json.load(f)
|
||
|
||
tqdm.write("=" * 60)
|
||
tqdm.write(" Статистика обработки")
|
||
tqdm.write("=" * 60)
|
||
tqdm.write(f"Всего фото: {report.get('total_photos', '?')}")
|
||
tqdm.write(f"Дубликатов: {report.get('duplicates', '?')}")
|
||
tqdm.write(f"\nКатегории оригиналов:")
|
||
for cat, count in report.get("categories", {}).items():
|
||
marker = ""
|
||
if cat in KEEP_CATEGORIES:
|
||
marker = " [ОСТАВИТЬ]"
|
||
elif cat in JUNK_CATEGORIES:
|
||
marker = " [МУСОР]"
|
||
elif cat in REVIEW_CATEGORIES:
|
||
marker = " [ПРОВЕРИТЬ]"
|
||
total = report.get("originals", report.get("total_photos", 1))
|
||
pct = count * 100 / total if total else 0
|
||
tqdm.write(f" {cat}: {count} ({pct:.1f}%){marker}")
|
||
|
||
# -- Утилиты --
|
||
|
||
@staticmethod
|
||
def _remove_empty_dirs(path: Path) -> None:
|
||
"""Рекурсивно удаляет пустые подпапки."""
|
||
for child in sorted(path.rglob("*"), reverse=True):
|
||
if child.is_dir() and not any(child.iterdir()):
|
||
child.rmdir()
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Точка входа
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
parser = argparse.ArgumentParser(
|
||
description="Сквозная дедупликация и классификация фото из ВК",
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
epilog="""
|
||
Примеры:
|
||
python process_photos.py run # Полная обработка
|
||
python process_photos.py run --limit 200 # Тест на 200 фото
|
||
python process_photos.py run --dry-run # Без перемещения
|
||
python process_photos.py rollback # Откат
|
||
python process_photos.py stats # Статистика
|
||
""",
|
||
)
|
||
parser.add_argument(
|
||
"command",
|
||
choices=["run", "rollback", "stats"],
|
||
help="Команда: run | rollback | stats",
|
||
)
|
||
parser.add_argument(
|
||
"--source",
|
||
default=config.DOWNLOAD_DIR,
|
||
help=f"Папка с фото (по умолчанию: {config.DOWNLOAD_DIR})",
|
||
)
|
||
parser.add_argument(
|
||
"--output",
|
||
default=config.OUTPUT_DIR,
|
||
help=f"Папка для результатов (по умолчанию: {config.OUTPUT_DIR})",
|
||
)
|
||
parser.add_argument(
|
||
"--limit",
|
||
type=int,
|
||
default=None,
|
||
help="Обработать только первые N фото (для тестирования)",
|
||
)
|
||
parser.add_argument(
|
||
"--threshold",
|
||
type=int,
|
||
default=config.DEDUP_THRESHOLD,
|
||
help=f"Порог Хэмминга (по умолчанию: {config.DEDUP_THRESHOLD})",
|
||
)
|
||
parser.add_argument(
|
||
"--dry-run",
|
||
action="store_true",
|
||
help="Показать без перемещения файлов",
|
||
)
|
||
args = parser.parse_args()
|
||
|
||
processor = PhotoProcessor(args)
|
||
|
||
if args.command == "run":
|
||
processor.run()
|
||
elif args.command == "rollback":
|
||
processor.run_rollback()
|
||
elif args.command == "stats":
|
||
processor.run_stats()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|