#!/usr/bin/env python3 """ Скрипт для выгрузки всех фотографий из личных диалогов ВКонтакте. Использование: 1. Заполни VK_TOKEN в config.py (инструкция в файле) 2. pip install -r requirements.txt 3. python main.py 4. Ctrl+C для остановки (прогресс сохраняется автоматически) 5. Повторный запуск продолжит с места остановки """ import json import os import shutil import signal import sys import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional import piexif import piexif.helper import requests import vk_api from tqdm import tqdm import config # Приоритет размеров фото ВК (от лучшего к худшему) PHOTO_SIZE_PRIORITY: list[str] = ["w", "z", "y", "x", "r", "q", "p", "o", "m", "s"] # --------------------------------------------------------------------------- # Модели данных # --------------------------------------------------------------------------- @dataclass class PhotoInfo: """Информация о фотографии для скачивания и записи EXIF.""" photo_id: int owner_id: int url: str date: int # Unix timestamp сообщения / фото sender_id: int sender_name: str message_text: str photo_text: str lat: Optional[float] = None long: Optional[float] = None # --------------------------------------------------------------------------- # Менеджер прогресса (resume) # --------------------------------------------------------------------------- class ProgressManager: """Управление файлом прогресса для механизма resume (потокобезопасный).""" def __init__(self, progress_file: str) -> None: self.progress_file: Path = Path(progress_file) self._lock = threading.Lock() self.data: dict = self._load() self._downloaded_ids: set[int] = set(self.data.get("downloaded_photo_ids", [])) def _load(self) -> dict: """Загружает прогресс из файла или создаёт пустой.""" if self.progress_file.exists(): try: with open(self.progress_file, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return self._default_state() @staticmethod def _default_state() -> dict: """Возвращает пустое состояние прогресса.""" return { "version": 1, "last_updated": "", "dialogs_total": 0, "dialogs_completed": [], "current_dialog": None, "downloaded_photo_ids": [], "stats": { "photos_downloaded": 0, "photos_skipped": 0, "errors": 0, }, } def save(self) -> None: """Сохраняет текущее состояние прогресса в файл.""" with self._lock: self.data["last_updated"] = datetime.now().isoformat() self.data["downloaded_photo_ids"] = list(self._downloaded_ids) tmp_path = self.progress_file.with_suffix(".tmp") with open(tmp_path, "w", encoding="utf-8") as f: json.dump(self.data, f, ensure_ascii=False, indent=2) tmp_path.replace(self.progress_file) def is_dialog_completed(self, peer_id: int) -> bool: """Проверяет, завершён ли диалог.""" return peer_id in self.data["dialogs_completed"] def mark_dialog_completed(self, peer_id: int) -> None: """Помечает диалог как полностью обработанный.""" with self._lock: if peer_id not in self.data["dialogs_completed"]: self.data["dialogs_completed"].append(peer_id) self.data["current_dialog"] = None self.save() def set_current_dialog(self, peer_id: int) -> None: """Устанавливает текущий обрабатываемый диалог.""" with self._lock: self.data["current_dialog"] = {"peer_id": peer_id} self.save() def get_current_dialog(self) -> Optional[dict]: """Возвращает текущий обрабатываемый диалог или None.""" return self.data.get("current_dialog") def is_photo_downloaded(self, photo_id: int) -> bool: """Проверяет, было ли фото уже скачано.""" return photo_id in self._downloaded_ids def mark_photo_downloaded(self, photo_id: int) -> None: """Отмечает фото как скачанное (потокобезопасно).""" with self._lock: self._downloaded_ids.add(photo_id) self.data["stats"]["photos_downloaded"] += 1 def increment_skipped(self) -> None: """Увеличивает счётчик пропущенных фото (потокобезопасно).""" with self._lock: self.data["stats"]["photos_skipped"] += 1 def increment_errors(self) -> None: """Увеличивает счётчик ошибок (потокобезопасно).""" with self._lock: self.data["stats"]["errors"] += 1 # --------------------------------------------------------------------------- # Запись EXIF метаданных # --------------------------------------------------------------------------- class ExifWriter: """Запись EXIF метаданных в JPEG файлы.""" @staticmethod def _decimal_to_dms(decimal_degrees: float) -> tuple[tuple, bool]: """Конвертирует десятичные градусы в формат DMS для EXIF GPS.""" is_negative = decimal_degrees < 0 decimal_degrees = abs(decimal_degrees) degrees = int(decimal_degrees) minutes_float = (decimal_degrees - degrees) * 60 minutes = int(minutes_float) seconds = round((minutes_float - minutes) * 60 * 10000) dms = ( (degrees, 1), (minutes, 1), (seconds, 10000), ) return dms, is_negative @staticmethod def write_exif(filepath: Path, photo_info: PhotoInfo) -> None: """Записывает EXIF метаданные в файл изображения.""" if filepath.suffix.lower() not in (".jpg", ".jpeg"): ExifWriter._write_json_meta(filepath, photo_info) return try: try: exif_dict = piexif.load(str(filepath)) except Exception: exif_dict = { "0th": {}, "Exif": {}, "GPS": {}, "1st": {}, "Interop": {}, } # Дата отправки сообщения if photo_info.date: dt = datetime.fromtimestamp(photo_info.date) date_bytes = dt.strftime("%Y:%m:%d %H:%M:%S").encode("ascii") exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = date_bytes exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = date_bytes exif_dict["0th"][piexif.ImageIFD.DateTime] = date_bytes # Автор if photo_info.sender_name: artist = photo_info.sender_name.encode("utf-8") exif_dict["0th"][piexif.ImageIFD.Artist] = artist exif_dict["0th"][piexif.ImageIFD.Copyright] = artist # Описание фото из ВК if photo_info.photo_text: exif_dict["0th"][piexif.ImageIFD.ImageDescription] = ( photo_info.photo_text.encode("utf-8") ) # Текст сообщения → UserComment if photo_info.message_text: user_comment = piexif.helper.UserComment.dump( photo_info.message_text, encoding="unicode" ) exif_dict["Exif"][piexif.ExifIFD.UserComment] = user_comment # GPS if photo_info.lat is not None and photo_info.long is not None: lat_dms, lat_neg = ExifWriter._decimal_to_dms(photo_info.lat) lon_dms, lon_neg = ExifWriter._decimal_to_dms(photo_info.long) exif_dict["GPS"] = { piexif.GPSIFD.GPSVersionID: (2, 3, 0, 0), piexif.GPSIFD.GPSLatitude: lat_dms, piexif.GPSIFD.GPSLatitudeRef: b"S" if lat_neg else b"N", piexif.GPSIFD.GPSLongitude: lon_dms, piexif.GPSIFD.GPSLongitudeRef: b"W" if lon_neg else b"E", } exif_bytes = piexif.dump(exif_dict) piexif.insert(exif_bytes, str(filepath)) except Exception as exc: tqdm.write(f" EXIF ошибка ({filepath.name}): {exc}. Сохраняю в JSON.") ExifWriter._write_json_meta(filepath, photo_info) @staticmethod def _write_json_meta(filepath: Path, photo_info: PhotoInfo) -> None: """Сохраняет метаданные в JSON файл рядом с изображением.""" meta_path = filepath.with_suffix(filepath.suffix + ".meta.json") meta: dict = { "photo_id": photo_info.photo_id, "date": datetime.fromtimestamp(photo_info.date).isoformat() if photo_info.date else None, "sender_id": photo_info.sender_id, "sender": photo_info.sender_name, "message_text": photo_info.message_text, "photo_text": photo_info.photo_text, } if photo_info.lat is not None: meta["gps"] = {"lat": photo_info.lat, "long": photo_info.long} with open(meta_path, "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) # --------------------------------------------------------------------------- # Основной загрузчик # --------------------------------------------------------------------------- class VKPhotoDownloader: """Скачивание фото из всех личных диалогов ВКонтакте.""" def __init__(self) -> None: self._stop_requested: bool = False self.progress: ProgressManager = ProgressManager(config.PROGRESS_FILE) self.download_dir: Path = Path(config.DOWNLOAD_DIR) self.download_dir.mkdir(parents=True, exist_ok=True) self._user_cache: dict[int, str] = {} self._http_session: requests.Session = requests.Session() # Обработчики сигналов для graceful shutdown signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) # Авторизация VK API self._vk_session = vk_api.VkApi(token=config.VK_TOKEN, api_version=config.API_VERSION) self.api = self._vk_session.get_api() # -- сигналы -- def _signal_handler(self, _signum: int, _frame: object) -> None: """Graceful shutdown по Ctrl+C / SIGTERM.""" if self._stop_requested: tqdm.write("\nПринудительная остановка!") sys.exit(1) self._stop_requested = True tqdm.write( "\nПолучен сигнал остановки. " "Завершаю текущие загрузки и сохраняю прогресс..." ) # -- утилиты -- def _check_free_space(self) -> bool: """Проверяет, достаточно ли свободного места на диске.""" usage = shutil.disk_usage(str(self.download_dir)) free_mb = usage.free / (1024 * 1024) if free_mb < config.MIN_FREE_SPACE_MB: tqdm.write( f"Недостаточно места на диске! " f"Свободно: {free_mb:.0f} МБ, минимум: {config.MIN_FREE_SPACE_MB} МБ" ) return False return True @staticmethod def _safe_name(name: str) -> str: """Делает строку безопасной для имени папки/файла.""" return "".join(c if c.isalnum() or c in ("_", "-") else "_" for c in name) # -- VK API execute (до 25 вызовов за 1 запрос) -- def _execute(self, code: str) -> dict: """Выполняет VKScript через метод execute.""" return self._vk_session.method("execute", {"code": code}) # -- батч-загрузка имён пользователей -- def _prefetch_user_names(self, user_ids: list[int]) -> None: """Загружает имена пользователей пакетами (до 1000 за запрос).""" # Разделяем на пользователей и сообщества need_users: list[int] = [] need_groups: list[int] = [] for uid in user_ids: if uid in self._user_cache or uid == 0: continue if uid > 2_000_000_000: self._user_cache[uid] = f"Беседа_{uid - 2_000_000_000}" elif uid > 0: need_users.append(uid) else: need_groups.append(abs(uid)) # Пакетная загрузка пользователей (до 1000 за запрос) for i in range(0, len(need_users), 1000): batch = need_users[i:i + 1000] try: ids_str = ",".join(str(x) for x in batch) users = self.api.users.get(user_ids=ids_str) for u in users: self._user_cache[u["id"]] = f"{u['first_name']} {u['last_name']}" except Exception: pass time.sleep(0.34) # Пакетная загрузка сообществ (до 500 за запрос) for i in range(0, len(need_groups), 500): batch = need_groups[i:i + 500] try: ids_str = ",".join(str(x) for x in batch) resp = self.api.groups.getById(group_ids=ids_str) groups = resp if isinstance(resp, list) else resp.get("groups", []) for g in groups: gid = -g["id"] self._user_cache[gid] = g.get("name", f"group_{g['id']}") except Exception: pass time.sleep(0.34) def _get_user_name(self, user_id: int) -> str: """Получает имя из кэша. Если нет — дозагружает.""" if user_id in self._user_cache: return self._user_cache[user_id] # Фоллбэк на единичный запрос self._prefetch_user_names([user_id]) return self._user_cache.get(user_id, f"id{user_id}") # -- получение списка диалогов через execute -- def _get_all_conversations(self) -> list[dict]: """Получает полный список диалогов (через execute — 25 страниц за запрос).""" conversations: list[dict] = [] offset = 0 tqdm.write("Получаю список диалогов...") while True: # VKScript: за один execute получаем до 25 страниц по 200 диалогов code = f""" var results = []; var offset = {offset}; var i = 0; while (i < 25) {{ var resp = API.messages.getConversations({{ "offset": offset, "count": 200, "extended": 0 }}); results.push(resp); offset = offset + 200; if (offset >= resp.count || resp.items.length == 0) {{ return {{"results": results, "done": true}}; }} i = i + 1; }} return {{"results": results, "done": false, "next_offset": offset}}; """ try: data = self._execute(code) except Exception as exc: tqdm.write(f" execute ошибка (conversations): {exc}, пробую обычный метод") return self._get_all_conversations_fallback() for page in data.get("results", []): if not page: continue for item in page.get("items", []): peer = item["conversation"]["peer"] if peer["type"] == "chat": continue conversations.append( {"peer_id": peer["id"], "type": peer["type"]} ) if data.get("done", True): break offset = data.get("next_offset", offset + 5000) time.sleep(0.34) tqdm.write(f"Найдено личных диалогов: {len(conversations)} (беседы пропущены)") return conversations def _get_all_conversations_fallback(self) -> list[dict]: """Фоллбэк: получение диалогов обычным методом (без execute).""" conversations: list[dict] = [] offset = 0 while True: resp = self.api.messages.getConversations( offset=offset, count=200, extended=0, ) items = resp.get("items", []) if not items: break for item in items: peer = item["conversation"]["peer"] if peer["type"] == "chat": continue conversations.append( {"peer_id": peer["id"], "type": peer["type"]} ) offset += 200 if offset >= resp.get("count", 0): break time.sleep(0.34) return conversations # -- выбор лучшего размера фото -- @staticmethod def _best_photo_url(photo: dict) -> Optional[str]: """Выбирает URL фото максимального доступного размера.""" orig = photo.get("orig_photo") if orig and orig.get("url"): return orig["url"] sizes = photo.get("sizes") if sizes: size_map = {s["type"]: s["url"] for s in sizes} for prio in PHOTO_SIZE_PRIORITY: if prio in size_map: return size_map[prio] best = max(sizes, key=lambda s: s.get("width", 0) * s.get("height", 0)) return best.get("url") for key in ("photo_2560", "photo_1280", "photo_807", "photo_604", "photo_130", "photo_75"): if key in photo: return photo[key] return None # -- извлечение фото из сообщений -- def _extract_photos_recursive(self, message: dict) -> list[dict]: """Рекурсивно извлекает все фото из сообщения (вложения + пересланные).""" result: list[dict] = [] for att in message.get("attachments", []): if att.get("type") == "photo": photo = att["photo"] url = self._best_photo_url(photo) if url: result.append({ "photo": photo, "url": url, "from_id": message.get("from_id", 0), "date": message.get("date", photo.get("date", 0)), "message_text": message.get("text", ""), }) for fwd in message.get("fwd_messages", []): result.extend(self._extract_photos_recursive(fwd)) reply = message.get("reply_message") if reply: result.extend(self._extract_photos_recursive(reply)) return result # -- сбор фото через execute (getHistoryAttachments, до 25 страниц за запрос) -- def _collect_all_attachment_photos(self, peer_id: int) -> list[dict]: """Собирает все фото-вложения через execute (25 страниц за 1 API вызов).""" all_photos: list[dict] = [] cursor = "" while not self._stop_requested: # VKScript: до 25 вызовов getHistoryAttachments за один execute start_from_clause = ( f'"start_from": "{cursor}",' if cursor else "" ) code = f""" var results = []; var cursor = "{cursor}"; var i = 0; while (i < 25) {{ var params = {{ "peer_id": {peer_id}, "media_type": "photo", "count": 200, "preserve_order": 1 }}; if (cursor != "") {{ params.start_from = cursor; }} var resp = API.messages.getHistoryAttachments(params); results.push(resp); if (!resp.next_from || resp.items.length == 0) {{ return {{"results": results, "cursor": ""}}; }} cursor = resp.next_from; i = i + 1; }} return {{"results": results, "cursor": cursor}}; """ try: data = self._execute(code) except Exception as exc: tqdm.write(f" execute ошибка (attachments): {exc}, фоллбэк") return self._collect_attachments_fallback(peer_id, all_photos, cursor) for page in data.get("results", []): if not page: continue for item in page.get("items", []): att = item.get("attachment", {}) if att.get("type") != "photo": continue photo = att["photo"] url = self._best_photo_url(photo) if url: all_photos.append({ "photo": photo, "url": url, "from_id": item.get("from_id", 0), "date": item.get("date", photo.get("date", 0)), "message_text": "", }) cursor = data.get("cursor", "") if not cursor: break time.sleep(0.34) return all_photos def _collect_attachments_fallback( self, peer_id: int, existing: list[dict], start_from: str ) -> list[dict]: """Фоллбэк: обычная пагинация getHistoryAttachments.""" cursor: Optional[str] = start_from or None while not self._stop_requested: params: dict = { "peer_id": peer_id, "media_type": "photo", "count": 200, "preserve_order": 1, } if cursor: params["start_from"] = cursor resp = self.api.messages.getHistoryAttachments(**params) items = resp.get("items", []) cursor = resp.get("next_from") if not items: break for item in items: att = item.get("attachment", {}) if att.get("type") != "photo": continue photo = att["photo"] url = self._best_photo_url(photo) if url: existing.append({ "photo": photo, "url": url, "from_id": item.get("from_id", 0), "date": item.get("date", photo.get("date", 0)), "message_text": "", }) if not cursor: break time.sleep(0.34) return existing # -- сбор фото из пересланных через execute (getHistory) -- def _collect_forwarded_photos( self, peer_id: int, known_ids: set[int], ) -> list[dict]: """Сканирует историю сообщений через execute (25 страниц за запрос).""" found: list[dict] = [] offset = 0 while not self._stop_requested: code = f""" var results = []; var offset = {offset}; var i = 0; while (i < 25) {{ var resp = API.messages.getHistory({{ "peer_id": {peer_id}, "offset": offset, "count": 200 }}); results.push(resp); offset = offset + 200; if (offset >= resp.count || resp.items.length == 0) {{ return {{"results": results, "done": true}}; }} i = i + 1; }} return {{"results": results, "done": false, "next_offset": offset}}; """ try: data = self._execute(code) except vk_api.exceptions.ApiError as exc: tqdm.write(f" API ошибка при getHistory: {exc}") break except Exception as exc: tqdm.write(f" execute ошибка (history): {exc}, прерываю сканирование") break for page in data.get("results", []): if not page: continue for msg in page.get("items", []): sources: list[dict] = [] for fwd in msg.get("fwd_messages", []): sources.extend(self._extract_photos_recursive(fwd)) reply = msg.get("reply_message") if reply: sources.extend(self._extract_photos_recursive(reply)) for item in sources: pid = item["photo"].get("id", 0) if pid and pid not in known_ids: known_ids.add(pid) if not item.get("message_text"): item["message_text"] = msg.get("text", "") found.append(item) if data.get("done", True): break offset = data.get("next_offset", offset + 5000) time.sleep(0.34) return found # -- скачивание одного фото (для потока) -- def _download_single( self, photo_data: dict, dialog_dir: Path ) -> Optional[PhotoInfo]: """Скачивает одно фото, записывает EXIF и utime. Возвращает PhotoInfo или None.""" photo = photo_data["photo"] photo_id = photo.get("id", 0) url = photo_data["url"] date_ts: int = photo_data.get("date", photo.get("date", 0)) # Формируем путь: dialog_dir / YYYY / photo_{id}_{date}.jpg dt = datetime.fromtimestamp(date_ts) if date_ts else datetime.now() subfolder = dt.strftime("%Y") filename = f"photo_{photo_id}_{dt.strftime('%Y%m%d_%H%M%S')}.jpg" filepath = dialog_dir / subfolder / filename # Скачиваем с retry for attempt in range(config.MAX_RETRIES): try: resp = self._http_session.get( url, timeout=config.DOWNLOAD_TIMEOUT, stream=True ) resp.raise_for_status() filepath.parent.mkdir(parents=True, exist_ok=True) with open(filepath, "wb") as f: for chunk in resp.iter_content(chunk_size=8192): f.write(chunk) break except (requests.RequestException, OSError) as exc: if attempt < config.MAX_RETRIES - 1: time.sleep(2 ** attempt) else: tqdm.write(f" Ошибка скачивания ({filename}): {exc}") return None # Метаданные sender_id = photo_data.get("from_id", 0) sender_name = self._user_cache.get(sender_id, f"id{sender_id}") if sender_id else "" info = PhotoInfo( photo_id=photo_id, owner_id=photo.get("owner_id", 0), url=url, date=date_ts, sender_id=sender_id, sender_name=sender_name, message_text=photo_data.get("message_text", ""), photo_text=photo.get("text", ""), lat=photo.get("lat"), long=photo.get("long"), ) # EXIF ExifWriter.write_exif(filepath, info) # Дата файла = дата сообщения if date_ts: os.utime(filepath, (date_ts, date_ts)) return info # -- обработка одного диалога -- def _process_dialog( self, peer_id: int, dialog_name: str, photos_bar: tqdm ) -> None: """Обрабатывает один диалог: собирает и скачивает все фото.""" saved = self.progress.get_current_dialog() resuming = saved is not None and saved.get("peer_id") == peer_id if not resuming: self.progress.set_current_dialog(peer_id) known_ids: set[int] = set() all_photos: list[dict] = [] # Фаза 1: вложения через execute + getHistoryAttachments tqdm.write(f" [{dialog_name}] Сбор фото-вложений...") att_photos = self._collect_all_attachment_photos(peer_id) for p in att_photos: pid = p["photo"].get("id", 0) if pid and pid not in known_ids: known_ids.add(pid) all_photos.append(p) # Фаза 2: пересланные сообщения через execute + getHistory if not self._stop_requested: tqdm.write(f" [{dialog_name}] Поиск фото в пересланных сообщениях...") fwd_photos = self._collect_forwarded_photos(peer_id, known_ids) all_photos.extend(fwd_photos) if not all_photos: tqdm.write(f" [{dialog_name}] Нет фото") if not self._stop_requested: self.progress.mark_dialog_completed(peer_id) return # Фильтруем уже скачанные tasks = [ p for p in all_photos if not self.progress.is_photo_downloaded(p["photo"].get("id", 0)) ] skipped = len(all_photos) - len(tasks) tqdm.write( f" [{dialog_name}] Всего: {len(all_photos)}, " f"скачать: {len(tasks)}, пропустить: {skipped}" ) if not tasks: if not self._stop_requested: self.progress.mark_dialog_completed(peer_id) return # Предзагрузка имён отправителей пакетом sender_ids = list({t.get("from_id", 0) for t in tasks if t.get("from_id", 0)}) if sender_ids: self._prefetch_user_names(sender_ids) # Настройка прогресс-бара photos_bar.reset(total=len(all_photos)) photos_bar.n = skipped photos_bar.refresh() photos_bar.set_description(f"Фото ({dialog_name[:25]})") safe_dialog = self._safe_name(dialog_name) dialog_dir = self.download_dir / f"{safe_dialog}_id{peer_id}" # -- Параллельное скачивание через ThreadPoolExecutor -- completed_count = 0 with ThreadPoolExecutor(max_workers=config.DOWNLOAD_WORKERS) as executor: futures: dict = {} for task in tasks: if self._stop_requested: break if not self._check_free_space(): tqdm.write("Остановка из-за нехватки места на диске.") self._stop_requested = True break future = executor.submit(self._download_single, task, dialog_dir) futures[future] = task for future in as_completed(futures): task = futures[future] photo_id = task["photo"].get("id", 0) try: result = future.result() if result is not None: self.progress.mark_photo_downloaded(photo_id) else: self.progress.increment_errors() except Exception: self.progress.increment_errors() photos_bar.update(1) completed_count += 1 # Сохраняем прогресс каждые 50 фото if completed_count % 50 == 0: self.progress.save() if self._stop_requested: # Отменяем ещё не запущенные задачи for f in futures: f.cancel() break self.progress.save() if not self._stop_requested: self.progress.mark_dialog_completed(peer_id) # -- главный цикл -- def run(self) -> None: """Запускает процесс скачивания фотографий.""" tqdm.write("=" * 60) tqdm.write(" Выгрузка фото из диалогов ВКонтакте") tqdm.write(f" Потоков скачивания: {config.DOWNLOAD_WORKERS}") tqdm.write("=" * 60) if not config.VK_TOKEN: tqdm.write( "ОШИБКА: Заполни VK_TOKEN в config.py!\n" "Инструкция по получению токена — в комментарии в config.py" ) sys.exit(1) try: me = self.api.users.get()[0] my_name = f"{me['first_name']} {me['last_name']}" tqdm.write(f"Авторизован как: {my_name}") self._user_cache[me["id"]] = my_name except Exception as exc: tqdm.write(f"ОШИБКА авторизации: {exc}") tqdm.write("Проверь VK_TOKEN в config.py") sys.exit(1) conversations = self._get_all_conversations() self.progress.data["dialogs_total"] = len(conversations) self.progress.save() # Предзагрузка имён для всех собеседников peer_ids = [c["peer_id"] for c in conversations] tqdm.write("Загружаю имена собеседников...") self._prefetch_user_names(peer_ids) completed_ids: set[int] = set(self.progress.data["dialogs_completed"]) remaining = [c for c in conversations if c["peer_id"] not in completed_ids] current = self.progress.get_current_dialog() if current: cur_pid = current["peer_id"] remaining = [c for c in remaining if c["peer_id"] != cur_pid] for c in conversations: if c["peer_id"] == cur_pid: remaining.insert(0, c) break stats = self.progress.data["stats"] tqdm.write( f"\nПрогресс: {len(completed_ids)}/{len(conversations)} диалогов, " f"{stats['photos_downloaded']} фото уже скачано" ) tqdm.write(f"Осталось обработать: {len(remaining)} диалогов") tqdm.write("-" * 60) dialogs_bar = tqdm( total=len(conversations), initial=len(completed_ids), desc="Диалоги", unit=" диал", position=0, dynamic_ncols=True, ) photos_bar = tqdm( total=0, desc="Фото", unit=" фото", position=1, leave=False, dynamic_ncols=True, ) try: for conv in remaining: if self._stop_requested: break peer_id = conv["peer_id"] dialog_name = self._get_user_name(peer_id) self._process_dialog(peer_id, dialog_name, photos_bar) if not self._stop_requested: dialogs_bar.update(1) finally: photos_bar.close() dialogs_bar.close() self.progress.save() stats = self.progress.data["stats"] completed_count = len(self.progress.data["dialogs_completed"]) total_count = self.progress.data["dialogs_total"] print("\n" + "=" * 60) print(" Итого:") print(f" Фото скачано: {stats['photos_downloaded']}") print(f" Фото пропущено: {stats['photos_skipped']}") print(f" Ошибок: {stats['errors']}") print(f" Диалогов обработано: {completed_count}/{total_count}") if self._stop_requested: print("\n Работа остановлена. Запусти скрипт снова для продолжения.") else: print("\n Все диалоги обработаны!") print("=" * 60) # --------------------------------------------------------------------------- # Точка входа # --------------------------------------------------------------------------- def main() -> None: """Точка входа скрипта.""" downloader = VKPhotoDownloader() downloader.run() if __name__ == "__main__": main()