Files
homelab-docs/homelab/vpn-route-check/check_routes.py
2026-02-23 16:47:17 +03:00

380 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Проверяет, идёт ли трафик к заданным доменам через VPN (10.8.1.0) или через Ethernet.
Запускать с хоста в той же LAN, что и роутер (например контейнер 100 или рабочий Mac).
Требуется: traceroute, dig (dnsutils) или резолв через Python.
Выход: JSON + HTML в указанную директорию для просмотра в Homepage (iframe).
"""
import json
import os
import re
import subprocess
import sys
try:
import telnetlib
except ImportError:
telnetlib = None # удалён в Python 3.13+
from datetime import datetime, timezone
from ipaddress import IPv4Address, ip_network
from pathlib import Path
# Второй прыжок traceroute = 10.8.1.0 означает туннель AmneziaWG (VPN)
VPN_HOP_IP = "10.8.1.0"
VPN_IN_FIRST_N_HOPS = 10 # в первых N прыжках ищем VPN-шлюз
TRACEROUTE_MAX_HOPS = 15 # больше прыжков — видим VPN до того, как хост перестаёт отвечать (Яндекс, Mail и др.)
TRACEROUTE_TIMEOUT = 25 # секунд на один домен
# Роутер: откуда брать реальный маршрут (VPN vs провайдер). Либо SSH, либо Telnet (NDMS/Keenetic).
ROUTER_HOST = os.environ.get("ROUTER_HOST", "").strip() # 192.168.1.1
ROUTER_SSH_USER = os.environ.get("ROUTER_SSH_USER", "root")
ROUTER_SSH_TIMEOUT = 8
# Telnet (если на роутере нет SSH, только telnet)
ROUTER_TELNET_HOST = os.environ.get("ROUTER_TELNET_HOST", "").strip()
ROUTER_TELNET_USER = os.environ.get("ROUTER_TELNET_USER", "admin")
ROUTER_TELNET_PASSWORD = os.environ.get("ROUTER_TELNET_PASSWORD", "")
ROUTER_TELNET_TIMEOUT = 15
# По какой настройке опрашивать роутер (если задан TELNET — используем telnet)
ROUTER_USE_TELNET = bool(ROUTER_TELNET_HOST and ROUTER_TELNET_PASSWORD)
def load_domains_grouped(path: Path) -> list[tuple[str, str]]:
"""
Читает domains.txt с секциями [Россия] и [Зарубеж].
Возвращает список пар (название_группы, домен).
"""
out: list[tuple[str, str]] = []
current_group = "Прочее"
if not path.exists():
return out
for line in path.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if line.startswith("[") and line.endswith("]"):
current_group = line[1:-1].strip()
continue
out.append((current_group, line))
return out
def resolve_domain(domain: str) -> str | None:
"""Возвращает один IPv4-адрес для домена или None."""
try:
r = subprocess.run(
["dig", "+short", "-4", domain],
capture_output=True,
text=True,
timeout=10,
)
if r.returncode != 0:
return None
lines = [s.strip() for s in r.stdout.splitlines() if s.strip()]
for line in lines:
# первый похожий на IPv4
if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", line):
return line
return None
except (FileNotFoundError, subprocess.TimeoutExpired):
return None
def traceroute_hop2(ip: str) -> tuple[str, list[str]]:
"""
Запускает traceroute до ip с max TRACEROUTE_MAX_HOPS прыжками.
Возвращает ("VPN" | "Ethernet" | "unknown", список IP прыжков).
"""
hops: list[str] = []
try:
r = subprocess.run(
["traceroute", "-m", str(TRACEROUTE_MAX_HOPS), "-n", ip],
capture_output=True,
text=True,
timeout=TRACEROUTE_TIMEOUT,
)
except (FileNotFoundError, subprocess.TimeoutExpired):
return "unknown", []
# Парсим строки вида " 2 10.8.1.0 20.5 ms" или " 2 * * *"
for line in r.stdout.splitlines():
# номер_прыжка IP время...
m = re.match(r"\s*\d+\s+([*\d.]+)", line)
if m:
hop = m.group(1).strip()
if hop != "*":
hops.append(hop)
first_hops = hops[:VPN_IN_FIRST_N_HOPS]
if VPN_HOP_IP in first_hops:
return "VPN", hops
if len(hops) >= 2:
return "Ethernet", hops
# Один прыжок: хост (часто Google/CDN) не отвечает на traceroute по пути — виден только последний узел
if len(hops) == 1:
return "few_hops", hops
return "unknown", hops
def route_get_path(ip: str) -> str | None:
"""
Определяет маршрут до IP через «ip route get» на этом хосте.
Возвращает "VPN", "Ethernet" или None при ошибке.
На LAN-клиенте всегда виден только шлюз 192.168.1.1 — для реального маршрута настрой ROUTER_HOST.
"""
for ip_bin in ("/usr/sbin/ip", "/sbin/ip", "ip"):
try:
r = subprocess.run(
[ip_bin, "route", "get", ip],
capture_output=True,
text=True,
timeout=5,
)
out = (r.stdout or "") + (r.stderr or "")
if not out.strip():
continue
if "via " + VPN_HOP_IP in out or "via " + VPN_HOP_IP + " " in out:
return "VPN"
return "Ethernet"
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
return None
def route_get_path_via_router_ssh(ip: str) -> str | None:
"""Выполняет на роутере «ip route get <ip» по SSH. Возвращает "VPN", "Ethernet" или None."""
if not ROUTER_HOST:
return None
try:
r = subprocess.run(
[
"ssh",
"-o", "BatchMode=yes",
"-o", "ConnectTimeout=" + str(ROUTER_SSH_TIMEOUT),
"-o", "StrictHostKeyChecking=accept-new",
f"{ROUTER_SSH_USER}@{ROUTER_HOST}",
f"ip route get {ip}",
],
capture_output=True,
text=True,
timeout=ROUTER_SSH_TIMEOUT + 2,
)
out = (r.stdout or "") + (r.stderr or "")
if not out.strip():
return None
if "via " + VPN_HOP_IP in out or "via " + VPN_HOP_IP + " " in out:
return "VPN"
return "Ethernet"
except (FileNotFoundError, subprocess.TimeoutExpired):
return None
def fetch_router_routes_telnet() -> list[tuple[str, str, str]] | None:
"""
Подключается к роутеру по Telnet (NDMS/Keenetic), логин, выполняет «show ip route»,
парсит таблицу. Возвращает список (network_cidr, gateway, interface) или None.
"""
if not ROUTER_USE_TELNET or telnetlib is None:
return None
try:
tn = telnetlib.Telnet(ROUTER_TELNET_HOST, 23, timeout=ROUTER_TELNET_TIMEOUT)
tn.read_until(b"Login:", timeout=8)
tn.write(ROUTER_TELNET_USER.encode() + b"\n")
tn.read_until(b"Password:", timeout=5)
tn.write(ROUTER_TELNET_PASSWORD.encode() + b"\n")
tn.read_until(b">", timeout=8)
tn.write(b"show ip route\n")
raw = tn.read_until(b"(config)>", timeout=15)
tn.close()
except (OSError, EOFError) as e:
return None
text = raw.decode("utf-8", errors="replace")
routes: list[tuple[str, str, str]] = []
for line in text.splitlines():
line = line.strip()
# Строка вида "216.58.192.0/20 10.8.1.2 Wireguard0 ..."
if "/" not in line or line.startswith("("):
continue
parts = line.split()
if len(parts) >= 3:
net, gw, iface = parts[0], parts[1], parts[2]
if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2}$", net):
routes.append((net, gw, iface))
return routes if routes else None
def find_route_for_ip(router_routes: list[tuple[str, str, str]], ip: str) -> str | None:
"""
По таблице маршрутов роутера (network, gateway, interface) находит маршрут для IP
(longest prefix match). Возвращает "VPN" если шлюз 10.8.1.x или интерфейс Wireguard0, иначе "Ethernet".
"""
try:
addr = IPv4Address(ip)
except ValueError:
return None
best: tuple[int, str, str] | None = None # (prefix_len, gateway, interface)
for net_cidr, gw, iface in router_routes:
try:
net = ip_network(net_cidr, strict=False)
if addr in net:
plen = net.prefixlen
if best is None or plen > best[0]:
best = (plen, gw, iface)
except ValueError:
continue
if best is None:
return None
_plen, gateway, interface = best
if gateway.startswith("10.8.1.") or "Wireguard" in interface:
return "VPN"
return "Ethernet"
def check_domain(domain: str, router_routes: list[tuple[str, str, str]] | None = None) -> dict:
ip = resolve_domain(domain)
if not ip:
return {
"domain": domain,
"ip": None,
"path": "error",
"path_label": "Ошибка резолва",
"hops": [],
}
path, hops = traceroute_hop2(ip)
# Если traceroute дал мало данных или неизвестно — смотрим маршрут: таблица с роутера (telnet) или SSH/локальный ip route get.
if path in ("few_hops", "unknown"):
if router_routes is not None:
route_path = find_route_for_ip(router_routes, ip)
hop_note = "(router)"
elif ROUTER_HOST:
route_path = route_get_path_via_router_ssh(ip)
hop_note = "(router)"
else:
route_path = route_get_path(ip)
hop_note = "(ip route get)"
if route_path is not None:
path = route_path
hops = hops + [hop_note] if hops else [hop_note]
path_labels = {
"VPN": "VPN",
"Ethernet": "Ethernet",
"few_hops": "Мало данных (1 прыжок)",
"unknown": "Неизвестно",
}
return {
"domain": domain,
"ip": ip,
"path": path,
"path_label": path_labels.get(path, path),
"hops": hops,
}
def _row_html(r: dict) -> str:
path = r.get("path") or "unknown"
if path == "VPN":
badge = '<span class="badge vpn">VPN</span>'
elif path == "Ethernet":
badge = '<span class="badge ethernet">Ethernet</span>'
elif path == "error":
badge = '<span class="badge error">Ошибка</span>'
elif path == "few_hops":
badge = '<span class="badge unknown" title="Хост не отвечает на traceroute по пути, виден только 1 прыжок">Мало данных</span>'
else:
badge = '<span class="badge unknown">?</span>'
ip = r.get("ip") or ""
hops_s = ", ".join(r.get("hops") or []) or ""
return f"<tr><td>{r['domain']}</td><td>{ip}</td><td>{badge}</td><td class=\"hops\">{hops_s}</td></tr>"
def build_html(grouped_results: dict[str, list[dict]], gen_time: str, out_path: Path | None) -> str:
sections_html = []
for group_name, results in grouped_results.items():
if not results:
continue
rows = [_row_html(r) for r in results]
table_body = "\n".join(rows)
sections_html.append(
f" <h2 class=\"group-title\">{group_name}</h2>\n"
f" <table>\n"
f" <thead><tr><th>Домен</th><th>IP</th><th>Маршрут</th><th>Прыжки</th></tr></thead>\n"
f" <tbody>\n{table_body}\n </tbody>\n </table>"
)
blocks = "\n\n".join(sections_html)
html = f"""<!DOCTYPE html>
<html lang="ru">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta http-equiv="refresh" content="900">
<title>Маршрут к доменам (VPN / Ethernet)</title>
<style>
body {{ font-family: system-ui, sans-serif; margin: 1rem; background: #1a1a2e; color: #eee; }}
h1 {{ font-size: 1.2rem; }}
.meta {{ color: #888; font-size: 0.85rem; margin-bottom: 1rem; }}
.group-title {{ font-size: 1.1rem; margin: 1.5rem 0 0.5rem; color: #b8d4e3; }}
.group-title:first-of-type {{ margin-top: 0; }}
table {{ border-collapse: collapse; width: 100%; font-size: 0.9rem; max-width: 720px; margin-bottom: 1rem; }}
th, td {{ border: 1px solid #444; padding: 0.4rem 0.6rem; text-align: left; }}
th {{ background: #16213e; }}
tr:nth-child(even) {{ background: #0f0f1a; }}
.badge {{ display: inline-block; padding: 0.2rem 0.5rem; border-radius: 4px; font-weight: 600; font-size: 0.8rem; }}
.badge.vpn {{ background: #0d7377; color: #fff; }}
.badge.ethernet {{ background: #3d5a80; color: #fff; }}
.badge.unknown {{ background: #555; color: #ccc; }}
.badge.error {{ background: #9e2b2b; color: #fff; }}
.hops {{ font-size: 0.8rem; color: #aaa; }}
</style>
</head>
<body>
<h1>Маршрут к доменам</h1>
<p class="meta">Трафик через VPN (10.8.1.0) или напрямую (Ethernet). Обновлено: {gen_time}</p>
{blocks}
</body>
</html>"""
if out_path:
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(html, encoding="utf-8")
return html
def main():
script_dir = Path(__file__).resolve().parent
domains_path = script_dir / "domains.txt"
out_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else script_dir / "output"
out_dir = Path(out_dir)
grouped_domains = load_domains_grouped(domains_path)
if not grouped_domains:
grouped_domains = [("Зарубеж", "youtube.com"), ("Зарубеж", "instagram.com"), ("Зарубеж", "google.com")]
router_routes: list[tuple[str, str, str]] | None = None
if ROUTER_USE_TELNET:
router_routes = fetch_router_routes_telnet()
grouped_results: dict[str, list[dict]] = {}
all_results = []
for group_name, domain in grouped_domains:
r = check_domain(domain, router_routes)
r["group"] = group_name
all_results.append(r)
grouped_results.setdefault(group_name, []).append(r)
gen_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
payload = {"updated": gen_time, "results": all_results}
json_path = out_dir / "results.json"
out_dir.mkdir(parents=True, exist_ok=True)
json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
html_path = out_dir / "index.html"
build_html(grouped_results, gen_time, html_path)
total = len(all_results)
print(f"OK: {total} доменов → {html_path} / {json_path}")
if __name__ == "__main__":
main()