Files
telegram-helper-bot/helper_bot/handlers/voice/services.py
Andrey 2d40f4496e Update voice bot functionality and clean up project structure
- Added voice message handling capabilities, including saving and deleting audio messages via callback queries.
- Refactored audio record management in the database to remove unnecessary fields and streamline operations.
- Introduced new keyboard options for voice interactions in the bot.
- Updated `.gitignore` to include voice user files for better project organization.
- Removed obsolete voice bot handler files to simplify the codebase.
2025-09-01 19:17:05 +03:00

264 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
import asyncio
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple
from aiogram.types import FSInputFile
from helper_bot.handlers.voice.exceptions import VoiceMessageError, AudioProcessingError, DatabaseError, FileOperationError
from helper_bot.handlers.voice.constants import (
VOICE_USERS_DIR, STICK_DIR, STICK_PATTERN, STICKER_DELAY,
MESSAGE_DELAY_1, MESSAGE_DELAY_2, MESSAGE_DELAY_3, MESSAGE_DELAY_4
)
from logs.custom_logger import logger
class VoiceMessage:
"""Модель голосового сообщения"""
def __init__(self, file_name: str, user_id: int, date_added: datetime, file_id: int):
self.file_name = file_name
self.user_id = user_id
self.date_added = date_added
self.file_id = file_id
class VoiceBotService:
"""Сервис для работы с голосовыми сообщениями"""
def __init__(self, bot_db, settings):
self.bot_db = bot_db
self.settings = settings
async def get_welcome_sticker(self) -> Optional[FSInputFile]:
"""Получить случайный приветственный стикер"""
try:
name_stick_hello = list(Path(STICK_DIR).rglob(STICK_PATTERN))
if not name_stick_hello:
return None
random_stick_hello = random.choice(name_stick_hello)
random_stick_hello = FSInputFile(path=random_stick_hello)
logger.info(f"Стикер успешно получен. Наименование стикера: {random_stick_hello}")
return random_stick_hello
except Exception as e:
logger.error(f"Ошибка при получении стикера: {e}")
if self.settings['Settings']['logs']:
await self._send_error_to_logs(f'Отправка приветственных стикеров лажает. Ошибка: {e}')
return None
async def send_welcome_messages(self, message, user_emoji: str):
"""Отправить приветственные сообщения"""
try:
# Отправляем стикер
sticker = await self.get_welcome_sticker()
if sticker:
await message.answer_sticker(sticker)
await asyncio.sleep(STICKER_DELAY)
# Отправляем приветственное сообщение
markup = self._get_main_keyboard()
await message.answer(
text="<b>Привет.</b>",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(STICKER_DELAY)
# Отправляем описание
await message.answer(
text="<i>Здесь можно послушать голосовые сообщения от совершенно незнакомых людей из Бийска</i>",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_1)
# Отправляем аналогию
await message.answer(
text="Это почти как написать письмо, положить его в бутылку и швырнуть в океан. Никогда не узнаешь, послушал его кто-то или нет и ответить тоже не получится..",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_2)
# Отправляем правила
await message.answer(
text="Записывать можно всё что угодно — никаких правил нет. Главное — твой голос, <i>хотя бы на 5-10 секунд</i>",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_3)
# Отправляем информацию об анонимности
await message.answer(
text="Здесь всё анонимно: тот, кому я отправлю твое сообщение, не узнает ни твое имя, ни твой аккаунт (так что можно не стесняться говорить то, что не стал(а) бы выкладывать в собственные соцсети)",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_4)
# Отправляем предложения
await message.answer(
text="Если не знаешь, что сказать, можешь просто прочитать любое текстовое сообщение из недавно полученных или отправленных (или спеть, рассказать стихотворенье)",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_4)
# Отправляем информацию об эмодзи
await message.answer(
text=f"Любые войсы будут помечены эмоджи. <b>Твой эмоджи - </b>{user_emoji}Таким эмоджи будут помечены твои сообщения для других Но другие люди не узнают кто за каким эмоджи скрывается:)",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_4)
# Отправляем информацию о помощи
await message.answer(
text="Так же можешь ознакомиться с инструкцией к боту по команде /help",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
await asyncio.sleep(MESSAGE_DELAY_4)
# Отправляем финальное сообщение
await message.answer(
text="<b>Ну всё, достаточно инструкций. записывайся! Микрофон твой - </b> 🎤",
parse_mode='html',
reply_markup=markup,
disable_web_page_preview=not self.settings['Telegram']['preview_link']
)
except Exception as e:
logger.error(f"Ошибка при отправке приветственных сообщений: {e}")
raise VoiceMessageError(f"Не удалось отправить приветственные сообщения: {e}")
def get_random_audio(self, user_id: int) -> Optional[Tuple[str, str, str]]:
"""Получить случайное аудио для прослушивания"""
try:
check_audio = self.bot_db.check_listen_audio(user_id=user_id)
list_audio = list(check_audio)
if not list_audio:
return None
# Получаем случайное аудио
number_element = random.randint(0, len(list_audio) - 1)
audio_for_user = check_audio[number_element]
# Получаем информацию об авторе
user_id_author = self.bot_db.get_user_id_by_file_name(audio_for_user)
date_added = self.bot_db.get_date_by_file_name(audio_for_user)
user_emoji = self.bot_db.check_emoji_for_user(user_id_author)
return audio_for_user, date_added, user_emoji
except Exception as e:
logger.error(f"Ошибка при получении случайного аудио: {e}")
raise AudioProcessingError(f"Не удалось получить случайное аудио: {e}")
def mark_audio_as_listened(self, file_name: str, user_id: int) -> None:
"""Пометить аудио как прослушанное"""
try:
self.bot_db.mark_listened_audio(file_name, user_id=user_id)
except Exception as e:
logger.error(f"Ошибка при пометке аудио как прослушанного: {e}")
raise DatabaseError(f"Не удалось пометить аудио как прослушанное: {e}")
def clear_user_listenings(self, user_id: int) -> None:
"""Очистить прослушивания пользователя"""
try:
self.bot_db.delete_listen_count_for_user(user_id)
except Exception as e:
logger.error(f"Ошибка при очистке прослушиваний: {e}")
raise DatabaseError(f"Не удалось очистить прослушивания: {e}")
def get_remaining_audio_count(self, user_id: int) -> int:
"""Получить количество оставшихся непрослушанных аудио"""
try:
check_audio = self.bot_db.check_listen_audio(user_id=user_id)
return len(list(check_audio))
except Exception as e:
logger.error(f"Ошибка при получении количества аудио: {e}")
raise DatabaseError(f"Не удалось получить количество аудио: {e}")
def _get_main_keyboard(self):
"""Получить основную клавиатуру"""
from helper_bot.keyboards.keyboards import get_main_keyboard
return get_main_keyboard()
async def _send_error_to_logs(self, message: str) -> None:
"""Отправить ошибку в логи"""
try:
from helper_bot.utils.helper_func import send_voice_message
await send_voice_message(
self.settings['Telegram']['important_logs'],
None,
None,
None
)
except Exception as e:
logger.error(f"Не удалось отправить ошибку в логи: {e}")
class AudioFileService:
"""Сервис для работы с аудио файлами"""
def __init__(self, bot_db):
self.bot_db = bot_db
def generate_file_name(self, user_id: int) -> str:
"""Сгенерировать имя файла для аудио"""
try:
# Проверяем есть ли запись о файле в базе данных
is_having_audio_from_user = self.bot_db.get_last_user_audio_record(user_id=user_id)
if is_having_audio_from_user is False:
# Если нет, то генерируем имя файла
file_name = f'message_from_{user_id}_number_1'
else:
# Иначе берем последнюю запись из БД, добавляем к ней 1
file_name = self.bot_db.get_path_for_audio_record(user_id=user_id)
file_id = self.bot_db.get_id_for_audio_record(user_id) + 1
path = Path(f'voice_users/{file_name}.ogg')
if path.exists():
file_name = f'message_from_{user_id}_number_{file_id}'
return file_name
except Exception as e:
logger.error(f"Ошибка при генерации имени файла: {e}")
raise FileOperationError(f"Не удалось сгенерировать имя файла: {e}")
def save_audio_file(self, file_name: str, user_id: int, date_added: datetime) -> None:
"""Сохранить информацию об аудио файле в базу данных"""
try:
self.bot_db.add_audio_record(file_name, user_id, date_added)
except Exception as e:
logger.error(f"Ошибка при сохранении аудио файла в БД: {e}")
raise DatabaseError(f"Не удалось сохранить аудио файл в БД: {e}")
async def download_and_save_audio(self, bot, message_id: int, file_name: str) -> None:
"""Скачать и сохранить аудио файл"""
try:
# Получаем информацию о файле
file_info = await bot.get_file(file_id=bot.get_message(message_id).voice.file_id)
downloaded_file = await bot.download_file(file_path=file_info.file_path)
# Сохраняем файл
with open(f'voice_users/{file_name}.ogg', 'wb') as new_file:
new_file.write(downloaded_file.read())
except Exception as e:
logger.error(f"Ошибка при скачивании и сохранении аудио: {e}")
raise FileOperationError(f"Не удалось скачать и сохранить аудио: {e}")