Скрипты для выгрузки фото и видео из диалогов ВКонтакте, обработки (дедупликация + CLIP-классификация) и загрузки в Immich. Co-authored-by: Cursor <cursoragent@cursor.com>
958 lines
38 KiB
Python
958 lines
38 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Скрипт для выгрузки всех фотографий из личных диалогов ВКонтакте.
|
||
|
||
Использование:
|
||
1. Заполни VK_TOKEN в config.py (инструкция в файле)
|
||
2. pip install -r requirements.txt
|
||
3. python main.py
|
||
4. Ctrl+C для остановки (прогресс сохраняется автоматически)
|
||
5. Повторный запуск продолжит с места остановки
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import shutil
|
||
import signal
|
||
import sys
|
||
import threading
|
||
import time
|
||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
import piexif
|
||
import piexif.helper
|
||
import requests
|
||
import vk_api
|
||
from tqdm import tqdm
|
||
|
||
import config
|
||
|
||
# Приоритет размеров фото ВК (от лучшего к худшему)
|
||
PHOTO_SIZE_PRIORITY: list[str] = ["w", "z", "y", "x", "r", "q", "p", "o", "m", "s"]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Модели данных
|
||
# ---------------------------------------------------------------------------
|
||
|
||
@dataclass
|
||
class PhotoInfo:
|
||
"""Информация о фотографии для скачивания и записи EXIF."""
|
||
|
||
photo_id: int
|
||
owner_id: int
|
||
url: str
|
||
date: int # Unix timestamp сообщения / фото
|
||
sender_id: int
|
||
sender_name: str
|
||
message_text: str
|
||
photo_text: str
|
||
lat: Optional[float] = None
|
||
long: Optional[float] = None
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Менеджер прогресса (resume)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class ProgressManager:
|
||
"""Управление файлом прогресса для механизма resume (потокобезопасный)."""
|
||
|
||
def __init__(self, progress_file: str) -> None:
|
||
self.progress_file: Path = Path(progress_file)
|
||
self._lock = threading.Lock()
|
||
self.data: dict = self._load()
|
||
self._downloaded_ids: set[int] = set(self.data.get("downloaded_photo_ids", []))
|
||
|
||
def _load(self) -> dict:
|
||
"""Загружает прогресс из файла или создаёт пустой."""
|
||
if self.progress_file.exists():
|
||
try:
|
||
with open(self.progress_file, "r", encoding="utf-8") as f:
|
||
return json.load(f)
|
||
except (json.JSONDecodeError, OSError):
|
||
pass
|
||
return self._default_state()
|
||
|
||
@staticmethod
|
||
def _default_state() -> dict:
|
||
"""Возвращает пустое состояние прогресса."""
|
||
return {
|
||
"version": 1,
|
||
"last_updated": "",
|
||
"dialogs_total": 0,
|
||
"dialogs_completed": [],
|
||
"current_dialog": None,
|
||
"downloaded_photo_ids": [],
|
||
"stats": {
|
||
"photos_downloaded": 0,
|
||
"photos_skipped": 0,
|
||
"errors": 0,
|
||
},
|
||
}
|
||
|
||
def save(self) -> None:
|
||
"""Сохраняет текущее состояние прогресса в файл."""
|
||
with self._lock:
|
||
self.data["last_updated"] = datetime.now().isoformat()
|
||
self.data["downloaded_photo_ids"] = list(self._downloaded_ids)
|
||
tmp_path = self.progress_file.with_suffix(".tmp")
|
||
with open(tmp_path, "w", encoding="utf-8") as f:
|
||
json.dump(self.data, f, ensure_ascii=False, indent=2)
|
||
tmp_path.replace(self.progress_file)
|
||
|
||
def is_dialog_completed(self, peer_id: int) -> bool:
|
||
"""Проверяет, завершён ли диалог."""
|
||
return peer_id in self.data["dialogs_completed"]
|
||
|
||
def mark_dialog_completed(self, peer_id: int) -> None:
|
||
"""Помечает диалог как полностью обработанный."""
|
||
with self._lock:
|
||
if peer_id not in self.data["dialogs_completed"]:
|
||
self.data["dialogs_completed"].append(peer_id)
|
||
self.data["current_dialog"] = None
|
||
self.save()
|
||
|
||
def set_current_dialog(self, peer_id: int) -> None:
|
||
"""Устанавливает текущий обрабатываемый диалог."""
|
||
with self._lock:
|
||
self.data["current_dialog"] = {"peer_id": peer_id}
|
||
self.save()
|
||
|
||
def get_current_dialog(self) -> Optional[dict]:
|
||
"""Возвращает текущий обрабатываемый диалог или None."""
|
||
return self.data.get("current_dialog")
|
||
|
||
def is_photo_downloaded(self, photo_id: int) -> bool:
|
||
"""Проверяет, было ли фото уже скачано."""
|
||
return photo_id in self._downloaded_ids
|
||
|
||
def mark_photo_downloaded(self, photo_id: int) -> None:
|
||
"""Отмечает фото как скачанное (потокобезопасно)."""
|
||
with self._lock:
|
||
self._downloaded_ids.add(photo_id)
|
||
self.data["stats"]["photos_downloaded"] += 1
|
||
|
||
def increment_skipped(self) -> None:
|
||
"""Увеличивает счётчик пропущенных фото (потокобезопасно)."""
|
||
with self._lock:
|
||
self.data["stats"]["photos_skipped"] += 1
|
||
|
||
def increment_errors(self) -> None:
|
||
"""Увеличивает счётчик ошибок (потокобезопасно)."""
|
||
with self._lock:
|
||
self.data["stats"]["errors"] += 1
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Запись EXIF метаданных
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class ExifWriter:
|
||
"""Запись EXIF метаданных в JPEG файлы."""
|
||
|
||
@staticmethod
|
||
def _decimal_to_dms(decimal_degrees: float) -> tuple[tuple, bool]:
|
||
"""Конвертирует десятичные градусы в формат DMS для EXIF GPS."""
|
||
is_negative = decimal_degrees < 0
|
||
decimal_degrees = abs(decimal_degrees)
|
||
degrees = int(decimal_degrees)
|
||
minutes_float = (decimal_degrees - degrees) * 60
|
||
minutes = int(minutes_float)
|
||
seconds = round((minutes_float - minutes) * 60 * 10000)
|
||
dms = (
|
||
(degrees, 1),
|
||
(minutes, 1),
|
||
(seconds, 10000),
|
||
)
|
||
return dms, is_negative
|
||
|
||
@staticmethod
|
||
def write_exif(filepath: Path, photo_info: PhotoInfo) -> None:
|
||
"""Записывает EXIF метаданные в файл изображения."""
|
||
if filepath.suffix.lower() not in (".jpg", ".jpeg"):
|
||
ExifWriter._write_json_meta(filepath, photo_info)
|
||
return
|
||
|
||
try:
|
||
try:
|
||
exif_dict = piexif.load(str(filepath))
|
||
except Exception:
|
||
exif_dict = {
|
||
"0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "Interop": {},
|
||
}
|
||
|
||
# Дата отправки сообщения
|
||
if photo_info.date:
|
||
dt = datetime.fromtimestamp(photo_info.date)
|
||
date_bytes = dt.strftime("%Y:%m:%d %H:%M:%S").encode("ascii")
|
||
exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = date_bytes
|
||
exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = date_bytes
|
||
exif_dict["0th"][piexif.ImageIFD.DateTime] = date_bytes
|
||
|
||
# Автор
|
||
if photo_info.sender_name:
|
||
artist = photo_info.sender_name.encode("utf-8")
|
||
exif_dict["0th"][piexif.ImageIFD.Artist] = artist
|
||
exif_dict["0th"][piexif.ImageIFD.Copyright] = artist
|
||
|
||
# Описание фото из ВК
|
||
if photo_info.photo_text:
|
||
exif_dict["0th"][piexif.ImageIFD.ImageDescription] = (
|
||
photo_info.photo_text.encode("utf-8")
|
||
)
|
||
|
||
# Текст сообщения → UserComment
|
||
if photo_info.message_text:
|
||
user_comment = piexif.helper.UserComment.dump(
|
||
photo_info.message_text, encoding="unicode"
|
||
)
|
||
exif_dict["Exif"][piexif.ExifIFD.UserComment] = user_comment
|
||
|
||
# GPS
|
||
if photo_info.lat is not None and photo_info.long is not None:
|
||
lat_dms, lat_neg = ExifWriter._decimal_to_dms(photo_info.lat)
|
||
lon_dms, lon_neg = ExifWriter._decimal_to_dms(photo_info.long)
|
||
exif_dict["GPS"] = {
|
||
piexif.GPSIFD.GPSVersionID: (2, 3, 0, 0),
|
||
piexif.GPSIFD.GPSLatitude: lat_dms,
|
||
piexif.GPSIFD.GPSLatitudeRef: b"S" if lat_neg else b"N",
|
||
piexif.GPSIFD.GPSLongitude: lon_dms,
|
||
piexif.GPSIFD.GPSLongitudeRef: b"W" if lon_neg else b"E",
|
||
}
|
||
|
||
exif_bytes = piexif.dump(exif_dict)
|
||
piexif.insert(exif_bytes, str(filepath))
|
||
|
||
except Exception as exc:
|
||
tqdm.write(f" EXIF ошибка ({filepath.name}): {exc}. Сохраняю в JSON.")
|
||
ExifWriter._write_json_meta(filepath, photo_info)
|
||
|
||
@staticmethod
|
||
def _write_json_meta(filepath: Path, photo_info: PhotoInfo) -> None:
|
||
"""Сохраняет метаданные в JSON файл рядом с изображением."""
|
||
meta_path = filepath.with_suffix(filepath.suffix + ".meta.json")
|
||
meta: dict = {
|
||
"photo_id": photo_info.photo_id,
|
||
"date": datetime.fromtimestamp(photo_info.date).isoformat() if photo_info.date else None,
|
||
"sender_id": photo_info.sender_id,
|
||
"sender": photo_info.sender_name,
|
||
"message_text": photo_info.message_text,
|
||
"photo_text": photo_info.photo_text,
|
||
}
|
||
if photo_info.lat is not None:
|
||
meta["gps"] = {"lat": photo_info.lat, "long": photo_info.long}
|
||
|
||
with open(meta_path, "w", encoding="utf-8") as f:
|
||
json.dump(meta, f, ensure_ascii=False, indent=2)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Основной загрузчик
|
||
# ---------------------------------------------------------------------------
|
||
|
||
class VKPhotoDownloader:
|
||
"""Скачивание фото из всех личных диалогов ВКонтакте."""
|
||
|
||
def __init__(self) -> None:
|
||
self._stop_requested: bool = False
|
||
self.progress: ProgressManager = ProgressManager(config.PROGRESS_FILE)
|
||
self.download_dir: Path = Path(config.DOWNLOAD_DIR)
|
||
self.download_dir.mkdir(parents=True, exist_ok=True)
|
||
self._user_cache: dict[int, str] = {}
|
||
self._http_session: requests.Session = requests.Session()
|
||
|
||
# Обработчики сигналов для graceful shutdown
|
||
signal.signal(signal.SIGINT, self._signal_handler)
|
||
signal.signal(signal.SIGTERM, self._signal_handler)
|
||
|
||
# Авторизация VK API
|
||
self._vk_session = vk_api.VkApi(token=config.VK_TOKEN, api_version=config.API_VERSION)
|
||
self.api = self._vk_session.get_api()
|
||
|
||
# -- сигналы --
|
||
|
||
def _signal_handler(self, _signum: int, _frame: object) -> None:
|
||
"""Graceful shutdown по Ctrl+C / SIGTERM."""
|
||
if self._stop_requested:
|
||
tqdm.write("\nПринудительная остановка!")
|
||
sys.exit(1)
|
||
self._stop_requested = True
|
||
tqdm.write(
|
||
"\nПолучен сигнал остановки. "
|
||
"Завершаю текущие загрузки и сохраняю прогресс..."
|
||
)
|
||
|
||
# -- утилиты --
|
||
|
||
def _check_free_space(self) -> bool:
|
||
"""Проверяет, достаточно ли свободного места на диске."""
|
||
usage = shutil.disk_usage(str(self.download_dir))
|
||
free_mb = usage.free / (1024 * 1024)
|
||
if free_mb < config.MIN_FREE_SPACE_MB:
|
||
tqdm.write(
|
||
f"Недостаточно места на диске! "
|
||
f"Свободно: {free_mb:.0f} МБ, минимум: {config.MIN_FREE_SPACE_MB} МБ"
|
||
)
|
||
return False
|
||
return True
|
||
|
||
@staticmethod
|
||
def _safe_name(name: str) -> str:
|
||
"""Делает строку безопасной для имени папки/файла."""
|
||
return "".join(c if c.isalnum() or c in ("_", "-") else "_" for c in name)
|
||
|
||
# -- VK API execute (до 25 вызовов за 1 запрос) --
|
||
|
||
def _execute(self, code: str) -> dict:
|
||
"""Выполняет VKScript через метод execute."""
|
||
return self._vk_session.method("execute", {"code": code})
|
||
|
||
# -- батч-загрузка имён пользователей --
|
||
|
||
def _prefetch_user_names(self, user_ids: list[int]) -> None:
|
||
"""Загружает имена пользователей пакетами (до 1000 за запрос)."""
|
||
# Разделяем на пользователей и сообщества
|
||
need_users: list[int] = []
|
||
need_groups: list[int] = []
|
||
|
||
for uid in user_ids:
|
||
if uid in self._user_cache or uid == 0:
|
||
continue
|
||
if uid > 2_000_000_000:
|
||
self._user_cache[uid] = f"Беседа_{uid - 2_000_000_000}"
|
||
elif uid > 0:
|
||
need_users.append(uid)
|
||
else:
|
||
need_groups.append(abs(uid))
|
||
|
||
# Пакетная загрузка пользователей (до 1000 за запрос)
|
||
for i in range(0, len(need_users), 1000):
|
||
batch = need_users[i:i + 1000]
|
||
try:
|
||
ids_str = ",".join(str(x) for x in batch)
|
||
users = self.api.users.get(user_ids=ids_str)
|
||
for u in users:
|
||
self._user_cache[u["id"]] = f"{u['first_name']} {u['last_name']}"
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.34)
|
||
|
||
# Пакетная загрузка сообществ (до 500 за запрос)
|
||
for i in range(0, len(need_groups), 500):
|
||
batch = need_groups[i:i + 500]
|
||
try:
|
||
ids_str = ",".join(str(x) for x in batch)
|
||
resp = self.api.groups.getById(group_ids=ids_str)
|
||
groups = resp if isinstance(resp, list) else resp.get("groups", [])
|
||
for g in groups:
|
||
gid = -g["id"]
|
||
self._user_cache[gid] = g.get("name", f"group_{g['id']}")
|
||
except Exception:
|
||
pass
|
||
time.sleep(0.34)
|
||
|
||
def _get_user_name(self, user_id: int) -> str:
|
||
"""Получает имя из кэша. Если нет — дозагружает."""
|
||
if user_id in self._user_cache:
|
||
return self._user_cache[user_id]
|
||
# Фоллбэк на единичный запрос
|
||
self._prefetch_user_names([user_id])
|
||
return self._user_cache.get(user_id, f"id{user_id}")
|
||
|
||
# -- получение списка диалогов через execute --
|
||
|
||
def _get_all_conversations(self) -> list[dict]:
|
||
"""Получает полный список диалогов (через execute — 25 страниц за запрос)."""
|
||
conversations: list[dict] = []
|
||
offset = 0
|
||
|
||
tqdm.write("Получаю список диалогов...")
|
||
|
||
while True:
|
||
# VKScript: за один execute получаем до 25 страниц по 200 диалогов
|
||
code = f"""
|
||
var results = [];
|
||
var offset = {offset};
|
||
var i = 0;
|
||
while (i < 25) {{
|
||
var resp = API.messages.getConversations({{
|
||
"offset": offset, "count": 200, "extended": 0
|
||
}});
|
||
results.push(resp);
|
||
offset = offset + 200;
|
||
if (offset >= resp.count || resp.items.length == 0) {{
|
||
return {{"results": results, "done": true}};
|
||
}}
|
||
i = i + 1;
|
||
}}
|
||
return {{"results": results, "done": false, "next_offset": offset}};
|
||
"""
|
||
try:
|
||
data = self._execute(code)
|
||
except Exception as exc:
|
||
tqdm.write(f" execute ошибка (conversations): {exc}, пробую обычный метод")
|
||
return self._get_all_conversations_fallback()
|
||
|
||
for page in data.get("results", []):
|
||
if not page:
|
||
continue
|
||
for item in page.get("items", []):
|
||
peer = item["conversation"]["peer"]
|
||
if peer["type"] == "chat":
|
||
continue
|
||
conversations.append(
|
||
{"peer_id": peer["id"], "type": peer["type"]}
|
||
)
|
||
|
||
if data.get("done", True):
|
||
break
|
||
offset = data.get("next_offset", offset + 5000)
|
||
time.sleep(0.34)
|
||
|
||
tqdm.write(f"Найдено личных диалогов: {len(conversations)} (беседы пропущены)")
|
||
return conversations
|
||
|
||
def _get_all_conversations_fallback(self) -> list[dict]:
|
||
"""Фоллбэк: получение диалогов обычным методом (без execute)."""
|
||
conversations: list[dict] = []
|
||
offset = 0
|
||
|
||
while True:
|
||
resp = self.api.messages.getConversations(
|
||
offset=offset, count=200, extended=0,
|
||
)
|
||
items = resp.get("items", [])
|
||
if not items:
|
||
break
|
||
for item in items:
|
||
peer = item["conversation"]["peer"]
|
||
if peer["type"] == "chat":
|
||
continue
|
||
conversations.append(
|
||
{"peer_id": peer["id"], "type": peer["type"]}
|
||
)
|
||
offset += 200
|
||
if offset >= resp.get("count", 0):
|
||
break
|
||
time.sleep(0.34)
|
||
|
||
return conversations
|
||
|
||
# -- выбор лучшего размера фото --
|
||
|
||
@staticmethod
|
||
def _best_photo_url(photo: dict) -> Optional[str]:
|
||
"""Выбирает URL фото максимального доступного размера."""
|
||
orig = photo.get("orig_photo")
|
||
if orig and orig.get("url"):
|
||
return orig["url"]
|
||
|
||
sizes = photo.get("sizes")
|
||
if sizes:
|
||
size_map = {s["type"]: s["url"] for s in sizes}
|
||
for prio in PHOTO_SIZE_PRIORITY:
|
||
if prio in size_map:
|
||
return size_map[prio]
|
||
best = max(sizes, key=lambda s: s.get("width", 0) * s.get("height", 0))
|
||
return best.get("url")
|
||
|
||
for key in ("photo_2560", "photo_1280", "photo_807", "photo_604", "photo_130", "photo_75"):
|
||
if key in photo:
|
||
return photo[key]
|
||
|
||
return None
|
||
|
||
# -- извлечение фото из сообщений --
|
||
|
||
def _extract_photos_recursive(self, message: dict) -> list[dict]:
|
||
"""Рекурсивно извлекает все фото из сообщения (вложения + пересланные)."""
|
||
result: list[dict] = []
|
||
|
||
for att in message.get("attachments", []):
|
||
if att.get("type") == "photo":
|
||
photo = att["photo"]
|
||
url = self._best_photo_url(photo)
|
||
if url:
|
||
result.append({
|
||
"photo": photo,
|
||
"url": url,
|
||
"from_id": message.get("from_id", 0),
|
||
"date": message.get("date", photo.get("date", 0)),
|
||
"message_text": message.get("text", ""),
|
||
})
|
||
|
||
for fwd in message.get("fwd_messages", []):
|
||
result.extend(self._extract_photos_recursive(fwd))
|
||
|
||
reply = message.get("reply_message")
|
||
if reply:
|
||
result.extend(self._extract_photos_recursive(reply))
|
||
|
||
return result
|
||
|
||
# -- сбор фото через execute (getHistoryAttachments, до 25 страниц за запрос) --
|
||
|
||
def _collect_all_attachment_photos(self, peer_id: int) -> list[dict]:
|
||
"""Собирает все фото-вложения через execute (25 страниц за 1 API вызов)."""
|
||
all_photos: list[dict] = []
|
||
cursor = ""
|
||
|
||
while not self._stop_requested:
|
||
# VKScript: до 25 вызовов getHistoryAttachments за один execute
|
||
start_from_clause = (
|
||
f'"start_from": "{cursor}",' if cursor else ""
|
||
)
|
||
code = f"""
|
||
var results = [];
|
||
var cursor = "{cursor}";
|
||
var i = 0;
|
||
while (i < 25) {{
|
||
var params = {{
|
||
"peer_id": {peer_id},
|
||
"media_type": "photo",
|
||
"count": 200,
|
||
"preserve_order": 1
|
||
}};
|
||
if (cursor != "") {{
|
||
params.start_from = cursor;
|
||
}}
|
||
var resp = API.messages.getHistoryAttachments(params);
|
||
results.push(resp);
|
||
if (!resp.next_from || resp.items.length == 0) {{
|
||
return {{"results": results, "cursor": ""}};
|
||
}}
|
||
cursor = resp.next_from;
|
||
i = i + 1;
|
||
}}
|
||
return {{"results": results, "cursor": cursor}};
|
||
"""
|
||
try:
|
||
data = self._execute(code)
|
||
except Exception as exc:
|
||
tqdm.write(f" execute ошибка (attachments): {exc}, фоллбэк")
|
||
return self._collect_attachments_fallback(peer_id, all_photos, cursor)
|
||
|
||
for page in data.get("results", []):
|
||
if not page:
|
||
continue
|
||
for item in page.get("items", []):
|
||
att = item.get("attachment", {})
|
||
if att.get("type") != "photo":
|
||
continue
|
||
photo = att["photo"]
|
||
url = self._best_photo_url(photo)
|
||
if url:
|
||
all_photos.append({
|
||
"photo": photo,
|
||
"url": url,
|
||
"from_id": item.get("from_id", 0),
|
||
"date": item.get("date", photo.get("date", 0)),
|
||
"message_text": "",
|
||
})
|
||
|
||
cursor = data.get("cursor", "")
|
||
if not cursor:
|
||
break
|
||
time.sleep(0.34)
|
||
|
||
return all_photos
|
||
|
||
def _collect_attachments_fallback(
|
||
self, peer_id: int, existing: list[dict], start_from: str
|
||
) -> list[dict]:
|
||
"""Фоллбэк: обычная пагинация getHistoryAttachments."""
|
||
cursor: Optional[str] = start_from or None
|
||
while not self._stop_requested:
|
||
params: dict = {
|
||
"peer_id": peer_id, "media_type": "photo",
|
||
"count": 200, "preserve_order": 1,
|
||
}
|
||
if cursor:
|
||
params["start_from"] = cursor
|
||
resp = self.api.messages.getHistoryAttachments(**params)
|
||
items = resp.get("items", [])
|
||
cursor = resp.get("next_from")
|
||
if not items:
|
||
break
|
||
for item in items:
|
||
att = item.get("attachment", {})
|
||
if att.get("type") != "photo":
|
||
continue
|
||
photo = att["photo"]
|
||
url = self._best_photo_url(photo)
|
||
if url:
|
||
existing.append({
|
||
"photo": photo, "url": url,
|
||
"from_id": item.get("from_id", 0),
|
||
"date": item.get("date", photo.get("date", 0)),
|
||
"message_text": "",
|
||
})
|
||
if not cursor:
|
||
break
|
||
time.sleep(0.34)
|
||
return existing
|
||
|
||
# -- сбор фото из пересланных через execute (getHistory) --
|
||
|
||
def _collect_forwarded_photos(
|
||
self, peer_id: int, known_ids: set[int],
|
||
) -> list[dict]:
|
||
"""Сканирует историю сообщений через execute (25 страниц за запрос)."""
|
||
found: list[dict] = []
|
||
offset = 0
|
||
|
||
while not self._stop_requested:
|
||
code = f"""
|
||
var results = [];
|
||
var offset = {offset};
|
||
var i = 0;
|
||
while (i < 25) {{
|
||
var resp = API.messages.getHistory({{
|
||
"peer_id": {peer_id}, "offset": offset, "count": 200
|
||
}});
|
||
results.push(resp);
|
||
offset = offset + 200;
|
||
if (offset >= resp.count || resp.items.length == 0) {{
|
||
return {{"results": results, "done": true}};
|
||
}}
|
||
i = i + 1;
|
||
}}
|
||
return {{"results": results, "done": false, "next_offset": offset}};
|
||
"""
|
||
try:
|
||
data = self._execute(code)
|
||
except vk_api.exceptions.ApiError as exc:
|
||
tqdm.write(f" API ошибка при getHistory: {exc}")
|
||
break
|
||
except Exception as exc:
|
||
tqdm.write(f" execute ошибка (history): {exc}, прерываю сканирование")
|
||
break
|
||
|
||
for page in data.get("results", []):
|
||
if not page:
|
||
continue
|
||
for msg in page.get("items", []):
|
||
sources: list[dict] = []
|
||
for fwd in msg.get("fwd_messages", []):
|
||
sources.extend(self._extract_photos_recursive(fwd))
|
||
reply = msg.get("reply_message")
|
||
if reply:
|
||
sources.extend(self._extract_photos_recursive(reply))
|
||
|
||
for item in sources:
|
||
pid = item["photo"].get("id", 0)
|
||
if pid and pid not in known_ids:
|
||
known_ids.add(pid)
|
||
if not item.get("message_text"):
|
||
item["message_text"] = msg.get("text", "")
|
||
found.append(item)
|
||
|
||
if data.get("done", True):
|
||
break
|
||
offset = data.get("next_offset", offset + 5000)
|
||
time.sleep(0.34)
|
||
|
||
return found
|
||
|
||
# -- скачивание одного фото (для потока) --
|
||
|
||
def _download_single(
|
||
self, photo_data: dict, dialog_dir: Path
|
||
) -> Optional[PhotoInfo]:
|
||
"""Скачивает одно фото, записывает EXIF и utime. Возвращает PhotoInfo или None."""
|
||
photo = photo_data["photo"]
|
||
photo_id = photo.get("id", 0)
|
||
url = photo_data["url"]
|
||
date_ts: int = photo_data.get("date", photo.get("date", 0))
|
||
|
||
# Формируем путь: dialog_dir / YYYY / photo_{id}_{date}.jpg
|
||
dt = datetime.fromtimestamp(date_ts) if date_ts else datetime.now()
|
||
subfolder = dt.strftime("%Y")
|
||
filename = f"photo_{photo_id}_{dt.strftime('%Y%m%d_%H%M%S')}.jpg"
|
||
filepath = dialog_dir / subfolder / filename
|
||
|
||
# Скачиваем с retry
|
||
for attempt in range(config.MAX_RETRIES):
|
||
try:
|
||
resp = self._http_session.get(
|
||
url, timeout=config.DOWNLOAD_TIMEOUT, stream=True
|
||
)
|
||
resp.raise_for_status()
|
||
filepath.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(filepath, "wb") as f:
|
||
for chunk in resp.iter_content(chunk_size=8192):
|
||
f.write(chunk)
|
||
break
|
||
except (requests.RequestException, OSError) as exc:
|
||
if attempt < config.MAX_RETRIES - 1:
|
||
time.sleep(2 ** attempt)
|
||
else:
|
||
tqdm.write(f" Ошибка скачивания ({filename}): {exc}")
|
||
return None
|
||
|
||
# Метаданные
|
||
sender_id = photo_data.get("from_id", 0)
|
||
sender_name = self._user_cache.get(sender_id, f"id{sender_id}") if sender_id else ""
|
||
|
||
info = PhotoInfo(
|
||
photo_id=photo_id,
|
||
owner_id=photo.get("owner_id", 0),
|
||
url=url,
|
||
date=date_ts,
|
||
sender_id=sender_id,
|
||
sender_name=sender_name,
|
||
message_text=photo_data.get("message_text", ""),
|
||
photo_text=photo.get("text", ""),
|
||
lat=photo.get("lat"),
|
||
long=photo.get("long"),
|
||
)
|
||
|
||
# EXIF
|
||
ExifWriter.write_exif(filepath, info)
|
||
|
||
# Дата файла = дата сообщения
|
||
if date_ts:
|
||
os.utime(filepath, (date_ts, date_ts))
|
||
|
||
return info
|
||
|
||
# -- обработка одного диалога --
|
||
|
||
def _process_dialog(
|
||
self, peer_id: int, dialog_name: str, photos_bar: tqdm
|
||
) -> None:
|
||
"""Обрабатывает один диалог: собирает и скачивает все фото."""
|
||
|
||
saved = self.progress.get_current_dialog()
|
||
resuming = saved is not None and saved.get("peer_id") == peer_id
|
||
|
||
if not resuming:
|
||
self.progress.set_current_dialog(peer_id)
|
||
|
||
known_ids: set[int] = set()
|
||
all_photos: list[dict] = []
|
||
|
||
# Фаза 1: вложения через execute + getHistoryAttachments
|
||
tqdm.write(f" [{dialog_name}] Сбор фото-вложений...")
|
||
att_photos = self._collect_all_attachment_photos(peer_id)
|
||
for p in att_photos:
|
||
pid = p["photo"].get("id", 0)
|
||
if pid and pid not in known_ids:
|
||
known_ids.add(pid)
|
||
all_photos.append(p)
|
||
|
||
# Фаза 2: пересланные сообщения через execute + getHistory
|
||
if not self._stop_requested:
|
||
tqdm.write(f" [{dialog_name}] Поиск фото в пересланных сообщениях...")
|
||
fwd_photos = self._collect_forwarded_photos(peer_id, known_ids)
|
||
all_photos.extend(fwd_photos)
|
||
|
||
if not all_photos:
|
||
tqdm.write(f" [{dialog_name}] Нет фото")
|
||
if not self._stop_requested:
|
||
self.progress.mark_dialog_completed(peer_id)
|
||
return
|
||
|
||
# Фильтруем уже скачанные
|
||
tasks = [
|
||
p for p in all_photos
|
||
if not self.progress.is_photo_downloaded(p["photo"].get("id", 0))
|
||
]
|
||
skipped = len(all_photos) - len(tasks)
|
||
|
||
tqdm.write(
|
||
f" [{dialog_name}] Всего: {len(all_photos)}, "
|
||
f"скачать: {len(tasks)}, пропустить: {skipped}"
|
||
)
|
||
|
||
if not tasks:
|
||
if not self._stop_requested:
|
||
self.progress.mark_dialog_completed(peer_id)
|
||
return
|
||
|
||
# Предзагрузка имён отправителей пакетом
|
||
sender_ids = list({t.get("from_id", 0) for t in tasks if t.get("from_id", 0)})
|
||
if sender_ids:
|
||
self._prefetch_user_names(sender_ids)
|
||
|
||
# Настройка прогресс-бара
|
||
photos_bar.reset(total=len(all_photos))
|
||
photos_bar.n = skipped
|
||
photos_bar.refresh()
|
||
photos_bar.set_description(f"Фото ({dialog_name[:25]})")
|
||
|
||
safe_dialog = self._safe_name(dialog_name)
|
||
dialog_dir = self.download_dir / f"{safe_dialog}_id{peer_id}"
|
||
|
||
# -- Параллельное скачивание через ThreadPoolExecutor --
|
||
completed_count = 0
|
||
|
||
with ThreadPoolExecutor(max_workers=config.DOWNLOAD_WORKERS) as executor:
|
||
futures: dict = {}
|
||
|
||
for task in tasks:
|
||
if self._stop_requested:
|
||
break
|
||
if not self._check_free_space():
|
||
tqdm.write("Остановка из-за нехватки места на диске.")
|
||
self._stop_requested = True
|
||
break
|
||
future = executor.submit(self._download_single, task, dialog_dir)
|
||
futures[future] = task
|
||
|
||
for future in as_completed(futures):
|
||
task = futures[future]
|
||
photo_id = task["photo"].get("id", 0)
|
||
|
||
try:
|
||
result = future.result()
|
||
if result is not None:
|
||
self.progress.mark_photo_downloaded(photo_id)
|
||
else:
|
||
self.progress.increment_errors()
|
||
except Exception:
|
||
self.progress.increment_errors()
|
||
|
||
photos_bar.update(1)
|
||
completed_count += 1
|
||
|
||
# Сохраняем прогресс каждые 50 фото
|
||
if completed_count % 50 == 0:
|
||
self.progress.save()
|
||
|
||
if self._stop_requested:
|
||
# Отменяем ещё не запущенные задачи
|
||
for f in futures:
|
||
f.cancel()
|
||
break
|
||
|
||
self.progress.save()
|
||
|
||
if not self._stop_requested:
|
||
self.progress.mark_dialog_completed(peer_id)
|
||
|
||
# -- главный цикл --
|
||
|
||
def run(self) -> None:
|
||
"""Запускает процесс скачивания фотографий."""
|
||
tqdm.write("=" * 60)
|
||
tqdm.write(" Выгрузка фото из диалогов ВКонтакте")
|
||
tqdm.write(f" Потоков скачивания: {config.DOWNLOAD_WORKERS}")
|
||
tqdm.write("=" * 60)
|
||
|
||
if not config.VK_TOKEN:
|
||
tqdm.write(
|
||
"ОШИБКА: Заполни VK_TOKEN в config.py!\n"
|
||
"Инструкция по получению токена — в комментарии в config.py"
|
||
)
|
||
sys.exit(1)
|
||
|
||
try:
|
||
me = self.api.users.get()[0]
|
||
my_name = f"{me['first_name']} {me['last_name']}"
|
||
tqdm.write(f"Авторизован как: {my_name}")
|
||
self._user_cache[me["id"]] = my_name
|
||
except Exception as exc:
|
||
tqdm.write(f"ОШИБКА авторизации: {exc}")
|
||
tqdm.write("Проверь VK_TOKEN в config.py")
|
||
sys.exit(1)
|
||
|
||
conversations = self._get_all_conversations()
|
||
self.progress.data["dialogs_total"] = len(conversations)
|
||
self.progress.save()
|
||
|
||
# Предзагрузка имён для всех собеседников
|
||
peer_ids = [c["peer_id"] for c in conversations]
|
||
tqdm.write("Загружаю имена собеседников...")
|
||
self._prefetch_user_names(peer_ids)
|
||
|
||
completed_ids: set[int] = set(self.progress.data["dialogs_completed"])
|
||
remaining = [c for c in conversations if c["peer_id"] not in completed_ids]
|
||
|
||
current = self.progress.get_current_dialog()
|
||
if current:
|
||
cur_pid = current["peer_id"]
|
||
remaining = [c for c in remaining if c["peer_id"] != cur_pid]
|
||
for c in conversations:
|
||
if c["peer_id"] == cur_pid:
|
||
remaining.insert(0, c)
|
||
break
|
||
|
||
stats = self.progress.data["stats"]
|
||
tqdm.write(
|
||
f"\nПрогресс: {len(completed_ids)}/{len(conversations)} диалогов, "
|
||
f"{stats['photos_downloaded']} фото уже скачано"
|
||
)
|
||
tqdm.write(f"Осталось обработать: {len(remaining)} диалогов")
|
||
tqdm.write("-" * 60)
|
||
|
||
dialogs_bar = tqdm(
|
||
total=len(conversations),
|
||
initial=len(completed_ids),
|
||
desc="Диалоги",
|
||
unit=" диал",
|
||
position=0,
|
||
dynamic_ncols=True,
|
||
)
|
||
photos_bar = tqdm(
|
||
total=0,
|
||
desc="Фото",
|
||
unit=" фото",
|
||
position=1,
|
||
leave=False,
|
||
dynamic_ncols=True,
|
||
)
|
||
|
||
try:
|
||
for conv in remaining:
|
||
if self._stop_requested:
|
||
break
|
||
|
||
peer_id = conv["peer_id"]
|
||
dialog_name = self._get_user_name(peer_id)
|
||
|
||
self._process_dialog(peer_id, dialog_name, photos_bar)
|
||
|
||
if not self._stop_requested:
|
||
dialogs_bar.update(1)
|
||
|
||
finally:
|
||
photos_bar.close()
|
||
dialogs_bar.close()
|
||
self.progress.save()
|
||
|
||
stats = self.progress.data["stats"]
|
||
completed_count = len(self.progress.data["dialogs_completed"])
|
||
total_count = self.progress.data["dialogs_total"]
|
||
|
||
print("\n" + "=" * 60)
|
||
print(" Итого:")
|
||
print(f" Фото скачано: {stats['photos_downloaded']}")
|
||
print(f" Фото пропущено: {stats['photos_skipped']}")
|
||
print(f" Ошибок: {stats['errors']}")
|
||
print(f" Диалогов обработано: {completed_count}/{total_count}")
|
||
if self._stop_requested:
|
||
print("\n Работа остановлена. Запусти скрипт снова для продолжения.")
|
||
else:
|
||
print("\n Все диалоги обработаны!")
|
||
print("=" * 60)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Точка входа
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def main() -> None:
|
||
"""Точка входа скрипта."""
|
||
downloader = VKPhotoDownloader()
|
||
downloader.run()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|