Files
vk-scripts/main_video.py
Andrey f760e94206 Initial commit: VK media tools
Скрипты для выгрузки фото и видео из диалогов ВКонтакте,
обработки (дедупликация + CLIP-классификация) и загрузки в Immich.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-16 21:14:50 +03:00

824 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Скрипт для выгрузки СВОИХ видео из личных диалогов ВКонтакте.
Скачивает только видео, где owner_id == ваш ID (загруженные вами).
Чужие видео, видео сообществ — пропускаются.
Использование:
1. Заполни VK_TOKEN в config.py
2. pip install -r requirements.txt
3. python main_video.py
4. Ctrl+C для остановки (прогресс сохраняется)
5. Повторный запуск продолжит с места остановки
"""
import json
import os
import shutil
import signal
import sys
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Optional
import requests
import vk_api
from tqdm import tqdm
import config
# Приоритет качества видео (от лучшего к худшему)
VIDEO_QUALITY_PRIORITY: list[str] = [
"mp4_2160", "mp4_1440", "mp4_1080", "mp4_720", "mp4_480", "mp4_360", "mp4_240", "mp4_144",
]
# ---------------------------------------------------------------------------
# Модели данных
# ---------------------------------------------------------------------------
@dataclass
class VideoMeta:
"""Метаданные видео для JSON-сайдкара."""
video_id: int
owner_id: int
title: str
duration: int
date: int
sender_id: int
sender_name: str
message_text: str
quality: str
# ---------------------------------------------------------------------------
# Менеджер прогресса
# ---------------------------------------------------------------------------
class ProgressManager:
"""Прогресс с resume-механизмом (потокобезопасный)."""
def __init__(self, progress_file: str) -> None:
self.progress_file: Path = Path(progress_file)
self._lock = threading.Lock()
self.data: dict = self._load()
self._downloaded_ids: set[int] = set(
self.data.get("downloaded_video_ids", [])
)
def _load(self) -> dict:
if self.progress_file.exists():
try:
with open(self.progress_file, "r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
pass
return self._default_state()
@staticmethod
def _default_state() -> dict:
return {
"version": 3,
"last_updated": "",
"dialogs_total": 0,
"dialogs_completed": [],
"current_dialog": None,
"downloaded_video_ids": [],
"stats": {
"videos_downloaded": 0,
"foreign_skipped": 0,
"external_saved": 0,
"no_files": 0,
"errors": 0,
"bytes_downloaded": 0,
},
}
def save(self) -> None:
with self._lock:
self.data["last_updated"] = datetime.now().isoformat()
self.data["downloaded_video_ids"] = list(self._downloaded_ids)
tmp = self.progress_file.with_suffix(".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(self.data, f, ensure_ascii=False, indent=2)
tmp.replace(self.progress_file)
def is_dialog_completed(self, peer_id: int) -> bool:
return peer_id in self.data["dialogs_completed"]
def mark_dialog_completed(self, peer_id: int) -> None:
with self._lock:
if peer_id not in self.data["dialogs_completed"]:
self.data["dialogs_completed"].append(peer_id)
self.data["current_dialog"] = None
self.save()
def set_current_dialog(self, peer_id: int) -> None:
with self._lock:
self.data["current_dialog"] = {"peer_id": peer_id}
self.save()
def get_current_dialog(self) -> Optional[dict]:
return self.data.get("current_dialog")
def is_video_downloaded(self, video_id: int) -> bool:
return video_id in self._downloaded_ids
def mark_video_downloaded(self, video_id: int, size: int) -> None:
with self._lock:
self._downloaded_ids.add(video_id)
self.data["stats"]["videos_downloaded"] += 1
self.data["stats"]["bytes_downloaded"] += size
def increment_foreign(self, count: int = 1) -> None:
with self._lock:
self.data["stats"]["foreign_skipped"] += count
def increment_external(self) -> None:
with self._lock:
self.data["stats"]["external_saved"] += 1
def increment_no_files(self) -> None:
with self._lock:
self.data["stats"]["no_files"] += 1
def increment_errors(self) -> None:
with self._lock:
self.data["stats"]["errors"] += 1
# ---------------------------------------------------------------------------
# Основной загрузчик
# ---------------------------------------------------------------------------
class VKVideoDownloader:
"""Скачивание своих видео из диалогов ВК."""
def __init__(self) -> None:
self._stop_requested: bool = False
self.progress = ProgressManager(config.VIDEO_PROGRESS_FILE)
self.download_dir = Path(config.VIDEO_DOWNLOAD_DIR)
self.download_dir.mkdir(parents=True, exist_ok=True)
self._user_cache: dict[int, str] = {}
self._my_id: int = 0
# HTTP-сессия для скачивания файлов
self._http = requests.Session()
# Отдельная сессия для video.get БЕЗ User-Agent браузера
# (VK не отдаёт поле files если видит браузерный UA)
self._video_api_session = requests.Session()
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
# vk_api для всего кроме video.get
self._vk_session = vk_api.VkApi(
token=config.VK_TOKEN, api_version=config.API_VERSION,
)
self.api = self._vk_session.get_api()
def _signal_handler(self, _signum: int, _frame: object) -> None:
if self._stop_requested:
tqdm.write("\nПринудительная остановка!")
sys.exit(1)
self._stop_requested = True
tqdm.write("\nОстановка... сохраняю прогресс.")
# -- утилиты --
def _check_free_space(self) -> bool:
usage = shutil.disk_usage(str(self.download_dir))
free_mb = usage.free / (1024 * 1024)
if free_mb < config.VIDEO_MIN_FREE_SPACE_MB:
tqdm.write(
f"Мало места! Свободно: {free_mb:.0f} МБ, "
f"минимум: {config.VIDEO_MIN_FREE_SPACE_MB} МБ"
)
return False
return True
@staticmethod
def _safe_name(name: str) -> str:
return "".join(c if c.isalnum() or c in ("_", "-") else "_" for c in name)
@staticmethod
def _format_size(size_bytes: int) -> str:
sz = float(size_bytes)
for unit in ("Б", "КБ", "МБ", "ГБ"):
if sz < 1024:
return f"{sz:.1f} {unit}"
sz /= 1024
return f"{sz:.1f} ТБ"
@staticmethod
def _format_duration(seconds: int) -> str:
m, s = divmod(seconds, 60)
h, m = divmod(m, 60)
if h:
return f"{h}:{m:02d}:{s:02d}"
return f"{m}:{s:02d}"
# -- VK API --
def _execute(self, code: str) -> dict:
return self._vk_session.method("execute", {"code": code})
def _video_get_raw(self, video_keys: list[str]) -> list[dict]:
"""Вызов video.get через requests БЕЗ браузерного User-Agent.
VK не отдаёт поле files если видит User-Agent браузера.
"""
result: list[dict] = []
for i in range(0, len(video_keys), 200):
if self._stop_requested:
break
batch = video_keys[i:i + 200]
try:
resp = self._video_api_session.post(
"https://api.vk.com/method/video.get",
data={
"videos": ",".join(batch),
"access_token": config.VK_TOKEN,
"v": config.API_VERSION,
},
timeout=30,
)
data = resp.json()
items = data.get("response", {}).get("items", [])
result.extend(items)
except Exception as exc:
tqdm.write(f" video.get ошибка: {exc}")
time.sleep(0.34)
return result
def _prefetch_user_names(self, user_ids: list[int]) -> None:
need_users: list[int] = []
need_groups: list[int] = []
for uid in user_ids:
if uid in self._user_cache or uid == 0:
continue
if uid > 2_000_000_000:
self._user_cache[uid] = f"Беседа_{uid - 2_000_000_000}"
elif uid > 0:
need_users.append(uid)
else:
need_groups.append(abs(uid))
for i in range(0, len(need_users), 1000):
batch = need_users[i:i + 1000]
try:
ids_str = ",".join(str(x) for x in batch)
users = self.api.users.get(user_ids=ids_str)
for u in users:
self._user_cache[u["id"]] = f"{u['first_name']} {u['last_name']}"
except Exception:
pass
time.sleep(0.34)
for i in range(0, len(need_groups), 500):
batch = need_groups[i:i + 500]
try:
ids_str = ",".join(str(x) for x in batch)
resp = self.api.groups.getById(group_ids=ids_str)
groups = resp if isinstance(resp, list) else resp.get("groups", [])
for g in groups:
self._user_cache[-g["id"]] = g.get("name", f"group_{g['id']}")
except Exception:
pass
time.sleep(0.34)
def _get_user_name(self, user_id: int) -> str:
if user_id in self._user_cache:
return self._user_cache[user_id]
self._prefetch_user_names([user_id])
return self._user_cache.get(user_id, f"id{user_id}")
# -- получение диалогов --
def _get_all_conversations(self) -> list[dict]:
conversations: list[dict] = []
offset = 0
tqdm.write("Получаю список диалогов...")
while True:
code = f"""
var results = [];
var offset = {offset};
var i = 0;
while (i < 25) {{
var resp = API.messages.getConversations({{
"offset": offset, "count": 200, "extended": 0
}});
results.push(resp);
offset = offset + 200;
if (offset >= resp.count || resp.items.length == 0) {{
return {{"results": results, "done": true}};
}}
i = i + 1;
}}
return {{"results": results, "done": false, "next_offset": offset}};
"""
try:
data = self._execute(code)
except Exception as exc:
tqdm.write(f" execute ошибка: {exc}, фоллбэк")
return self._get_conversations_fallback()
for page in data.get("results", []):
if not page:
continue
for item in page.get("items", []):
peer = item["conversation"]["peer"]
if peer["type"] == "chat":
continue
conversations.append(
{"peer_id": peer["id"], "type": peer["type"]}
)
if data.get("done", True):
break
offset = data.get("next_offset", offset + 5000)
time.sleep(0.34)
tqdm.write(f"Найдено личных диалогов: {len(conversations)}")
return conversations
def _get_conversations_fallback(self) -> list[dict]:
conversations: list[dict] = []
offset = 0
while True:
resp = self.api.messages.getConversations(
offset=offset, count=200, extended=0,
)
items = resp.get("items", [])
if not items:
break
for item in items:
peer = item["conversation"]["peer"]
if peer["type"] == "chat":
continue
conversations.append(
{"peer_id": peer["id"], "type": peer["type"]}
)
offset += 200
if offset >= resp.get("count", 0):
break
time.sleep(0.34)
return conversations
# -- сбор видео-вложений --
def _collect_my_videos(self, peer_id: int) -> list[dict]:
"""Собирает видео-вложения, фильтруя только свои (owner_id == my_id)."""
all_videos: list[dict] = []
cursor = ""
foreign_count = 0
while not self._stop_requested:
code = f"""
var results = [];
var cursor = "{cursor}";
var i = 0;
while (i < 25) {{
var params = {{
"peer_id": {peer_id},
"media_type": "video",
"count": 200,
"preserve_order": 1
}};
if (cursor != "") {{
params.start_from = cursor;
}}
var resp = API.messages.getHistoryAttachments(params);
results.push(resp);
if (!resp.next_from || resp.items.length == 0) {{
return {{"results": results, "cursor": ""}};
}}
cursor = resp.next_from;
i = i + 1;
}}
return {{"results": results, "cursor": cursor}};
"""
try:
data = self._execute(code)
except Exception as exc:
tqdm.write(f" execute ошибка: {exc}, фоллбэк")
fb_result, fb_foreign = self._collect_videos_fallback(
peer_id, all_videos, cursor,
)
foreign_count += fb_foreign
break
for page in data.get("results", []):
if not page:
continue
for item in page.get("items", []):
att = item.get("attachment", {})
if att.get("type") != "video":
continue
video = att["video"]
if video.get("owner_id") != self._my_id:
foreign_count += 1
continue
all_videos.append({
"video": video,
"from_id": item.get("from_id", 0),
"date": item.get("date", video.get("date", 0)),
"message_text": "",
})
cursor = data.get("cursor", "")
if not cursor:
break
time.sleep(0.34)
if foreign_count:
self.progress.increment_foreign(foreign_count)
return all_videos
def _collect_videos_fallback(
self, peer_id: int, existing: list[dict], start_from: str,
) -> tuple[list[dict], int]:
cursor: Optional[str] = start_from or None
foreign_count = 0
while not self._stop_requested:
params: dict = {
"peer_id": peer_id, "media_type": "video",
"count": 200, "preserve_order": 1,
}
if cursor:
params["start_from"] = cursor
resp = self.api.messages.getHistoryAttachments(**params)
items = resp.get("items", [])
cursor = resp.get("next_from")
if not items:
break
for item in items:
att = item.get("attachment", {})
if att.get("type") != "video":
continue
video = att["video"]
if video.get("owner_id") != self._my_id:
foreign_count += 1
continue
existing.append({
"video": video,
"from_id": item.get("from_id", 0),
"date": item.get("date", video.get("date", 0)),
"message_text": "",
})
if not cursor:
break
time.sleep(0.34)
return existing, foreign_count
# -- выбор лучшего качества --
@staticmethod
def _best_video_url(files: dict) -> Optional[tuple[str, str]]:
"""Выбирает URL видео максимального качества.
Возвращает (url, quality) или None.
"""
for quality in VIDEO_QUALITY_PRIORITY:
url = files.get(quality)
if url:
return (url, quality)
return None
# -- скачивание одного видео --
def _download_single(
self, video_data: dict, files: dict,
) -> Optional[VideoMeta]:
"""Скачивает одно видео через прямой URL. Возвращает VideoMeta или None."""
video = video_data["video"]
video_id = video.get("id", 0)
date_ts: int = video_data.get("date", video.get("date", 0))
title = video.get("title", "")
duration = video.get("duration", 0)
best = self._best_video_url(files)
if not best:
self.progress.increment_no_files()
return None
url, quality = best
# Путь: downloads_video/video_{id}_{date}_{title}.mp4
dt = datetime.fromtimestamp(date_ts) if date_ts else datetime.now()
safe_title = self._safe_name(title)[:50] if title else ""
suffix = f"_{safe_title}" if safe_title else ""
filename = f"video_{video_id}_{dt.strftime('%Y%m%d_%H%M%S')}{suffix}.mp4"
filepath = self.download_dir / filename
if filepath.exists():
return None
# Скачиваем с retry и стримингом (128 КБ чанки)
file_size = 0
for attempt in range(config.MAX_RETRIES):
try:
resp = self._http.get(
url, timeout=config.VIDEO_DOWNLOAD_TIMEOUT, stream=True,
)
resp.raise_for_status()
with open(filepath, "wb") as f:
for chunk in resp.iter_content(chunk_size=131072):
if self._stop_requested:
f.close()
filepath.unlink(missing_ok=True)
return None
f.write(chunk)
file_size = filepath.stat().st_size
break
except (requests.RequestException, OSError) as exc:
filepath.unlink(missing_ok=True)
if attempt < config.MAX_RETRIES - 1:
time.sleep(2 ** attempt)
else:
tqdm.write(f" Ошибка скачивания ({filename}): {exc}")
return None
# Метаданные
sender_id = video_data.get("from_id", 0)
sender_name = (
self._user_cache.get(sender_id, f"id{sender_id}")
if sender_id else ""
)
info = VideoMeta(
video_id=video_id,
owner_id=video.get("owner_id", 0),
title=title,
duration=duration,
date=date_ts,
sender_id=sender_id,
sender_name=sender_name,
message_text=video_data.get("message_text", ""),
quality=quality,
)
# Дата файла = дата сообщения
if date_ts:
os.utime(filepath, (date_ts, date_ts))
return info
# -- обработка одного диалога --
def _process_dialog(
self, peer_id: int, dialog_name: str, bar: tqdm,
) -> None:
saved = self.progress.get_current_dialog()
resuming = saved is not None and saved.get("peer_id") == peer_id
if not resuming:
self.progress.set_current_dialog(peer_id)
# Сбор только моих видео
raw_videos = self._collect_my_videos(peer_id)
if not raw_videos:
if not self._stop_requested:
self.progress.mark_dialog_completed(peer_id)
return
# Дедупликация по video_id
seen: set[int] = set()
unique: list[dict] = []
for v in raw_videos:
vid = v["video"].get("id", 0)
if vid and vid not in seen:
seen.add(vid)
unique.append(v)
# Фильтруем уже скачанные
tasks = [
v for v in unique
if not self.progress.is_video_downloaded(v["video"].get("id", 0))
]
skipped = len(unique) - len(tasks)
tqdm.write(
f" [{dialog_name}] Моих видео: {len(unique)}, "
f"скачать: {len(tasks)}, пропустить: {skipped}"
)
if not tasks:
if not self._stop_requested:
self.progress.mark_dialog_completed(peer_id)
return
# Получаем прямые URL через video.get (без браузерного UA)
tqdm.write(f" [{dialog_name}] Получаю URL видеофайлов...")
video_keys: list[str] = []
for t in tasks:
v = t["video"]
key = f"{v.get('owner_id', 0)}_{v.get('id', 0)}"
ak = v.get("access_key", "")
if ak:
key += f"_{ak}"
video_keys.append(key)
details = self._video_get_raw(video_keys)
# Индекс: video_id → files
files_map: dict[int, dict] = {}
for d in details:
files_map[d["id"]] = d.get("files", {})
# Предзагрузка имён
sender_ids = list({t.get("from_id", 0) for t in tasks if t.get("from_id", 0)})
if sender_ids:
self._prefetch_user_names(sender_ids)
# Прогресс-бар
bar.reset(total=len(unique))
bar.n = skipped
bar.refresh()
bar.set_description(f"Видео ({dialog_name[:25]})")
# Последовательное скачивание
for task in tasks:
if self._stop_requested:
break
if not self._check_free_space():
tqdm.write("Остановка: мало места на диске.")
self._stop_requested = True
break
video_id = task["video"].get("id", 0)
files = files_map.get(video_id, {})
# Проверяем: внешнее видео (YouTube и т.д.)?
if "external" in files and not any(
k.startswith("mp4_") for k in files
):
self.progress.increment_external()
bar.update(1)
self.progress.save()
continue
# Убираем служебные поля
files.pop("failover_host", None)
files.pop("hls_ondemand", None)
files.pop("dash_ondemand", None)
files.pop("external", None)
result = self._download_single(task, files)
if result is not None:
fsize = 0
dt = datetime.fromtimestamp(task.get("date", 0) or time.time())
safe_title = self._safe_name(task["video"].get("title", ""))[:50]
sfx = f"_{safe_title}" if safe_title else ""
fname = f"video_{video_id}_{dt.strftime('%Y%m%d_%H%M%S')}{sfx}.mp4"
fpath = self.download_dir / fname
if fpath.exists():
fsize = fpath.stat().st_size
self.progress.mark_video_downloaded(video_id, fsize)
dur_str = self._format_duration(task["video"].get("duration", 0))
tqdm.write(
f"{task['video'].get('title', '')[:40]} "
f"({dur_str}, {result.quality}, {self._format_size(fsize)})"
)
else:
if not files or not self._best_video_url(files):
pass # increment_no_files уже вызван в _download_single
else:
self.progress.increment_errors()
bar.update(1)
self.progress.save()
if not self._stop_requested:
self.progress.mark_dialog_completed(peer_id)
# -- главный цикл --
def run(self) -> None:
tqdm.write("=" * 60)
tqdm.write(" Выгрузка СВОИХ видео из диалогов ВКонтакте")
tqdm.write("=" * 60)
if not config.VK_TOKEN:
tqdm.write("ОШИБКА: Заполни VK_TOKEN в config.py!")
sys.exit(1)
try:
me = self.api.users.get()[0]
self._my_id = me["id"]
my_name = f"{me['first_name']} {me['last_name']}"
tqdm.write(f"Авторизован как: {my_name} (id{self._my_id})")
tqdm.write(f"Скачиваю только видео с owner_id={self._my_id}")
self._user_cache[me["id"]] = my_name
except Exception as exc:
tqdm.write(f"ОШИБКА авторизации: {exc}")
sys.exit(1)
conversations = self._get_all_conversations()
self.progress.data["dialogs_total"] = len(conversations)
self.progress.save()
peer_ids = [c["peer_id"] for c in conversations]
tqdm.write("Загружаю имена собеседников...")
self._prefetch_user_names(peer_ids)
completed_ids = set(self.progress.data["dialogs_completed"])
remaining = [c for c in conversations if c["peer_id"] not in completed_ids]
current = self.progress.get_current_dialog()
if current:
cur_pid = current["peer_id"]
remaining = [c for c in remaining if c["peer_id"] != cur_pid]
for c in conversations:
if c["peer_id"] == cur_pid:
remaining.insert(0, c)
break
stats = self.progress.data["stats"]
tqdm.write(
f"\nПрогресс: {len(completed_ids)}/{len(conversations)} диалогов, "
f"{stats['videos_downloaded']} видео скачано, "
f"{stats['foreign_skipped']} чужих пропущено"
)
tqdm.write(f"Осталось: {len(remaining)} диалогов")
tqdm.write("-" * 60)
dialogs_bar = tqdm(
total=len(conversations),
initial=len(completed_ids),
desc="Диалоги",
unit=" диал",
position=0,
dynamic_ncols=True,
)
videos_bar = tqdm(
total=0,
desc="Видео",
unit=" видео",
position=1,
leave=False,
dynamic_ncols=True,
)
try:
for conv in remaining:
if self._stop_requested:
break
peer_id = conv["peer_id"]
dialog_name = self._get_user_name(peer_id)
self._process_dialog(peer_id, dialog_name, videos_bar)
if not self._stop_requested:
dialogs_bar.update(1)
finally:
videos_bar.close()
dialogs_bar.close()
self.progress.save()
stats = self.progress.data["stats"]
completed_count = len(self.progress.data["dialogs_completed"])
total_count = self.progress.data["dialogs_total"]
print("\n" + "=" * 60)
print(" Итого:")
print(f" Моих видео скачано: {stats['videos_downloaded']}")
print(f" Чужих видео пропущено: {stats['foreign_skipped']}")
ext = stats.get("external_saved", 0)
if ext:
print(f" Внешних (YouTube и т.п.):{ext}")
nf = stats.get("no_files", 0)
if nf:
print(f" Без файлов (удалены?): {nf}")
print(f" Ошибок: {stats['errors']}")
print(f" Скачано: {self._format_size(stats['bytes_downloaded'])}")
print(f" Диалогов обработано: {completed_count}/{total_count}")
if self._stop_requested:
print("\n Остановлено. Запусти снова для продолжения.")
else:
print("\n Все диалоги обработаны!")
print("=" * 60)
# ---------------------------------------------------------------------------
# Точка входа
# ---------------------------------------------------------------------------
def main() -> None:
downloader = VKVideoDownloader()
downloader.run()
if __name__ == "__main__":
main()