HOT_FIX_2 for handle_start_message

This commit is contained in:
Andrey
2024-07-15 23:46:05 +03:00
parent 01f6cbd37d
commit 09a071c014
11 changed files with 667 additions and 703 deletions

View File

@@ -1,10 +1,7 @@
import sqlite3 import sqlite3
import os import os
from datetime import datetime from datetime import datetime
from logs.custom_logger import Logger from logs.custom_logger import logger
# Инициализируем логгер
db_logger = Logger(name='db')
# Получение абсолютного пути к текущей директории # Получение абсолютного пути к текущей директории
current_dir = os.getcwd() current_dir = os.getcwd()
@@ -15,7 +12,7 @@ class BotDB:
self.db_file = os.path.join(current_dir, name) self.db_file = os.path.join(current_dir, name)
self.conn = None self.conn = None
self.cursor = None self.cursor = None
self.logger = db_logger.get_logger() self.logger = logger
self.logger.info(f'Подключен к базе данных: {self.db_file}') self.logger.info(f'Подключен к базе данных: {self.db_file}')
def connect(self): def connect(self):

View File

@@ -1 +1 @@
from .main import admin_router from .admin_handlers import admin_router

View File

@@ -1,143 +1,140 @@
import traceback import traceback
from aiogram import Router, types, F from aiogram import Router, types, F
from aiogram.filters import Command, StateFilter from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext from aiogram.fsm.context import FSMContext
from helper_bot.filters.main import ChatTypeFilter from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards.main import get_reply_keyboard_admin, create_keyboard_with_pagination, \ from helper_bot.keyboards.main import get_reply_keyboard_admin, create_keyboard_with_pagination, \
create_keyboard_for_ban_days, create_keyboard_for_approve_ban create_keyboard_for_ban_days, create_keyboard_for_approve_ban
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list
from logs.custom_logger import Logger from logs.custom_logger import logger
from database.db import BotDB from database.db import BotDB
admin_router = Router() admin_router = Router()
#Инициализируем логгер
admin_logger = Logger(name='admin_handler') bdf = BaseDependencyFactory()
logger = admin_logger.get_logger() GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
bdf = BaseDependencyFactory() MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] LOGS = bdf.settings['Settings']['logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] TEST = bdf.settings['Settings']['test']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs'] BotDB = BotDB('database/tg-bot-database')
TEST = bdf.settings['Settings']['test']
BotDB = BotDB('database/tg-bot-database') @admin_router.message(
ChatTypeFilter(chat_type=["private"]),
Command('admin')
@admin_router.message( )
ChatTypeFilter(chat_type=["private"]), async def admin_panel(message: types.Message, state: FSMContext):
Command('admin') try:
) if check_access(message.from_user.id):
async def admin_panel(message: types.Message, state: FSMContext): await state.set_state("ADMIN")
try: logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
if check_access(message.from_user.id): markup = get_reply_keyboard_admin()
await state.set_state("ADMIN") await message.answer("Добро пожаловать в админку. Выбери что хочешь:",
logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}") reply_markup=markup)
markup = get_reply_keyboard_admin() else:
await message.answer("Добро пожаловать в админку. Выбери что хочешь:", await message.answer('Доступ запрещен, досвидания!')
reply_markup=markup) except Exception as e:
else: logger.error(f"Ошибка при запуске админ панели: {e}")
await message.answer('Доступ запрещен, досвидания!') await message.bot.send_message(IMPORTANT_LOGS,
except Exception as e: f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
logger.error(f"Ошибка при запуске админ панели: {e}")
await message.bot.send_message(IMPORTANT_LOGS,
f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}') @admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("ADMIN"),
@admin_router.message( F.text == 'Бан (Список)'
ChatTypeFilter(chat_type=["private"]), )
StateFilter("ADMIN"), async def get_last_users(message: types.Message):
F.text == 'Бан (Список)' logger.info(
) f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
async def get_last_users(message: types.Message): list_users = BotDB.get_last_users_from_db()
logger.info( keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})") await message.answer(text="Список пользователей которые последними обращались к боту",
list_users = BotDB.get_last_users_from_db() reply_markup=keyboard)
keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
await message.answer(text="Список пользователей которые последними обращались к боту",
reply_markup=keyboard) @admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("ADMIN"),
@admin_router.message( F.text == 'Разбан (список)'
ChatTypeFilter(chat_type=["private"]), )
StateFilter("ADMIN"), async def get_banned_users(message):
F.text == 'Разбан (список)' logger.info(
) f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
async def get_banned_users(message): message_text = get_banned_users_list(0)
logger.info( buttons_list = get_banned_users_buttons()
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})") if buttons_list:
message_text = get_banned_users_list(0) k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock')
buttons_list = get_banned_users_buttons() await message.answer(text=message_text, reply_markup=k)
if buttons_list: else:
k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock') await message.answer(text="В списке забанненых пользователей никого нет")
await message.answer(text=message_text, reply_markup=k)
else:
await message.answer(text="В списке забанненых пользователей никого нет") @admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_2")
@admin_router.message( )
ChatTypeFilter(chat_type=["private"]), async def ban_user_step_2(message: types.Message, state: FSMContext):
StateFilter("BAN_2") user_data = await state.get_data()
) logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})")
async def ban_user_step_2(message: types.Message, state: FSMContext): await state.update_data(message_for_user=message.text)
user_data = await state.get_data() markup = create_keyboard_for_ban_days()
logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})") await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
await state.update_data(message_for_user=message.text) f"его в чат", reply_markup=markup)
markup = create_keyboard_for_ban_days() await state.set_state("BAN_3")
await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
f"его в чат", reply_markup=markup)
await state.set_state("BAN_3") @admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_3")
@admin_router.message( )
ChatTypeFilter(chat_type=["private"]), async def ban_user_step_3(message: types.Message, state: FSMContext):
StateFilter("BAN_3") logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}")
) if message.text != 'Навсегда':
async def ban_user_step_3(message: types.Message, state: FSMContext): count_days = int(message.text)
logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}") date_to_unban = add_days_to_date(count_days)
if message.text != 'Навсегда': else:
count_days = int(message.text) date_to_unban = None
date_to_unban = add_days_to_date(count_days) logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}")
else: await state.update_data(date_to_unban=date_to_unban)
date_to_unban = None user_data = await state.get_data()
logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}") markup = create_keyboard_for_approve_ban()
await state.update_data(date_to_unban=date_to_unban) await message.answer(
user_data = await state.get_data() f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}",
markup = create_keyboard_for_approve_ban() reply_markup=markup)
await message.answer( await state.set_state("BAN_FINAL")
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}",
reply_markup=markup)
await state.set_state("BAN_FINAL") @admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_FINAL")
@admin_router.message( )
ChatTypeFilter(chat_type=["private"]), async def approve_ban(message: types.Message, state: FSMContext):
StateFilter("BAN_FINAL") user_data = await state.get_data()
) logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})")
async def approve_ban(message: types.Message, state: FSMContext): if message.text == 'Подтвердить':
user_data = await state.get_data() exists = BotDB.check_user_in_blacklist(user_data['user_id'])
logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})") if exists:
if message.text == 'Подтвердить': await message.reply(f"Пользователь уже был заблокирован ранее.")
exists = BotDB.check_user_in_blacklist(user_data['user_id']) logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)")
if exists: await state.set_state('ADMIN')
await message.reply(f"Пользователь уже был заблокирован ранее.") else:
logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)") BotDB.set_user_blacklist(user_data['user_id'],
await state.set_state('ADMIN') user_data['user_name'],
else: user_data['message_for_user'],
BotDB.set_user_blacklist(user_data['user_id'], user_data['date_to_unban'])
user_data['user_name'], await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.")
user_data['message_for_user'], logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)")
user_data['date_to_unban']) await state.set_state('ADMIN')
await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.") markup = get_reply_keyboard_admin()
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)") await message.answer('Вернулись в меню', reply_markup=markup)
await state.set_state('ADMIN')
markup = get_reply_keyboard_admin()
await message.answer('Вернулись в меню', reply_markup=markup)

