WIP: Temporary commit for branch move
This commit is contained in:
@@ -1,389 +1,394 @@
|
||||
import html
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from aiogram import types
|
||||
from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio
|
||||
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from logs.custom_logger import logger
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
def get_first_name(message: types.Message) -> str:
|
||||
first_name = html.escape(message.from_user.first_name)
|
||||
return first_name
|
||||
|
||||
|
||||
def get_text_message(post_text: str, first_name: str, username: str):
|
||||
"""
|
||||
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
|
||||
|
||||
Args:
|
||||
post_text: Текст сообщения
|
||||
first_name: Имя автора поста
|
||||
username: Юзернейм автора поста
|
||||
|
||||
Returns:
|
||||
str: - Сформированный текст сообщения.
|
||||
"""
|
||||
if "неанон" in post_text or "не анон" in post_text:
|
||||
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}'
|
||||
elif "анон" in post_text:
|
||||
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно'
|
||||
else:
|
||||
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}'
|
||||
|
||||
|
||||
async def download_file(message: types.Message, file_id: str):
|
||||
"""
|
||||
Скачивает файл по file_id из Telegram.
|
||||
|
||||
Args:
|
||||
message: сообщение
|
||||
file_id: File ID фотографии
|
||||
filename: Имя файла, под которым будет сохранено фото
|
||||
|
||||
Returns:
|
||||
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
|
||||
"""
|
||||
try:
|
||||
os.makedirs("files", exist_ok=True)
|
||||
os.makedirs("files/photos", exist_ok=True)
|
||||
os.makedirs("files/videos", exist_ok=True)
|
||||
os.makedirs("files/music", exist_ok=True)
|
||||
os.makedirs("files/voice", exist_ok=True)
|
||||
os.makedirs("files/video_notes", exist_ok=True)
|
||||
file = await message.bot.get_file(file_id)
|
||||
file_path = os.path.join("files", file.file_path)
|
||||
await message.bot.download_file(file_path=file.file_path, destination=file_path)
|
||||
return file_path
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка скачивания фотографии: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
|
||||
"""
|
||||
Создает MediaGroup.
|
||||
|
||||
Args:
|
||||
album: Album объект из Telegram API.
|
||||
post_caption: Текст подписи к первому фото.
|
||||
|
||||
Returns:
|
||||
Список InputMediaPhoto (MediaGroup).
|
||||
"""
|
||||
media_group = []
|
||||
|
||||
for i, message in enumerate(album):
|
||||
if message.photo:
|
||||
file_id = message.photo[-1].file_id
|
||||
media_type = 'photo'
|
||||
elif message.video:
|
||||
file_id = message.video.file_id
|
||||
media_type = 'video'
|
||||
elif message.audio:
|
||||
file_id = message.audio.file_id
|
||||
media_type = 'audio'
|
||||
else:
|
||||
# Если нет фото, видео или аудио, пропускаем сообщение
|
||||
continue
|
||||
|
||||
# Формируем объект MediaGroup с учетом типа медиа
|
||||
if i == len(album) - 1:
|
||||
if media_type == 'photo':
|
||||
media_group.append(InputMediaPhoto(media=file_id, caption=post_caption))
|
||||
elif media_type == 'video':
|
||||
media_group.append(InputMediaVideo(media=file_id, caption=post_caption))
|
||||
elif media_type == 'audio':
|
||||
media_group.append(InputMediaAudio(media=file_id, caption=post_caption))
|
||||
else:
|
||||
if media_type == 'photo':
|
||||
media_group.append(InputMediaPhoto(media=file_id))
|
||||
elif media_type == 'video':
|
||||
media_group.append(InputMediaVideo(media=file_id))
|
||||
elif media_type == 'audio':
|
||||
media_group.append(InputMediaAudio(media=file_id))
|
||||
|
||||
return media_group # Возвращаем MediaGroup
|
||||
|
||||
|
||||
async def add_in_db_media_mediagroup(sent_message):
|
||||
"""
|
||||
Идентификатор медиа-группы
|
||||
|
||||
Args:
|
||||
sent_message: sent_message объект из Telegram API
|
||||
|
||||
Returns:
|
||||
Список InputFile (FSInputFile).
|
||||
"""
|
||||
media_group_message_id = sent_message[-1].message_id # Получаем идентификатор медиа-группы
|
||||
for i, message in enumerate(sent_message):
|
||||
if message.photo:
|
||||
file_id = message.photo[-1].file_id
|
||||
file_path = await download_file(message, file_id=file_id)
|
||||
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'photo')
|
||||
elif message.video:
|
||||
file_id = message.video.file_id
|
||||
file_path = await download_file(message, file_id=file_id)
|
||||
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'video')
|
||||
else:
|
||||
# Если нет фото, видео или аудио, или другой контент, пропускаем сообщение
|
||||
continue
|
||||
|
||||
|
||||
async def add_in_db_media(sent_message):
|
||||
"""
|
||||
Args:
|
||||
sent_message: sent_message объект из Telegram API
|
||||
|
||||
Returns:
|
||||
Список InputFile (FSInputFile).
|
||||
"""
|
||||
if sent_message.photo:
|
||||
file_id = sent_message.photo[-1].file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'photo')
|
||||
elif sent_message.video:
|
||||
file_id = sent_message.video.file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video')
|
||||
elif sent_message.voice:
|
||||
file_id = sent_message.voice.file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'voice')
|
||||
elif sent_message.audio:
|
||||
file_id = sent_message.audio.file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'audio')
|
||||
elif sent_message.video_note:
|
||||
file_id = sent_message.video_note.file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video_note')
|
||||
|
||||
|
||||
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
|
||||
media_group: list[InputMediaPhoto]):
|
||||
sent_message = await message.bot.send_media_group(
|
||||
chat_id=chat_id,
|
||||
media=media_group,
|
||||
)
|
||||
BotDB.add_post_in_db(sent_message[-1].message_id, sent_message[-1].caption, message.from_user.id)
|
||||
await add_in_db_media_mediagroup(sent_message)
|
||||
message_id = sent_message[-1].message_id
|
||||
return message_id
|
||||
|
||||
|
||||
async def send_media_group_to_channel(bot, chat_id: int, post_content: list[tuple[str]], post_text: str):
|
||||
"""
|
||||
Отправляет медиа-группу с подписью к последнему файлу.
|
||||
|
||||
Args:
|
||||
bot: Экземпляр бота aiogram.
|
||||
chat_id: ID чата для отправки.
|
||||
post_content: Список кортежей с путями к файлам.
|
||||
post_text: Текст подписи.
|
||||
"""
|
||||
media = []
|
||||
for file_path in post_content:
|
||||
try:
|
||||
file = FSInputFile(path=file_path[0])
|
||||
type = file_path[1]
|
||||
if type == 'video':
|
||||
media.append(types.InputMediaVideo(media=file))
|
||||
if type == 'photo':
|
||||
media.append(types.InputMediaPhoto(media=file))
|
||||
except FileNotFoundError:
|
||||
logger.error(f"Файл не найден: {file_path[0]}")
|
||||
return
|
||||
|
||||
# Добавляем подпись к последнему файлу
|
||||
if media:
|
||||
media[-1].caption = post_text
|
||||
|
||||
await bot.send_media_group(chat_id=chat_id, media=media)
|
||||
|
||||
|
||||
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_message(
|
||||
chat_id=chat_id,
|
||||
text=post_text
|
||||
)
|
||||
message_id = sent_message.message_id
|
||||
return message_id
|
||||
else:
|
||||
sent_message = await message.bot.send_message(
|
||||
chat_id=chat_id,
|
||||
text=post_text,
|
||||
reply_markup=markup
|
||||
)
|
||||
message_id = sent_message.message_id
|
||||
return message_id
|
||||
|
||||
|
||||
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_photo(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
photo=photo
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_photo(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
photo=photo,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_video(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
video=video
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_video(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
video=video,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
async def send_video_note_message(chat_id, message: types.Message, video_note: str,
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_video_note(
|
||||
chat_id=chat_id,
|
||||
video_note=video_note
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_video_note(
|
||||
chat_id=chat_id,
|
||||
video_note=video_note,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_audio(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
audio=audio
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_audio(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
audio=audio,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
async def send_voice_message(chat_id, message: types.Message, voice: str,
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_voice(
|
||||
chat_id=chat_id,
|
||||
voice=voice
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_voice(
|
||||
chat_id=chat_id,
|
||||
voice=voice,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
def check_access(user_id: int):
|
||||
"""Проверка прав на совершение действий"""
|
||||
return BotDB.is_admin(user_id)
|
||||
|
||||
|
||||
def add_days_to_date(days: int):
|
||||
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
|
||||
current_date = datetime.now()
|
||||
future_date = current_date + timedelta(days=days)
|
||||
formatted_date = future_date.strftime("%d-%m-%Y")
|
||||
return formatted_date
|
||||
|
||||
|
||||
def get_banned_users_list(offset: int):
|
||||
"""
|
||||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||||
|
||||
Args:
|
||||
offset: отступ для запроса в базу данных
|
||||
|
||||
Returns:
|
||||
message - текст сообщения
|
||||
user_ids - лист кортежей [(user_name: user_id)]
|
||||
"""
|
||||
users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
|
||||
message = "Список заблокированных пользователей:\n"
|
||||
|
||||
for user in users:
|
||||
message += f"Пользователь: {user[0]}\n"
|
||||
message += f"Причина бана: {user[2]}\n"
|
||||
message += f"Дата разбана: {user[3]}\n\n"
|
||||
return message
|
||||
|
||||
|
||||
def get_banned_users_buttons():
|
||||
"""
|
||||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||||
|
||||
Args:
|
||||
offset: отступ для запроса в базу данных
|
||||
|
||||
Returns:
|
||||
message - текст сообщения
|
||||
user_ids - лист кортежей [(user_name: user_id)]
|
||||
"""
|
||||
users = BotDB.get_banned_users_from_db()
|
||||
user_ids = []
|
||||
|
||||
for user in users:
|
||||
user_ids.append((user[0], user[1]))
|
||||
return user_ids
|
||||
|
||||
|
||||
def delete_user_blacklist(user_id: int):
|
||||
return BotDB.delete_user_blacklist(user_id=user_id)
|
||||
|
||||
|
||||
def check_username_and_full_name(user_id: int, username: str, full_name: str):
|
||||
username_db, full_name_db = BotDB.get_username_and_full_name(user_id=user_id)
|
||||
return username != username_db or full_name != full_name_db
|
||||
|
||||
|
||||
def unban_notifier(self):
|
||||
# Получение сегодняшней даты в формате DD-MM-YYYY
|
||||
current_date = datetime.now()
|
||||
today = current_date.strftime("%d-%m-%Y")
|
||||
# Получение списка разблокированных пользователей
|
||||
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
|
||||
message = "Разблокированные пользователи:\n"
|
||||
for user_id, user_name in unblocked_users.items():
|
||||
message += f"ID: {user_id}, Имя: {user_name}\n"
|
||||
|
||||
# Отправка сообщения в канал
|
||||
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)
|
||||
import html
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from aiogram import types
|
||||
from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio
|
||||
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from logs.custom_logger import logger
|
||||
|
||||
|
||||
def get_first_name(message: types.Message) -> str:
|
||||
first_name = html.escape(message.from_user.first_name)
|
||||
return first_name
|
||||
|
||||
|
||||
def get_text_message(post_text: str, first_name: str, username: str = None):
|
||||
"""
|
||||
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
|
||||
|
||||
Args:
|
||||
post_text: Текст сообщения
|
||||
first_name: Имя автора поста
|
||||
username: Юзернейм автора поста (может быть None)
|
||||
|
||||
Returns:
|
||||
str: - Сформированный текст сообщения.
|
||||
"""
|
||||
# Формируем строку с информацией об авторе
|
||||
if username:
|
||||
author_info = f"{first_name} @{username}"
|
||||
else:
|
||||
author_info = f"{first_name} (Ник не указан)"
|
||||
|
||||
if "неанон" in post_text or "не анон" in post_text:
|
||||
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {author_info}'
|
||||
elif "анон" in post_text:
|
||||
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно'
|
||||
else:
|
||||
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {author_info}'
|
||||
|
||||
|
||||
async def download_file(message: types.Message, file_id: str):
|
||||
"""
|
||||
Скачивает файл по file_id из Telegram.
|
||||
|
||||
Args:
|
||||
message: сообщение
|
||||
file_id: File ID фотографии
|
||||
filename: Имя файла, под которым будет сохранено фото
|
||||
|
||||
Returns:
|
||||
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
|
||||
"""
|
||||
try:
|
||||
os.makedirs("files", exist_ok=True)
|
||||
os.makedirs("files/photos", exist_ok=True)
|
||||
os.makedirs("files/videos", exist_ok=True)
|
||||
os.makedirs("files/music", exist_ok=True)
|
||||
os.makedirs("files/voice", exist_ok=True)
|
||||
os.makedirs("files/video_notes", exist_ok=True)
|
||||
file = await message.bot.get_file(file_id)
|
||||
file_path = os.path.join("files", file.file_path)
|
||||
await message.bot.download_file(file_path=file.file_path, destination=file_path)
|
||||
return file_path
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка скачивания фотографии: {e}")
|
||||
return None
|
||||
|
||||
|
||||
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
|
||||
"""
|
||||
Создает MediaGroup.
|
||||
|
||||
Args:
|
||||
album: Album объект из Telegram API.
|
||||
post_caption: Текст подписи к первому фото.
|
||||
|
||||
Returns:
|
||||
Список InputMediaPhoto (MediaGroup).
|
||||
"""
|
||||
media_group = []
|
||||
|
||||
for i, message in enumerate(album):
|
||||
if message.photo:
|
||||
file_id = message.photo[-1].file_id
|
||||
media_type = 'photo'
|
||||
elif message.video:
|
||||
file_id = message.video.file_id
|
||||
media_type = 'video'
|
||||
elif message.audio:
|
||||
file_id = message.audio.file_id
|
||||
media_type = 'audio'
|
||||
else:
|
||||
# Если нет фото, видео или аудио, пропускаем сообщение
|
||||
continue
|
||||
|
||||
# Формируем объект MediaGroup с учетом типа медиа
|
||||
if i == len(album) - 1:
|
||||
if media_type == 'photo':
|
||||
media_group.append(InputMediaPhoto(media=file_id, caption=post_caption))
|
||||
elif media_type == 'video':
|
||||
media_group.append(InputMediaVideo(media=file_id, caption=post_caption))
|
||||
elif media_type == 'audio':
|
||||
media_group.append(InputMediaAudio(media=file_id, caption=post_caption))
|
||||
else:
|
||||
if media_type == 'photo':
|
||||
media_group.append(InputMediaPhoto(media=file_id))
|
||||
elif media_type == 'video':
|
||||
media_group.append(InputMediaVideo(media=file_id))
|
||||
elif media_type == 'audio':
|
||||
media_group.append(InputMediaAudio(media=file_id))
|
||||
|
||||
return media_group # Возвращаем MediaGroup
|
||||
|
||||
|
||||
async def add_in_db_media_mediagroup(sent_message, bot_db):
|
||||
"""
|
||||
Идентификатор медиа-группы
|
||||
|
||||
Args:
|
||||
sent_message: sent_message объект из Telegram API
|
||||
bot_db: Экземпляр базы данных
|
||||
|
||||
Returns:
|
||||
Список InputFile (FSInputFile).
|
||||
"""
|
||||
media_group_message_id = sent_message[-1].message_id # Получаем идентификатор медиа-группы
|
||||
for i, message in enumerate(sent_message):
|
||||
if message.photo:
|
||||
file_id = message.photo[-1].file_id
|
||||
file_path = await download_file(message, file_id=file_id)
|
||||
bot_db.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'photo')
|
||||
elif message.video:
|
||||
file_id = message.video.file_id
|
||||
file_path = await download_file(message, file_id=file_id)
|
||||
bot_db.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'video')
|
||||
else:
|
||||
# Если нет фото, видео или аудио, или другой контент, пропускаем сообщение
|
||||
continue
|
||||
|
||||
|
||||
async def add_in_db_media(sent_message, bot_db):
|
||||
"""
|
||||
Args:
|
||||
sent_message: sent_message объект из Telegram API
|
||||
bot_db: Экземпляр базы данных
|
||||
|
||||
Returns:
|
||||
Список InputFile (FSInputFile).
|
||||
"""
|
||||
if sent_message.photo:
|
||||
file_id = sent_message.photo[-1].file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'photo')
|
||||
elif sent_message.video:
|
||||
file_id = sent_message.video.file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video')
|
||||
elif sent_message.voice:
|
||||
file_id = sent_message.voice.file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'voice')
|
||||
elif sent_message.audio:
|
||||
file_id = sent_message.audio.file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'audio')
|
||||
elif sent_message.video_note:
|
||||
file_id = sent_message.video_note.file_id
|
||||
file_path = await download_file(sent_message, file_id=file_id)
|
||||
bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video_note')
|
||||
|
||||
|
||||
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
|
||||
media_group: list[InputMediaPhoto], bot_db):
|
||||
sent_message = await message.bot.send_media_group(
|
||||
chat_id=chat_id,
|
||||
media=media_group,
|
||||
)
|
||||
bot_db.add_post_in_db(sent_message[-1].message_id, sent_message[-1].caption, message.from_user.id)
|
||||
await add_in_db_media_mediagroup(sent_message, bot_db)
|
||||
message_id = sent_message[-1].message_id
|
||||
return message_id
|
||||
|
||||
|
||||
async def send_media_group_to_channel(bot, chat_id: int, post_content: list[tuple[str]], post_text: str):
|
||||
"""
|
||||
Отправляет медиа-группу с подписью к последнему файлу.
|
||||
|
||||
Args:
|
||||
bot: Экземпляр бота aiogram.
|
||||
chat_id: ID чата для отправки.
|
||||
post_content: Список кортежей с путями к файлам.
|
||||
post_text: Текст подписи.
|
||||
"""
|
||||
media = []
|
||||
for file_path in post_content:
|
||||
try:
|
||||
file = FSInputFile(path=file_path[0])
|
||||
type = file_path[1]
|
||||
if type == 'video':
|
||||
media.append(types.InputMediaVideo(media=file))
|
||||
if type == 'photo':
|
||||
media.append(types.InputMediaPhoto(media=file))
|
||||
except FileNotFoundError:
|
||||
logger.error(f"Файл не найден: {file_path[0]}")
|
||||
return
|
||||
|
||||
# Добавляем подпись к последнему файлу
|
||||
if media:
|
||||
media[-1].caption = post_text
|
||||
|
||||
await bot.send_media_group(chat_id=chat_id, media=media)
|
||||
|
||||
|
||||
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_message(
|
||||
chat_id=chat_id,
|
||||
text=post_text
|
||||
)
|
||||
message_id = sent_message.message_id
|
||||
return message_id
|
||||
else:
|
||||
sent_message = await message.bot.send_message(
|
||||
chat_id=chat_id,
|
||||
text=post_text,
|
||||
reply_markup=markup
|
||||
)
|
||||
message_id = sent_message.message_id
|
||||
return message_id
|
||||
|
||||
|
||||
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_photo(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
photo=photo
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_photo(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
photo=photo,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_video(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
video=video
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_video(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
video=video,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
async def send_video_note_message(chat_id, message: types.Message, video_note: str,
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_video_note(
|
||||
chat_id=chat_id,
|
||||
video_note=video_note
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_video_note(
|
||||
chat_id=chat_id,
|
||||
video_note=video_note,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_audio(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
audio=audio
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_audio(
|
||||
chat_id=chat_id,
|
||||
caption=post_text,
|
||||
audio=audio,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
async def send_voice_message(chat_id, message: types.Message, voice: str,
|
||||
markup: types.ReplyKeyboardMarkup = None):
|
||||
if markup is None:
|
||||
sent_message = await message.bot.send_voice(
|
||||
chat_id=chat_id,
|
||||
voice=voice
|
||||
)
|
||||
else:
|
||||
sent_message = await message.bot.send_voice(
|
||||
chat_id=chat_id,
|
||||
voice=voice,
|
||||
reply_markup=markup
|
||||
)
|
||||
return sent_message
|
||||
|
||||
|
||||
def check_access(user_id: int, bot_db):
|
||||
"""Проверка прав на совершение действий"""
|
||||
return bot_db.is_admin(user_id)
|
||||
|
||||
|
||||
def add_days_to_date(days: int):
|
||||
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
|
||||
current_date = datetime.now()
|
||||
future_date = current_date + timedelta(days=days)
|
||||
formatted_date = future_date.strftime("%d-%m-%Y")
|
||||
return formatted_date
|
||||
|
||||
|
||||
def get_banned_users_list(offset: int, bot_db):
|
||||
"""
|
||||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||||
|
||||
Args:
|
||||
offset: отступ для запроса в базу данных
|
||||
bot_db: Экземпляр базы данных
|
||||
|
||||
Returns:
|
||||
message - текст сообщения
|
||||
user_ids - лист кортежей [(user_name: user_id)]
|
||||
"""
|
||||
users = bot_db.get_banned_users_from_db_with_limits(limit=7, offset=offset)
|
||||
message = "Список заблокированных пользователей:\n"
|
||||
|
||||
for user in users:
|
||||
message += f"Пользователь: {user[0]}\n"
|
||||
message += f"Причина бана: {user[2]}\n"
|
||||
message += f"Дата разбана: {user[3]}\n\n"
|
||||
return message
|
||||
|
||||
|
||||
def get_banned_users_buttons(bot_db):
|
||||
"""
|
||||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||||
|
||||
Args:
|
||||
bot_db: Экземпляр базы данных
|
||||
|
||||
Returns:
|
||||
message - текст сообщения
|
||||
user_ids - лист кортежей [(user_name: user_id)]
|
||||
"""
|
||||
users = bot_db.get_banned_users_from_db()
|
||||
user_ids = []
|
||||
|
||||
for user in users:
|
||||
user_ids.append((user[0], user[1]))
|
||||
return user_ids
|
||||
|
||||
|
||||
def delete_user_blacklist(user_id: int, bot_db):
|
||||
return bot_db.delete_user_blacklist(user_id=user_id)
|
||||
|
||||
|
||||
def check_username_and_full_name(user_id: int, username: str, full_name: str, bot_db):
|
||||
username_db, full_name_db = bot_db.get_username_and_full_name(user_id=user_id)
|
||||
return username != username_db or full_name != full_name_db
|
||||
|
||||
|
||||
def unban_notifier(self):
|
||||
# Получение сегодняшней даты в формате DD-MM-YYYY
|
||||
current_date = datetime.now()
|
||||
today = current_date.strftime("%d-%m-%Y")
|
||||
# Получение списка разблокированных пользователей
|
||||
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
|
||||
message = "Разблокированные пользователи:\n"
|
||||
for user_id, user_name in unblocked_users.items():
|
||||
message += f"ID: {user_id}, Имя: {user_name}\n"
|
||||
|
||||
# Отправка сообщения в канал
|
||||
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)
|
||||
|
||||
Reference in New Issue
Block a user