402 lines
17 KiB
Python
402 lines
17 KiB
Python
"""
|
||
Сервис утилит для бота
|
||
"""
|
||
import asyncio
|
||
import hashlib
|
||
import secrets
|
||
from datetime import datetime
|
||
from typing import Optional, Tuple
|
||
|
||
from config.constants import ANONYMOUS_TOKEN_LENGTH, DEFAULT_QUESTION_PREVIEW_LENGTH, DEFAULT_TEXT_TRUNCATE_LENGTH, MIN_QUESTION_LENGTH, MIN_ANSWER_LENGTH, SEPARATOR_LENGTH
|
||
from models.question import Question
|
||
from models.user import User
|
||
from services.infrastructure.database import DatabaseService
|
||
from services.infrastructure.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
|
||
class UtilsService:
|
||
"""Сервис утилит для форматирования и валидации"""
|
||
|
||
def __init__(self, database: DatabaseService):
|
||
self.database = database
|
||
|
||
def generate_referral_link(self, bot_username: str, user_id: int) -> str:
|
||
"""
|
||
Генерация уникальной реферальной ссылки для пользователя
|
||
|
||
Args:
|
||
bot_username: Имя бота (без @)
|
||
user_id: ID пользователя
|
||
|
||
Returns:
|
||
Ссылка формата: t.me/bot_username?start=ref_{user_id}
|
||
"""
|
||
return f"https://t.me/{bot_username}?start=ref_{user_id}"
|
||
|
||
def generate_anonymous_id(self) -> str:
|
||
"""
|
||
Генерация анонимного ID для отправителя вопроса
|
||
|
||
Returns:
|
||
Случайная строка для идентификации анонимного пользователя
|
||
"""
|
||
return secrets.token_hex(ANONYMOUS_TOKEN_LENGTH)
|
||
|
||
def format_user_info(self, user: User, show_stats: bool = False) -> str:
|
||
"""
|
||
Форматирование информации о пользователе
|
||
|
||
Args:
|
||
user: Объект пользователя
|
||
show_stats: Показывать ли статистику
|
||
|
||
Returns:
|
||
Отформатированная строка с информацией о пользователе
|
||
"""
|
||
info = f"👤 <b>{user.display_name}</b>\n"
|
||
|
||
if user.full_name and user.username:
|
||
info += f"📝 {user.full_name}\n"
|
||
|
||
if hasattr(user, 'is_premium') and user.is_premium:
|
||
info += "⭐ Premium пользователь\n"
|
||
|
||
if hasattr(user, 'language_code') and user.language_code:
|
||
info += f"🌐 Язык: {user.language_code.upper()}\n"
|
||
|
||
if user.created_at:
|
||
info += f"📅 Регистрация: {user.created_at.strftime('%d.%m.%Y %H:%M')}\n"
|
||
|
||
if user.is_active:
|
||
info += "✅ Активен\n"
|
||
else:
|
||
info += "❌ Неактивен\n"
|
||
|
||
if show_stats:
|
||
# Здесь можно добавить статистику пользователя
|
||
pass
|
||
|
||
return info
|
||
|
||
def format_user_display_name(self, user: User) -> str:
|
||
"""
|
||
Форматирование отображаемого имени пользователя для суперпользователей
|
||
|
||
Args:
|
||
user: Объект пользователя
|
||
|
||
Returns:
|
||
Строка в формате: {@username} {first_name} {last_name}
|
||
"""
|
||
parts = []
|
||
|
||
# Добавляем username если есть
|
||
if user.username:
|
||
parts.append(f"@{user.username}")
|
||
|
||
# Добавляем имя
|
||
if user.first_name:
|
||
parts.append(user.first_name)
|
||
|
||
# Добавляем фамилию если есть
|
||
if user.last_name:
|
||
parts.append(user.last_name)
|
||
|
||
return " ".join(parts) if parts else "Неизвестный пользователь"
|
||
|
||
def format_question_info(self, question: Question, show_answer: bool = False) -> str:
|
||
"""
|
||
Форматирование информации о вопросе
|
||
|
||
Args:
|
||
question: Объект вопроса
|
||
show_answer: Показывать ли ответ
|
||
|
||
Returns:
|
||
Отформатированная строка с информацией о вопросе
|
||
"""
|
||
# Используем user_question_number для отображения, если он есть
|
||
display_number = question.user_question_number if question.user_question_number is not None else question.id
|
||
info = f"❓ <b>Вопрос #{display_number}</b>\n\n"
|
||
|
||
if question.is_anonymous:
|
||
info += "👤 <i>Анонимный вопрос</i>\n"
|
||
else:
|
||
info += f"👤 От: {question.from_user_id}\n"
|
||
|
||
info += f"📝 <b>Вопрос:</b>\n{question.message_text}\n\n"
|
||
|
||
if question.created_at:
|
||
info += f"📅 {question.created_at.strftime('%d.%m.%Y %H:%M')}\n"
|
||
|
||
# Статус вопроса
|
||
status_emoji = {
|
||
'pending': '⏳',
|
||
'answered': '✅',
|
||
'rejected': '❌',
|
||
'deleted': '🗑️'
|
||
}
|
||
|
||
status_text = {
|
||
'pending': 'Ожидает ответа',
|
||
'answered': 'Отвечен',
|
||
'rejected': 'Отклонен',
|
||
'deleted': 'Удален'
|
||
}
|
||
|
||
info += f"{status_emoji.get(question.status.value, '❓')} {status_text.get(question.status.value, 'Неизвестно')}\n"
|
||
|
||
if show_answer and question.answer_text:
|
||
info += f"\n💬 <b>Ответ:</b>\n{question.answer_text}\n"
|
||
|
||
if question.answered_at:
|
||
info += f"\n📅 Ответ дан: {question.answered_at.strftime('%d.%m.%Y %H:%M')}"
|
||
|
||
return info
|
||
|
||
def format_questions_list(self, questions: list, show_answers: bool = False) -> str:
|
||
"""
|
||
Форматирование списка вопросов
|
||
|
||
Args:
|
||
questions: Список вопросов
|
||
show_answers: Показывать ли ответы
|
||
|
||
Returns:
|
||
Отформатированная строка со списком вопросов
|
||
"""
|
||
if not questions:
|
||
return "📭 Вопросов пока нет"
|
||
|
||
info = f"📋 <b>Список вопросов ({len(questions)}):</b>\n\n"
|
||
|
||
for i, question in enumerate(questions, 1):
|
||
info += f"{i}. {self.format_question_info(question, show_answers)}\n"
|
||
if i < len(questions):
|
||
info += "─" * SEPARATOR_LENGTH + "\n\n"
|
||
|
||
return info
|
||
|
||
def format_stats(self, stats: dict) -> str:
|
||
"""
|
||
Форматирование статистики
|
||
|
||
Args:
|
||
stats: Словарь со статистикой
|
||
|
||
Returns:
|
||
Отформатированная строка со статистикой
|
||
"""
|
||
info = "📊 <b>Статистика бота:</b>\n\n"
|
||
|
||
# Статистика пользователей
|
||
if 'total_users' in stats:
|
||
info += "👥 <b>Пользователи:</b>\n"
|
||
info += f"• Всего: {stats.get('total_users', 0)}\n"
|
||
info += f"• Активных за неделю: {stats.get('active_week', 0)}\n"
|
||
info += f"• Активных сегодня: {stats.get('active_today', 0)}\n\n"
|
||
|
||
# Статистика вопросов
|
||
if 'total_questions' in stats:
|
||
info += "❓ <b>Вопросы:</b>\n"
|
||
info += f"• Всего: {stats.get('total_questions', 0)}\n"
|
||
info += f"• Ожидают ответа: {stats.get('pending_questions', 0)}\n"
|
||
info += f"• Отвечено: {stats.get('answered_questions', 0)}\n"
|
||
info += f"• За сегодня: {stats.get('questions_today', 0)}\n"
|
||
info += f"• За неделю: {stats.get('questions_week', 0)}\n\n"
|
||
|
||
# Дополнительная статистика
|
||
if 'total_questions_received' in stats:
|
||
info += "📈 <b>Активность:</b>\n"
|
||
info += f"• Получено вопросов: {stats.get('total_questions_received', 0)}\n"
|
||
info += f"• Отвечено вопросов: {stats.get('total_questions_answered', 0)}\n"
|
||
|
||
return info
|
||
|
||
def escape_html(self, text: str) -> str:
|
||
"""
|
||
Экранирование HTML символов
|
||
|
||
Args:
|
||
text: Исходный текст
|
||
|
||
Returns:
|
||
Экранированный текст
|
||
"""
|
||
return (text
|
||
.replace('&', '&')
|
||
.replace('<', '<')
|
||
.replace('>', '>')
|
||
.replace('"', '"')
|
||
.replace("'", '''))
|
||
|
||
|
||
def is_valid_question_text(self, text: str, max_length: int = 1000) -> Tuple[bool, str]:
|
||
"""
|
||
Проверка валидности текста вопроса
|
||
|
||
Args:
|
||
text: Текст вопроса
|
||
max_length: Максимальная длина
|
||
|
||
Returns:
|
||
Кортеж (валидность, сообщение об ошибке)
|
||
"""
|
||
if not text or not text.strip():
|
||
return False, "Вопрос не может быть пустым"
|
||
|
||
if len(text.strip()) < MIN_QUESTION_LENGTH:
|
||
return False, f"Вопрос должен содержать минимум {MIN_QUESTION_LENGTH} символов"
|
||
|
||
if len(text) > max_length:
|
||
return False, f"Вопрос слишком длинный (максимум {max_length} символов)"
|
||
|
||
return True, ""
|
||
|
||
def is_valid_answer_text(self, text: str, max_length: int = 2000) -> Tuple[bool, str]:
|
||
"""
|
||
Проверка валидности текста ответа
|
||
|
||
Args:
|
||
text: Текст ответа
|
||
max_length: Максимальная длина
|
||
|
||
Returns:
|
||
Кортеж (валидность, сообщение об ошибке)
|
||
"""
|
||
if not text or not text.strip():
|
||
return False, "Ответ не может быть пустым"
|
||
|
||
if len(text.strip()) < MIN_ANSWER_LENGTH:
|
||
return False, f"Ответ должен содержать минимум {MIN_ANSWER_LENGTH} символов"
|
||
|
||
if len(text) > max_length:
|
||
return False, f"Ответ слишком длинный (максимум {max_length} символов)"
|
||
|
||
return True, ""
|
||
|
||
async def send_answer_to_author(self, bot, question: Question, answer_text: str):
|
||
"""
|
||
Отправляет ответ автору вопроса
|
||
|
||
Args:
|
||
bot: Экземпляр бота
|
||
question: Объект вопроса
|
||
answer_text: Текст ответа
|
||
"""
|
||
try:
|
||
logger.info(f"send_answer_to_author вызвана для вопроса {question.id}, from_user_id: {question.from_user_id}")
|
||
|
||
# Проверяем, есть ли ID автора (для анонимных вопросов может быть None)
|
||
if not question.from_user_id:
|
||
logger.warning(f"Нельзя отправить ответ автору вопроса {question.id}: from_user_id = None (анонимный вопрос)")
|
||
return
|
||
|
||
# Формируем сообщение для автора
|
||
message_text = f"💬 <b>Получен ответ на ваш вопрос!</b>\n\n"
|
||
message_text += f"❓ <b>Ваш вопрос:</b>\n{question.message_text}\n\n"
|
||
message_text += f"✅ <b>Ответ:</b>\n{answer_text}\n\n"
|
||
|
||
# Обрабатываем дату ответа (асинхронно)
|
||
if question.answered_at:
|
||
if isinstance(question.answered_at, str):
|
||
# Если дата пришла как строка, конвертируем в datetime
|
||
try:
|
||
# Используем asyncio для неблокирующего парсинга
|
||
loop = asyncio.get_event_loop()
|
||
answered_at = await loop.run_in_executor(
|
||
None,
|
||
lambda: datetime.fromisoformat(question.answered_at.replace('Z', '+00:00'))
|
||
)
|
||
date_str = answered_at.strftime('%d.%m.%Y %H:%M')
|
||
except:
|
||
date_str = str(question.answered_at)
|
||
else:
|
||
date_str = question.answered_at.strftime('%d.%m.%Y %H:%M')
|
||
else:
|
||
date_str = "Неизвестно"
|
||
|
||
message_text += f"📅 <b>Дата ответа:</b> {date_str}"
|
||
|
||
# Отправляем сообщение автору
|
||
logger.info(f"Попытка отправить сообщение пользователю {question.from_user_id}")
|
||
await bot.send_message(
|
||
chat_id=question.from_user_id,
|
||
text=message_text,
|
||
parse_mode="HTML"
|
||
)
|
||
|
||
logger.info(f"✅ Ответ успешно отправлен автору вопроса {question.id} (пользователь {question.from_user_id})")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при отправке ответа автору вопроса {question.id}: {e}")
|
||
# Не поднимаем исключение, чтобы не прерывать основной процесс
|
||
|
||
|
||
# Функции для обратной совместимости
|
||
def generate_referral_link(bot_username: str, user_id: int) -> str:
|
||
"""Генерация уникальной реферальной ссылки для пользователя"""
|
||
return f"https://t.me/{bot_username}?start=ref_{user_id}"
|
||
|
||
|
||
def generate_anonymous_id() -> str:
|
||
"""Генерация анонимного ID для отправителя вопроса"""
|
||
return secrets.token_hex(ANONYMOUS_TOKEN_LENGTH)
|
||
|
||
|
||
def format_user_info(user: User, show_stats: bool = False) -> str:
|
||
"""Форматирование информации о пользователе"""
|
||
utils = UtilsService(None) # Временное решение для обратной совместимости
|
||
return utils.format_user_info(user, show_stats)
|
||
|
||
|
||
def format_user_display_name(user: User) -> str:
|
||
"""Форматирование отображаемого имени пользователя для суперпользователей"""
|
||
utils = UtilsService(None) # Временное решение для обратной совместимости
|
||
return utils.format_user_display_name(user)
|
||
|
||
|
||
def format_question_info(question: Question, show_answer: bool = False) -> str:
|
||
"""Форматирование информации о вопросе"""
|
||
utils = UtilsService(None) # Временное решение для обратной совместимости
|
||
return utils.format_question_info(question, show_answer)
|
||
|
||
|
||
|
||
|
||
def format_stats(stats: dict) -> str:
|
||
"""Форматирование статистики"""
|
||
utils = UtilsService(None) # Временное решение для обратной совместимости
|
||
return utils.format_stats(stats)
|
||
|
||
|
||
def escape_html(text: str) -> str:
|
||
"""Экранирование HTML символов"""
|
||
return (text
|
||
.replace('&', '&')
|
||
.replace('<', '<')
|
||
.replace('>', '>')
|
||
.replace('"', '"')
|
||
.replace("'", '''))
|
||
|
||
|
||
|
||
|
||
def is_valid_question_text(text: str, max_length: int = 1000) -> Tuple[bool, str]:
|
||
"""Проверка валидности текста вопроса"""
|
||
utils = UtilsService(None) # Временное решение для обратной совместимости
|
||
return utils.is_valid_question_text(text, max_length)
|
||
|
||
|
||
def is_valid_answer_text(text: str, max_length: int = 2000) -> Tuple[bool, str]:
|
||
"""Проверка валидности текста ответа"""
|
||
utils = UtilsService(None) # Временное решение для обратной совместимости
|
||
return utils.is_valid_answer_text(text, max_length)
|
||
|
||
|
||
async def send_answer_to_author(bot, question: Question, answer_text: str):
|
||
"""Отправляет ответ автору вопроса"""
|
||
utils = UtilsService(None) # Временное решение для обратной совместимости
|
||
await utils.send_answer_to_author(bot, question, answer_text) |