View File

@@ -1 +1 @@
from .main import callback_router from .callback_handlers import callback_router

View File

@@ -1,165 +1,161 @@
import traceback import traceback
from aiogram import Router, F, types from aiogram import Router, F, types
from aiogram.fsm.context import FSMContext from aiogram.fsm.context import FSMContext
from aiogram.types import CallbackQuery from aiogram.types import CallbackQuery
from database.db import BotDB from database.db import BotDB
from helper_bot.keyboards.main import create_keyboard_with_pagination, get_reply_keyboard_admin, \ from helper_bot.keyboards.main import create_keyboard_with_pagination, get_reply_keyboard_admin, \
create_keyboard_for_ban_reason create_keyboard_for_ban_reason
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \ from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \
get_banned_users_buttons, delete_user_blacklist, get_help_message_id, send_media_group_message get_banned_users_buttons, delete_user_blacklist, get_help_message_id, send_media_group_message
from logs.custom_logger import Logger from logs.custom_logger import logger
callback_router = Router() callback_router = Router()
#Инициализируем логгер bdf = BaseDependencyFactory()
callback_logger = Logger(name='callback_logger') GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
logger = callback_logger.get_logger() GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
bdf = BaseDependencyFactory() GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] LOGS = bdf.settings['Settings']['logs']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] TEST = bdf.settings['Settings']['test']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] BotDB = BotDB('database/tg-bot-database')
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
@callback_router.callback_query(
BotDB = BotDB('database/tg-bot-database') F.data == "publish"
)
async def post_for_group(call: CallbackQuery, state: FSMContext):
@callback_router.callback_query( logger.info(
F.data == "publish" f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
) if call.data == 'publish' and call.message.content_type == 'text' and call.message.text != "^":
async def post_for_group(call: CallbackQuery, state: FSMContext): try:
logger.info( await send_text_message(MAIN_PUBLIC, call.message, call.message.text)
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})') await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
if call.data == 'publish' and call.message.content_type == 'text' and call.message.text != "^": logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.')
try: await call.answer(text='Выложено!', show_alert=True, cache_time=3)
await send_text_message(MAIN_PUBLIC, call.message, call.message.text) except Exception as e:
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) await call.bot.send_message(chat_id=IMPORTANT_LOGS,
logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.') text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
await call.answer(text='Выложено!', show_alert=True, cache_time=3) logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}')
except Exception as e: await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
await call.bot.send_message(chat_id=IMPORTANT_LOGS, elif call.data == 'publish' and call.message.content_type == 'photo':
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") try:
logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}') print(f'CALLMESSAGE - {call.message.text}')
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, call.message.caption)
elif call.data == 'publish' and call.message.content_type == 'photo': await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
try: logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.')
print(f'CALLMESSAGE - {call.message.text}') await call.answer(text='Выложено!', show_alert=True, cache_time=3)
await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, call.message.caption) except Exception as e:
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) await call.bot.send_message(chat_id=IMPORTANT_LOGS,
logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.') text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
await call.answer(text='Выложено!', show_alert=True, cache_time=3) logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}')
except Exception as e: await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
await call.bot.send_message(chat_id=IMPORTANT_LOGS, elif call.data == 'publish' and call.message.text == "^":
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") user_data = await state.get_data()
logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}') media_group_message_id = get_help_message_id(call.message.message_id, user_data)
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) await call.bot.copy_message(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST,message_id=media_group_message_id, reply_markup=None)
elif call.data == 'publish' and call.message.text == "^": #await call.bot.copy_messages(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST, message_ids=[media_group_message_id, media_group_message_id-1])
user_data = await state.get_data() await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
media_group_message_id = get_help_message_id(call.message.message_id, user_data) print(user_data['media_group_message_id'])
await call.bot.copy_message(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST,message_id=media_group_message_id, reply_markup=None) print(user_data['help_message_id'])
#await call.bot.copy_messages(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST, message_ids=[media_group_message_id, media_group_message_id-1]) await call.answer(text='Выложено!', show_alert=True, cache_time=3)
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
print(user_data['media_group_message_id']) @callback_router.callback_query(
print(user_data['help_message_id']) F.data == "decline"
await call.answer(text='Выложено!', show_alert=True, cache_time=3) )
async def decline_post_for_group(call: CallbackQuery, state: FSMContext):
@callback_router.callback_query( logger.info(
F.data == "decline" f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
) try:
async def decline_post_for_group(call: CallbackQuery, state: FSMContext): if call.message.content_type == 'text' and call.message.text != "^":
logger.info( await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})') logger.info(
try: f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
if call.message.content_type == 'text' and call.message.text != "^": await call.answer(text='Отклонено!', show_alert=True, cache_time=3)
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) if call.message.text == '^':
logger.info( user_data = await state.get_data()
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).') media_group_message_id = get_help_message_id(call.message.message_id, user_data)
await call.answer(text='Отклонено!', show_alert=True, cache_time=3) await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
if call.message.text == '^': except Exception as e:
user_data = await state.get_data() await call.bot.send_message(IMPORTANT_LOGS,
media_group_message_id = get_help_message_id(call.message.message_id, user_data) f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id) logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}')
except Exception as e: await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
await call.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}') @callback_router.callback_query(
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) F.data.contains('ban')
)
async def process_ban_user(call: CallbackQuery, state: FSMContext):
@callback_router.callback_query( user_id = call.data[4:]
F.data.contains('ban') logger.info(
) f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}")
async def process_ban_user(call: CallbackQuery, state: FSMContext): user_name = BotDB.get_username(user_id=user_id)
user_id = call.data[4:] if user_name:
logger.info( await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}") date_to_unban=None)
user_name = BotDB.get_username(user_id=user_id) markup = create_keyboard_for_ban_reason()
if user_name: await call.message.answer(
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None, text=f"Выбран пользователь:\nid: {user_id}\nusername:{user_name}. Выбери причину бана из списка или напиши ее в чат",
date_to_unban=None) reply_markup=markup)
markup = create_keyboard_for_ban_reason() await state.set_state('BAN_2')
await call.message.answer( else:
text=f"Выбран пользователь:\nid: {user_id}\nusername:{user_name}. Выбери причину бана из списка или напиши ее в чат", markup = get_reply_keyboard_admin()
reply_markup=markup) await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup)
await state.set_state('BAN_2') await state.set_state('ADMIN')
else:
markup = get_reply_keyboard_admin()
await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup) @callback_router.callback_query(
await state.set_state('ADMIN') F.data.contains('unlock')
)
async def process_unlock_user(call: CallbackQuery):
@callback_router.callback_query( user_id = call.data[7:]
F.data.contains('unlock') user_name = BotDB.get_username(user_id=user_id)
) delete_user_blacklist(user_id)
async def process_unlock_user(call: CallbackQuery): logger.info(f"Разблокирован пользователь с ID: {user_id}\nusername:{user_name}")
user_id = call.data[7:] username = BotDB.get_username(user_id)
user_name = BotDB.get_username(user_id=user_id) await call.answer(f'Пользователь разблокирован {username}', show_alert=True)
delete_user_blacklist(user_id)
logger.info(f"Разблокирован пользователь с ID: {user_id}\nusername:{user_name}")
username = BotDB.get_username(user_id) @callback_router.callback_query(
await call.answer(f'Пользователь разблокирован {username}', show_alert=True) F.data == 'return'
)
async def return_to_main_menu(call: CallbackQuery):
@callback_router.callback_query( await call.message.delete()
F.data == 'return' logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}")
) markup = get_reply_keyboard_admin()
async def return_to_main_menu(call: CallbackQuery): await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:",
await call.message.delete() reply_markup=markup)
logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}")
markup = get_reply_keyboard_admin()
await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:", @callback_router.callback_query(
reply_markup=markup) F.data.contains('page')
)
async def change_page(call: CallbackQuery):
@callback_router.callback_query( page_number = int(call.data[5:])
F.data.contains('page') logger.info(f"Переход на страницу {page_number}")
) if call.message.text == 'Список пользователей которые последними обращались к боту':
async def change_page(call: CallbackQuery): list_users = BotDB.get_last_users_from_db()
page_number = int(call.data[5:]) #TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range
logger.info(f"Переход на страницу {page_number}") keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
if call.message.text == 'Список пользователей которые последними обращались к боту': 'ban')
list_users = BotDB.get_last_users_from_db()
#TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users, reply_markup=keyboard)
'ban') else:
#Готовим сообщения
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id, message_user = get_banned_users_list(int(page_number) * 7 - 7)
reply_markup=keyboard) await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
else: text=message_user)
#Готовим сообщения
message_user = get_banned_users_list(int(page_number) * 7 - 7) #Готовим клавиатуру
await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, buttons = get_banned_users_buttons()
text=message_user) keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
#Готовим клавиатуру reply_markup=keyboard)
buttons = get_banned_users_buttons()
keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
reply_markup=keyboard)

