"""
Сервис утилит для бота
"""
import asyncio
import hashlib
import secrets
from datetime import datetime
from typing import Optional, Tuple
from config.constants import ANONYMOUS_TOKEN_LENGTH, DEFAULT_QUESTION_PREVIEW_LENGTH, DEFAULT_TEXT_TRUNCATE_LENGTH, MIN_QUESTION_LENGTH, MIN_ANSWER_LENGTH, SEPARATOR_LENGTH
from models.question import Question
from models.user import User
from services.infrastructure.database import DatabaseService
from services.infrastructure.logger import get_logger
logger = get_logger(__name__)
class UtilsService:
"""Сервис утилит для форматирования и валидации"""
def __init__(self, database: DatabaseService):
self.database = database
def generate_referral_link(self, bot_username: str, user_id: int) -> str:
"""
Генерация уникальной реферальной ссылки для пользователя
Args:
bot_username: Имя бота (без @)
user_id: ID пользователя
Returns:
Ссылка формата: t.me/bot_username?start=ref_{user_id}
"""
return f"https://t.me/{bot_username}?start=ref_{user_id}"
def generate_anonymous_id(self) -> str:
"""
Генерация анонимного ID для отправителя вопроса
Returns:
Случайная строка для идентификации анонимного пользователя
"""
return secrets.token_hex(ANONYMOUS_TOKEN_LENGTH)
def format_user_info(self, user: User, show_stats: bool = False) -> str:
"""
Форматирование информации о пользователе
Args:
user: Объект пользователя
show_stats: Показывать ли статистику
Returns:
Отформатированная строка с информацией о пользователе
"""
info = f"👤 {user.display_name}\n"
if user.full_name and user.username:
info += f"📝 {user.full_name}\n"
if hasattr(user, 'is_premium') and user.is_premium:
info += "⭐ Premium пользователь\n"
if hasattr(user, 'language_code') and user.language_code:
info += f"🌐 Язык: {user.language_code.upper()}\n"
if user.created_at:
info += f"📅 Регистрация: {user.created_at.strftime('%d.%m.%Y %H:%M')}\n"
if user.is_active:
info += "✅ Активен\n"
else:
info += "❌ Неактивен\n"
if show_stats:
# Здесь можно добавить статистику пользователя
pass
return info
def format_user_display_name(self, user: User) -> str:
"""
Форматирование отображаемого имени пользователя для суперпользователей
Args:
user: Объект пользователя
Returns:
Строка в формате: {@username} {first_name} {last_name}
"""
parts = []
# Добавляем username если есть
if user.username:
parts.append(f"@{user.username}")
# Добавляем имя
if user.first_name:
parts.append(user.first_name)
# Добавляем фамилию если есть
if user.last_name:
parts.append(user.last_name)
return " ".join(parts) if parts else "Неизвестный пользователь"
def format_question_info(self, question: Question, show_answer: bool = False) -> str:
"""
Форматирование информации о вопросе
Args:
question: Объект вопроса
show_answer: Показывать ли ответ
Returns:
Отформатированная строка с информацией о вопросе
"""
# Используем user_question_number для отображения, если он есть
display_number = question.user_question_number if question.user_question_number is not None else question.id
info = f"❓ Вопрос #{display_number}\n\n"
if question.is_anonymous:
info += "👤 Анонимный вопрос\n"
else:
info += f"👤 От: {question.from_user_id}\n"
info += f"📝 Вопрос:\n{question.message_text}\n\n"
if question.created_at:
info += f"📅 {question.created_at.strftime('%d.%m.%Y %H:%M')}\n"
# Статус вопроса
status_emoji = {
'pending': '⏳',
'answered': '✅',
'rejected': '❌',
'deleted': '🗑️'
}
status_text = {
'pending': 'Ожидает ответа',
'answered': 'Отвечен',
'rejected': 'Отклонен',
'deleted': 'Удален'
}
info += f"{status_emoji.get(question.status.value, '❓')} {status_text.get(question.status.value, 'Неизвестно')}\n"
if show_answer and question.answer_text:
info += f"\n💬 Ответ:\n{question.answer_text}\n"
if question.answered_at:
info += f"\n📅 Ответ дан: {question.answered_at.strftime('%d.%m.%Y %H:%M')}"
return info
def format_questions_list(self, questions: list, show_answers: bool = False) -> str:
"""
Форматирование списка вопросов
Args:
questions: Список вопросов
show_answers: Показывать ли ответы
Returns:
Отформатированная строка со списком вопросов
"""
if not questions:
return "📭 Вопросов пока нет"
info = f"📋 Список вопросов ({len(questions)}):\n\n"
for i, question in enumerate(questions, 1):
info += f"{i}. {self.format_question_info(question, show_answers)}\n"
if i < len(questions):
info += "─" * SEPARATOR_LENGTH + "\n\n"
return info
def format_stats(self, stats: dict) -> str:
"""
Форматирование статистики
Args:
stats: Словарь со статистикой
Returns:
Отформатированная строка со статистикой
"""
info = "📊 Статистика бота:\n\n"
# Статистика пользователей
if 'total_users' in stats:
info += "👥 Пользователи:\n"
info += f"• Всего: {stats.get('total_users', 0)}\n"
info += f"• Активных за неделю: {stats.get('active_week', 0)}\n"
info += f"• Активных сегодня: {stats.get('active_today', 0)}\n\n"
# Статистика вопросов
if 'total_questions' in stats:
info += "❓ Вопросы:\n"
info += f"• Всего: {stats.get('total_questions', 0)}\n"
info += f"• Ожидают ответа: {stats.get('pending_questions', 0)}\n"
info += f"• Отвечено: {stats.get('answered_questions', 0)}\n"
info += f"• За сегодня: {stats.get('questions_today', 0)}\n"
info += f"• За неделю: {stats.get('questions_week', 0)}\n\n"
# Дополнительная статистика
if 'total_questions_received' in stats:
info += "📈 Активность:\n"
info += f"• Получено вопросов: {stats.get('total_questions_received', 0)}\n"
info += f"• Отвечено вопросов: {stats.get('total_questions_answered', 0)}\n"
return info
def escape_html(self, text: str) -> str:
"""
Экранирование HTML символов
Args:
text: Исходный текст
Returns:
Экранированный текст
"""
return (text
.replace('&', '&')
.replace('<', '<')
.replace('>', '>')
.replace('"', '"')
.replace("'", '''))
def is_valid_question_text(self, text: str, max_length: int = 1000) -> Tuple[bool, str]:
"""
Проверка валидности текста вопроса
Args:
text: Текст вопроса
max_length: Максимальная длина
Returns:
Кортеж (валидность, сообщение об ошибке)
"""
if not text or not text.strip():
return False, "Вопрос не может быть пустым"
if len(text.strip()) < MIN_QUESTION_LENGTH:
return False, f"Вопрос должен содержать минимум {MIN_QUESTION_LENGTH} символов"
if len(text) > max_length:
return False, f"Вопрос слишком длинный (максимум {max_length} символов)"
return True, ""
def is_valid_answer_text(self, text: str, max_length: int = 2000) -> Tuple[bool, str]:
"""
Проверка валидности текста ответа
Args:
text: Текст ответа
max_length: Максимальная длина
Returns:
Кортеж (валидность, сообщение об ошибке)
"""
if not text or not text.strip():
return False, "Ответ не может быть пустым"
if len(text.strip()) < MIN_ANSWER_LENGTH:
return False, f"Ответ должен содержать минимум {MIN_ANSWER_LENGTH} символов"
if len(text) > max_length:
return False, f"Ответ слишком длинный (максимум {max_length} символов)"
return True, ""
async def send_answer_to_author(self, bot, question: Question, answer_text: str):
"""
Отправляет ответ автору вопроса
Args:
bot: Экземпляр бота
question: Объект вопроса
answer_text: Текст ответа
"""
try:
logger.info(f"send_answer_to_author вызвана для вопроса {question.id}, from_user_id: {question.from_user_id}")
# Проверяем, есть ли ID автора (для анонимных вопросов может быть None)
if not question.from_user_id:
logger.warning(f"Нельзя отправить ответ автору вопроса {question.id}: from_user_id = None (анонимный вопрос)")
return
# Формируем сообщение для автора
message_text = f"💬 Получен ответ на ваш вопрос!\n\n"
message_text += f"❓ Ваш вопрос:\n{question.message_text}\n\n"
message_text += f"✅ Ответ:\n{answer_text}\n\n"
# Обрабатываем дату ответа (асинхронно)
if question.answered_at:
if isinstance(question.answered_at, str):
# Если дата пришла как строка, конвертируем в datetime
try:
# Используем asyncio для неблокирующего парсинга
loop = asyncio.get_event_loop()
answered_at = await loop.run_in_executor(
None,
lambda: datetime.fromisoformat(question.answered_at.replace('Z', '+00:00'))
)
date_str = answered_at.strftime('%d.%m.%Y %H:%M')
except:
date_str = str(question.answered_at)
else:
date_str = question.answered_at.strftime('%d.%m.%Y %H:%M')
else:
date_str = "Неизвестно"
message_text += f"📅 Дата ответа: {date_str}"
# Отправляем сообщение автору
logger.info(f"Попытка отправить сообщение пользователю {question.from_user_id}")
await bot.send_message(
chat_id=question.from_user_id,
text=message_text,
parse_mode="HTML"
)
logger.info(f"✅ Ответ успешно отправлен автору вопроса {question.id} (пользователь {question.from_user_id})")
except Exception as e:
logger.error(f"Ошибка при отправке ответа автору вопроса {question.id}: {e}")
# Не поднимаем исключение, чтобы не прерывать основной процесс
# Функции для обратной совместимости
def generate_referral_link(bot_username: str, user_id: int) -> str:
"""Генерация уникальной реферальной ссылки для пользователя"""
return f"https://t.me/{bot_username}?start=ref_{user_id}"
def generate_anonymous_id() -> str:
"""Генерация анонимного ID для отправителя вопроса"""
return secrets.token_hex(ANONYMOUS_TOKEN_LENGTH)
def format_user_info(user: User, show_stats: bool = False) -> str:
"""Форматирование информации о пользователе"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.format_user_info(user, show_stats)
def format_user_display_name(user: User) -> str:
"""Форматирование отображаемого имени пользователя для суперпользователей"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.format_user_display_name(user)
def format_question_info(question: Question, show_answer: bool = False) -> str:
"""Форматирование информации о вопросе"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.format_question_info(question, show_answer)
def format_stats(stats: dict) -> str:
"""Форматирование статистики"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.format_stats(stats)
def escape_html(text: str) -> str:
"""Экранирование HTML символов"""
return (text
.replace('&', '&')
.replace('<', '<')
.replace('>', '>')
.replace('"', '"')
.replace("'", '''))
def is_valid_question_text(text: str, max_length: int = 1000) -> Tuple[bool, str]:
"""Проверка валидности текста вопроса"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.is_valid_question_text(text, max_length)
def is_valid_answer_text(text: str, max_length: int = 2000) -> Tuple[bool, str]:
"""Проверка валидности текста ответа"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.is_valid_answer_text(text, max_length)
async def send_answer_to_author(bot, question: Question, answer_text: str):
"""Отправляет ответ автору вопроса"""
utils = UtilsService(None) # Временное решение для обратной совместимости
await utils.send_answer_to_author(bot, question, answer_text)