Files
AnonBot/services/validation/input_validator.py

360 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Централизованный валидатор входных данных для AnonBot
"""
import re
import html
from typing import Optional, Tuple, List
from dataclasses import dataclass
from services.infrastructure.logger import get_logger
from config.constants import MIN_QUESTION_LENGTH, MIN_ANSWER_LENGTH
logger = get_logger(__name__)
@dataclass
class ValidationResult:
"""Результат валидации"""
is_valid: bool
error_message: str = ""
sanitized_value: Optional[str] = None
def __bool__(self) -> bool:
return self.is_valid
class InputValidator:
"""Централизованный валидатор входных данных"""
# Константы для валидации
MIN_TELEGRAM_ID = 1
MAX_TELEGRAM_ID = 2**63 - 1
MIN_USERNAME_LENGTH = 1
MAX_USERNAME_LENGTH = 32
MAX_CALLBACK_DATA_LENGTH = 64
MAX_TEXT_LENGTH = 4000 # Telegram limit
MAX_HTML_ENTITIES = 100 # Защита от HTML-спама
# Регулярные выражения
USERNAME_PATTERN = re.compile(r'^[a-zA-Z0-9_]{1,32}$')
DEEP_LINK_PATTERN = re.compile(r'^[a-zA-Z0-9_-]{1,64}$')
CALLBACK_DATA_PATTERN = re.compile(r'^[a-zA-Z0-9_:-]{1,64}$')
# Опасные HTML теги и атрибуты
DANGEROUS_TAGS = {'script', 'iframe', 'object', 'embed', 'form', 'input', 'button'}
DANGEROUS_ATTRIBUTES = {'onclick', 'onload', 'onerror', 'onmouseover', 'href', 'src'}
def __init__(self):
logger.info("🔍 InputValidator инициализирован")
def validate_telegram_id(self, user_id: int) -> ValidationResult:
"""
Валидация Telegram ID пользователя
Args:
user_id: ID пользователя Telegram
Returns:
ValidationResult с результатом валидации
"""
try:
if not isinstance(user_id, int):
return ValidationResult(
False,
f"Telegram ID должен быть числом, получен: {type(user_id).__name__}"
)
if user_id < self.MIN_TELEGRAM_ID:
return ValidationResult(
False,
f"Telegram ID должен быть больше {self.MIN_TELEGRAM_ID}"
)
if user_id > self.MAX_TELEGRAM_ID:
return ValidationResult(
False,
f"Telegram ID должен быть меньше {self.MAX_TELEGRAM_ID}"
)
logger.debug(f"✅ Telegram ID {user_id} прошел валидацию")
return ValidationResult(True, sanitized_value=str(user_id))
except Exception as e:
logger.error(f"❌ Ошибка валидации Telegram ID {user_id}: {e}")
return ValidationResult(False, f"Ошибка валидации Telegram ID: {str(e)}")
def validate_username(self, username: str) -> ValidationResult:
"""
Валидация username пользователя
Args:
username: Username пользователя (без @)
Returns:
ValidationResult с результатом валидации
"""
try:
if not username:
return ValidationResult(True, sanitized_value="") # Username может быть пустым
# Убираем @ если есть
username = username.lstrip('@')
if len(username) < self.MIN_USERNAME_LENGTH:
return ValidationResult(
False,
f"Username должен содержать минимум {self.MIN_USERNAME_LENGTH} символ"
)
if len(username) > self.MAX_USERNAME_LENGTH:
return ValidationResult(
False,
f"Username не должен превышать {self.MAX_USERNAME_LENGTH} символов"
)
if not self.USERNAME_PATTERN.match(username):
return ValidationResult(
False,
"Username может содержать только латинские буквы, цифры и подчеркивания"
)
logger.debug(f"✅ Username '{username}' прошел валидацию")
return ValidationResult(True, sanitized_value=username)
except Exception as e:
logger.error(f"❌ Ошибка валидации username '{username}': {e}")
return ValidationResult(False, f"Ошибка валидации username: {str(e)}")
def validate_text_content(
self,
text: str,
min_length: int = 1,
max_length: int = MAX_TEXT_LENGTH,
content_type: str = "текст"
) -> ValidationResult:
"""
Валидация текстового контента
Args:
text: Текст для валидации
min_length: Минимальная длина
max_length: Максимальная длина
content_type: Тип контента для сообщений об ошибках
Returns:
ValidationResult с результатом валидации
"""
try:
if not text:
return ValidationResult(
False,
f"{content_type.capitalize()} не может быть пустым"
)
# Проверяем длину до санитизации
if len(text) > max_length:
return ValidationResult(
False,
f"{content_type.capitalize()} слишком длинный (максимум {max_length} символов)"
)
# Санитизируем HTML
sanitized_text = self.sanitize_html(text)
# Проверяем длину после санитизации
if len(sanitized_text.strip()) < min_length:
return ValidationResult(
False,
f"{content_type.capitalize()} должен содержать минимум {min_length} символов"
)
# Проверяем на спам (повторяющиеся символы)
if self._is_spam_text(sanitized_text):
return ValidationResult(
False,
f"{content_type.capitalize()} содержит слишком много повторяющихся символов"
)
logger.debug(f"{content_type} прошел валидацию (длина: {len(sanitized_text)})")
return ValidationResult(True, sanitized_value=sanitized_text)
except Exception as e:
logger.error(f"❌ Ошибка валидации {content_type}: {e}")
return ValidationResult(False, f"Ошибка валидации {content_type}: {str(e)}")
def validate_question_text(self, text: str, max_length: int = 1000) -> ValidationResult:
"""
Валидация текста вопроса
Args:
text: Текст вопроса
max_length: Максимальная длина вопроса
Returns:
ValidationResult с результатом валидации
"""
return self.validate_text_content(
text,
min_length=MIN_QUESTION_LENGTH,
max_length=max_length,
content_type="вопрос"
)
def validate_answer_text(self, text: str, max_length: int = 2000) -> ValidationResult:
"""
Валидация текста ответа
Args:
text: Текст ответа
max_length: Максимальная длина ответа
Returns:
ValidationResult с результатом валидации
"""
return self.validate_text_content(
text,
min_length=MIN_ANSWER_LENGTH,
max_length=max_length,
content_type="ответ"
)
def validate_callback_data(self, data: str) -> ValidationResult:
"""
Валидация callback data
Args:
data: Callback data для валидации
Returns:
ValidationResult с результатом валидации
"""
try:
if not data:
return ValidationResult(False, "Callback data не может быть пустым")
if len(data) > self.MAX_CALLBACK_DATA_LENGTH:
return ValidationResult(
False,
f"Callback data не должен превышать {self.MAX_CALLBACK_DATA_LENGTH} символов"
)
if not self.CALLBACK_DATA_PATTERN.match(data):
return ValidationResult(
False,
"Callback data содержит недопустимые символы"
)
logger.debug(f"✅ Callback data '{data}' прошел валидацию")
return ValidationResult(True, sanitized_value=data)
except Exception as e:
logger.error(f"❌ Ошибка валидации callback data '{data}': {e}")
return ValidationResult(False, f"Ошибка валидации callback data: {str(e)}")
def validate_deep_link(self, link: str) -> ValidationResult:
"""
Валидация deep link
Args:
link: Deep link для валидации
Returns:
ValidationResult с результатом валидации
"""
try:
if not link:
return ValidationResult(False, "Deep link не может быть пустым")
if len(link) > 64: # Telegram deep link limit
return ValidationResult(
False,
"Deep link не должен превышать 64 символа"
)
if not self.DEEP_LINK_PATTERN.match(link):
return ValidationResult(
False,
"Deep link содержит недопустимые символы"
)
logger.debug(f"✅ Deep link '{link}' прошел валидацию")
return ValidationResult(True, sanitized_value=link)
except Exception as e:
logger.error(f"❌ Ошибка валидации deep link '{link}': {e}")
return ValidationResult(False, f"Ошибка валидации deep link: {str(e)}")
def sanitize_html(self, text: str) -> str:
"""
Санитизация HTML в тексте
Args:
text: Текст для санитизации
Returns:
Санитизированный текст
"""
try:
if not text:
return ""
# Экранируем HTML сущности
sanitized = html.escape(text, quote=True)
# Проверяем количество HTML сущностей (защита от спама)
html_entities_count = len(re.findall(r'&[a-zA-Z0-9#]+;', sanitized))
if html_entities_count > self.MAX_HTML_ENTITIES:
logger.warning(f"⚠️ Обнаружено много HTML сущностей в тексте: {html_entities_count}")
# Убираем лишние HTML сущности
sanitized = re.sub(r'&[a-zA-Z0-9#]+;', '', sanitized)
return sanitized.strip()
except Exception as e:
logger.error(f"❌ Ошибка санитизации HTML: {e}")
return text.strip() # Возвращаем исходный текст в случае ошибки
def _is_spam_text(self, text: str) -> bool:
"""
Проверка текста на спам (повторяющиеся символы)
Args:
text: Текст для проверки
Returns:
True если текст похож на спам
"""
try:
if len(text) < 10:
return False
# Проверяем повторяющиеся символы
char_counts = {}
for char in text:
char_counts[char] = char_counts.get(char, 0) + 1
# Если какой-то символ повторяется более 50% от длины текста
max_count = max(char_counts.values())
if max_count > len(text) * 0.5:
return True
# Проверяем повторяющиеся слова
words = text.split()
if len(words) > 3:
word_counts = {}
for word in words:
word_counts[word] = word_counts.get(word, 0) + 1
max_word_count = max(word_counts.values())
if max_word_count > len(words) * 0.6:
return True
return False
except Exception as e:
logger.error(f"❌ Ошибка проверки на спам: {e}")
return False