#!/usr/bin/env python3 """ Скрипт для выгрузки СВОИХ видео из личных диалогов ВКонтакте. Скачивает только видео, где owner_id == ваш ID (загруженные вами). Чужие видео, видео сообществ — пропускаются. Использование: 1. Заполни VK_TOKEN в config.py 2. pip install -r requirements.txt 3. python main_video.py 4. Ctrl+C для остановки (прогресс сохраняется) 5. Повторный запуск продолжит с места остановки """ import json import os import shutil import signal import sys import threading import time from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Optional import requests import vk_api from tqdm import tqdm import config # Приоритет качества видео (от лучшего к худшему) VIDEO_QUALITY_PRIORITY: list[str] = [ "mp4_2160", "mp4_1440", "mp4_1080", "mp4_720", "mp4_480", "mp4_360", "mp4_240", "mp4_144", ] # --------------------------------------------------------------------------- # Модели данных # --------------------------------------------------------------------------- @dataclass class VideoMeta: """Метаданные видео для JSON-сайдкара.""" video_id: int owner_id: int title: str duration: int date: int sender_id: int sender_name: str message_text: str quality: str # --------------------------------------------------------------------------- # Менеджер прогресса # --------------------------------------------------------------------------- class ProgressManager: """Прогресс с resume-механизмом (потокобезопасный).""" def __init__(self, progress_file: str) -> None: self.progress_file: Path = Path(progress_file) self._lock = threading.Lock() self.data: dict = self._load() self._downloaded_ids: set[int] = set( self.data.get("downloaded_video_ids", []) ) def _load(self) -> dict: if self.progress_file.exists(): try: with open(self.progress_file, "r", encoding="utf-8") as f: return json.load(f) except (json.JSONDecodeError, OSError): pass return self._default_state() @staticmethod def _default_state() -> dict: return { "version": 3, "last_updated": "", "dialogs_total": 0, "dialogs_completed": [], "current_dialog": None, "downloaded_video_ids": [], "stats": { "videos_downloaded": 0, "foreign_skipped": 0, "external_saved": 0, "no_files": 0, "errors": 0, "bytes_downloaded": 0, }, } def save(self) -> None: with self._lock: self.data["last_updated"] = datetime.now().isoformat() self.data["downloaded_video_ids"] = list(self._downloaded_ids) tmp = self.progress_file.with_suffix(".tmp") with open(tmp, "w", encoding="utf-8") as f: json.dump(self.data, f, ensure_ascii=False, indent=2) tmp.replace(self.progress_file) def is_dialog_completed(self, peer_id: int) -> bool: return peer_id in self.data["dialogs_completed"] def mark_dialog_completed(self, peer_id: int) -> None: with self._lock: if peer_id not in self.data["dialogs_completed"]: self.data["dialogs_completed"].append(peer_id) self.data["current_dialog"] = None self.save() def set_current_dialog(self, peer_id: int) -> None: with self._lock: self.data["current_dialog"] = {"peer_id": peer_id} self.save() def get_current_dialog(self) -> Optional[dict]: return self.data.get("current_dialog") def is_video_downloaded(self, video_id: int) -> bool: return video_id in self._downloaded_ids def mark_video_downloaded(self, video_id: int, size: int) -> None: with self._lock: self._downloaded_ids.add(video_id) self.data["stats"]["videos_downloaded"] += 1 self.data["stats"]["bytes_downloaded"] += size def increment_foreign(self, count: int = 1) -> None: with self._lock: self.data["stats"]["foreign_skipped"] += count def increment_external(self) -> None: with self._lock: self.data["stats"]["external_saved"] += 1 def increment_no_files(self) -> None: with self._lock: self.data["stats"]["no_files"] += 1 def increment_errors(self) -> None: with self._lock: self.data["stats"]["errors"] += 1 # --------------------------------------------------------------------------- # Основной загрузчик # --------------------------------------------------------------------------- class VKVideoDownloader: """Скачивание своих видео из диалогов ВК.""" def __init__(self) -> None: self._stop_requested: bool = False self.progress = ProgressManager(config.VIDEO_PROGRESS_FILE) self.download_dir = Path(config.VIDEO_DOWNLOAD_DIR) self.download_dir.mkdir(parents=True, exist_ok=True) self._user_cache: dict[int, str] = {} self._my_id: int = 0 # HTTP-сессия для скачивания файлов self._http = requests.Session() # Отдельная сессия для video.get БЕЗ User-Agent браузера # (VK не отдаёт поле files если видит браузерный UA) self._video_api_session = requests.Session() signal.signal(signal.SIGINT, self._signal_handler) signal.signal(signal.SIGTERM, self._signal_handler) # vk_api для всего кроме video.get self._vk_session = vk_api.VkApi( token=config.VK_TOKEN, api_version=config.API_VERSION, ) self.api = self._vk_session.get_api() def _signal_handler(self, _signum: int, _frame: object) -> None: if self._stop_requested: tqdm.write("\nПринудительная остановка!") sys.exit(1) self._stop_requested = True tqdm.write("\nОстановка... сохраняю прогресс.") # -- утилиты -- def _check_free_space(self) -> bool: usage = shutil.disk_usage(str(self.download_dir)) free_mb = usage.free / (1024 * 1024) if free_mb < config.VIDEO_MIN_FREE_SPACE_MB: tqdm.write( f"Мало места! Свободно: {free_mb:.0f} МБ, " f"минимум: {config.VIDEO_MIN_FREE_SPACE_MB} МБ" ) return False return True @staticmethod def _safe_name(name: str) -> str: return "".join(c if c.isalnum() or c in ("_", "-") else "_" for c in name) @staticmethod def _format_size(size_bytes: int) -> str: sz = float(size_bytes) for unit in ("Б", "КБ", "МБ", "ГБ"): if sz < 1024: return f"{sz:.1f} {unit}" sz /= 1024 return f"{sz:.1f} ТБ" @staticmethod def _format_duration(seconds: int) -> str: m, s = divmod(seconds, 60) h, m = divmod(m, 60) if h: return f"{h}:{m:02d}:{s:02d}" return f"{m}:{s:02d}" # -- VK API -- def _execute(self, code: str) -> dict: return self._vk_session.method("execute", {"code": code}) def _video_get_raw(self, video_keys: list[str]) -> list[dict]: """Вызов video.get через requests БЕЗ браузерного User-Agent. VK не отдаёт поле files если видит User-Agent браузера. """ result: list[dict] = [] for i in range(0, len(video_keys), 200): if self._stop_requested: break batch = video_keys[i:i + 200] try: resp = self._video_api_session.post( "https://api.vk.com/method/video.get", data={ "videos": ",".join(batch), "access_token": config.VK_TOKEN, "v": config.API_VERSION, }, timeout=30, ) data = resp.json() items = data.get("response", {}).get("items", []) result.extend(items) except Exception as exc: tqdm.write(f" video.get ошибка: {exc}") time.sleep(0.34) return result def _prefetch_user_names(self, user_ids: list[int]) -> None: need_users: list[int] = [] need_groups: list[int] = [] for uid in user_ids: if uid in self._user_cache or uid == 0: continue if uid > 2_000_000_000: self._user_cache[uid] = f"Беседа_{uid - 2_000_000_000}" elif uid > 0: need_users.append(uid) else: need_groups.append(abs(uid)) for i in range(0, len(need_users), 1000): batch = need_users[i:i + 1000] try: ids_str = ",".join(str(x) for x in batch) users = self.api.users.get(user_ids=ids_str) for u in users: self._user_cache[u["id"]] = f"{u['first_name']} {u['last_name']}" except Exception: pass time.sleep(0.34) for i in range(0, len(need_groups), 500): batch = need_groups[i:i + 500] try: ids_str = ",".join(str(x) for x in batch) resp = self.api.groups.getById(group_ids=ids_str) groups = resp if isinstance(resp, list) else resp.get("groups", []) for g in groups: self._user_cache[-g["id"]] = g.get("name", f"group_{g['id']}") except Exception: pass time.sleep(0.34) def _get_user_name(self, user_id: int) -> str: if user_id in self._user_cache: return self._user_cache[user_id] self._prefetch_user_names([user_id]) return self._user_cache.get(user_id, f"id{user_id}") # -- получение диалогов -- def _get_all_conversations(self) -> list[dict]: conversations: list[dict] = [] offset = 0 tqdm.write("Получаю список диалогов...") while True: code = f""" var results = []; var offset = {offset}; var i = 0; while (i < 25) {{ var resp = API.messages.getConversations({{ "offset": offset, "count": 200, "extended": 0 }}); results.push(resp); offset = offset + 200; if (offset >= resp.count || resp.items.length == 0) {{ return {{"results": results, "done": true}}; }} i = i + 1; }} return {{"results": results, "done": false, "next_offset": offset}}; """ try: data = self._execute(code) except Exception as exc: tqdm.write(f" execute ошибка: {exc}, фоллбэк") return self._get_conversations_fallback() for page in data.get("results", []): if not page: continue for item in page.get("items", []): peer = item["conversation"]["peer"] if peer["type"] == "chat": continue conversations.append( {"peer_id": peer["id"], "type": peer["type"]} ) if data.get("done", True): break offset = data.get("next_offset", offset + 5000) time.sleep(0.34) tqdm.write(f"Найдено личных диалогов: {len(conversations)}") return conversations def _get_conversations_fallback(self) -> list[dict]: conversations: list[dict] = [] offset = 0 while True: resp = self.api.messages.getConversations( offset=offset, count=200, extended=0, ) items = resp.get("items", []) if not items: break for item in items: peer = item["conversation"]["peer"] if peer["type"] == "chat": continue conversations.append( {"peer_id": peer["id"], "type": peer["type"]} ) offset += 200 if offset >= resp.get("count", 0): break time.sleep(0.34) return conversations # -- сбор видео-вложений -- def _collect_my_videos(self, peer_id: int) -> list[dict]: """Собирает видео-вложения, фильтруя только свои (owner_id == my_id).""" all_videos: list[dict] = [] cursor = "" foreign_count = 0 while not self._stop_requested: code = f""" var results = []; var cursor = "{cursor}"; var i = 0; while (i < 25) {{ var params = {{ "peer_id": {peer_id}, "media_type": "video", "count": 200, "preserve_order": 1 }}; if (cursor != "") {{ params.start_from = cursor; }} var resp = API.messages.getHistoryAttachments(params); results.push(resp); if (!resp.next_from || resp.items.length == 0) {{ return {{"results": results, "cursor": ""}}; }} cursor = resp.next_from; i = i + 1; }} return {{"results": results, "cursor": cursor}}; """ try: data = self._execute(code) except Exception as exc: tqdm.write(f" execute ошибка: {exc}, фоллбэк") fb_result, fb_foreign = self._collect_videos_fallback( peer_id, all_videos, cursor, ) foreign_count += fb_foreign break for page in data.get("results", []): if not page: continue for item in page.get("items", []): att = item.get("attachment", {}) if att.get("type") != "video": continue video = att["video"] if video.get("owner_id") != self._my_id: foreign_count += 1 continue all_videos.append({ "video": video, "from_id": item.get("from_id", 0), "date": item.get("date", video.get("date", 0)), "message_text": "", }) cursor = data.get("cursor", "") if not cursor: break time.sleep(0.34) if foreign_count: self.progress.increment_foreign(foreign_count) return all_videos def _collect_videos_fallback( self, peer_id: int, existing: list[dict], start_from: str, ) -> tuple[list[dict], int]: cursor: Optional[str] = start_from or None foreign_count = 0 while not self._stop_requested: params: dict = { "peer_id": peer_id, "media_type": "video", "count": 200, "preserve_order": 1, } if cursor: params["start_from"] = cursor resp = self.api.messages.getHistoryAttachments(**params) items = resp.get("items", []) cursor = resp.get("next_from") if not items: break for item in items: att = item.get("attachment", {}) if att.get("type") != "video": continue video = att["video"] if video.get("owner_id") != self._my_id: foreign_count += 1 continue existing.append({ "video": video, "from_id": item.get("from_id", 0), "date": item.get("date", video.get("date", 0)), "message_text": "", }) if not cursor: break time.sleep(0.34) return existing, foreign_count # -- выбор лучшего качества -- @staticmethod def _best_video_url(files: dict) -> Optional[tuple[str, str]]: """Выбирает URL видео максимального качества. Возвращает (url, quality) или None. """ for quality in VIDEO_QUALITY_PRIORITY: url = files.get(quality) if url: return (url, quality) return None # -- скачивание одного видео -- def _download_single( self, video_data: dict, files: dict, ) -> Optional[VideoMeta]: """Скачивает одно видео через прямой URL. Возвращает VideoMeta или None.""" video = video_data["video"] video_id = video.get("id", 0) date_ts: int = video_data.get("date", video.get("date", 0)) title = video.get("title", "") duration = video.get("duration", 0) best = self._best_video_url(files) if not best: self.progress.increment_no_files() return None url, quality = best # Путь: downloads_video/video_{id}_{date}_{title}.mp4 dt = datetime.fromtimestamp(date_ts) if date_ts else datetime.now() safe_title = self._safe_name(title)[:50] if title else "" suffix = f"_{safe_title}" if safe_title else "" filename = f"video_{video_id}_{dt.strftime('%Y%m%d_%H%M%S')}{suffix}.mp4" filepath = self.download_dir / filename if filepath.exists(): return None # Скачиваем с retry и стримингом (128 КБ чанки) file_size = 0 for attempt in range(config.MAX_RETRIES): try: resp = self._http.get( url, timeout=config.VIDEO_DOWNLOAD_TIMEOUT, stream=True, ) resp.raise_for_status() with open(filepath, "wb") as f: for chunk in resp.iter_content(chunk_size=131072): if self._stop_requested: f.close() filepath.unlink(missing_ok=True) return None f.write(chunk) file_size = filepath.stat().st_size break except (requests.RequestException, OSError) as exc: filepath.unlink(missing_ok=True) if attempt < config.MAX_RETRIES - 1: time.sleep(2 ** attempt) else: tqdm.write(f" Ошибка скачивания ({filename}): {exc}") return None # Метаданные sender_id = video_data.get("from_id", 0) sender_name = ( self._user_cache.get(sender_id, f"id{sender_id}") if sender_id else "" ) info = VideoMeta( video_id=video_id, owner_id=video.get("owner_id", 0), title=title, duration=duration, date=date_ts, sender_id=sender_id, sender_name=sender_name, message_text=video_data.get("message_text", ""), quality=quality, ) # Дата файла = дата сообщения if date_ts: os.utime(filepath, (date_ts, date_ts)) return info # -- обработка одного диалога -- def _process_dialog( self, peer_id: int, dialog_name: str, bar: tqdm, ) -> None: saved = self.progress.get_current_dialog() resuming = saved is not None and saved.get("peer_id") == peer_id if not resuming: self.progress.set_current_dialog(peer_id) # Сбор только моих видео raw_videos = self._collect_my_videos(peer_id) if not raw_videos: if not self._stop_requested: self.progress.mark_dialog_completed(peer_id) return # Дедупликация по video_id seen: set[int] = set() unique: list[dict] = [] for v in raw_videos: vid = v["video"].get("id", 0) if vid and vid not in seen: seen.add(vid) unique.append(v) # Фильтруем уже скачанные tasks = [ v for v in unique if not self.progress.is_video_downloaded(v["video"].get("id", 0)) ] skipped = len(unique) - len(tasks) tqdm.write( f" [{dialog_name}] Моих видео: {len(unique)}, " f"скачать: {len(tasks)}, пропустить: {skipped}" ) if not tasks: if not self._stop_requested: self.progress.mark_dialog_completed(peer_id) return # Получаем прямые URL через video.get (без браузерного UA) tqdm.write(f" [{dialog_name}] Получаю URL видеофайлов...") video_keys: list[str] = [] for t in tasks: v = t["video"] key = f"{v.get('owner_id', 0)}_{v.get('id', 0)}" ak = v.get("access_key", "") if ak: key += f"_{ak}" video_keys.append(key) details = self._video_get_raw(video_keys) # Индекс: video_id → files files_map: dict[int, dict] = {} for d in details: files_map[d["id"]] = d.get("files", {}) # Предзагрузка имён sender_ids = list({t.get("from_id", 0) for t in tasks if t.get("from_id", 0)}) if sender_ids: self._prefetch_user_names(sender_ids) # Прогресс-бар bar.reset(total=len(unique)) bar.n = skipped bar.refresh() bar.set_description(f"Видео ({dialog_name[:25]})") # Последовательное скачивание for task in tasks: if self._stop_requested: break if not self._check_free_space(): tqdm.write("Остановка: мало места на диске.") self._stop_requested = True break video_id = task["video"].get("id", 0) files = files_map.get(video_id, {}) # Проверяем: внешнее видео (YouTube и т.д.)? if "external" in files and not any( k.startswith("mp4_") for k in files ): self.progress.increment_external() bar.update(1) self.progress.save() continue # Убираем служебные поля files.pop("failover_host", None) files.pop("hls_ondemand", None) files.pop("dash_ondemand", None) files.pop("external", None) result = self._download_single(task, files) if result is not None: fsize = 0 dt = datetime.fromtimestamp(task.get("date", 0) or time.time()) safe_title = self._safe_name(task["video"].get("title", ""))[:50] sfx = f"_{safe_title}" if safe_title else "" fname = f"video_{video_id}_{dt.strftime('%Y%m%d_%H%M%S')}{sfx}.mp4" fpath = self.download_dir / fname if fpath.exists(): fsize = fpath.stat().st_size self.progress.mark_video_downloaded(video_id, fsize) dur_str = self._format_duration(task["video"].get("duration", 0)) tqdm.write( f" ✓ {task['video'].get('title', '')[:40]} " f"({dur_str}, {result.quality}, {self._format_size(fsize)})" ) else: if not files or not self._best_video_url(files): pass # increment_no_files уже вызван в _download_single else: self.progress.increment_errors() bar.update(1) self.progress.save() if not self._stop_requested: self.progress.mark_dialog_completed(peer_id) # -- главный цикл -- def run(self) -> None: tqdm.write("=" * 60) tqdm.write(" Выгрузка СВОИХ видео из диалогов ВКонтакте") tqdm.write("=" * 60) if not config.VK_TOKEN: tqdm.write("ОШИБКА: Заполни VK_TOKEN в config.py!") sys.exit(1) try: me = self.api.users.get()[0] self._my_id = me["id"] my_name = f"{me['first_name']} {me['last_name']}" tqdm.write(f"Авторизован как: {my_name} (id{self._my_id})") tqdm.write(f"Скачиваю только видео с owner_id={self._my_id}") self._user_cache[me["id"]] = my_name except Exception as exc: tqdm.write(f"ОШИБКА авторизации: {exc}") sys.exit(1) conversations = self._get_all_conversations() self.progress.data["dialogs_total"] = len(conversations) self.progress.save() peer_ids = [c["peer_id"] for c in conversations] tqdm.write("Загружаю имена собеседников...") self._prefetch_user_names(peer_ids) completed_ids = set(self.progress.data["dialogs_completed"]) remaining = [c for c in conversations if c["peer_id"] not in completed_ids] current = self.progress.get_current_dialog() if current: cur_pid = current["peer_id"] remaining = [c for c in remaining if c["peer_id"] != cur_pid] for c in conversations: if c["peer_id"] == cur_pid: remaining.insert(0, c) break stats = self.progress.data["stats"] tqdm.write( f"\nПрогресс: {len(completed_ids)}/{len(conversations)} диалогов, " f"{stats['videos_downloaded']} видео скачано, " f"{stats['foreign_skipped']} чужих пропущено" ) tqdm.write(f"Осталось: {len(remaining)} диалогов") tqdm.write("-" * 60) dialogs_bar = tqdm( total=len(conversations), initial=len(completed_ids), desc="Диалоги", unit=" диал", position=0, dynamic_ncols=True, ) videos_bar = tqdm( total=0, desc="Видео", unit=" видео", position=1, leave=False, dynamic_ncols=True, ) try: for conv in remaining: if self._stop_requested: break peer_id = conv["peer_id"] dialog_name = self._get_user_name(peer_id) self._process_dialog(peer_id, dialog_name, videos_bar) if not self._stop_requested: dialogs_bar.update(1) finally: videos_bar.close() dialogs_bar.close() self.progress.save() stats = self.progress.data["stats"] completed_count = len(self.progress.data["dialogs_completed"]) total_count = self.progress.data["dialogs_total"] print("\n" + "=" * 60) print(" Итого:") print(f" Моих видео скачано: {stats['videos_downloaded']}") print(f" Чужих видео пропущено: {stats['foreign_skipped']}") ext = stats.get("external_saved", 0) if ext: print(f" Внешних (YouTube и т.п.):{ext}") nf = stats.get("no_files", 0) if nf: print(f" Без файлов (удалены?): {nf}") print(f" Ошибок: {stats['errors']}") print(f" Скачано: {self._format_size(stats['bytes_downloaded'])}") print(f" Диалогов обработано: {completed_count}/{total_count}") if self._stop_requested: print("\n Остановлено. Запусти снова для продолжения.") else: print("\n Все диалоги обработаны!") print("=" * 60) # --------------------------------------------------------------------------- # Точка входа # --------------------------------------------------------------------------- def main() -> None: downloader = VKVideoDownloader() downloader.run() if __name__ == "__main__": main()