288 lines
16 KiB
Python
288 lines
16 KiB
Python
import random
|
||
import traceback
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from time import sleep
|
||
|
||
from aiogram import types, Router, F
|
||
from aiogram.filters import Command, StateFilter
|
||
from aiogram.fsm.context import FSMContext
|
||
from aiogram.types import FSInputFile
|
||
|
||
from helper_bot.filters.main import ChatTypeFilter
|
||
from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post
|
||
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
|
||
from helper_bot.middlewares.text_middleware import AlbumMiddleware
|
||
from helper_bot.utils import messages
|
||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
|
||
from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \
|
||
process_photo_album, send_media_group_message, check_username_and_full_name
|
||
from logs.custom_logger import Logger
|
||
|
||
from database.db import BotDB
|
||
|
||
private_router = Router()
|
||
|
||
private_router.message.middleware(AlbumMiddleware())
|
||
|
||
#Инициализируем логгер
|
||
private_logger = Logger(name='private_handler')
|
||
logger = private_logger.get_logger()
|
||
|
||
bdf = BaseDependencyFactory()
|
||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||
LOGS = bdf.settings['Settings']['logs']
|
||
TEST = bdf.settings['Settings']['test']
|
||
|
||
BotDB = BotDB('database/tg-bot-database')
|
||
|
||
|
||
@private_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
Command("start")
|
||
)
|
||
@private_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
F.text == 'Вернуться в бота'
|
||
)
|
||
async def handle_start_message(message: types.Message, state: FSMContext):
|
||
try:
|
||
user_id = message.from_user.id
|
||
full_name = message.from_user.full_name
|
||
username = message.from_user.username
|
||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||
is_need_update = check_username_and_full_name(user_id, username, full_name)
|
||
if is_need_update:
|
||
BotDB.update_username_and_full_name(user_id, username, full_name)
|
||
await message.answer(f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name} и ник @{username}")
|
||
await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}')
|
||
sleep(2)
|
||
await state.set_state("START")
|
||
logger.info(
|
||
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} "
|
||
f"Имя автора сообщения: {message.from_user.full_name})")
|
||
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
|
||
random_stick_hello = random.choice(name_stick_hello)
|
||
random_stick_hello = FSInputFile(path=random_stick_hello)
|
||
logger.info(f"Стикер успешно получен из БД")
|
||
await message.answer_sticker(random_stick_hello)
|
||
sleep(0.3)
|
||
except Exception as e:
|
||
logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}")
|
||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||
text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||
try:
|
||
user_id = message.from_user.id
|
||
full_name = message.from_user.full_name
|
||
username = message.from_user.username
|
||
first_name = message.from_user.first_name
|
||
is_bot = message.from_user.is_bot
|
||
language_code = message.from_user.language_code
|
||
current_date = datetime.now()
|
||
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
|
||
if not BotDB.user_exists(user_id):
|
||
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
|
||
date)
|
||
BotDB.update_date_for_user(date, user_id)
|
||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||
hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE')
|
||
await message.answer(hello_message, reply_markup=markup)
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
|
||
await message.bot.send_message(IMPORTANT_LOGS,
|
||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||
|
||
|
||
@private_router.message(
|
||
StateFilter("START"),
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
F.text == '📢Предложить свой пост'
|
||
)
|
||
async def suggest_post(message: types.Message, state: FSMContext):
|
||
try:
|
||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||
await state.set_state("SUGGEST")
|
||
current_state = await state.get_state()
|
||
logger.info(
|
||
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
|
||
markup = types.ReplyKeyboardRemove()
|
||
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
|
||
await message.answer(suggest_news)
|
||
sleep(0.3)
|
||
suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2')
|
||
await message.answer(suggest_news_2, reply_markup=markup)
|
||
except Exception as e:
|
||
await message.bot.send_message(IMPORTANT_LOGS,
|
||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||
|
||
|
||
@private_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
F.text == '👋🏼Сказать пока!'
|
||
)
|
||
@private_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
F.text == 'Выйти из чата'
|
||
)
|
||
async def end_message(message: types.Message, state: FSMContext):
|
||
try:
|
||
logger.info(
|
||
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
|
||
random_stick_bye = random.choice(name_stick_bye)
|
||
random_stick_bye = FSInputFile(path=random_stick_bye)
|
||
await message.answer_sticker(random_stick_bye)
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||
try:
|
||
markup = types.ReplyKeyboardRemove()
|
||
bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE')
|
||
await message.answer(bye_message, reply_markup=markup)
|
||
await state.set_state("START")
|
||
except Exception as e:
|
||
logger.error(
|
||
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||
|
||
|
||
@private_router.message(
|
||
StateFilter("SUGGEST"),
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
)
|
||
async def suggest_router(message: types.Message, state: FSMContext, album: list = None):
|
||
logger.info(
|
||
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||
try:
|
||
if message.content_type == 'text':
|
||
lower_text = message.text.lower()
|
||
post_text, is_anonymous = get_text_message(lower_text, message.from_user.full_name,
|
||
message.from_user.username)
|
||
markup = get_reply_keyboard_for_post()
|
||
if is_anonymous:
|
||
await send_text_message(GROUP_FOR_POST, message, post_text, markup)
|
||
else:
|
||
await send_text_message(GROUP_FOR_POST, message, post_text, markup)
|
||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||
await state.set_state("START")
|
||
elif message.content_type == 'photo' and message.media_group_id is None:
|
||
lower_caption = message.caption.lower()
|
||
markup = get_reply_keyboard_for_post()
|
||
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
|
||
message.from_user.username)
|
||
#TODO: тут какая-то шляпа
|
||
if is_anonymous:
|
||
await send_photo_message(GROUP_FOR_POST, message,
|
||
message.photo[-1].file_id, post_caption, markup)
|
||
else:
|
||
await send_photo_message(GROUP_FOR_POST, message,
|
||
message.photo[-1].file_id, post_caption, markup)
|
||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||
await state.set_state("START")
|
||
elif message.media_group_id is not None:
|
||
post_caption = " "
|
||
if album[0].caption:
|
||
lower_caption = album[0].caption.lower()
|
||
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
|
||
message.from_user.username)
|
||
media_group = process_photo_album(album, post_caption)
|
||
media_group_message_id = await send_media_group_message(GROUP_FOR_POST, message,
|
||
media_group)
|
||
sleep(0.2)
|
||
markup = get_reply_keyboard_for_post()
|
||
help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup)
|
||
await state.update_data(media_group_message_id=media_group_message_id, help_message_id=help_message_id)
|
||
|
||
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
|
||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||
await state.set_state("START")
|
||
else:
|
||
await message.bot.send_message(message.chat.id,
|
||
'Я пока не умею работать с таким сообщением. Пришли текст и фото/фоты(ы)')
|
||
except Exception as e:
|
||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||
|
||
|
||
@private_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
F.text == '🤪Хочу стикеры'
|
||
)
|
||
async def stickers(message: types.Message, state: FSMContext):
|
||
logger.info(
|
||
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||
try:
|
||
BotDB.update_info_about_stickers(user_id=message.from_user.id)
|
||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||
await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
|
||
reply_markup=markup)
|
||
await state.set_state("START")
|
||
except Exception as e:
|
||
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||
logger.error(
|
||
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||
|
||
|
||
@private_router.message(
|
||
StateFilter("START"),
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
F.text == '📩Связаться с админами'
|
||
)
|
||
async def connect_with_admin(message: types.Message, state: FSMContext):
|
||
logger.info(
|
||
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
|
||
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN')
|
||
await message.answer(admin_message, parse_mode="html")
|
||
await message.forward(chat_id=GROUP_FOR_LOGS)
|
||
await state.set_state("PRE_CHAT")
|
||
|
||
|
||
@private_router.message(
|
||
StateFilter("PRE_CHAT"),
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
)
|
||
@private_router.message(
|
||
StateFilter("CHAT"),
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
)
|
||
async def resend_message_in_group_for_message(message: types.Message, state: FSMContext):
|
||
logger.info(
|
||
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})")
|
||
await message.forward(chat_id=GROUP_FOR_MESSAGE)
|
||
current_date = datetime.now()
|
||
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
|
||
BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
|
||
question = messages.get_message(get_first_name(message), 'QUESTION')
|
||
user_state = await state.get_state()
|
||
if user_state == "PRE_CHAT":
|
||
markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||
await message.answer(question, reply_markup=markup)
|
||
await state.set_state("START")
|
||
elif user_state == "CHAT":
|
||
markup = get_reply_keyboard_leave_chat()
|
||
await message.answer(question, reply_markup=markup)
|
||
|
||
# @private_router.message(
|
||
# ChatTypeFilter(chat_type=["private"])
|
||
# )
|
||
# async def default(message: types.Message, state: FSMContext):
|
||
# markup = get_reply_keyboard(BotDB, message.from_user.id)
|
||
# await message.answer('Кажется ты заблудился. Держи клавиатуру, твое состояние сброшено на начало', reply_markup=markup)
|
||
# await state.set_state("START")
|