Files
AnonBot/services/utils.py

402 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Сервис утилит для бота
"""
import asyncio
import hashlib
import secrets
from datetime import datetime
from typing import Optional, Tuple
from config.constants import ANONYMOUS_TOKEN_LENGTH, DEFAULT_QUESTION_PREVIEW_LENGTH, DEFAULT_TEXT_TRUNCATE_LENGTH, MIN_QUESTION_LENGTH, MIN_ANSWER_LENGTH, SEPARATOR_LENGTH
from models.question import Question
from models.user import User
from services.infrastructure.database import DatabaseService
from services.infrastructure.logger import get_logger
logger = get_logger(__name__)
class UtilsService:
"""Сервис утилит для форматирования и валидации"""
def __init__(self, database: DatabaseService):
self.database = database
def generate_referral_link(self, bot_username: str, user_id: int) -> str:
"""
Генерация уникальной реферальной ссылки для пользователя
Args:
bot_username: Имя бота (без @)
user_id: ID пользователя
Returns:
Ссылка формата: t.me/bot_username?start=ref_{user_id}
"""
return f"https://t.me/{bot_username}?start=ref_{user_id}"
def generate_anonymous_id(self) -> str:
"""
Генерация анонимного ID для отправителя вопроса
Returns:
Случайная строка для идентификации анонимного пользователя
"""
return secrets.token_hex(ANONYMOUS_TOKEN_LENGTH)
def format_user_info(self, user: User, show_stats: bool = False) -> str:
"""
Форматирование информации о пользователе
Args:
user: Объект пользователя
show_stats: Показывать ли статистику
Returns:
Отформатированная строка с информацией о пользователе
"""
info = f"👤 <b>{user.display_name}</b>\n"
if user.full_name and user.username:
info += f"📝 {user.full_name}\n"
if hasattr(user, 'is_premium') and user.is_premium:
info += "⭐ Premium пользователь\n"
if hasattr(user, 'language_code') and user.language_code:
info += f"🌐 Язык: {user.language_code.upper()}\n"
if user.created_at:
info += f"📅 Регистрация: {user.created_at.strftime('%d.%m.%Y %H:%M')}\n"
if user.is_active:
info += "✅ Активен\n"
else:
info += "❌ Неактивен\n"
if show_stats:
# Здесь можно добавить статистику пользователя
pass
return info
def format_user_display_name(self, user: User) -> str:
"""
Форматирование отображаемого имени пользователя для суперпользователей
Args:
user: Объект пользователя
Returns:
Строка в формате: {@username} {first_name} {last_name}
"""
parts = []
# Добавляем username если есть
if user.username:
parts.append(f"@{user.username}")
# Добавляем имя
if user.first_name:
parts.append(user.first_name)
# Добавляем фамилию если есть
if user.last_name:
parts.append(user.last_name)
return " ".join(parts) if parts else "Неизвестный пользователь"
def format_question_info(self, question: Question, show_answer: bool = False) -> str:
"""
Форматирование информации о вопросе
Args:
question: Объект вопроса
show_answer: Показывать ли ответ
Returns:
Отформатированная строка с информацией о вопросе
"""
# Используем user_question_number для отображения, если он есть
display_number = question.user_question_number if question.user_question_number is not None else question.id
info = f"❓ <b>Вопрос #{display_number}</b>\n\n"
if question.is_anonymous:
info += "👤 <i>Анонимный вопрос</i>\n"
else:
info += f"👤 От: {question.from_user_id}\n"
info += f"📝 <b>Вопрос:</b>\n{question.message_text}\n\n"
if question.created_at:
info += f"📅 {question.created_at.strftime('%d.%m.%Y %H:%M')}\n"
# Статус вопроса
status_emoji = {
'pending': '',
'answered': '',
'rejected': '',
'deleted': '🗑️'
}
status_text = {
'pending': 'Ожидает ответа',
'answered': 'Отвечен',
'rejected': 'Отклонен',
'deleted': 'Удален'
}
info += f"{status_emoji.get(question.status.value, '')} {status_text.get(question.status.value, 'Неизвестно')}\n"
if show_answer and question.answer_text:
info += f"\n💬 <b>Ответ:</b>\n{question.answer_text}\n"
if question.answered_at:
info += f"\n📅 Ответ дан: {question.answered_at.strftime('%d.%m.%Y %H:%M')}"
return info
def format_questions_list(self, questions: list, show_answers: bool = False) -> str:
"""
Форматирование списка вопросов
Args:
questions: Список вопросов
show_answers: Показывать ли ответы
Returns:
Отформатированная строка со списком вопросов
"""
if not questions:
return "📭 Вопросов пока нет"
info = f"📋 <b>Список вопросов ({len(questions)}):</b>\n\n"
for i, question in enumerate(questions, 1):
info += f"{i}. {self.format_question_info(question, show_answers)}\n"
if i < len(questions):
info += "" * SEPARATOR_LENGTH + "\n\n"
return info
def format_stats(self, stats: dict) -> str:
"""
Форматирование статистики
Args:
stats: Словарь со статистикой
Returns:
Отформатированная строка со статистикой
"""
info = "📊 <b>Статистика бота:</b>\n\n"
# Статистика пользователей
if 'total_users' in stats:
info += "👥 <b>Пользователи:</b>\n"
info += f"Всего: {stats.get('total_users', 0)}\n"
info += f"• Активных за неделю: {stats.get('active_week', 0)}\n"
info += f"• Активных сегодня: {stats.get('active_today', 0)}\n\n"
# Статистика вопросов
if 'total_questions' in stats:
info += "❓ <b>Вопросы:</b>\n"
info += f"Всего: {stats.get('total_questions', 0)}\n"
info += f"• Ожидают ответа: {stats.get('pending_questions', 0)}\n"
info += f"• Отвечено: {stats.get('answered_questions', 0)}\n"
info += f"За сегодня: {stats.get('questions_today', 0)}\n"
info += f"За неделю: {stats.get('questions_week', 0)}\n\n"
# Дополнительная статистика
if 'total_questions_received' in stats:
info += "📈 <b>Активность:</b>\n"
info += f"• Получено вопросов: {stats.get('total_questions_received', 0)}\n"
info += f"• Отвечено вопросов: {stats.get('total_questions_answered', 0)}\n"
return info
def escape_html(self, text: str) -> str:
"""
Экранирование HTML символов
Args:
text: Исходный текст
Returns:
Экранированный текст
"""
return (text
.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
.replace('"', '&quot;')
.replace("'", '&#x27;'))
def is_valid_question_text(self, text: str, max_length: int = 1000) -> Tuple[bool, str]:
"""
Проверка валидности текста вопроса
Args:
text: Текст вопроса
max_length: Максимальная длина
Returns:
Кортеж (валидность, сообщение об ошибке)
"""
if not text or not text.strip():
return False, "Вопрос не может быть пустым"
if len(text.strip()) < MIN_QUESTION_LENGTH:
return False, f"Вопрос должен содержать минимум {MIN_QUESTION_LENGTH} символов"
if len(text) > max_length:
return False, f"Вопрос слишком длинный (максимум {max_length} символов)"
return True, ""
def is_valid_answer_text(self, text: str, max_length: int = 2000) -> Tuple[bool, str]:
"""
Проверка валидности текста ответа
Args:
text: Текст ответа
max_length: Максимальная длина
Returns:
Кортеж (валидность, сообщение об ошибке)
"""
if not text or not text.strip():
return False, "Ответ не может быть пустым"
if len(text.strip()) < MIN_ANSWER_LENGTH:
return False, f"Ответ должен содержать минимум {MIN_ANSWER_LENGTH} символов"
if len(text) > max_length:
return False, f"Ответ слишком длинный (максимум {max_length} символов)"
return True, ""
async def send_answer_to_author(self, bot, question: Question, answer_text: str):
"""
Отправляет ответ автору вопроса
Args:
bot: Экземпляр бота
question: Объект вопроса
answer_text: Текст ответа
"""
try:
logger.info(f"send_answer_to_author вызвана для вопроса {question.id}, from_user_id: {question.from_user_id}")
# Проверяем, есть ли ID автора (для анонимных вопросов может быть None)
if not question.from_user_id:
logger.warning(f"Нельзя отправить ответ автору вопроса {question.id}: from_user_id = None (анонимный вопрос)")
return
# Формируем сообщение для автора
message_text = f"💬 <b>Получен ответ на ваш вопрос!</b>\n\n"
message_text += f"❓ <b>Ваш вопрос:</b>\n{question.message_text}\n\n"
message_text += f"✅ <b>Ответ:</b>\n{answer_text}\n\n"
# Обрабатываем дату ответа (асинхронно)
if question.answered_at:
if isinstance(question.answered_at, str):
# Если дата пришла как строка, конвертируем в datetime
try:
# Используем asyncio для неблокирующего парсинга
loop = asyncio.get_event_loop()
answered_at = await loop.run_in_executor(
None,
lambda: datetime.fromisoformat(question.answered_at.replace('Z', '+00:00'))
)
date_str = answered_at.strftime('%d.%m.%Y %H:%M')
except:
date_str = str(question.answered_at)
else:
date_str = question.answered_at.strftime('%d.%m.%Y %H:%M')
else:
date_str = "Неизвестно"
message_text += f"📅 <b>Дата ответа:</b> {date_str}"
# Отправляем сообщение автору
logger.info(f"Попытка отправить сообщение пользователю {question.from_user_id}")
await bot.send_message(
chat_id=question.from_user_id,
text=message_text,
parse_mode="HTML"
)
logger.info(f"✅ Ответ успешно отправлен автору вопроса {question.id} (пользователь {question.from_user_id})")
except Exception as e:
logger.error(f"Ошибка при отправке ответа автору вопроса {question.id}: {e}")
# Не поднимаем исключение, чтобы не прерывать основной процесс
# Функции для обратной совместимости
def generate_referral_link(bot_username: str, user_id: int) -> str:
"""Генерация уникальной реферальной ссылки для пользователя"""
return f"https://t.me/{bot_username}?start=ref_{user_id}"
def generate_anonymous_id() -> str:
"""Генерация анонимного ID для отправителя вопроса"""
return secrets.token_hex(ANONYMOUS_TOKEN_LENGTH)
def format_user_info(user: User, show_stats: bool = False) -> str:
"""Форматирование информации о пользователе"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.format_user_info(user, show_stats)
def format_user_display_name(user: User) -> str:
"""Форматирование отображаемого имени пользователя для суперпользователей"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.format_user_display_name(user)
def format_question_info(question: Question, show_answer: bool = False) -> str:
"""Форматирование информации о вопросе"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.format_question_info(question, show_answer)
def format_stats(stats: dict) -> str:
"""Форматирование статистики"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.format_stats(stats)
def escape_html(text: str) -> str:
"""Экранирование HTML символов"""
return (text
.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
.replace('"', '&quot;')
.replace("'", '&#x27;'))
def is_valid_question_text(text: str, max_length: int = 1000) -> Tuple[bool, str]:
"""Проверка валидности текста вопроса"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.is_valid_question_text(text, max_length)
def is_valid_answer_text(text: str, max_length: int = 2000) -> Tuple[bool, str]:
"""Проверка валидности текста ответа"""
utils = UtilsService(None) # Временное решение для обратной совместимости
return utils.is_valid_answer_text(text, max_length)
async def send_answer_to_author(bot, question: Question, answer_text: str):
"""Отправляет ответ автору вопроса"""
utils = UtilsService(None) # Временное решение для обратной совместимости
await utils.send_answer_to_author(bot, question, answer_text)