197 lines
7.2 KiB
Python
197 lines
7.2 KiB
Python
from datetime import datetime, timedelta
|
||
|
||
from aiogram import types
|
||
from aiogram.types import InputMediaPhoto
|
||
|
||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||
|
||
bdf = BaseDependencyFactory()
|
||
|
||
BotDB = bdf.get_db()
|
||
|
||
|
||
def get_first_name(message: types.Message) -> str:
|
||
return message.from_user.first_name
|
||
|
||
|
||
def get_text_message(post_text: str, first_name: str, username: str):
|
||
"""
|
||
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
|
||
|
||
Args:
|
||
post_text: Текст сообщения
|
||
first_name: Имя автора поста
|
||
username: Юзернейм автора поста
|
||
|
||
Returns:
|
||
Кортеж из двух элементов:
|
||
- Сформированный текст сообщения.
|
||
- Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный).
|
||
"""
|
||
if "неанон" in post_text or "не анон" in post_text:
|
||
is_anonymous = False
|
||
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
|
||
elif "анон" in post_text:
|
||
is_anonymous = True
|
||
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно', is_anonymous
|
||
else:
|
||
is_anonymous = False
|
||
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
|
||
|
||
|
||
def process_photo_album(album, post_caption: str = ''):
|
||
"""
|
||
Создает список InputMediaPhoto для альбома.
|
||
|
||
Args:
|
||
album: Album объект из Telegram API.
|
||
post_caption: Текст подписи к первому фото.
|
||
|
||
Returns:
|
||
Список InputMediaPhoto.
|
||
"""
|
||
photo_media = []
|
||
for i, message in enumerate(album):
|
||
if i == 0:
|
||
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id, caption=post_caption))
|
||
else:
|
||
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id))
|
||
return photo_media
|
||
|
||
|
||
async def send_media_group_message(chat_id: int, message: types.Message, media_group: list[InputMediaPhoto]):
|
||
sent_message = await message.bot.send_media_group(
|
||
chat_id=chat_id,
|
||
media=media_group,
|
||
)
|
||
message_id = sent_message[-1].message_id
|
||
return message_id
|
||
|
||
|
||
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
|
||
if markup is None:
|
||
sent_message = await message.bot.send_message(
|
||
chat_id=chat_id,
|
||
text=post_text
|
||
)
|
||
message_id = sent_message.message_id
|
||
return message_id
|
||
else:
|
||
sent_message = await message.bot.send_message(
|
||
chat_id=chat_id,
|
||
text=post_text,
|
||
reply_markup=markup
|
||
)
|
||
message_id = sent_message.message_id
|
||
return message_id
|
||
|
||
|
||
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str, markup: types.ReplyKeyboardMarkup = None):
|
||
if markup is None:
|
||
await message.bot.send_photo(
|
||
chat_id=chat_id,
|
||
caption=post_text,
|
||
photo=photo
|
||
)
|
||
else:
|
||
await message.bot.send_photo(
|
||
chat_id=chat_id,
|
||
caption=post_text,
|
||
photo=photo,
|
||
reply_markup=markup
|
||
)
|
||
|
||
|
||
def check_access(user_id: int):
|
||
"""Проверка прав на совершение действий"""
|
||
return BotDB.is_admin(user_id)
|
||
|
||
|
||
def add_days_to_date(days: int):
|
||
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
|
||
current_date = datetime.now()
|
||
future_date = current_date + timedelta(days=days)
|
||
formatted_date = future_date.strftime("%d-%m-%Y")
|
||
return formatted_date
|
||
|
||
|
||
def get_banned_users_list(offset: int):
|
||
"""
|
||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||
|
||
Args:
|
||
offset: отступ для запроса в базу данных
|
||
|
||
Returns:
|
||
message - текст сообщения
|
||
user_ids - лист кортежей [(user_name: user_id)]
|
||
"""
|
||
users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
|
||
message = "Список заблокированных пользователей:\n"
|
||
|
||
for user in users:
|
||
message += f"Пользователь: {user[0]}\n"
|
||
message += f"Причина бана: {user[2]}\n"
|
||
message += f"Дата разбана: {user[3]}\n\n"
|
||
return message
|
||
|
||
|
||
def get_banned_users_buttons():
|
||
"""
|
||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||
|
||
Args:
|
||
offset: отступ для запроса в базу данных
|
||
|
||
Returns:
|
||
message - текст сообщения
|
||
user_ids - лист кортежей [(user_name: user_id)]
|
||
"""
|
||
users = BotDB.get_banned_users_from_db()
|
||
user_ids = []
|
||
|
||
for user in users:
|
||
user_ids.append((user[0], user[1]))
|
||
return user_ids
|
||
|
||
|
||
def get_help_message_id(media_group_message_id: int, data: dict) -> int:
|
||
"""
|
||
Получает идентификатор сообщения помощи по идентификатору сообщения группы.
|
||
|
||
Args:
|
||
media_group_message_id: Идентификатор сообщения группы
|
||
data: Словарь с данными.
|
||
|
||
Returns:
|
||
Идентификатор сообщения помощи.
|
||
"""
|
||
|
||
if 'help_message_id' in data and 'media_group_message_id' in data:
|
||
return data['media_group_message_id']
|
||
else:
|
||
return 0
|
||
|
||
|
||
def delete_user_blacklist(user_id: int):
|
||
return BotDB.delete_user_blacklist(user_id=user_id)
|
||
|
||
|
||
def check_username_and_full_name(user_id: int, username: str, full_name: str):
|
||
username_db, full_name_db = BotDB.get_username_and_full_name(user_id=user_id)
|
||
return username != username_db or full_name != full_name_db
|
||
|
||
|
||
def unban_notifier(self):
|
||
# Получение сегодняшней даты в формате DD-MM-YYYY
|
||
current_date = datetime.now()
|
||
print('Мы в функции unban_notifier')
|
||
today = current_date.strftime("%d-%m-%Y")
|
||
# Получение списка разблокированных пользователей
|
||
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
|
||
message = "Разблокированные пользователи:\n"
|
||
for user_id, user_name in unblocked_users.items():
|
||
message += f"ID: {user_id}, Имя: {user_name}\n"
|
||
|
||
# Отправка сообщения в канал
|
||
self.bot.send_message(self.GROUP_FOR_MESSAGE, message) |