#!/usr/bin/env python3 """ Проверяет, идёт ли трафик к заданным доменам через VPN (10.8.1.0) или через Ethernet. Запускать с хоста в той же LAN, что и роутер (например контейнер 100 или рабочий Mac). Требуется: traceroute, dig (dnsutils) или резолв через Python. Выход: JSON + HTML в указанную директорию для просмотра в Homepage (iframe). """ import json import os import re import subprocess import sys try: import telnetlib except ImportError: telnetlib = None # удалён в Python 3.13+ from datetime import datetime, timezone from ipaddress import IPv4Address, ip_network from pathlib import Path # Второй прыжок traceroute = 10.8.1.0 означает туннель AmneziaWG (VPN) VPN_HOP_IP = "10.8.1.0" VPN_IN_FIRST_N_HOPS = 10 # в первых N прыжках ищем VPN-шлюз TRACEROUTE_MAX_HOPS = 15 # больше прыжков — видим VPN до того, как хост перестаёт отвечать (Яндекс, Mail и др.) TRACEROUTE_TIMEOUT = 25 # секунд на один домен # Роутер: откуда брать реальный маршрут (VPN vs провайдер). Либо SSH, либо Telnet (NDMS/Keenetic). ROUTER_HOST = os.environ.get("ROUTER_HOST", "").strip() # 192.168.1.1 ROUTER_SSH_USER = os.environ.get("ROUTER_SSH_USER", "root") ROUTER_SSH_TIMEOUT = 8 # Telnet (если на роутере нет SSH, только telnet) ROUTER_TELNET_HOST = os.environ.get("ROUTER_TELNET_HOST", "").strip() ROUTER_TELNET_USER = os.environ.get("ROUTER_TELNET_USER", "admin") ROUTER_TELNET_PASSWORD = os.environ.get("ROUTER_TELNET_PASSWORD", "") ROUTER_TELNET_TIMEOUT = 15 # По какой настройке опрашивать роутер (если задан TELNET — используем telnet) ROUTER_USE_TELNET = bool(ROUTER_TELNET_HOST and ROUTER_TELNET_PASSWORD) def load_domains_grouped(path: Path) -> list[tuple[str, str]]: """ Читает domains.txt с секциями [Россия] и [Зарубеж]. Возвращает список пар (название_группы, домен). """ out: list[tuple[str, str]] = [] current_group = "Прочее" if not path.exists(): return out for line in path.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#"): continue if line.startswith("[") and line.endswith("]"): current_group = line[1:-1].strip() continue out.append((current_group, line)) return out def resolve_domain(domain: str) -> str | None: """Возвращает один IPv4-адрес для домена или None.""" try: r = subprocess.run( ["dig", "+short", "-4", domain], capture_output=True, text=True, timeout=10, ) if r.returncode != 0: return None lines = [s.strip() for s in r.stdout.splitlines() if s.strip()] for line in lines: # первый похожий на IPv4 if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", line): return line return None except (FileNotFoundError, subprocess.TimeoutExpired): return None def traceroute_hop2(ip: str) -> tuple[str, list[str]]: """ Запускает traceroute до ip с max TRACEROUTE_MAX_HOPS прыжками. Возвращает ("VPN" | "Ethernet" | "unknown", список IP прыжков). """ hops: list[str] = [] try: r = subprocess.run( ["traceroute", "-m", str(TRACEROUTE_MAX_HOPS), "-n", ip], capture_output=True, text=True, timeout=TRACEROUTE_TIMEOUT, ) except (FileNotFoundError, subprocess.TimeoutExpired): return "unknown", [] # Парсим строки вида " 2 10.8.1.0 20.5 ms" или " 2 * * *" for line in r.stdout.splitlines(): # номер_прыжка IP время... m = re.match(r"\s*\d+\s+([*\d.]+)", line) if m: hop = m.group(1).strip() if hop != "*": hops.append(hop) first_hops = hops[:VPN_IN_FIRST_N_HOPS] if VPN_HOP_IP in first_hops: return "VPN", hops if len(hops) >= 2: return "Ethernet", hops # Один прыжок: хост (часто Google/CDN) не отвечает на traceroute по пути — виден только последний узел if len(hops) == 1: return "few_hops", hops return "unknown", hops def route_get_path(ip: str) -> str | None: """ Определяет маршрут до IP через «ip route get» на этом хосте. Возвращает "VPN", "Ethernet" или None при ошибке. На LAN-клиенте всегда виден только шлюз 192.168.1.1 — для реального маршрута настрой ROUTER_HOST. """ for ip_bin in ("/usr/sbin/ip", "/sbin/ip", "ip"): try: r = subprocess.run( [ip_bin, "route", "get", ip], capture_output=True, text=True, timeout=5, ) out = (r.stdout or "") + (r.stderr or "") if not out.strip(): continue if "via " + VPN_HOP_IP in out or "via " + VPN_HOP_IP + " " in out: return "VPN" return "Ethernet" except (FileNotFoundError, subprocess.TimeoutExpired): continue return None def route_get_path_via_router_ssh(ip: str) -> str | None: """Выполняет на роутере «ip route get list[tuple[str, str, str]] | None: """ Подключается к роутеру по Telnet (NDMS/Keenetic), логин, выполняет «show ip route», парсит таблицу. Возвращает список (network_cidr, gateway, interface) или None. """ if not ROUTER_USE_TELNET or telnetlib is None: return None try: tn = telnetlib.Telnet(ROUTER_TELNET_HOST, 23, timeout=ROUTER_TELNET_TIMEOUT) tn.read_until(b"Login:", timeout=8) tn.write(ROUTER_TELNET_USER.encode() + b"\n") tn.read_until(b"Password:", timeout=5) tn.write(ROUTER_TELNET_PASSWORD.encode() + b"\n") tn.read_until(b">", timeout=8) tn.write(b"show ip route\n") raw = tn.read_until(b"(config)>", timeout=15) tn.close() except (OSError, EOFError) as e: return None text = raw.decode("utf-8", errors="replace") routes: list[tuple[str, str, str]] = [] for line in text.splitlines(): line = line.strip() # Строка вида "216.58.192.0/20 10.8.1.2 Wireguard0 ..." if "/" not in line or line.startswith("("): continue parts = line.split() if len(parts) >= 3: net, gw, iface = parts[0], parts[1], parts[2] if re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/\d{1,2}$", net): routes.append((net, gw, iface)) return routes if routes else None def find_route_for_ip(router_routes: list[tuple[str, str, str]], ip: str) -> str | None: """ По таблице маршрутов роутера (network, gateway, interface) находит маршрут для IP (longest prefix match). Возвращает "VPN" если шлюз 10.8.1.x или интерфейс Wireguard0, иначе "Ethernet". """ try: addr = IPv4Address(ip) except ValueError: return None best: tuple[int, str, str] | None = None # (prefix_len, gateway, interface) for net_cidr, gw, iface in router_routes: try: net = ip_network(net_cidr, strict=False) if addr in net: plen = net.prefixlen if best is None or plen > best[0]: best = (plen, gw, iface) except ValueError: continue if best is None: return None _plen, gateway, interface = best if gateway.startswith("10.8.1.") or "Wireguard" in interface: return "VPN" return "Ethernet" def check_domain(domain: str, router_routes: list[tuple[str, str, str]] | None = None) -> dict: ip = resolve_domain(domain) if not ip: return { "domain": domain, "ip": None, "path": "error", "path_label": "Ошибка резолва", "hops": [], } path, hops = traceroute_hop2(ip) # Если traceroute дал мало данных или неизвестно — смотрим маршрут: таблица с роутера (telnet) или SSH/локальный ip route get. if path in ("few_hops", "unknown"): if router_routes is not None: route_path = find_route_for_ip(router_routes, ip) hop_note = "(router)" elif ROUTER_HOST: route_path = route_get_path_via_router_ssh(ip) hop_note = "(router)" else: route_path = route_get_path(ip) hop_note = "(ip route get)" if route_path is not None: path = route_path hops = hops + [hop_note] if hops else [hop_note] path_labels = { "VPN": "VPN", "Ethernet": "Ethernet", "few_hops": "Мало данных (1 прыжок)", "unknown": "Неизвестно", } return { "domain": domain, "ip": ip, "path": path, "path_label": path_labels.get(path, path), "hops": hops, } def _row_html(r: dict) -> str: path = r.get("path") or "unknown" if path == "VPN": badge = 'VPN' elif path == "Ethernet": badge = 'Ethernet' elif path == "error": badge = 'Ошибка' elif path == "few_hops": badge = 'Мало данных' else: badge = '?' ip = r.get("ip") or "—" hops_s = ", ".join(r.get("hops") or []) or "—" return f"{r['domain']}{ip}{badge}{hops_s}" def build_html(grouped_results: dict[str, list[dict]], gen_time: str, out_path: Path | None) -> str: sections_html = [] for group_name, results in grouped_results.items(): if not results: continue rows = [_row_html(r) for r in results] table_body = "\n".join(rows) sections_html.append( f"

