274 lines
11 KiB
Python
274 lines
11 KiB
Python
from datetime import datetime, timedelta
|
||
|
||
from aiogram import types
|
||
from aiogram.types import InputMediaPhoto, FSInputFile
|
||
|
||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||
from logs.custom_logger import logger
|
||
|
||
bdf = BaseDependencyFactory()
|
||
|
||
BotDB = bdf.get_db()
|
||
|
||
|
||
def get_first_name(message: types.Message) -> str:
|
||
return message.from_user.first_name
|
||
|
||
|
||
def get_text_message(post_text: str, first_name: str, username: str):
|
||
"""
|
||
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
|
||
|
||
Args:
|
||
post_text: Текст сообщения
|
||
first_name: Имя автора поста
|
||
username: Юзернейм автора поста
|
||
|
||
Returns:
|
||
Кортеж из двух элементов:
|
||
- Сформированный текст сообщения.
|
||
- Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный).
|
||
"""
|
||
if "неанон" in post_text or "не анон" in post_text:
|
||
is_anonymous = False
|
||
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
|
||
elif "анон" in post_text:
|
||
is_anonymous = True
|
||
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно', is_anonymous
|
||
else:
|
||
is_anonymous = False
|
||
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
|
||
|
||
|
||
async def download_photo(message: types.Message, file_id: str):
|
||
"""
|
||
Скачивает фото по file_id из Telegram.
|
||
|
||
Args:
|
||
message: сообщение
|
||
file_id: File ID фотографии.
|
||
filename: Имя файла, под которым будет сохранено фото.
|
||
|
||
Returns:
|
||
Путь к сохраненному файлу, если файл был скачан успешно, иначе None.
|
||
"""
|
||
try:
|
||
file = await message.bot.get_file(file_id)
|
||
file_path = file.file_path
|
||
await message.bot.download_file(file_path=file_path, destination=file_path)
|
||
return file_path
|
||
except Exception as e:
|
||
logger.error(f"Ошибка скачивания фотографии: {e}")
|
||
return None
|
||
|
||
|
||
async def process_photo_album(album, post_caption: str = ''):
|
||
"""
|
||
Создает список InputMediaPhoto для альбома.
|
||
|
||
Args:
|
||
album: Album объект из Telegram API.
|
||
post_caption: Текст подписи к первому фото.
|
||
|
||
Returns:
|
||
Список InputMediaPhoto.
|
||
"""
|
||
photo_media = []
|
||
#Циклом проходимся по собранному миддлварью объекту album
|
||
for i, message in enumerate(album):
|
||
file_id = message.photo[-1].file_id
|
||
file_path = await download_photo(message, file_id=file_id)
|
||
# Если это последняя фото в массиве, то добавляем подпись. Остальные фото просто преобразуем в InputMediaPhoto
|
||
# и формируем объект MediaGroup
|
||
if i == len(album) - 1:
|
||
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id, caption=post_caption))
|
||
else:
|
||
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id))
|
||
return photo_media # Возвращаем идентификатор медиа-группы
|
||
|
||
|
||
async def add_in_db_media(sent_message, post_caption: str = ''):
|
||
"""
|
||
Идентификатор медиа-группы
|
||
|
||
Args:
|
||
sent_message: sent_message объект из Telegram API.
|
||
post_caption: Текст подписи к первому фото.
|
||
|
||
Returns:
|
||
Список InputMediaPhoto.
|
||
"""
|
||
media_group_message_id = sent_message[0].message_id # Получаем идентификатор медиа-группы
|
||
for i, message in enumerate(sent_message):
|
||
file_id = message.photo[-1].file_id
|
||
file_path = await download_photo(message, file_id=file_id)
|
||
if i == 0:
|
||
BotDB.add_post_from_telegram_in_db(media_group_message_id, message.message_id, 0, file_path, post_caption)
|
||
elif i == len(sent_message) - 1:
|
||
BotDB.add_post_from_telegram_in_db(media_group_message_id, message.message_id, 1, file_path, post_caption)
|
||
else:
|
||
BotDB.add_post_from_telegram_in_db(media_group_message_id, message.message_id, 0, file_path, post_caption)
|
||
return media_group_message_id
|
||
|
||
|
||
async def send_media_group_message(chat_id: int, message: types.Message, media_group: list[InputMediaPhoto]):
|
||
sent_message = await message.bot.send_media_group(
|
||
chat_id=chat_id,
|
||
media=media_group,
|
||
)
|
||
await add_in_db_media(sent_message, post_caption=sent_message[-1].caption)
|
||
message_id = sent_message[-1].message_id
|
||
return message_id
|
||
|
||
|
||
async def send_media_group_with_caption(bot, chat_id: int, post_content: list[tuple[str]], post_text: str):
|
||
"""
|
||
Отправляет медиа-группу с подписью к последнему файлу.
|
||
|
||
Args:
|
||
bot: Экземпляр бота aiogram.
|
||
chat_id: ID чата для отправки.
|
||
post_content: Список кортежей с путями к файлам.
|
||
post_text: Текст подписи.
|
||
"""
|
||
media = []
|
||
for file_path in post_content:
|
||
try:
|
||
file = FSInputFile(path=file_path[0])
|
||
media.append(types.InputMediaPhoto(media=file))
|
||
except FileNotFoundError:
|
||
logger.error(f"Файл не найден: {file_path[0]}")
|
||
return
|
||
|
||
# Добавляем подпись к последнему файлу
|
||
if media:
|
||
media[-1].caption = post_text
|
||
|
||
await bot.send_media_group(chat_id=chat_id, media=media)
|
||
|
||
|
||
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
|
||
if markup is None:
|
||
sent_message = await message.bot.send_message(
|
||
chat_id=chat_id,
|
||
text=post_text
|
||
)
|
||
message_id = sent_message.message_id
|
||
return message_id
|
||
else:
|
||
sent_message = await message.bot.send_message(
|
||
chat_id=chat_id,
|
||
text=post_text,
|
||
reply_markup=markup
|
||
)
|
||
message_id = sent_message.message_id
|
||
return message_id
|
||
|
||
|
||
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str, markup: types.ReplyKeyboardMarkup = None):
|
||
if markup is None:
|
||
await message.bot.send_photo(
|
||
chat_id=chat_id,
|
||
caption=post_text,
|
||
photo=photo
|
||
)
|
||
else:
|
||
await message.bot.send_photo(
|
||
chat_id=chat_id,
|
||
caption=post_text,
|
||
photo=photo,
|
||
reply_markup=markup
|
||
)
|
||
|
||
|
||
def check_access(user_id: int):
|
||
"""Проверка прав на совершение действий"""
|
||
return BotDB.is_admin(user_id)
|
||
|
||
|
||
def add_days_to_date(days: int):
|
||
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
|
||
current_date = datetime.now()
|
||
future_date = current_date + timedelta(days=days)
|
||
formatted_date = future_date.strftime("%d-%m-%Y")
|
||
return formatted_date
|
||
|
||
|
||
def get_banned_users_list(offset: int):
|
||
"""
|
||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||
|
||
Args:
|
||
offset: отступ для запроса в базу данных
|
||
|
||
Returns:
|
||
message - текст сообщения
|
||
user_ids - лист кортежей [(user_name: user_id)]
|
||
"""
|
||
users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
|
||
message = "Список заблокированных пользователей:\n"
|
||
|
||
for user in users:
|
||
message += f"Пользователь: {user[0]}\n"
|
||
message += f"Причина бана: {user[2]}\n"
|
||
message += f"Дата разбана: {user[3]}\n\n"
|
||
return message
|
||
|
||
|
||
def get_banned_users_buttons():
|
||
"""
|
||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||
|
||
Args:
|
||
offset: отступ для запроса в базу данных
|
||
|
||
Returns:
|
||
message - текст сообщения
|
||
user_ids - лист кортежей [(user_name: user_id)]
|
||
"""
|
||
users = BotDB.get_banned_users_from_db()
|
||
user_ids = []
|
||
|
||
for user in users:
|
||
user_ids.append((user[0], user[1]))
|
||
return user_ids
|
||
|
||
|
||
def get_help_message_id(media_group_message_id: int, data: dict) -> int:
|
||
"""
|
||
Получает идентификатор текстового сообщения по идентификатору сообщения группы.
|
||
|
||
Args:
|
||
media_group_message_id: Идентификатор сообщения группы
|
||
data: Словарь с данными.
|
||
|
||
Returns:
|
||
Идентификатор сообщения помощи.
|
||
"""
|
||
if 'help_message_id' in data:
|
||
return data['help_message_id'] # Возвращаем help_message_id
|
||
else:
|
||
return 0
|
||
|
||
|
||
def delete_user_blacklist(user_id: int):
|
||
return BotDB.delete_user_blacklist(user_id=user_id)
|
||
|
||
|
||
def check_username_and_full_name(user_id: int, username: str, full_name: str):
|
||
username_db, full_name_db = BotDB.get_username_and_full_name(user_id=user_id)
|
||
return username != username_db or full_name != full_name_db
|
||
|
||
|
||
def unban_notifier(self):
|
||
# Получение сегодняшней даты в формате DD-MM-YYYY
|
||
current_date = datetime.now()
|
||
today = current_date.strftime("%d-%m-%Y")
|
||
# Получение списка разблокированных пользователей
|
||
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
|
||
message = "Разблокированные пользователи:\n"
|
||
for user_id, user_name in unblocked_users.items():
|
||
message += f"ID: {user_id}, Имя: {user_name}\n"
|
||
|
||
# Отправка сообщения в канал
|
||
self.bot.send_message(self.GROUP_FOR_MESSAGE, message) |