mediaGroup start work
This commit is contained in:
Binary file not shown.
Binary file not shown.
@@ -1,139 +1,139 @@
|
||||
import traceback
|
||||
|
||||
from aiogram import Router, types, F
|
||||
from aiogram.filters import Command, StateFilter
|
||||
from aiogram.fsm.context import FSMContext
|
||||
|
||||
from helper_bot.filters.main import ChatTypeFilter
|
||||
from helper_bot.keyboards.main import get_reply_keyboard_admin, create_keyboard_with_pagination, \
|
||||
create_keyboard_for_ban_days, create_keyboard_for_approve_ban
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list
|
||||
from logs.custom_logger import logger
|
||||
|
||||
|
||||
admin_router = Router()
|
||||
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
Command('admin')
|
||||
)
|
||||
async def admin_panel(message: types.Message, state: FSMContext):
|
||||
try:
|
||||
if check_access(message.from_user.id):
|
||||
await state.set_state("ADMIN")
|
||||
logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
|
||||
markup = get_reply_keyboard_admin()
|
||||
await message.answer("Добро пожаловать в админку. Выбери что хочешь:",
|
||||
reply_markup=markup)
|
||||
else:
|
||||
await message.answer('Доступ запрещен, досвидания!')
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при запуске админ панели: {e}")
|
||||
await message.bot.send_message(IMPORTANT_LOGS,
|
||||
f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("ADMIN"),
|
||||
F.text == 'Бан (Список)'
|
||||
)
|
||||
async def get_last_users(message: types.Message):
|
||||
logger.info(
|
||||
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
|
||||
list_users = BotDB.get_last_users_from_db()
|
||||
keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
|
||||
await message.answer(text="Список пользователей которые последними обращались к боту",
|
||||
reply_markup=keyboard)
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("ADMIN"),
|
||||
F.text == 'Разбан (список)'
|
||||
)
|
||||
async def get_banned_users(message):
|
||||
logger.info(
|
||||
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
|
||||
message_text = get_banned_users_list(0)
|
||||
buttons_list = get_banned_users_buttons()
|
||||
if buttons_list:
|
||||
k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock')
|
||||
await message.answer(text=message_text, reply_markup=k)
|
||||
else:
|
||||
await message.answer(text="В списке забанненых пользователей никого нет")
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("BAN_2")
|
||||
)
|
||||
async def ban_user_step_2(message: types.Message, state: FSMContext):
|
||||
user_data = await state.get_data()
|
||||
logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})")
|
||||
await state.update_data(message_for_user=message.text)
|
||||
markup = create_keyboard_for_ban_days()
|
||||
await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
|
||||
f"его в чат", reply_markup=markup)
|
||||
await state.set_state("BAN_3")
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("BAN_3")
|
||||
)
|
||||
async def ban_user_step_3(message: types.Message, state: FSMContext):
|
||||
logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}")
|
||||
if message.text != 'Навсегда':
|
||||
count_days = int(message.text)
|
||||
date_to_unban = add_days_to_date(count_days)
|
||||
else:
|
||||
date_to_unban = None
|
||||
logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}")
|
||||
await state.update_data(date_to_unban=date_to_unban)
|
||||
user_data = await state.get_data()
|
||||
markup = create_keyboard_for_approve_ban()
|
||||
await message.answer(
|
||||
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}",
|
||||
reply_markup=markup)
|
||||
await state.set_state("BAN_FINAL")
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("BAN_FINAL")
|
||||
)
|
||||
async def approve_ban(message: types.Message, state: FSMContext):
|
||||
user_data = await state.get_data()
|
||||
logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})")
|
||||
if message.text == 'Подтвердить':
|
||||
exists = BotDB.check_user_in_blacklist(user_data['user_id'])
|
||||
if exists:
|
||||
await message.reply(f"Пользователь уже был заблокирован ранее.")
|
||||
logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)")
|
||||
await state.set_state('ADMIN')
|
||||
else:
|
||||
BotDB.set_user_blacklist(user_data['user_id'],
|
||||
user_data['user_name'],
|
||||
user_data['message_for_user'],
|
||||
user_data['date_to_unban'])
|
||||
await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.")
|
||||
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)")
|
||||
await state.set_state('ADMIN')
|
||||
markup = get_reply_keyboard_admin()
|
||||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||||
import traceback
|
||||
|
||||
from aiogram import Router, types, F
|
||||
from aiogram.filters import Command, StateFilter
|
||||
from aiogram.fsm.context import FSMContext
|
||||
|
||||
from helper_bot.filters.main import ChatTypeFilter
|
||||
from helper_bot.keyboards.main import get_reply_keyboard_admin, create_keyboard_with_pagination, \
|
||||
create_keyboard_for_ban_days, create_keyboard_for_approve_ban
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list
|
||||
from logs.custom_logger import logger
|
||||
|
||||
|
||||
admin_router = Router()
|
||||
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
Command('admin')
|
||||
)
|
||||
async def admin_panel(message: types.Message, state: FSMContext):
|
||||
try:
|
||||
if check_access(message.from_user.id):
|
||||
await state.set_state("ADMIN")
|
||||
logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
|
||||
markup = get_reply_keyboard_admin()
|
||||
await message.answer("Добро пожаловать в админку. Выбери что хочешь:",
|
||||
reply_markup=markup)
|
||||
else:
|
||||
await message.answer('Доступ запрещен, досвидания!')
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при запуске админ панели: {e}")
|
||||
await message.bot.send_message(IMPORTANT_LOGS,
|
||||
f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("ADMIN"),
|
||||
F.text == 'Бан (Список)'
|
||||
)
|
||||
async def get_last_users(message: types.Message):
|
||||
logger.info(
|
||||
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
|
||||
list_users = BotDB.get_last_users_from_db()
|
||||
keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
|
||||
await message.answer(text="Список пользователей которые последними обращались к боту",
|
||||
reply_markup=keyboard)
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("ADMIN"),
|
||||
F.text == 'Разбан (список)'
|
||||
)
|
||||
async def get_banned_users(message):
|
||||
logger.info(
|
||||
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
|
||||
message_text = get_banned_users_list(0)
|
||||
buttons_list = get_banned_users_buttons()
|
||||
if buttons_list:
|
||||
k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock')
|
||||
await message.answer(text=message_text, reply_markup=k)
|
||||
else:
|
||||
await message.answer(text="В списке забанненых пользователей никого нет")
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("BAN_2")
|
||||
)
|
||||
async def ban_user_step_2(message: types.Message, state: FSMContext):
|
||||
user_data = await state.get_data()
|
||||
logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})")
|
||||
await state.update_data(message_for_user=message.text)
|
||||
markup = create_keyboard_for_ban_days()
|
||||
await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
|
||||
f"его в чат", reply_markup=markup)
|
||||
await state.set_state("BAN_3")
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("BAN_3")
|
||||
)
|
||||
async def ban_user_step_3(message: types.Message, state: FSMContext):
|
||||
logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}")
|
||||
if message.text != 'Навсегда':
|
||||
count_days = int(message.text)
|
||||
date_to_unban = add_days_to_date(count_days)
|
||||
else:
|
||||
date_to_unban = None
|
||||
logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}")
|
||||
await state.update_data(date_to_unban=date_to_unban)
|
||||
user_data = await state.get_data()
|
||||
markup = create_keyboard_for_approve_ban()
|
||||
await message.answer(
|
||||
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}",
|
||||
reply_markup=markup)
|
||||
await state.set_state("BAN_FINAL")
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("BAN_FINAL")
|
||||
)
|
||||
async def approve_ban(message: types.Message, state: FSMContext):
|
||||
user_data = await state.get_data()
|
||||
logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})")
|
||||
if message.text == 'Подтвердить':
|
||||
exists = BotDB.check_user_in_blacklist(user_data['user_id'])
|
||||
if exists:
|
||||
await message.reply(f"Пользователь уже был заблокирован ранее.")
|
||||
logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)")
|
||||
await state.set_state('ADMIN')
|
||||
else:
|
||||
BotDB.set_user_blacklist(user_data['user_id'],
|
||||
user_data['user_name'],
|
||||
user_data['message_for_user'],
|
||||
user_data['date_to_unban'])
|
||||
await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.")
|
||||
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)")
|
||||
await state.set_state('ADMIN')
|
||||
markup = get_reply_keyboard_admin()
|
||||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -1,158 +1,162 @@
|
||||
import traceback
|
||||
|
||||
from aiogram import Router, F
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.types import CallbackQuery
|
||||
|
||||
from helper_bot.keyboards.main import create_keyboard_with_pagination, get_reply_keyboard_admin, \
|
||||
create_keyboard_for_ban_reason
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \
|
||||
get_banned_users_buttons, delete_user_blacklist, get_help_message_id, send_media_group_message
|
||||
from logs.custom_logger import logger
|
||||
|
||||
callback_router = Router()
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "publish"
|
||||
)
|
||||
async def post_for_group(call: CallbackQuery, state: FSMContext):
|
||||
logger.info(
|
||||
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
|
||||
if call.data == 'publish' and call.message.content_type == 'text' and call.message.text != "^":
|
||||
try:
|
||||
await send_text_message(MAIN_PUBLIC, call.message, call.message.text)
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
|
||||
except Exception as e:
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.data == 'publish' and call.message.content_type == 'photo':
|
||||
try:
|
||||
await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, call.message.caption)
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
|
||||
except Exception as e:
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.data == 'publish' and call.message.text == "^":
|
||||
user_data = await state.get_data()
|
||||
media_group_message_id = get_help_message_id(call.message.message_id, user_data)
|
||||
await call.bot.copy_message(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST,message_id=media_group_message_id, reply_markup=None)
|
||||
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
|
||||
print(user_data['media_group_message_id'])
|
||||
print(user_data['help_message_id'])
|
||||
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "decline"
|
||||
)
|
||||
async def decline_post_for_group(call: CallbackQuery, state: FSMContext):
|
||||
logger.info(
|
||||
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
|
||||
try:
|
||||
if call.message.content_type == 'text' and call.message.text != "^" or call.message.content_type == 'photo':
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(
|
||||
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
|
||||
await call.answer(text='Отклонено!', show_alert=True, cache_time=3)
|
||||
if call.message.text == '^':
|
||||
user_data = await state.get_data()
|
||||
media_group_message_id = get_help_message_id(call.message.message_id, user_data)
|
||||
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
|
||||
except Exception as e:
|
||||
await call.bot.send_message(IMPORTANT_LOGS,
|
||||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('ban')
|
||||
)
|
||||
async def process_ban_user(call: CallbackQuery, state: FSMContext):
|
||||
user_id = call.data[4:]
|
||||
logger.info(
|
||||
f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}")
|
||||
user_name = BotDB.get_username(user_id=user_id)
|
||||
if user_name:
|
||||
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
|
||||
date_to_unban=None)
|
||||
markup = create_keyboard_for_ban_reason()
|
||||
await call.message.answer(
|
||||
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name}\nИмя:{call.message.from_user.full_name}\nВыбери причину бана из списка или напиши ее в чат",
|
||||
reply_markup=markup)
|
||||
await state.set_state('BAN_2')
|
||||
else:
|
||||
markup = get_reply_keyboard_admin()
|
||||
await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup)
|
||||
await state.set_state('ADMIN')
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('unlock')
|
||||
)
|
||||
async def process_unlock_user(call: CallbackQuery):
|
||||
user_id = call.data[7:]
|
||||
user_name = BotDB.get_username(user_id=user_id)
|
||||
delete_user_blacklist(user_id)
|
||||
logger.info(f"Разблокирован пользователь с ID: {user_id} username:{user_name}")
|
||||
username = BotDB.get_username(user_id)
|
||||
await call.answer(f'Пользователь разблокирован {username}', show_alert=True)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == 'return'
|
||||
)
|
||||
async def return_to_main_menu(call: CallbackQuery):
|
||||
await call.message.delete()
|
||||
logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}")
|
||||
markup = get_reply_keyboard_admin()
|
||||
await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:",
|
||||
reply_markup=markup)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('page')
|
||||
)
|
||||
async def change_page(call: CallbackQuery):
|
||||
page_number = int(call.data[5:])
|
||||
logger.info(f"Переход на страницу {page_number}")
|
||||
if call.message.text == 'Список пользователей которые последними обращались к боту':
|
||||
list_users = BotDB.get_last_users_from_db()
|
||||
#TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range
|
||||
keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
|
||||
'ban')
|
||||
|
||||
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
reply_markup=keyboard)
|
||||
else:
|
||||
#Готовим сообщения
|
||||
message_user = get_banned_users_list(int(page_number) * 7 - 7)
|
||||
await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
text=message_user)
|
||||
|
||||
#Готовим клавиатуру
|
||||
buttons = get_banned_users_buttons()
|
||||
keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
|
||||
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
reply_markup=keyboard)
|
||||
import traceback
|
||||
|
||||
from aiogram import Router, F
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.types import CallbackQuery
|
||||
|
||||
from helper_bot.keyboards.main import create_keyboard_with_pagination, get_reply_keyboard_admin, \
|
||||
create_keyboard_for_ban_reason
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \
|
||||
get_banned_users_buttons, delete_user_blacklist, get_help_message_id, send_media_group_message
|
||||
from logs.custom_logger import logger
|
||||
|
||||
callback_router = Router()
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "publish"
|
||||
)
|
||||
async def post_for_group(call: CallbackQuery, state: FSMContext):
|
||||
logger.info(
|
||||
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
|
||||
if call.data == 'publish' and call.message.content_type == 'text' and call.message.text != "^":
|
||||
try:
|
||||
await send_text_message(MAIN_PUBLIC, call.message, call.message.text)
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
|
||||
except Exception as e:
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.data == 'publish' and call.message.content_type == 'photo':
|
||||
try:
|
||||
await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, call.message.caption)
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
|
||||
except Exception as e:
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.data == 'publish' and call.message.text == "^":
|
||||
print('Попали куда надо')
|
||||
user_data = await state.get_data()
|
||||
print(f'В CALLBACK ГЕТ ДАТА: {user_data}')
|
||||
media_group_message_id = get_help_message_id(call.message.message_id, user_data)
|
||||
print(f'ПОЛУЧАЮ МЕССАГ ГРУПП АЙДИ ИЗ ГЕТ ДАТЫ: {media_group_message_id}')
|
||||
print(f'ИДЕНТИФИКАТОР СООБЩЕНИЯ У КОТОРОГО ЖМУ КОЛЛБЭК: {call.message.message_id}')
|
||||
await call.bot.copy_message(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST, message_id=media_group_message_id, reply_markup=None)
|
||||
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
|
||||
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "decline"
|
||||
)
|
||||
async def decline_post_for_group(call: CallbackQuery, state: FSMContext):
|
||||
|
||||
logger.info(
|
||||
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
|
||||
try:
|
||||
if call.message.content_type == 'text' and call.message.text != "^" or call.message.content_type == 'photo':
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(
|
||||
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
|
||||
await call.answer(text='Отклонено!', show_alert=True, cache_time=3)
|
||||
if call.message.text == '^':
|
||||
user_data = await state.get_data()
|
||||
media_group_message_id = get_help_message_id(call.message.message_id, user_data)
|
||||
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
|
||||
except Exception as e:
|
||||
await call.bot.send_message(IMPORTANT_LOGS,
|
||||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('ban')
|
||||
)
|
||||
async def process_ban_user(call: CallbackQuery, state: FSMContext):
|
||||
user_id = call.data[4:]
|
||||
logger.info(
|
||||
f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}")
|
||||
user_name = BotDB.get_username(user_id=user_id)
|
||||
if user_name:
|
||||
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
|
||||
date_to_unban=None)
|
||||
markup = create_keyboard_for_ban_reason()
|
||||
await call.message.answer(
|
||||
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name}\nИмя:{call.message.from_user.full_name}\nВыбери причину бана из списка или напиши ее в чат",
|
||||
reply_markup=markup)
|
||||
await state.set_state('BAN_2')
|
||||
else:
|
||||
markup = get_reply_keyboard_admin()
|
||||
await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup)
|
||||
await state.set_state('ADMIN')
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('unlock')
|
||||
)
|
||||
async def process_unlock_user(call: CallbackQuery):
|
||||
user_id = call.data[7:]
|
||||
user_name = BotDB.get_username(user_id=user_id)
|
||||
delete_user_blacklist(user_id)
|
||||
logger.info(f"Разблокирован пользователь с ID: {user_id} username:{user_name}")
|
||||
username = BotDB.get_username(user_id)
|
||||
await call.answer(f'Пользователь разблокирован {username}', show_alert=True)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == 'return'
|
||||
)
|
||||
async def return_to_main_menu(call: CallbackQuery):
|
||||
await call.message.delete()
|
||||
logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}")
|
||||
markup = get_reply_keyboard_admin()
|
||||
await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:",
|
||||
reply_markup=markup)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('page')
|
||||
)
|
||||
async def change_page(call: CallbackQuery):
|
||||
page_number = int(call.data[5:])
|
||||
logger.info(f"Переход на страницу {page_number}")
|
||||
if call.message.text == 'Список пользователей которые последними обращались к боту':
|
||||
list_users = BotDB.get_last_users_from_db()
|
||||
#TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range
|
||||
keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
|
||||
'ban')
|
||||
|
||||
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
reply_markup=keyboard)
|
||||
else:
|
||||
#Готовим сообщения
|
||||
message_user = get_banned_users_list(int(page_number) * 7 - 7)
|
||||
await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
text=message_user)
|
||||
|
||||
#Готовим клавиатуру
|
||||
buttons = get_banned_users_buttons()
|
||||
keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
|
||||
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
reply_markup=keyboard)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -1,50 +1,50 @@
|
||||
from aiogram import Router, types
|
||||
from aiogram.fsm.context import FSMContext
|
||||
|
||||
from helper_bot.filters.main import ChatTypeFilter
|
||||
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from helper_bot.utils.helper_func import send_text_message
|
||||
from logs.custom_logger import logger
|
||||
|
||||
group_router = Router()
|
||||
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@group_router.message(
|
||||
ChatTypeFilter(chat_type=["group", "supergroup"]),
|
||||
)
|
||||
async def handle_message(message: types.Message, state: FSMContext):
|
||||
"""Функция ответа админа пользователю через закрытый чат"""
|
||||
logger.info(
|
||||
f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"')
|
||||
markup = get_reply_keyboard_leave_chat()
|
||||
message_id = 0
|
||||
try:
|
||||
message_id = message.reply_to_message.message_id
|
||||
except AttributeError as e:
|
||||
await message.answer('Блять, выдели сообщение!')
|
||||
logger.warning(
|
||||
f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа. Ошибка {str(e)}')
|
||||
message_from_admin = message.text
|
||||
try:
|
||||
chat_id = BotDB.get_user_by_message_id(message_id)
|
||||
await send_text_message(chat_id, message, message_from_admin, markup)
|
||||
await state.set_state("CHAT")
|
||||
logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id} на сообщение {message_id}')
|
||||
except TypeError as e:
|
||||
await message.answer('Не могу найти кому ответить в базе, проебали сообщение.')
|
||||
logger.error(
|
||||
f'Ошибка при поиске пользователя в базе для ответа на сообщение: {message.text} в группе {message.chat.title} (ID сообщения: {message.message_id}) Ошибка: {str(e)}')
|
||||
from aiogram import Router, types
|
||||
from aiogram.fsm.context import FSMContext
|
||||
|
||||
from helper_bot.filters.main import ChatTypeFilter
|
||||
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from helper_bot.utils.helper_func import send_text_message
|
||||
from logs.custom_logger import logger
|
||||
|
||||
group_router = Router()
|
||||
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@group_router.message(
|
||||
ChatTypeFilter(chat_type=["group", "supergroup"]),
|
||||
)
|
||||
async def handle_message(message: types.Message, state: FSMContext):
|
||||
"""Функция ответа админа пользователю через закрытый чат"""
|
||||
logger.info(
|
||||
f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"')
|
||||
markup = get_reply_keyboard_leave_chat()
|
||||
message_id = 0
|
||||
try:
|
||||
message_id = message.reply_to_message.message_id
|
||||
except AttributeError as e:
|
||||
await message.answer('Блять, выдели сообщение!')
|
||||
logger.warning(
|
||||
f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа. Ошибка {str(e)}')
|
||||
message_from_admin = message.text
|
||||
try:
|
||||
chat_id = BotDB.get_user_by_message_id(message_id)
|
||||
await send_text_message(chat_id, message, message_from_admin, markup)
|
||||
await state.set_state("CHAT")
|
||||
logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id} на сообщение {message_id}')
|
||||
except TypeError as e:
|
||||
await message.answer('Не могу найти кому ответить в базе, проебали сообщение.')
|
||||
logger.error(
|
||||
f'Ошибка при поиске пользователя в базе для ответа на сообщение: {message.text} в группе {message.chat.title} (ID сообщения: {message.message_id}) Ошибка: {str(e)}')
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -1,282 +1,284 @@
|
||||
import random
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
|
||||
from aiogram import types, Router, F
|
||||
from aiogram.filters import Command, StateFilter
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.types import FSInputFile
|
||||
|
||||
from helper_bot.filters.main import ChatTypeFilter
|
||||
from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post
|
||||
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
|
||||
from helper_bot.middlewares.album_middleware import AlbumMiddleware
|
||||
from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware
|
||||
from helper_bot.utils import messages
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \
|
||||
process_photo_album, send_media_group_message, check_username_and_full_name
|
||||
from logs.custom_logger import logger
|
||||
|
||||
private_router = Router()
|
||||
|
||||
private_router.message.middleware(AlbumMiddleware())
|
||||
private_router.message.middleware(BlacklistMiddleware())
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
Command("start")
|
||||
)
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == 'Вернуться в бота'
|
||||
)
|
||||
async def handle_start_message(message: types.Message, state: FSMContext):
|
||||
try:
|
||||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||||
full_name = message.from_user.full_name
|
||||
username = message.from_user.username
|
||||
first_name = message.from_user.first_name
|
||||
is_bot = message.from_user.is_bot
|
||||
language_code = message.from_user.language_code
|
||||
user_id = message.from_user.id
|
||||
current_date = datetime.now()
|
||||
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
|
||||
if not BotDB.user_exists(user_id):
|
||||
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
|
||||
date)
|
||||
else:
|
||||
is_need_update = check_username_and_full_name(user_id, username, full_name)
|
||||
if is_need_update:
|
||||
BotDB.update_username_and_full_name(user_id, username, full_name)
|
||||
await message.answer(f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name} и ник @{username}")
|
||||
await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}')
|
||||
sleep(1)
|
||||
BotDB.update_date_for_user(date, user_id)
|
||||
await state.set_state("START")
|
||||
logger.info(
|
||||
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} "
|
||||
f"Имя автора сообщения: {message.from_user.full_name})")
|
||||
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
|
||||
random_stick_hello = random.choice(name_stick_hello)
|
||||
random_stick_hello = FSInputFile(path=random_stick_hello)
|
||||
logger.info(f"Стикер успешно получен из БД")
|
||||
await message.answer_sticker(random_stick_hello)
|
||||
sleep(0.3)
|
||||
except Exception as e:
|
||||
logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}")
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
try:
|
||||
|
||||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE')
|
||||
await message.answer(hello_message, reply_markup=markup)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
|
||||
await message.bot.send_message(IMPORTANT_LOGS,
|
||||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
StateFilter("START"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == '📢Предложить свой пост'
|
||||
)
|
||||
async def suggest_post(message: types.Message, state: FSMContext):
|
||||
try:
|
||||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||||
await state.set_state("SUGGEST")
|
||||
current_state = await state.get_state()
|
||||
logger.info(
|
||||
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
|
||||
markup = types.ReplyKeyboardRemove()
|
||||
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
|
||||
await message.answer(suggest_news)
|
||||
sleep(0.3)
|
||||
suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2')
|
||||
await message.answer(suggest_news_2, reply_markup=markup)
|
||||
except Exception as e:
|
||||
await message.bot.send_message(IMPORTANT_LOGS,
|
||||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == '👋🏼Сказать пока!'
|
||||
)
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == 'Выйти из чата'
|
||||
)
|
||||
async def end_message(message: types.Message, state: FSMContext):
|
||||
try:
|
||||
logger.info(
|
||||
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
|
||||
random_stick_bye = random.choice(name_stick_bye)
|
||||
random_stick_bye = FSInputFile(path=random_stick_bye)
|
||||
await message.answer_sticker(random_stick_bye)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
try:
|
||||
markup = types.ReplyKeyboardRemove()
|
||||
bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE')
|
||||
await message.answer(bye_message, reply_markup=markup)
|
||||
await state.set_state("START")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
StateFilter("SUGGEST"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
)
|
||||
async def suggest_router(message: types.Message, state: FSMContext, album: list = None):
|
||||
logger.info(
|
||||
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
try:
|
||||
if message.content_type == 'text':
|
||||
lower_text = message.text.lower()
|
||||
post_text, is_anonymous = get_text_message(lower_text, message.from_user.full_name,
|
||||
message.from_user.username)
|
||||
markup = get_reply_keyboard_for_post()
|
||||
if is_anonymous:
|
||||
await send_text_message(GROUP_FOR_POST, message, post_text, markup)
|
||||
else:
|
||||
await send_text_message(GROUP_FOR_POST, message, post_text, markup)
|
||||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||||
await state.set_state("START")
|
||||
elif message.content_type == 'photo' and message.media_group_id is None:
|
||||
lower_caption = message.caption.lower()
|
||||
markup = get_reply_keyboard_for_post()
|
||||
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
|
||||
message.from_user.username)
|
||||
#TODO: тут какая-то шляпа
|
||||
if is_anonymous:
|
||||
await send_photo_message(GROUP_FOR_POST, message,
|
||||
message.photo[-1].file_id, post_caption, markup)
|
||||
else:
|
||||
await send_photo_message(GROUP_FOR_POST, message,
|
||||
message.photo[-1].file_id, post_caption, markup)
|
||||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||||
await state.set_state("START")
|
||||
elif message.media_group_id is not None:
|
||||
post_caption = " "
|
||||
if album[0].caption:
|
||||
lower_caption = album[0].caption.lower()
|
||||
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
|
||||
message.from_user.username)
|
||||
media_group = process_photo_album(album, post_caption)
|
||||
media_group_message_id = await send_media_group_message(GROUP_FOR_POST, message,
|
||||
media_group)
|
||||
sleep(0.2)
|
||||
markup = get_reply_keyboard_for_post()
|
||||
help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup)
|
||||
await state.update_data(media_group_message_id=media_group_message_id, help_message_id=help_message_id)
|
||||
|
||||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||||
await state.set_state("START")
|
||||
else:
|
||||
await message.bot.send_message(message.chat.id,
|
||||
'Я пока не умею работать с таким сообщением. Пришли текст и фото/фоты(ы)')
|
||||
except Exception as e:
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == '🤪Хочу стикеры'
|
||||
)
|
||||
async def stickers(message: types.Message, state: FSMContext):
|
||||
logger.info(
|
||||
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
try:
|
||||
BotDB.update_info_about_stickers(user_id=message.from_user.id)
|
||||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||||
await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
|
||||
reply_markup=markup)
|
||||
await state.set_state("START")
|
||||
except Exception as e:
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(
|
||||
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
StateFilter("START"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == '📩Связаться с админами'
|
||||
)
|
||||
async def connect_with_admin(message: types.Message, state: FSMContext):
|
||||
logger.info(
|
||||
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN')
|
||||
await message.answer(admin_message, parse_mode="html")
|
||||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||||
await state.set_state("PRE_CHAT")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
StateFilter("PRE_CHAT"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
)
|
||||
@private_router.message(
|
||||
StateFilter("CHAT"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
)
|
||||
async def resend_message_in_group_for_message(message: types.Message, state: FSMContext):
|
||||
logger.info(
|
||||
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})")
|
||||
await message.forward(chat_id=GROUP_FOR_MESSAGE)
|
||||
current_date = datetime.now()
|
||||
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
|
||||
BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
|
||||
question = messages.get_message(get_first_name(message), 'QUESTION')
|
||||
user_state = await state.get_state()
|
||||
if user_state == "PRE_CHAT":
|
||||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
await message.answer(question, reply_markup=markup)
|
||||
await state.set_state("START")
|
||||
elif user_state == "CHAT":
|
||||
markup = get_reply_keyboard_leave_chat()
|
||||
await message.answer(question, reply_markup=markup)
|
||||
|
||||
# @private_router.message(
|
||||
# ChatTypeFilter(chat_type=["private"])
|
||||
# )
|
||||
# async def default(message: types.Message, state: FSMContext):
|
||||
# markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
# await message.answer('Кажется ты заблудился. Держи клавиатуру, твое состояние сброшено на начало', reply_markup=markup)
|
||||
# await state.set_state("START")
|
||||
import random
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from time import sleep
|
||||
|
||||
from aiogram import types, Router, F
|
||||
from aiogram.filters import Command, StateFilter
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.types import FSInputFile
|
||||
|
||||
from helper_bot.filters.main import ChatTypeFilter
|
||||
from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post
|
||||
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
|
||||
from helper_bot.middlewares.album_middleware import AlbumMiddleware
|
||||
from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware
|
||||
from helper_bot.utils import messages
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \
|
||||
process_photo_album, send_media_group_message, check_username_and_full_name
|
||||
from logs.custom_logger import logger
|
||||
|
||||
private_router = Router()
|
||||
|
||||
private_router.message.middleware(AlbumMiddleware())
|
||||
private_router.message.middleware(BlacklistMiddleware())
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
Command("start")
|
||||
)
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == 'Вернуться в бота'
|
||||
)
|
||||
async def handle_start_message(message: types.Message, state: FSMContext):
|
||||
try:
|
||||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||||
full_name = message.from_user.full_name
|
||||
username = message.from_user.username
|
||||
first_name = message.from_user.first_name
|
||||
is_bot = message.from_user.is_bot
|
||||
language_code = message.from_user.language_code
|
||||
user_id = message.from_user.id
|
||||
current_date = datetime.now()
|
||||
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
|
||||
if not BotDB.user_exists(user_id):
|
||||
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
|
||||
date)
|
||||
else:
|
||||
is_need_update = check_username_and_full_name(user_id, username, full_name)
|
||||
if is_need_update:
|
||||
BotDB.update_username_and_full_name(user_id, username, full_name)
|
||||
await message.answer(f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name} и ник @{username}")
|
||||
await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}')
|
||||
sleep(1)
|
||||
BotDB.update_date_for_user(date, user_id)
|
||||
await state.set_state("START")
|
||||
logger.info(
|
||||
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} "
|
||||
f"Имя автора сообщения: {message.from_user.full_name})")
|
||||
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
|
||||
random_stick_hello = random.choice(name_stick_hello)
|
||||
random_stick_hello = FSInputFile(path=random_stick_hello)
|
||||
logger.info(f"Стикер успешно получен из БД")
|
||||
await message.answer_sticker(random_stick_hello)
|
||||
sleep(0.3)
|
||||
except Exception as e:
|
||||
logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}")
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
try:
|
||||
|
||||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE')
|
||||
await message.answer(hello_message, reply_markup=markup)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
|
||||
await message.bot.send_message(IMPORTANT_LOGS,
|
||||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
StateFilter("START"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == '📢Предложить свой пост'
|
||||
)
|
||||
async def suggest_post(message: types.Message, state: FSMContext):
|
||||
try:
|
||||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||||
await state.set_state("SUGGEST")
|
||||
current_state = await state.get_state()
|
||||
logger.info(
|
||||
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
|
||||
markup = types.ReplyKeyboardRemove()
|
||||
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
|
||||
await message.answer(suggest_news)
|
||||
sleep(0.3)
|
||||
suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2')
|
||||
await message.answer(suggest_news_2, reply_markup=markup)
|
||||
except Exception as e:
|
||||
await message.bot.send_message(IMPORTANT_LOGS,
|
||||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == '👋🏼Сказать пока!'
|
||||
)
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == 'Выйти из чата'
|
||||
)
|
||||
async def end_message(message: types.Message, state: FSMContext):
|
||||
try:
|
||||
logger.info(
|
||||
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
|
||||
random_stick_bye = random.choice(name_stick_bye)
|
||||
random_stick_bye = FSInputFile(path=random_stick_bye)
|
||||
await message.answer_sticker(random_stick_bye)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
try:
|
||||
markup = types.ReplyKeyboardRemove()
|
||||
bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE')
|
||||
await message.answer(bye_message, reply_markup=markup)
|
||||
await state.set_state("START")
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
StateFilter("SUGGEST"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
)
|
||||
async def suggest_router(message: types.Message, state: FSMContext, album: list = None):
|
||||
logger.info(
|
||||
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
try:
|
||||
if message.content_type == 'text':
|
||||
lower_text = message.text.lower()
|
||||
post_text, is_anonymous = get_text_message(lower_text, message.from_user.full_name,
|
||||
message.from_user.username)
|
||||
markup = get_reply_keyboard_for_post()
|
||||
if is_anonymous:
|
||||
await send_text_message(GROUP_FOR_POST, message, post_text, markup)
|
||||
else:
|
||||
await send_text_message(GROUP_FOR_POST, message, post_text, markup)
|
||||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||||
await state.set_state("START")
|
||||
elif message.content_type == 'photo' and message.media_group_id is None:
|
||||
lower_caption = message.caption.lower()
|
||||
markup = get_reply_keyboard_for_post()
|
||||
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
|
||||
message.from_user.username)
|
||||
#TODO: тут какая-то шляпа
|
||||
if is_anonymous:
|
||||
await send_photo_message(GROUP_FOR_POST, message,
|
||||
message.photo[-1].file_id, post_caption, markup)
|
||||
else:
|
||||
await send_photo_message(GROUP_FOR_POST, message,
|
||||
message.photo[-1].file_id, post_caption, markup)
|
||||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||||
await state.set_state("START")
|
||||
elif message.media_group_id is not None:
|
||||
post_caption = " "
|
||||
if album[0].caption:
|
||||
lower_caption = album[0].caption.lower()
|
||||
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
|
||||
message.from_user.username)
|
||||
media_group = process_photo_album(album, post_caption)
|
||||
media_group_message_id = await send_media_group_message(GROUP_FOR_POST, message,
|
||||
media_group)
|
||||
print(f'Отправил в чат предложки сообщения, media_group_message_id:{media_group_message_id}\n\n')
|
||||
sleep(0.2)
|
||||
markup = get_reply_keyboard_for_post()
|
||||
help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup)
|
||||
await state.update_data(media_group_message_id=media_group_message_id, help_message_id=help_message_id)
|
||||
d = await state.get_data()
|
||||
print(f'ЗАПИСАЛ В state.update_data {d}')
|
||||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||||
await state.set_state("START")
|
||||
else:
|
||||
await message.bot.send_message(message.chat.id,
|
||||
'Я пока не умею работать с таким сообщением. Пришли текст и фото/фоты(ы)')
|
||||
except Exception as e:
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == '🤪Хочу стикеры'
|
||||
)
|
||||
async def stickers(message: types.Message, state: FSMContext):
|
||||
logger.info(
|
||||
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
try:
|
||||
BotDB.update_info_about_stickers(user_id=message.from_user.id)
|
||||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||||
await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
|
||||
reply_markup=markup)
|
||||
await state.set_state("START")
|
||||
except Exception as e:
|
||||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(
|
||||
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
StateFilter("START"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
F.text == '📩Связаться с админами'
|
||||
)
|
||||
async def connect_with_admin(message: types.Message, state: FSMContext):
|
||||
logger.info(
|
||||
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||||
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN')
|
||||
await message.answer(admin_message, parse_mode="html")
|
||||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||||
await state.set_state("PRE_CHAT")
|
||||
|
||||
|
||||
@private_router.message(
|
||||
StateFilter("PRE_CHAT"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
)
|
||||
@private_router.message(
|
||||
StateFilter("CHAT"),
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
)
|
||||
async def resend_message_in_group_for_message(message: types.Message, state: FSMContext):
|
||||
logger.info(
|
||||
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})")
|
||||
await message.forward(chat_id=GROUP_FOR_MESSAGE)
|
||||
current_date = datetime.now()
|
||||
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
|
||||
BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
|
||||
question = messages.get_message(get_first_name(message), 'QUESTION')
|
||||
user_state = await state.get_state()
|
||||
if user_state == "PRE_CHAT":
|
||||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
await message.answer(question, reply_markup=markup)
|
||||
await state.set_state("START")
|
||||
elif user_state == "CHAT":
|
||||
markup = get_reply_keyboard_leave_chat()
|
||||
await message.answer(question, reply_markup=markup)
|
||||
|
||||
# @private_router.message(
|
||||
# ChatTypeFilter(chat_type=["private"])
|
||||
# )
|
||||
# async def default(message: types.Message, state: FSMContext):
|
||||
# markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||||
# await message.answer('Кажется ты заблудился. Держи клавиатуру, твое состояние сброшено на начало', reply_markup=markup)
|
||||
# await state.set_state("START")
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,21 +1,21 @@
|
||||
from typing import Dict, Any
|
||||
|
||||
from aiogram import BaseMiddleware, types
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from logs.custom_logger import logger
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
class BlacklistMiddleware(BaseMiddleware):
|
||||
async def __call__(self, handler, event: types.Message, data: Dict[str, Any]) -> Any:
|
||||
logger.info(f'Вызов BlacklistMiddleware для пользователя {event.from_user.username}')
|
||||
if BotDB.check_user_in_blacklist(user_id=event.from_user.id):
|
||||
logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} заблокирован!')
|
||||
user_info = BotDB.get_blacklist_users_by_id(event.from_user.id)
|
||||
await event.answer(
|
||||
f"<b>Ты заблокирован.</b>\n<b>Причина блокировки:</b> {user_info[2]}\n<b>Дата разбана:</b> {user_info[3]}")
|
||||
return False
|
||||
logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} доступ разрешен')
|
||||
return await handler(event, data)
|
||||
from typing import Dict, Any
|
||||
|
||||
from aiogram import BaseMiddleware, types
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||||
from logs.custom_logger import logger
|
||||
|
||||
bdf = BaseDependencyFactory()
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
class BlacklistMiddleware(BaseMiddleware):
|
||||
async def __call__(self, handler, event: types.Message, data: Dict[str, Any]) -> Any:
|
||||
logger.info(f'Вызов BlacklistMiddleware для пользователя {event.from_user.username}')
|
||||
if BotDB.check_user_in_blacklist(user_id=event.from_user.id):
|
||||
logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} заблокирован!')
|
||||
user_info = BotDB.get_blacklist_users_by_id(event.from_user.id)
|
||||
await event.answer(
|
||||
f"<b>Ты заблокирован.</b>\n<b>Причина блокировки:</b> {user_info[2]}\n<b>Дата разбана:</b> {user_info[3]}")
|
||||
return False
|
||||
logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} доступ разрешен')
|
||||
return await handler(event, data)
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -52,6 +52,7 @@ def process_photo_album(album, post_caption: str = ''):
|
||||
"""
|
||||
photo_media = []
|
||||
for i, message in enumerate(album):
|
||||
print(f'message_id в функции process_photo_album: {message.message_id}\n\n')
|
||||
if i == 0:
|
||||
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id, caption=post_caption))
|
||||
else:
|
||||
@@ -64,7 +65,9 @@ async def send_media_group_message(chat_id: int, message: types.Message, media_g
|
||||
chat_id=chat_id,
|
||||
media=media_group,
|
||||
)
|
||||
print(f'ПОЛНЫЙ ОБЪЕКТ ОТПРАВЛЕННЫХ СООБЩЕНИЙ : {sent_message}\n\n')
|
||||
message_id = sent_message[-1].message_id
|
||||
print(f'ИДЕНТИФИКАТОР ПОСЛЕДНГО СООБЩЕНИЯ: {message_id}')
|
||||
return message_id
|
||||
|
||||
|
||||
|
||||
@@ -1,25 +1,25 @@
|
||||
import datetime
|
||||
import os
|
||||
from loguru import logger
|
||||
|
||||
|
||||
logger = logger.bind(name='main_log')
|
||||
|
||||
# Получение сегодняшней даты для имени файла
|
||||
today = datetime.date.today().strftime('%Y-%m-%d')
|
||||
|
||||
# Создание папки для логов
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
if not os.path.exists(current_dir):
|
||||
# Если не существует, создаем ее
|
||||
os.makedirs(current_dir)
|
||||
filename = f'{current_dir}/helper_bot_{today}.log'
|
||||
|
||||
# Настройка формата логов
|
||||
logger.add(
|
||||
filename,
|
||||
rotation="00:00",
|
||||
retention="5 days",
|
||||
compression="zip",
|
||||
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {name} | {line} | {message}",
|
||||
)
|
||||
import datetime
|
||||
import os
|
||||
from loguru import logger
|
||||
|
||||
|
||||
logger = logger.bind(name='main_log')
|
||||
|
||||
# Получение сегодняшней даты для имени файла
|
||||
today = datetime.date.today().strftime('%Y-%m-%d')
|
||||
|
||||
# Создание папки для логов
|
||||
current_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
if not os.path.exists(current_dir):
|
||||
# Если не существует, создаем ее
|
||||
os.makedirs(current_dir)
|
||||
filename = f'{current_dir}/helper_bot_{today}.log'
|
||||
|
||||
# Настройка формата логов
|
||||
logger.add(
|
||||
filename,
|
||||
rotation="00:00",
|
||||
retention="5 days",
|
||||
compression="zip",
|
||||
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {name} | {line} | {message}",
|
||||
)
|
||||
|
||||
@@ -1,22 +1,22 @@
|
||||
APScheduler==3.10.4
|
||||
certifi~=2024.6.2
|
||||
charset-normalizer==3.3.2
|
||||
coverage==7.5.4
|
||||
exceptiongroup==1.2.1
|
||||
idna==3.7
|
||||
iniconfig==2.0.0
|
||||
loguru==0.7.2
|
||||
packaging==24.1
|
||||
pluggy==1.5.0
|
||||
pytest==8.2.2
|
||||
pytz==2024.1
|
||||
requests==2.32.3
|
||||
six==1.16.0
|
||||
tomli==2.0.1
|
||||
tzlocal==5.2
|
||||
urllib3~=2.2.1
|
||||
pip~=23.2.1
|
||||
attrs~=23.2.0
|
||||
typing_extensions~=4.12.2
|
||||
aiohttp~=3.9.5
|
||||
APScheduler==3.10.4
|
||||
certifi~=2024.6.2
|
||||
charset-normalizer==3.3.2
|
||||
coverage==7.5.4
|
||||
exceptiongroup==1.2.1
|
||||
idna==3.7
|
||||
iniconfig==2.0.0
|
||||
loguru==0.7.2
|
||||
packaging==24.1
|
||||
pluggy==1.5.0
|
||||
pytest==8.2.2
|
||||
pytz==2024.1
|
||||
requests==2.32.3
|
||||
six==1.16.0
|
||||
tomli==2.0.1
|
||||
tzlocal==5.2
|
||||
urllib3~=2.2.1
|
||||
pip~=23.2.1
|
||||
attrs~=23.2.0
|
||||
typing_extensions~=4.12.2
|
||||
aiohttp~=3.9.5
|
||||
aiogram~=3.10.0
|
||||
Reference in New Issue
Block a user