Files
telegram-helper-bot/helper_bot/utils/helper_func.py
2024-10-31 21:58:46 +03:00

390 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import html
import os
from datetime import datetime, timedelta
from aiogram import types
from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from logs.custom_logger import logger
bdf = BaseDependencyFactory()
BotDB = bdf.get_db()
def get_first_name(message: types.Message) -> str:
first_name = html.escape(message.from_user.first_name)
return first_name
def get_text_message(post_text: str, first_name: str, username: str):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста
Returns:
str: - Сформированный текст сообщения.
"""
if "неанон" in post_text or "не анон" in post_text:
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}'
elif "анон" in post_text:
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно'
else:
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}'
async def download_file(message: types.Message, file_id: str):
"""
Скачивает файл по file_id из Telegram.
Args:
message: сообщение
file_id: File ID фотографии
filename: Имя файла, под которым будет сохранено фото
Returns:
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
"""
try:
os.makedirs("files", exist_ok=True)
os.makedirs("files/photos", exist_ok=True)
os.makedirs("files/videos", exist_ok=True)
os.makedirs("files/music", exist_ok=True)
os.makedirs("files/voice", exist_ok=True)
os.makedirs("files/video_notes", exist_ok=True)
file = await message.bot.get_file(file_id)
file_path = os.path.join("files", file.file_path)
await message.bot.download_file(file_path=file.file_path, destination=file_path)
return file_path
except Exception as e:
logger.error(f"Ошибка скачивания фотографии: {e}")
return None
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
"""
Создает MediaGroup.
Args:
album: Album объект из Telegram API.
post_caption: Текст подписи к первому фото.
Returns:
Список InputMediaPhoto (MediaGroup).
"""
media_group = []
for i, message in enumerate(album):
if message.photo:
file_id = message.photo[-1].file_id
media_type = 'photo'
elif message.video:
file_id = message.video.file_id
media_type = 'video'
elif message.audio:
file_id = message.audio.file_id
media_type = 'audio'
else:
# Если нет фото, видео или аудио, пропускаем сообщение
continue
# Формируем объект MediaGroup с учетом типа медиа
if i == len(album) - 1:
if media_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id, caption=post_caption))
elif media_type == 'video':
media_group.append(InputMediaVideo(media=file_id, caption=post_caption))
elif media_type == 'audio':
media_group.append(InputMediaAudio(media=file_id, caption=post_caption))
else:
if media_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id))
elif media_type == 'video':
media_group.append(InputMediaVideo(media=file_id))
elif media_type == 'audio':
media_group.append(InputMediaAudio(media=file_id))
return media_group # Возвращаем MediaGroup
async def add_in_db_media_mediagroup(sent_message):
"""
Идентификатор медиа-группы
Args:
sent_message: sent_message объект из Telegram API
Returns:
Список InputFile (FSInputFile).
"""
media_group_message_id = sent_message[-1].message_id # Получаем идентификатор медиа-группы
for i, message in enumerate(sent_message):
if message.photo:
file_id = message.photo[-1].file_id
file_path = await download_file(message, file_id=file_id)
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'photo')
elif message.video:
file_id = message.video.file_id
file_path = await download_file(message, file_id=file_id)
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'video')
else:
# Если нет фото, видео или аудио, или другой контент, пропускаем сообщение
continue
async def add_in_db_media(sent_message):
"""
Args:
sent_message: sent_message объект из Telegram API
Returns:
Список InputFile (FSInputFile).
"""
if sent_message.photo:
file_id = sent_message.photo[-1].file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'photo')
elif sent_message.video:
file_id = sent_message.video.file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video')
elif sent_message.voice:
file_id = sent_message.voice.file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'voice')
elif sent_message.audio:
file_id = sent_message.audio.file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'audio')
elif sent_message.video_note:
file_id = sent_message.video_note.file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video_note')
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
media_group: list[InputMediaPhoto]):
sent_message = await message.bot.send_media_group(
chat_id=chat_id,
media=media_group,
)
BotDB.add_post_in_db(sent_message[-1].message_id, sent_message[-1].caption, message.from_user.id)
await add_in_db_media_mediagroup(sent_message)
message_id = sent_message[-1].message_id
return message_id
async def send_media_group_to_channel(bot, chat_id: int, post_content: list[tuple[str]], post_text: str):
"""
Отправляет медиа-группу с подписью к последнему файлу.
Args:
bot: Экземпляр бота aiogram.
chat_id: ID чата для отправки.
post_content: Список кортежей с путями к файлам.
post_text: Текст подписи.
"""
media = []
for file_path in post_content:
try:
file = FSInputFile(path=file_path[0])
type = file_path[1]
if type == 'video':
media.append(types.InputMediaVideo(media=file))
if type == 'photo':
media.append(types.InputMediaPhoto(media=file))
except FileNotFoundError:
logger.error(f"Файл не найден: {file_path[0]}")
return
# Добавляем подпись к последнему файлу
if media:
media[-1].caption = post_text
await bot.send_media_group(chat_id=chat_id, media=media)
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=post_text
)
message_id = sent_message.message_id
return message_id
else:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=post_text,
reply_markup=markup
)
message_id = sent_message.message_id
return message_id
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
photo=photo
)
else:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
photo=photo,
reply_markup=markup
)
return sent_message
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=post_text,
video=video
)
else:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=post_text,
video=video,
reply_markup=markup
)
return sent_message
async def send_video_note_message(chat_id, message: types.Message, video_note: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_video_note(
chat_id=chat_id,
video_note=video_note
)
else:
sent_message = await message.bot.send_video_note(
chat_id=chat_id,
video_note=video_note,
reply_markup=markup
)
return sent_message
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=post_text,
audio=audio
)
else:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=post_text,
audio=audio,
reply_markup=markup
)
return sent_message
async def send_voice_message(chat_id, message: types.Message, voice: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_voice(
chat_id=chat_id,
voice=voice
)
else:
sent_message = await message.bot.send_voice(
chat_id=chat_id,
voice=voice,
reply_markup=markup
)
return sent_message
def check_access(user_id: int):
"""Проверка прав на совершение действий"""
return BotDB.is_admin(user_id)
def add_days_to_date(days: int):
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
current_date = datetime.now()
future_date = current_date + timedelta(days=days)
formatted_date = future_date.strftime("%d-%m-%Y")
return formatted_date
def get_banned_users_list(offset: int):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
message = "Список заблокированных пользователей:\n"
for user in users:
message += f"Пользователь: {user[0]}\n"
message += f"Причина бана: {user[2]}\n"
message += f"Дата разбана: {user[3]}\n\n"
return message
def get_banned_users_buttons():
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = BotDB.get_banned_users_from_db()
user_ids = []
for user in users:
user_ids.append((user[0], user[1]))
return user_ids
def delete_user_blacklist(user_id: int):
return BotDB.delete_user_blacklist(user_id=user_id)
def check_username_and_full_name(user_id: int, username: str, full_name: str):
username_db, full_name_db = BotDB.get_username_and_full_name(user_id=user_id)
return username != username_db or full_name != full_name_db
def unban_notifier(self):
# Получение сегодняшней даты в формате DD-MM-YYYY
current_date = datetime.now()
today = current_date.strftime("%d-%m-%Y")
# Получение списка разблокированных пользователей
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
message = "Разблокированные пользователи:\n"
for user_id, user_name in unblocked_users.items():
message += f"ID: {user_id}, Имя: {user_name}\n"
# Отправка сообщения в канал
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)