import random
import asyncio
import traceback
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple
from aiogram.types import FSInputFile
from helper_bot.handlers.voice.exceptions import VoiceMessageError, AudioProcessingError, DatabaseError, FileOperationError
from helper_bot.handlers.voice.constants import (
VOICE_USERS_DIR, STICK_DIR, STICK_PATTERN, STICKER_DELAY,
MESSAGE_DELAY_1, MESSAGE_DELAY_2, MESSAGE_DELAY_3, MESSAGE_DELAY_4
)
from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
db_query_time
)
class VoiceMessage:
"""Модель голосового сообщения"""
def __init__(self, file_name: str, user_id: int, date_added: datetime, file_id: int):
self.file_name = file_name
self.user_id = user_id
self.date_added = date_added
self.file_id = file_id
class VoiceBotService:
"""Сервис для работы с голосовыми сообщениями"""
def __init__(self, bot_db, settings):
self.bot_db = bot_db
self.settings = settings
@track_time("get_welcome_sticker", "voice_bot_service")
@track_errors("voice_bot_service", "get_welcome_sticker")
async def get_welcome_sticker(self) -> Optional[FSInputFile]:
"""Получить случайный приветственный стикер"""
try:
name_stick_hello = list(Path(STICK_DIR).rglob(STICK_PATTERN))
if not name_stick_hello:
return None
random_stick_hello = random.choice(name_stick_hello)
random_stick_hello = FSInputFile(path=random_stick_hello)
logger.info(f"Стикер успешно получен. Наименование стикера: {random_stick_hello}")
return random_stick_hello
except Exception as e:
logger.error(f"Ошибка при получении стикера: {e}")
if self.settings['Settings']['logs']:
await self._send_error_to_logs(f'Отправка приветственных стикеров лажает. Ошибка: {e}')
return None
@track_time("send_welcome_messages", "voice_bot_service")
@track_errors("voice_bot_service", "send_welcome_messages")
async def send_welcome_messages(self, message, user_emoji: str):
"""Отправить приветственные сообщения"""
try:
# Отправляем стикер
sticker = await self.get_welcome_sticker()
if sticker:
await message.answer_sticker(sticker)
await asyncio.sleep(STICKER_DELAY)
# Отправляем приветственное сообщение
markup = self._get_main_keyboard()
await message.answer(
text="Привет.",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(STICKER_DELAY)
# Отправляем описание
await message.answer(
text="Здесь можно послушать голосовые сообщения от совершенно незнакомых людей из Бийска",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_1)
# Отправляем аналогию
await message.answer(
text="Это почти как написать письмо, положить его в бутылку и швырнуть в океан. Никогда не узнаешь, послушал его кто-то или нет и ответить тоже не получится..",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_2)
# Отправляем правила
await message.answer(
text="Записывать можно всё что угодно — никаких правил нет. Главное — твой голос, хотя бы на 5-10 секунд",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_3)
# Отправляем информацию об анонимности
await message.answer(
text="Здесь всё анонимно: тот, кому я отправлю твое сообщение, не узнает ни твое имя, ни твой аккаунт (так что можно не стесняться говорить то, что не стал(а) бы выкладывать в собственные соцсети)",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_4)
# Отправляем предложения
await message.answer(
text="Если не знаешь, что сказать, можешь просто прочитать любое текстовое сообщение из недавно полученных или отправленных (или спеть, рассказать стихотворенье)",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_4)
# Отправляем информацию об эмодзи
await message.answer(
text=f"Любые войсы будут помечены эмоджи. Твой эмоджи - {user_emoji}Таким эмоджи будут помечены твои сообщения для других Но другие люди не узнают кто за каким эмоджи скрывается:)",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_4)
# Отправляем информацию о помощи
await message.answer(
text="Так же можешь ознакомиться с инструкцией к боту по команде /help",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_4)
# Отправляем финальное сообщение
await message.answer(
text="Ну всё, достаточно инструкций. записывайся! Микрофон твой - 🎤",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
except Exception as e:
logger.error(f"Ошибка при отправке приветственных сообщений: {e}")
raise VoiceMessageError(f"Не удалось отправить приветственные сообщения: {e}")
@track_time("get_random_audio", "voice_bot_service")
@track_errors("voice_bot_service", "get_random_audio")
async def get_random_audio(self, user_id: int) -> Optional[Tuple[str, str, str]]:
"""Получить случайное аудио для прослушивания"""
try:
check_audio = await self.bot_db.check_listen_audio(user_id=user_id)
list_audio = list(check_audio)
if not list_audio:
return None
# Получаем случайное аудио
number_element = random.randint(0, len(list_audio) - 1)
audio_for_user = check_audio[number_element]
# Получаем информацию об авторе
user_id_author = await self.bot_db.get_user_id_by_file_name(audio_for_user)
date_added = await self.bot_db.get_date_by_file_name(audio_for_user)
user_emoji = await self.bot_db.get_user_emoji(user_id_author)
return audio_for_user, date_added, user_emoji
except Exception as e:
logger.error(f"Ошибка при получении случайного аудио: {e}")
raise AudioProcessingError(f"Не удалось получить случайное аудио: {e}")
@track_time("mark_audio_as_listened", "voice_bot_service")
@track_errors("voice_bot_service", "mark_audio_as_listened")
async def mark_audio_as_listened(self, file_name: str, user_id: int) -> None:
"""Пометить аудио как прослушанное"""
try:
await self.bot_db.mark_listened_audio(file_name, user_id=user_id)
except Exception as e:
logger.error(f"Ошибка при пометке аудио как прослушанного: {e}")
raise DatabaseError(f"Не удалось пометить аудио как прослушанное: {e}")
@track_time("clear_user_listenings", "voice_bot_service")
@track_errors("voice_bot_service", "clear_user_listenings")
async def clear_user_listenings(self, user_id: int) -> None:
"""Очистить прослушивания пользователя"""
try:
await self.bot_db.delete_listen_count_for_user(user_id)
except Exception as e:
logger.error(f"Ошибка при очистке прослушиваний: {e}")
raise DatabaseError(f"Не удалось очистить прослушивания: {e}")
@track_time("get_remaining_audio_count", "voice_bot_service")
@track_errors("voice_bot_service", "get_remaining_audio_count")
async def get_remaining_audio_count(self, user_id: int) -> int:
"""Получить количество оставшихся непрослушанных аудио"""
try:
check_audio = await self.bot_db.check_listen_audio(user_id=user_id)
return len(list(check_audio))
except Exception as e:
logger.error(f"Ошибка при получении количества аудио: {e}")
raise DatabaseError(f"Не удалось получить количество аудио: {e}")
@track_time("get_main_keyboard", "voice_bot_service")
@track_errors("voice_bot_service", "get_main_keyboard")
def _get_main_keyboard(self):
"""Получить основную клавиатуру"""
from helper_bot.keyboards.keyboards import get_main_keyboard
return get_main_keyboard()
@track_time("send_error_to_logs", "voice_bot_service")
@track_errors("voice_bot_service", "send_error_to_logs")
async def _send_error_to_logs(self, message: str) -> None:
"""Отправить ошибку в логи"""
try:
from helper_bot.utils.helper_func import send_voice_message
await send_voice_message(
self.settings['Telegram']['important_logs'],
None,
None,
None
)
except Exception as e:
logger.error(f"Не удалось отправить ошибку в логи: {e}")
class AudioFileService:
"""Сервис для работы с аудио файлами"""
def __init__(self, bot_db):
self.bot_db = bot_db
@track_time("generate_file_name", "audio_file_service")
@track_errors("audio_file_service", "generate_file_name")
async def generate_file_name(self, user_id: int) -> str:
"""Сгенерировать имя файла для аудио"""
try:
# Проверяем есть ли запись о файле в базе данных
user_audio_count = await self.bot_db.get_user_audio_records_count(user_id=user_id)
if user_audio_count == 0:
# Если нет, то генерируем имя файла
file_name = f'message_from_{user_id}_number_1'
else:
# Иначе берем последнюю запись из БД, добавляем к ней 1
file_name = await self.bot_db.get_path_for_audio_record(user_id=user_id)
if file_name:
# Извлекаем номер из имени файла и увеличиваем на 1
try:
current_number = int(file_name.split('_')[-1])
new_number = current_number + 1
except (ValueError, IndexError):
new_number = user_audio_count + 1
else:
new_number = user_audio_count + 1
file_name = f'message_from_{user_id}_number_{new_number}'
return file_name
except Exception as e:
logger.error(f"Ошибка при генерации имени файла: {e}")
raise FileOperationError(f"Не удалось сгенерировать имя файла: {e}")
@track_time("save_audio_file", "audio_file_service")
@track_errors("audio_file_service", "save_audio_file")
async def save_audio_file(self, file_name: str, user_id: int, date_added: datetime, file_id: str) -> None:
"""Сохранить информацию об аудио файле в базу данных"""
try:
await self.bot_db.add_audio_record_simple(file_name, user_id, date_added)
except Exception as e:
logger.error(f"Ошибка при сохранении аудио файла в БД: {e}")
raise DatabaseError(f"Не удалось сохранить аудио файл в БД: {e}")
@track_time("download_and_save_audio", "audio_file_service")
@track_errors("audio_file_service", "download_and_save_audio")
async def download_and_save_audio(self, bot, message, file_name: str) -> None:
"""Скачать и сохранить аудио файл"""
try:
logger.info(f"Начинаем скачивание и сохранение аудио: {file_name}")
# Проверяем наличие голосового сообщения
if not message or not message.voice:
logger.error("Сообщение или голосовое сообщение не найдено")
raise FileOperationError("Сообщение или голосовое сообщение не найдено")
file_id = message.voice.file_id
logger.info(f"Получен file_id: {file_id}")
file_info = await bot.get_file(file_id=file_id)
logger.info(f"Получена информация о файле: {file_info.file_path}")
downloaded_file = await bot.download_file(file_path=file_info.file_path)
# Проверяем что файл успешно скачан
if not downloaded_file:
logger.error("Не удалось скачать файл")
raise FileOperationError("Не удалось скачать файл")
# Получаем размер файла без изменения позиции
current_pos = downloaded_file.tell()
downloaded_file.seek(0, 2) # Переходим в конец файла
file_size = downloaded_file.tell()
downloaded_file.seek(current_pos) # Возвращаемся в исходную позицию
logger.info(f"Файл скачан, размер: {file_size} bytes")
# Создаем директорию если она не существует
import os
os.makedirs(VOICE_USERS_DIR, exist_ok=True)
logger.info(f"Директория {VOICE_USERS_DIR} создана/проверена")
file_path = f'{VOICE_USERS_DIR}/{file_name}.ogg'
logger.info(f"Сохраняем файл по пути: {file_path}")
# Сбрасываем позицию в файле перед сохранением
downloaded_file.seek(0)
# Сохраняем файл
with open(file_path, 'wb') as new_file:
new_file.write(downloaded_file.read())
logger.info(f"Файл успешно сохранен: {file_path}")
except Exception as e:
logger.error(f"Ошибка при скачивании и сохранении аудио: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise FileOperationError(f"Не удалось скачать и сохранить аудио: {e}")