Files
vk-scripts/process_photos.py
Andrey f760e94206 Initial commit: VK media tools
Скрипты для выгрузки фото и видео из диалогов ВКонтакте,
обработки (дедупликация + CLIP-классификация) и загрузки в Immich.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-16 21:14:50 +03:00

798 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Обработка скачанных фото: сквозная дедупликация и классификация.
Все фото собираются из downloads/ (из всех диалогов), сортируются
хронологически (самые старые первыми) и раскладываются в output/:
output/personal/ — личные фото людей
output/travel/ — путешествия, места
output/food/ — еда
output/screenshots/ — скриншоты переписок
output/_duplicates/ — дубликаты (оригинал = самое раннее фото)
output/_junk/ — мемы, стикеры
output/_review/ — неуверенная классификация, art, document
Использование:
python process_photos.py run # Полная обработка
python process_photos.py run --limit 200 # Тест на 200 фото
python process_photos.py run --dry-run # Без перемещения
python process_photos.py rollback # Откатить всё назад
python process_photos.py stats # Статистика
"""
import argparse
import json
import os
import shutil
import signal
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from pathlib import Path
from typing import Optional
import imagehash
import numpy as np
from PIL import Image
from tqdm import tqdm
import config
# ---------------------------------------------------------------------------
# Константы
# ---------------------------------------------------------------------------
# Папки-категории в output/
DEDUP_DIR: str = "_duplicates"
JUNK_DIR: str = "_junk"
REVIEW_DIR: str = "_review"
# Описания категорий для CLIP
CATEGORIES: dict[str, list[str]] = {
"personal": [
"a personal photograph of real people",
"a selfie photo of a person",
"a group photo of friends or family",
"a portrait photograph of a person",
"a candid photo of people at an event or party",
],
"travel": [
"a landscape photograph of nature or scenery",
"a travel photograph of a famous place or landmark",
"a photograph of architecture or buildings",
"a cityscape or street photograph",
"a photograph from a vacation or trip",
],
"food": [
"a photograph of food or a dish on a plate",
"a restaurant or cafe photograph",
"a cooking or baking photograph",
],
"screenshots": [
"a screenshot of a mobile phone chat or text messages",
"a screenshot of a computer screen with interface",
"a screenshot of a social media post or webpage",
],
"meme": [
"a meme image with text overlay and funny picture",
"a demotivational poster image with black border",
"a comic strip or cartoon panel with speech bubbles",
"an internet joke image with caption text",
],
"sticker": [
"a small cartoon sticker on plain background",
"a simple emoji or emoticon image",
"a cartoon character sticker with transparent background",
],
"art": [
"a digital art or illustration drawing",
"a hand-drawn painting or artwork",
"abstract colorful art image",
],
"document": [
"a scanned printed document or text page",
"a photograph of a paper document",
"a handwritten note or letter photograph",
],
}
# Куда перемещать каждую категорию
KEEP_CATEGORIES: set[str] = {"personal", "travel", "food", "screenshots"}
JUNK_CATEGORIES: set[str] = {"meme", "sticker"}
REVIEW_CATEGORIES: set[str] = {"art", "document"}
# ---------------------------------------------------------------------------
# Утилиты
# ---------------------------------------------------------------------------
def popcount64(arr: np.ndarray) -> np.ndarray:
"""Vectorized подсчёт единичных бит в массиве uint64."""
x = arr.astype(np.uint64)
x = x - ((x >> np.uint64(1)) & np.uint64(0x5555555555555555))
x = (x & np.uint64(0x3333333333333333)) + (
(x >> np.uint64(2)) & np.uint64(0x3333333333333333)
)
x = (x + (x >> np.uint64(4))) & np.uint64(0x0F0F0F0F0F0F0F0F)
return ((x * np.uint64(0x0101010101010101)) >> np.uint64(56)).astype(
np.int32
)
def unique_dest(dest: Path) -> Path:
"""Если файл с таким именем уже существует, добавляет суффикс _2, _3 и т.д."""
if not dest.exists():
return dest
stem = dest.stem
suffix = dest.suffix
parent = dest.parent
counter = 2
while True:
candidate = parent / f"{stem}_{counter}{suffix}"
if not candidate.exists():
return candidate
counter += 1
# ---------------------------------------------------------------------------
# Трекер прогресса
# ---------------------------------------------------------------------------
class ProgressTracker:
"""Прогресс обработки с поддержкой возобновления."""
def __init__(self, filepath: str = config.PROCESS_PROGRESS_FILE) -> None:
self.filepath: Path = Path(filepath)
self.data: dict = self._load()
def _load(self) -> dict:
"""Загружает прогресс из файла."""
if self.filepath.exists():
try:
with open(self.filepath, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return {
"version": 2,
"hashed_files": {},
"classified_files": {},
"processed_files": [],
}
def save(self) -> None:
"""Сохраняет прогресс (атомарная запись)."""
self.data["last_updated"] = datetime.now().isoformat()
tmp = self.filepath.with_suffix(".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(self.data, f, ensure_ascii=False, indent=2)
tmp.replace(self.filepath)
def reset(self) -> None:
"""Полный сброс прогресса."""
self.data = {
"version": 2,
"hashed_files": {},
"classified_files": {},
"processed_files": [],
}
self.save()
# ---------------------------------------------------------------------------
# Лог отката
# ---------------------------------------------------------------------------
class RollbackLog:
"""Журнал перемещений для отката."""
def __init__(self, filepath: str = config.ROLLBACK_LOG_FILE) -> None:
self.filepath: Path = Path(filepath)
self.moves: list[dict] = self._load()
def _load(self) -> list[dict]:
if self.filepath.exists():
try:
with open(self.filepath, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return []
def log_move(self, src: str, dst: str, reason: str) -> None:
"""Записывает перемещение."""
self.moves.append({"src": src, "dst": dst, "reason": reason})
self._save()
def _save(self) -> None:
with open(self.filepath, "w", encoding="utf-8") as f:
json.dump(self.moves, f, ensure_ascii=False, indent=2)
def rollback(self) -> int:
"""Откатить все перемещения (LIFO). Возвращает кол-во восстановленных."""
restored = 0
for move in tqdm(
reversed(self.moves), total=len(self.moves),
desc="Откат", unit=" файлов",
):
current = Path(move["dst"])
original = Path(move["src"])
if current.exists():
try:
original.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(current), str(original))
restored += 1
except OSError as exc:
tqdm.write(f" Ошибка: {current.name}: {exc}")
self.moves.clear()
self._save()
return restored
@property
def count(self) -> int:
return len(self.moves)
# ---------------------------------------------------------------------------
# Основной обработчик
# ---------------------------------------------------------------------------
class PhotoProcessor:
"""Сквозная обработка: сбор → сортировка → хеширование → деdup → CLIP → организация."""
def __init__(self, args: argparse.Namespace) -> None:
self.source_dir: Path = Path(args.source)
self.output_dir: Path = Path(args.output)
self.limit: Optional[int] = args.limit
self.threshold: int = args.threshold
self.dry_run: bool = args.dry_run
self._stop: bool = False
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
self.progress = ProgressTracker()
self.rollback = RollbackLog()
def _signal_handler(self, _sig: int, _frame: object) -> None:
if self._stop:
tqdm.write("\nПринудительная остановка!")
sys.exit(1)
self._stop = True
tqdm.write("\nОстановка... сохраняю прогресс.")
# -- 1. Сбор и сортировка --
def _scan_photos(self) -> list[Path]:
"""Собирает все фото из downloads/, сортирует по дате файла (oldest first)."""
photos: list[Path] = []
for root, _dirs, files in os.walk(self.source_dir):
for f in files:
if f.lower().endswith((".jpg", ".jpeg", ".png")):
photos.append(Path(root) / f)
# Сортировка по mtime (= дата сообщения ВК, установлена через os.utime)
photos.sort(key=lambda p: p.stat().st_mtime)
if self.limit:
photos = photos[: self.limit]
return photos
# -- 2. Хеширование --
@staticmethod
def _compute_one_hash(photo_path: Path) -> Optional[tuple[str, str]]:
"""Вычисляет pHash одного фото."""
try:
with Image.open(photo_path) as img:
h = imagehash.phash(img, hash_size=config.HASH_SIZE)
return (str(photo_path), str(h))
except Exception:
return None
def _compute_hashes(self, photos: list[Path]) -> dict[str, str]:
"""Вычисляет хеши для всех фото (с кешем и многопоточностью)."""
cached: dict[str, str] = self.progress.data.get("hashed_files", {})
result: dict[str, str] = {}
to_hash: list[Path] = []
for p in photos:
key = str(p)
if key in cached:
result[key] = cached[key]
else:
to_hash.append(p)
if not to_hash:
tqdm.write(f" Все {len(result)} хешей из кеша.")
return result
tqdm.write(f" Хеширование: {len(to_hash)} новых + {len(result)} из кеша")
bar = tqdm(total=len(to_hash), desc="Хеширование", unit=" фото")
with ThreadPoolExecutor(max_workers=config.HASH_WORKERS) as executor:
futures = {
executor.submit(self._compute_one_hash, p): p
for p in to_hash
}
done_count = 0
for future in as_completed(futures):
if self._stop:
for f in futures:
f.cancel()
break
res = future.result()
if res:
path_str, hash_hex = res
result[path_str] = hash_hex
cached[path_str] = hash_hex
bar.update(1)
done_count += 1
if done_count % 500 == 0:
self.progress.data["hashed_files"] = cached
self.progress.save()
bar.close()
self.progress.data["hashed_files"] = cached
self.progress.save()
return result
# -- 3. Хронологическая дедупликация --
def _dedup_chronological(
self,
sorted_photos: list[Path],
hashes: dict[str, str],
) -> tuple[list[Path], list[Path]]:
"""Проходит фото от старых к новым. Первое вхождение — оригинал, остальные — дубликаты.
Возвращает (originals, duplicates).
"""
# Массив уникальных хешей (int) для vectorized сравнения
unique_hash_ints: list[int] = []
unique_hash_np: Optional[np.ndarray] = None
originals: list[Path] = []
duplicates: list[Path] = []
tqdm.write(f" Сквозная дедупликация (порог: {self.threshold})...")
for photo in tqdm(
sorted_photos, desc="Дедупликация", unit=" фото", leave=False,
):
if self._stop:
break
key = str(photo)
hash_hex = hashes.get(key)
if hash_hex is None:
# Не удалось вычислить хеш — пропускаем
continue
hash_int = int(hash_hex, 16)
is_duplicate = False
if unique_hash_ints:
# Vectorized сравнение с уже встреченными уникальными хешами
if unique_hash_np is None or len(unique_hash_np) != len(unique_hash_ints):
unique_hash_np = np.array(unique_hash_ints, dtype=np.uint64)
xor = np.bitwise_xor(np.uint64(hash_int), unique_hash_np)
distances = popcount64(xor)
if np.any(distances <= self.threshold):
is_duplicate = True
if is_duplicate:
duplicates.append(photo)
else:
originals.append(photo)
unique_hash_ints.append(hash_int)
unique_hash_np = None # Инвалидируем кеш numpy-массива
tqdm.write(
f" Оригиналов: {len(originals)}, дубликатов: {len(duplicates)}"
)
return originals, duplicates
# -- 4. CLIP классификация --
def _classify_photos(
self, photos: list[Path],
) -> dict[str, tuple[str, float]]:
"""Классифицирует фото через CLIP. Возвращает {path_str: (category, score)}."""
import torch
import open_clip
cached: dict[str, list] = self.progress.data.get("classified_files", {})
result: dict[str, tuple[str, float]] = {}
to_classify: list[Path] = []
for p in photos:
key = str(p)
if key in cached:
result[key] = tuple(cached[key])
elif p.exists():
to_classify.append(p)
if not to_classify:
tqdm.write(f" Все {len(result)} классификаций из кеша.")
return result
tqdm.write(f" Классификация: {len(to_classify)} новых + {len(result)} из кеша")
# Устройство
if torch.backends.mps.is_available():
device = torch.device("mps")
tqdm.write(" Устройство: Apple Silicon MPS")
elif torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
tqdm.write(" Устройство: CPU")
# Загрузка модели
tqdm.write(" Загрузка CLIP ViT-B-32...")
model, _, preprocess = open_clip.create_model_and_transforms(
"ViT-B-32", pretrained="laion2b_s34b_b79k",
)
tokenizer = open_clip.get_tokenizer("ViT-B-32")
model = model.to(device)
model.eval()
# Подготовка text-эмбеддингов
all_prompts: list[str] = []
category_mapping: list[str] = []
for cat_name, prompts in CATEGORIES.items():
for prompt in prompts:
all_prompts.append(prompt)
category_mapping.append(cat_name)
tokens = tokenizer(all_prompts).to(device)
with torch.no_grad():
text_features = model.encode_text(tokens)
text_features /= text_features.norm(dim=-1, keepdim=True)
tqdm.write(" Модель загружена.")
# Классификация батчами
batch_size = config.CLIP_BATCH_SIZE
bar = tqdm(total=len(to_classify), desc="Классификация", unit=" фото")
for i in range(0, len(to_classify), batch_size):
if self._stop:
break
batch_paths = to_classify[i: i + batch_size]
tensors: list = []
valid_paths: list[Path] = []
for p in batch_paths:
try:
img = Image.open(p).convert("RGB")
tensors.append(preprocess(img))
valid_paths.append(p)
except Exception:
pass
if tensors:
image_batch = torch.stack(tensors).to(device)
with torch.no_grad():
image_features = model.encode_image(image_batch)
image_features /= image_features.norm(dim=-1, keepdim=True)
similarities = image_features @ text_features.T
for j, path in enumerate(valid_paths):
sims = similarities[j]
cat_scores: dict[str, list[float]] = defaultdict(list)
for idx, cat_name in enumerate(category_mapping):
cat_scores[cat_name].append(sims[idx].item())
cat_avg = {
cat: sum(scores) / len(scores)
for cat, scores in cat_scores.items()
}
best_cat = max(cat_avg, key=lambda k: cat_avg[k])
best_score = cat_avg[best_cat]
key = str(path)
result[key] = (best_cat, best_score)
cached[key] = [best_cat, best_score]
bar.update(len(batch_paths))
if (i // batch_size) % 10 == 0:
self.progress.data["classified_files"] = cached
self.progress.save()
bar.close()
self.progress.data["classified_files"] = cached
self.progress.save()
return result
# -- 5. Организация (перемещение) --
def _move_photo(
self, photo: Path, dest_dir: Path, reason: str,
) -> Optional[Path]:
"""Перемещает фото в dest_dir (плоско, без подпапок диалогов).
Возвращает путь назначения или None при dry-run.
"""
dest = unique_dest(dest_dir / photo.name)
if self.dry_run:
tqdm.write(f" [DRY-RUN] {photo.name}{dest_dir.name}/")
return None
dest_dir.mkdir(parents=True, exist_ok=True)
shutil.move(str(photo), str(dest))
self.rollback.log_move(str(photo), str(dest), reason)
return dest
def _organize_all(
self,
originals: list[Path],
duplicates: list[Path],
classifications: dict[str, tuple[str, float]],
) -> dict[str, int]:
"""Перемещает все фото в output/ по категориям (плоская структура)."""
stats: dict[str, int] = defaultdict(int)
confidence_min = config.CLIP_CONFIDENCE_MIN
# Дубликаты → output/_duplicates/
tqdm.write("Перемещение дубликатов...")
for photo in tqdm(duplicates, desc="Дубликаты", unit=" фото", leave=False):
if self._stop:
break
self._move_photo(photo, self.output_dir / DEDUP_DIR, "duplicate")
stats["duplicates"] += 1
# Оригиналы → по категориям
tqdm.write("Перемещение оригиналов по категориям...")
for photo in tqdm(originals, desc="Организация", unit=" фото", leave=False):
if self._stop:
break
key = str(photo)
cat_info = classifications.get(key)
if cat_info is None:
# Нет классификации — в review
self._move_photo(
photo,
self.output_dir / REVIEW_DIR / "unclassified",
"unclassified",
)
stats["review_unclassified"] += 1
continue
category, score = cat_info
if score < confidence_min:
dest = self.output_dir / REVIEW_DIR / "low_confidence"
stats["review_low_conf"] += 1
reason = f"low_conf:{category}"
elif category in JUNK_CATEGORIES:
dest = self.output_dir / JUNK_DIR
stats[f"junk_{category}"] += 1
reason = f"junk:{category}"
elif category in REVIEW_CATEGORIES:
dest = self.output_dir / REVIEW_DIR / category
stats[f"review_{category}"] += 1
reason = f"review:{category}"
elif category in KEEP_CATEGORIES:
dest = self.output_dir / category
stats[f"keep_{category}"] += 1
reason = f"keep:{category}"
else:
dest = self.output_dir / REVIEW_DIR / "other"
stats["review_other"] += 1
reason = f"other:{category}"
self._move_photo(photo, dest, reason)
return dict(stats)
# -- Главная команда: run --
def run(self) -> None:
"""Полная обработка: сбор → деdup → classify → организация."""
self.output_dir.mkdir(parents=True, exist_ok=True)
tqdm.write("=" * 60)
tqdm.write(" Обработка фото: дедупликация + классификация")
tqdm.write(f" Источник: {self.source_dir}/")
tqdm.write(f" Выход: {self.output_dir}/")
tqdm.write("=" * 60)
# 1. Сбор фото, отсортированных по дате (oldest first)
tqdm.write("\n[1/5] Сбор и сортировка фото по дате...")
photos = self._scan_photos()
if not photos:
tqdm.write("Фото не найдено.")
return
oldest = datetime.fromtimestamp(photos[0].stat().st_mtime)
newest = datetime.fromtimestamp(photos[-1].stat().st_mtime)
tqdm.write(
f" Найдено: {len(photos)} фото "
f"({oldest.strftime('%Y-%m-%d')}{newest.strftime('%Y-%m-%d')})"
)
if self._stop:
return
# 2. Хеширование
tqdm.write("\n[2/5] Вычисление перцептивных хешей...")
hashes = self._compute_hashes(photos)
if self._stop:
return
# 3. Хронологическая дедупликация (от старых к новым)
tqdm.write("\n[3/5] Сквозная дедупликация (oldest = оригинал)...")
originals, duplicates = self._dedup_chronological(photos, hashes)
if self._stop:
return
# 4. Классификация оригиналов через CLIP
tqdm.write(f"\n[4/5] Классификация {len(originals)} оригиналов...")
classifications = self._classify_photos(originals)
if self._stop:
return
# 5. Перемещение в output/
tqdm.write(f"\n[5/5] Организация файлов в {self.output_dir}/...")
stats = self._organize_all(originals, duplicates, classifications)
self.progress.save()
# Итого
tqdm.write("\n" + "=" * 60)
tqdm.write(" Итого:")
total_keep = sum(v for k, v in stats.items() if k.startswith("keep_"))
total_junk = sum(v for k, v in stats.items() if k.startswith("junk_"))
total_review = sum(v for k, v in stats.items() if k.startswith("review_"))
total_dup = stats.get("duplicates", 0)
tqdm.write(f" Оригиналов (keep): {total_keep}")
for k, v in sorted(stats.items()):
if k.startswith("keep_"):
tqdm.write(f" {k.replace('keep_', '')}: {v}")
tqdm.write(f" Дубликатов: {total_dup}")
tqdm.write(f" Мусор (junk): {total_junk}")
tqdm.write(f" На проверку (review): {total_review}")
tqdm.write(f" Всего обработано: {sum(stats.values())}")
tqdm.write("=" * 60)
# -- Откат --
def run_rollback(self) -> None:
"""Откатывает все перемещения обратно в downloads/."""
tqdm.write("=" * 60)
tqdm.write(" Откат всех перемещений")
tqdm.write("=" * 60)
if self.rollback.count == 0:
tqdm.write("Нет перемещений для отката.")
return
tqdm.write(f"Записей в журнале: {self.rollback.count}")
restored = self.rollback.rollback()
tqdm.write(f"Восстановлено файлов: {restored}")
# Сбрасываем прогресс
self.progress.reset()
# Удаляем пустые папки в output/
if self.output_dir.exists():
self._remove_empty_dirs(self.output_dir)
if self.output_dir.exists() and not any(self.output_dir.iterdir()):
self.output_dir.rmdir()
tqdm.write(f" Удалена пустая папка: {self.output_dir.name}/")
# -- Статистика --
def run_stats(self) -> None:
"""Показывает статистику из отчёта."""
report_path = self.output_dir / "classification_report.json"
if not report_path.exists():
tqdm.write("Отчёт не найден. Сначала запусти run.")
return
with open(report_path, "r", encoding="utf-8") as f:
report = json.load(f)
tqdm.write("=" * 60)
tqdm.write(" Статистика обработки")
tqdm.write("=" * 60)
tqdm.write(f"Всего фото: {report.get('total_photos', '?')}")
tqdm.write(f"Дубликатов: {report.get('duplicates', '?')}")
tqdm.write(f"\nКатегории оригиналов:")
for cat, count in report.get("categories", {}).items():
marker = ""
if cat in KEEP_CATEGORIES:
marker = " [ОСТАВИТЬ]"
elif cat in JUNK_CATEGORIES:
marker = " [МУСОР]"
elif cat in REVIEW_CATEGORIES:
marker = " [ПРОВЕРИТЬ]"
total = report.get("originals", report.get("total_photos", 1))
pct = count * 100 / total if total else 0
tqdm.write(f" {cat}: {count} ({pct:.1f}%){marker}")
# -- Утилиты --
@staticmethod
def _remove_empty_dirs(path: Path) -> None:
"""Рекурсивно удаляет пустые подпапки."""
for child in sorted(path.rglob("*"), reverse=True):
if child.is_dir() and not any(child.iterdir()):
child.rmdir()
# ---------------------------------------------------------------------------
# Точка входа
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Сквозная дедупликация и классификация фото из ВК",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Примеры:
python process_photos.py run # Полная обработка
python process_photos.py run --limit 200 # Тест на 200 фото
python process_photos.py run --dry-run # Без перемещения
python process_photos.py rollback # Откат
python process_photos.py stats # Статистика
""",
)
parser.add_argument(
"command",
choices=["run", "rollback", "stats"],
help="Команда: run | rollback | stats",
)
parser.add_argument(
"--source",
default=config.DOWNLOAD_DIR,
help=f"Папка с фото (по умолчанию: {config.DOWNLOAD_DIR})",
)
parser.add_argument(
"--output",
default=config.OUTPUT_DIR,
help=f"Папка для результатов (по умолчанию: {config.OUTPUT_DIR})",
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Обработать только первые N фото (для тестирования)",
)
parser.add_argument(
"--threshold",
type=int,
default=config.DEDUP_THRESHOLD,
help=f"Порог Хэмминга (по умолчанию: {config.DEDUP_THRESHOLD})",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Показать без перемещения файлов",
)
args = parser.parse_args()
processor = PhotoProcessor(args)
if args.command == "run":
processor.run()
elif args.command == "rollback":
processor.run_rollback()
elif args.command == "stats":
processor.run_stats()
if __name__ == "__main__":
main()