{group_name}

\n" f" \n" f" \n" f" \n{table_body}\n \n
ДоменIPМаршрутПрыжки
" ) blocks = "\n\n".join(sections_html) html = f""" Маршрут к доменам (VPN / Ethernet)

Маршрут к доменам

Трафик через VPN (10.8.1.0) или напрямую (Ethernet). Обновлено: {gen_time}

{blocks} """ if out_path: out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(html, encoding="utf-8") return html def main(): script_dir = Path(__file__).resolve().parent domains_path = script_dir / "domains.txt" out_dir = Path(sys.argv[1]) if len(sys.argv) > 1 else script_dir / "output" out_dir = Path(out_dir) grouped_domains = load_domains_grouped(domains_path) if not grouped_domains: grouped_domains = [("Зарубеж", "youtube.com"), ("Зарубеж", "instagram.com"), ("Зарубеж", "google.com")] router_routes: list[tuple[str, str, str]] | None = None if ROUTER_USE_TELNET: router_routes = fetch_router_routes_telnet() grouped_results: dict[str, list[dict]] = {} all_results = [] for group_name, domain in grouped_domains: r = check_domain(domain, router_routes) r["group"] = group_name all_results.append(r) grouped_results.setdefault(group_name, []).append(r) gen_time = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC") payload = {"updated": gen_time, "results": all_results} json_path = out_dir / "results.json" out_dir.mkdir(parents=True, exist_ok=True) json_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") html_path = out_dir / "index.html" build_html(grouped_results, gen_time, html_path) total = len(all_results) print(f"OK: {total} доменов → {html_path} / {json_path}") if __name__ == "__main__": main()