View File

@@ -1 +1 @@
from .main import group_router from .group_handlers import group_router

View File

@@ -1,54 +1,51 @@
from aiogram import Router, types from aiogram import Router, types
from aiogram.fsm.context import FSMContext from aiogram.fsm.context import FSMContext
from database.db import BotDB from database.db import BotDB
from helper_bot.filters.main import ChatTypeFilter from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import send_text_message from helper_bot.utils.helper_func import send_text_message
from logs.custom_logger import Logger
group_router = Router() group_router = Router()
#Инициализируем логгер
group_logger = Logger(name='group_logger') bdf = BaseDependencyFactory()
logger = group_logger.get_logger() GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
bdf = BaseDependencyFactory() MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] LOGS = bdf.settings['Settings']['logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] TEST = bdf.settings['Settings']['test']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs'] BotDB = BotDB('database/tg-bot-database')
TEST = bdf.settings['Settings']['test']
BotDB = BotDB('database/tg-bot-database') @group_router.message(
ChatTypeFilter(chat_type=["group", "supergroup"]),
)
@group_router.message( async def handle_message(message: types.Message, state: FSMContext):
ChatTypeFilter(chat_type=["group", "supergroup"]), """Функция ответа админа пользователю через закрытый чат"""
) logger.info(
async def handle_message(message: types.Message, state: FSMContext): f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"')
"""Функция ответа админа пользователю через закрытый чат""" markup = get_reply_keyboard_leave_chat()
logger.info( message_id = 0
f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"') try:
markup = get_reply_keyboard_leave_chat() message_id = message.reply_to_message.message_id
message_id = 0 except AttributeError as e:
try: await message.answer('Блять, выдели сообщение!')
message_id = message.reply_to_message.message_id logger.warning(
except AttributeError as e: f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа. Ошибка {str(e)}')
await message.answer('Блять, выдели сообщение!') message_from_admin = message.text
logger.warning( try:
f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа. Ошибка {str(e)}') chat_id = BotDB.get_user_by_message_id(message_id)
message_from_admin = message.text await send_text_message(chat_id, message, message_from_admin, markup)
try: await state.set_state("CHAT")
chat_id = BotDB.get_user_by_message_id(message_id) logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id} на сообщение {message_id}')
await send_text_message(chat_id, message, message_from_admin, markup) except TypeError as e:
await state.set_state("CHAT") await message.answer('Не могу найти кому ответить в базе, проебали сообщение.')
logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id} на сообщение {message_id}') logger.error(
except TypeError as e: f'Ошибка при поиске пользователя в базе для ответа на сообщение: {message.text} в группе {message.chat.title} (ID сообщения: {message.message_id}) Ошибка: {str(e)}')
await message.answer('Не могу найти кому ответить в базе, проебали сообщение.')
logger.error(
f'Ошибка при поиске пользователя в базе для ответа на сообщение: {message.text} в группе {message.chat.title} (ID сообщения: {message.message_id}) Ошибка: {str(e)}')

View File

@@ -1 +1 @@
from .main import private_router from .private_handlers import private_router

View File

@@ -1,286 +1,283 @@
import random import random
import traceback import traceback
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
from aiogram import types, Router, F from aiogram import types, Router, F
from aiogram.filters import Command, StateFilter from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext from aiogram.fsm.context import FSMContext
from aiogram.types import FSInputFile from aiogram.types import FSInputFile
from helper_bot.filters.main import ChatTypeFilter from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
from helper_bot.middlewares.text_middleware import AlbumMiddleware from helper_bot.middlewares.text_middleware import AlbumMiddleware
from helper_bot.utils import messages from helper_bot.utils import messages
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \ from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \
process_photo_album, send_media_group_message, check_username_and_full_name process_photo_album, send_media_group_message, check_username_and_full_name
from logs.custom_logger import Logger from logs.custom_logger import logger
from database.db import BotDB from database.db import BotDB
private_router = Router() private_router = Router()
private_router.message.middleware(AlbumMiddleware()) private_router.message.middleware(AlbumMiddleware())
#Инициализируем логгер
private_logger = Logger(name='private_handler') bdf = BaseDependencyFactory()
logger = private_logger.get_logger() GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
bdf = BaseDependencyFactory() MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] LOGS = bdf.settings['Settings']['logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] TEST = bdf.settings['Settings']['test']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs'] BotDB = BotDB('database/tg-bot-database')
TEST = bdf.settings['Settings']['test']
BotDB = BotDB('database/tg-bot-database') @private_router.message(
ChatTypeFilter(chat_type=["private"]),
Command("start")
@private_router.message( )
ChatTypeFilter(chat_type=["private"]), @private_router.message(
Command("start") ChatTypeFilter(chat_type=["private"]),
) F.text == 'Вернуться в бота'
@private_router.message( )
ChatTypeFilter(chat_type=["private"]), async def handle_start_message(message: types.Message, state: FSMContext):
F.text == 'Вернуться в бота' try:
) await message.forward(chat_id=GROUP_FOR_LOGS)
async def handle_start_message(message: types.Message, state: FSMContext): full_name = message.from_user.full_name
try: username = message.from_user.username
await message.forward(chat_id=GROUP_FOR_LOGS) first_name = message.from_user.first_name
full_name = message.from_user.full_name is_bot = message.from_user.is_bot
username = message.from_user.username language_code = message.from_user.language_code
first_name = message.from_user.first_name user_id = message.from_user.id
is_bot = message.from_user.is_bot current_date = datetime.now()
language_code = message.from_user.language_code date = current_date.strftime("%Y-%m-%d %H:%M:%S")
user_id = message.from_user.id if not BotDB.user_exists(user_id):
current_date = datetime.now() BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
date = current_date.strftime("%Y-%m-%d %H:%M:%S") date)
if not BotDB.user_exists(user_id): else:
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date, is_need_update = check_username_and_full_name(user_id, username, full_name)
date) if is_need_update:
else: BotDB.update_username_and_full_name(user_id, username, full_name)
is_need_update = check_username_and_full_name(user_id, username, full_name) await message.answer(f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name} и ник @{username}")
if is_need_update: await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}')
BotDB.update_username_and_full_name(user_id, username, full_name) sleep(1)
await message.answer(f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name} и ник @{username}") BotDB.update_date_for_user(date, user_id)
await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}') await state.set_state("START")
sleep(1) logger.info(
BotDB.update_date_for_user(date, user_id) f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} "
await state.set_state("START") f"Имя автора сообщения: {message.from_user.full_name})")
logger.info( name_stick_hello = list(Path('Stick').rglob('Hello_*'))
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} " random_stick_hello = random.choice(name_stick_hello)
f"Имя автора сообщения: {message.from_user.full_name})") random_stick_hello = FSInputFile(path=random_stick_hello)
name_stick_hello = list(Path('Stick').rglob('Hello_*')) logger.info(f"Стикер успешно получен из БД")
random_stick_hello = random.choice(name_stick_hello) await message.answer_sticker(random_stick_hello)
random_stick_hello = FSInputFile(path=random_stick_hello) sleep(0.3)
logger.info(f"Стикер успешно получен из БД") except Exception as e:
await message.answer_sticker(random_stick_hello) logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}")
sleep(0.3) await message.bot.send_message(chat_id=IMPORTANT_LOGS,
except Exception as e: text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}") try:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") markup = get_reply_keyboard(BotDB, message.from_user.id)
try: hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE')
await message.answer(hello_message, reply_markup=markup)
markup = get_reply_keyboard(BotDB, message.from_user.id) except Exception as e:
hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE') logger.error(
await message.answer(hello_message, reply_markup=markup) f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
except Exception as e: await message.bot.send_message(IMPORTANT_LOGS,
logger.error( f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
await message.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message(
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
@private_router.message( F.text == '📢Предложить свой пост'
StateFilter("START"), )
ChatTypeFilter(chat_type=["private"]), async def suggest_post(message: types.Message, state: FSMContext):
F.text == '📢Предложить свой пост' try:
) await message.forward(chat_id=GROUP_FOR_LOGS)
async def suggest_post(message: types.Message, state: FSMContext): await state.set_state("SUGGEST")
try: current_state = await state.get_state()
await message.forward(chat_id=GROUP_FOR_LOGS) logger.info(
await state.set_state("SUGGEST") f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
current_state = await state.get_state() markup = types.ReplyKeyboardRemove()
logger.info( suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}") await message.answer(suggest_news)
markup = types.ReplyKeyboardRemove() sleep(0.3)
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS') suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2')
await message.answer(suggest_news) await message.answer(suggest_news_2, reply_markup=markup)
sleep(0.3) except Exception as e:
suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2') await message.bot.send_message(IMPORTANT_LOGS,
await message.answer(suggest_news_2, reply_markup=markup) f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
except Exception as e:
await message.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == '👋🏼Сказать пока!'
@private_router.message( )
ChatTypeFilter(chat_type=["private"]), @private_router.message(
F.text == '👋🏼Сказать пока!' ChatTypeFilter(chat_type=["private"]),
) F.text == 'Выйти из чата'
@private_router.message( )
ChatTypeFilter(chat_type=["private"]), async def end_message(message: types.Message, state: FSMContext):
F.text == 'Выйти из чата' try:
) logger.info(
async def end_message(message: types.Message, state: FSMContext): f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
try: name_stick_bye = list(Path('Stick').rglob('Universal_*'))
logger.info( random_stick_bye = random.choice(name_stick_bye)
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") random_stick_bye = FSInputFile(path=random_stick_bye)
name_stick_bye = list(Path('Stick').rglob('Universal_*')) await message.answer_sticker(random_stick_bye)
random_stick_bye = random.choice(name_stick_bye) except Exception as e:
random_stick_bye = FSInputFile(path=random_stick_bye) logger.error(
await message.answer_sticker(random_stick_bye) f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
except Exception as e: await message.bot.send_message(chat_id=IMPORTANT_LOGS,
logger.error( text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") try:
await message.bot.send_message(chat_id=IMPORTANT_LOGS, markup = types.ReplyKeyboardRemove()
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE')
try: await message.answer(bye_message, reply_markup=markup)
markup = types.ReplyKeyboardRemove() await state.set_state("START")
bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE') except Exception as e:
await message.answer(bye_message, reply_markup=markup) logger.error(
await state.set_state("START") f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
except Exception as e: await message.bot.send_message(chat_id=IMPORTANT_LOGS,
logger.error( text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message(
StateFilter("SUGGEST"),
ChatTypeFilter(chat_type=["private"]),
@private_router.message( )
StateFilter("SUGGEST"), async def suggest_router(message: types.Message, state: FSMContext, album: list = None):
ChatTypeFilter(chat_type=["private"]), logger.info(
) f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
async def suggest_router(message: types.Message, state: FSMContext, album: list = None): try:
logger.info( if message.content_type == 'text':
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") lower_text = message.text.lower()
try: post_text, is_anonymous = get_text_message(lower_text, message.from_user.full_name,
if message.content_type == 'text': message.from_user.username)
lower_text = message.text.lower() markup = get_reply_keyboard_for_post()
post_text, is_anonymous = get_text_message(lower_text, message.from_user.full_name, if is_anonymous:
message.from_user.username) await send_text_message(GROUP_FOR_POST, message, post_text, markup)
markup = get_reply_keyboard_for_post() else:
if is_anonymous: await send_text_message(GROUP_FOR_POST, message, post_text, markup)
await send_text_message(GROUP_FOR_POST, message, post_text, markup) markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
else: success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await send_text_message(GROUP_FOR_POST, message, post_text, markup) await message.answer(success_send_message, reply_markup=markup_for_user)
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) await state.set_state("START")
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') elif message.content_type == 'photo' and message.media_group_id is None:
await message.answer(success_send_message, reply_markup=markup_for_user) lower_caption = message.caption.lower()
await state.set_state("START") markup = get_reply_keyboard_for_post()
elif message.content_type == 'photo' and message.media_group_id is None: post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
lower_caption = message.caption.lower() message.from_user.username)
markup = get_reply_keyboard_for_post() #TODO: тут какая-то шляпа
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name, if is_anonymous:
message.from_user.username) await send_photo_message(GROUP_FOR_POST, message,
#TODO: тут какая-то шляпа message.photo[-1].file_id, post_caption, markup)
if is_anonymous: else:
await send_photo_message(GROUP_FOR_POST, message, await send_photo_message(GROUP_FOR_POST, message,
message.photo[-1].file_id, post_caption, markup) message.photo[-1].file_id, post_caption, markup)
else: markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
await send_photo_message(GROUP_FOR_POST, message, success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
message.photo[-1].file_id, post_caption, markup) await message.answer(success_send_message, reply_markup=markup_for_user)
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) await state.set_state("START")
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') elif message.media_group_id is not None:
await message.answer(success_send_message, reply_markup=markup_for_user) post_caption = " "
await state.set_state("START") if album[0].caption:
elif message.media_group_id is not None: lower_caption = album[0].caption.lower()
post_caption = " " post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
if album[0].caption: message.from_user.username)
lower_caption = album[0].caption.lower() media_group = process_photo_album(album, post_caption)
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name, media_group_message_id = await send_media_group_message(GROUP_FOR_POST, message,
message.from_user.username) media_group)
media_group = process_photo_album(album, post_caption) sleep(0.2)
media_group_message_id = await send_media_group_message(GROUP_FOR_POST, message, markup = get_reply_keyboard_for_post()
media_group) help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup)
sleep(0.2) await state.update_data(media_group_message_id=media_group_message_id, help_message_id=help_message_id)
markup = get_reply_keyboard_for_post()
help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup) markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
await state.update_data(media_group_message_id=media_group_message_id, help_message_id=help_message_id) success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) await state.set_state("START")
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') else:
await message.answer(success_send_message, reply_markup=markup_for_user) await message.bot.send_message(message.chat.id,
await state.set_state("START") 'Я пока не умею работать с таким сообщением. Пришли текст и фото/фоты(ы)')
else: except Exception as e:
await message.bot.send_message(message.chat.id, await message.bot.send_message(chat_id=IMPORTANT_LOGS,
'Я пока не умею работать с таким сообщением. Пришли текст и фото/фоты(ы)') text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == '🤪Хочу стикеры'
@private_router.message( )
ChatTypeFilter(chat_type=["private"]), async def stickers(message: types.Message, state: FSMContext):
F.text == '🤪Хочу стикеры' logger.info(
) f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
async def stickers(message: types.Message, state: FSMContext): markup = get_reply_keyboard(BotDB, message.from_user.id)
logger.info( try:
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") BotDB.update_info_about_stickers(user_id=message.from_user.id)
markup = get_reply_keyboard(BotDB, message.from_user.id) await message.forward(chat_id=GROUP_FOR_LOGS)
try: await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
BotDB.update_info_about_stickers(user_id=message.from_user.id) reply_markup=markup)
await message.forward(chat_id=GROUP_FOR_LOGS) await state.set_state("START")
await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', except Exception as e:
reply_markup=markup) await message.bot.send_message(chat_id=IMPORTANT_LOGS,
await state.set_state("START") text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
except Exception as e: logger.error(
await message.bot.send_message(chat_id=IMPORTANT_LOGS, f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") @private_router.message(
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
@private_router.message( F.text == '📩Связаться с админами'
StateFilter("START"), )
ChatTypeFilter(chat_type=["private"]), async def connect_with_admin(message: types.Message, state: FSMContext):
F.text == '📩Связаться с админами' logger.info(
) f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
async def connect_with_admin(message: types.Message, state: FSMContext): admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN')
logger.info( await message.answer(admin_message, parse_mode="html")
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") await message.forward(chat_id=GROUP_FOR_LOGS)
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN') await state.set_state("PRE_CHAT")
await message.answer(admin_message, parse_mode="html")
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("PRE_CHAT") @private_router.message(
StateFilter("PRE_CHAT"),
ChatTypeFilter(chat_type=["private"]),
@private_router.message( )
StateFilter("PRE_CHAT"), @private_router.message(
ChatTypeFilter(chat_type=["private"]), StateFilter("CHAT"),
) ChatTypeFilter(chat_type=["private"]),
@private_router.message( )
StateFilter("CHAT"), async def resend_message_in_group_for_message(message: types.Message, state: FSMContext):
ChatTypeFilter(chat_type=["private"]), logger.info(
) f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})")
async def resend_message_in_group_for_message(message: types.Message, state: FSMContext): await message.forward(chat_id=GROUP_FOR_MESSAGE)
logger.info( current_date = datetime.now()
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})") date = current_date.strftime("%Y-%m-%d %H:%M:%S")
await message.forward(chat_id=GROUP_FOR_MESSAGE) BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
current_date = datetime.now() question = messages.get_message(get_first_name(message), 'QUESTION')
date = current_date.strftime("%Y-%m-%d %H:%M:%S") user_state = await state.get_state()
BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) if user_state == "PRE_CHAT":
question = messages.get_message(get_first_name(message), 'QUESTION') markup = get_reply_keyboard(BotDB, message.from_user.id)
user_state = await state.get_state() await message.answer(question, reply_markup=markup)
if user_state == "PRE_CHAT": await state.set_state("START")
markup = get_reply_keyboard(BotDB, message.from_user.id) elif user_state == "CHAT":
await message.answer(question, reply_markup=markup) markup = get_reply_keyboard_leave_chat()
await state.set_state("START") await message.answer(question, reply_markup=markup)
elif user_state == "CHAT":
markup = get_reply_keyboard_leave_chat() # @private_router.message(
await message.answer(question, reply_markup=markup) # ChatTypeFilter(chat_type=["private"])
# )
# @private_router.message( # async def default(message: types.Message, state: FSMContext):
# ChatTypeFilter(chat_type=["private"]) # markup = get_reply_keyboard(BotDB, message.from_user.id)
# ) # await message.answer('Кажется ты заблудился. Держи клавиатуру, твое состояние сброшено на начало', reply_markup=markup)
# async def default(message: types.Message, state: FSMContext): # await state.set_state("START")
# markup = get_reply_keyboard(BotDB, message.from_user.id)
# await message.answer('Кажется ты заблудился. Держи клавиатуру, твое состояние сброшено на начало', reply_markup=markup)
# await state.set_state("START")

View File

@@ -177,7 +177,7 @@ def delete_user_blacklist(user_id: int):
def check_username_and_full_name(user_id: int, username: str, full_name: str): def check_username_and_full_name(user_id: int, username: str, full_name: str):
username_db, full_name_db = BotDB.get_username_and_full_name(user_id=user_id) username_db, full_name_db = BotDB.get_username_and_full_name(user_id=user_id)
return not username == username_db and full_name == full_name_db return username != username_db or full_name != full_name_db
def unban_notifier(self): def unban_notifier(self):

View File

@@ -1,45 +1,25 @@
import datetime import datetime
import os import os
import loguru from loguru import logger
class Logger: logger = logger.bind(name='main_log')
def __init__(self, name):
self.logger = loguru.logger.bind(name=name) # Получение сегодняшней даты для имени файла
today = datetime.date.today().strftime('%Y-%m-%d')
# Получение сегодняшней даты для имени файла
today = datetime.date.today().strftime('%Y-%m-%d') # Создание папки для логов
current_dir = os.path.dirname(os.path.abspath(__file__))
# Создание папки для логов if not os.path.exists(current_dir):
current_dir = os.path.dirname(os.path.abspath(__file__)) # Если не существует, создаем ее
if not os.path.exists(current_dir): os.makedirs(current_dir)
# Если не существует, создаем ее filename = f'{current_dir}/helper_bot_{today}.log'
os.makedirs(current_dir)
filename = f'{current_dir}/helper_bot_{today}.log' # Настройка формата логов
logger.add(
# Настройка формата логов filename,
self.logger.add( rotation="00:00",
filename, retention="5 days",
rotation="00:00", compression="zip",
retention="5 days", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {name} | {line} | {message}",
compression="zip", )
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {name} | {line} | {message}",
)
def get_logger(self):
return self.logger
def info(self, message):
self.logger.info(message)
def debug(self, message):
self.logger.debug(message)
def warning(self, message):
self.logger.warning(message)
def error(self, message):
self.logger.error(message)
def critical(self, message):
self.logger.critical(message)