Files
telegram-helper-bot/helper_bot/utils/helper_func.py
2024-07-20 16:54:43 +03:00

274 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
from aiogram import types
from aiogram.types import InputMediaPhoto, FSInputFile
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from logs.custom_logger import logger
bdf = BaseDependencyFactory()
BotDB = bdf.get_db()
def get_first_name(message: types.Message) -> str:
return message.from_user.first_name
def get_text_message(post_text: str, first_name: str, username: str):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста
Returns:
Кортеж из двух элементов:
- Сформированный текст сообщения.
- Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный)
"""
if "неанон" in post_text or "не анон" in post_text:
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}'
elif "анон" in post_text:
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно'
else:
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}'
async def download_photo(message: types.Message, file_id: str):
"""
Скачивает фото по file_id из Telegram.
Args:
message: сообщение
file_id: File ID фотографии
filename: Имя файла, под которым будет сохранено фото
Returns:
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
"""
try:
file = await message.bot.get_file(file_id)
file_path = file.file_path
await message.bot.download_file(file_path=file_path, destination=file_path)
return file_path
except Exception as e:
logger.error(f"Ошибка скачивания фотографии: {e}")
return None
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
"""
Создает MediaGroup.
Args:
album: Album объект из Telegram API.
post_caption: Текст подписи к первому фото.
Returns:
Список InputMediaPhoto (MediaGroup).
"""
media_group = []
# Циклом проходимся по собранному миддлварью объекту album
for i, message in enumerate(album):
file_id = message.photo[-1].file_id
# Если это последняя фото в массиве, то добавляем подпись. Остальные фото просто преобразуем в InputMediaPhoto
# и формируем объект MediaGroup
if i == len(album) - 1:
media_group.append(InputMediaPhoto(media=file_id, caption=post_caption))
else:
media_group.append(InputMediaPhoto(media=file_id))
return media_group # Возвращаем MediaGroup
async def add_in_db_media(sent_message):
"""
Идентификатор медиа-группы
Args:
sent_message: sent_message объект из Telegram API
Returns:
Список InputMediaPhoto.
"""
media_group_message_id = sent_message[-1].message_id # Получаем идентификатор медиа-группы
for i, message in enumerate(sent_message):
file_id = message.photo[-1].file_id
file_path = await download_photo(message, file_id=file_id)
if i == 0:
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path)
elif i == len(sent_message) - 1:
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path)
else:
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path)
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
media_group: list[InputMediaPhoto]):
sent_message = await message.bot.send_media_group(
chat_id=chat_id,
media=media_group,
)
BotDB.add_post_in_db(sent_message[-1].message_id, sent_message[-1].caption, message.from_user.id)
await add_in_db_media(sent_message)
message_id = sent_message[-1].message_id
return message_id
async def send_media_group_to_channel(bot, chat_id: int, post_content: list[tuple[str]], post_text: str):
"""
Отправляет медиа-группу с подписью к последнему файлу.
Args:
bot: Экземпляр бота aiogram.
chat_id: ID чата для отправки.
post_content: Список кортежей с путями к файлам.
post_text: Текст подписи.
"""
media = []
for file_path in post_content:
try:
file = FSInputFile(path=file_path[0])
media.append(types.InputMediaPhoto(media=file))
except FileNotFoundError:
logger.error(f"Файл не найден: {file_path[0]}")
return
# Добавляем подпись к последнему файлу
if media:
media[-1].caption = post_text
await bot.send_media_group(chat_id=chat_id, media=media)
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=post_text
)
message_id = sent_message.message_id
return message_id
else:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=post_text,
reply_markup=markup
)
message_id = sent_message.message_id
return message_id
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
photo=photo
)
else:
await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
photo=photo,
reply_markup=markup
)
def check_access(user_id: int):
"""Проверка прав на совершение действий"""
return BotDB.is_admin(user_id)
def add_days_to_date(days: int):
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
current_date = datetime.now()
future_date = current_date + timedelta(days=days)
formatted_date = future_date.strftime("%d-%m-%Y")
return formatted_date
def get_banned_users_list(offset: int):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
message = "Список заблокированных пользователей:\n"
for user in users:
message += f"Пользователь: {user[0]}\n"
message += f"Причина бана: {user[2]}\n"
message += f"Дата разбана: {user[3]}\n\n"
return message
def get_banned_users_buttons():
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = BotDB.get_banned_users_from_db()
user_ids = []
for user in users:
user_ids.append((user[0], user[1]))
return user_ids
def get_help_message_id(media_group_message_id: int, data: dict) -> int:
"""
Получает идентификатор текстового сообщения по идентификатору сообщения группы.
Args:
media_group_message_id: Идентификатор сообщения группы
data: Словарь с данными.
Returns:
Идентификатор сообщения помощи.
"""
if 'help_message_id' in data:
return data['help_message_id'] # Возвращаем help_message_id
else:
return 0
def delete_user_blacklist(user_id: int):
return BotDB.delete_user_blacklist(user_id=user_id)
def check_username_and_full_name(user_id: int, username: str, full_name: str):
username_db, full_name_db = BotDB.get_username_and_full_name(user_id=user_id)
return username != username_db or full_name != full_name_db
def unban_notifier(self):
# Получение сегодняшней даты в формате DD-MM-YYYY
current_date = datetime.now()
today = current_date.strftime("%d-%m-%Y")
# Получение списка разблокированных пользователей
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
message = "Разблокированные пользователи:\n"
for user_id, user_name in unblocked_users.items():
message += f"ID: {user_id}, Имя: {user_name}\n"
# Отправка сообщения в канал
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)