Merge pull request #3 from KerradKerridi/dev-3

Dev-3 Добавил новые функции MediaGroup, разные типы файлов, ответы пользователю
This commit was merged in pull request #3.
This commit is contained in:
ANDREY KATYKHIN
2024-07-21 23:24:44 +05:00
committed by GitHub
36 changed files with 1624 additions and 887 deletions

3
.gitignore vendored
View File

@@ -1 +1,4 @@
/database/tg-bot-database /database/tg-bot-database
/settings.ini
/myenv/
/venv/

View File

@@ -1,14 +1,12 @@
import sqlite3
import os import os
import sqlite3
from datetime import datetime from datetime import datetime
from logs.custom_logger import logger
# Получение абсолютного пути к текущей директории from logs.custom_logger import logger
current_dir = os.getcwd()
class BotDB: class BotDB:
def __init__(self, name): def __init__(self, current_dir, name):
self.db_file = os.path.join(current_dir, name) self.db_file = os.path.join(current_dir, name)
self.conn = None self.conn = None
self.cursor = None self.cursor = None
@@ -59,7 +57,7 @@ class BotDB:
self.logger.info(f'Получена текущая версия миграции: {version}') self.logger.info(f'Получена текущая версия миграции: {version}')
return version return version
except Exception as e: except Exception as e:
self.logger.error(f'Ошибка при получении текущей версии миграции: {e}') self.logger.error(f'Ошибка при получении текущей версии миграции: {str(e)}')
raise raise
finally: finally:
self.close() self.close()
@@ -241,6 +239,68 @@ class BotDB:
finally: finally:
self.close() self.close()
def get_user_id_by_username(self, username: str):
"""
Возвращает user_id пользователя из базы данных по его user_name в Telegram.
Args:
username (str): Username пользователя.
Returns:
user_id (int): Идентификатор пользователя в Telegram.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT user_id FROM our_users WHERE username = ?", (username,))
result = self.cursor.fetchone()
if result:
user_id = result[0]
self.logger.info(f"User_id пользователя найден: username={username}, user_id={user_id}")
return user_id
else:
self.logger.info(f"Пользователь с username={username} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
raise
finally:
self.close()
def get_full_name_by_id(self, user_id: str):
"""
Возвращает full_name пользователя из базы данных по его username в Telegram.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
full_name (str): Username пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
full_name = result[0]
self.logger.info(f"Username пользователя найден: user_id={user_id}, full_name={full_name}")
return full_name
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
raise
finally:
self.close()
def get_all_user_id(self): def get_all_user_id(self):
""" """
Возвращает список всех user_id из базы данных. Возвращает список всех user_id из базы данных.
@@ -298,11 +358,6 @@ class BotDB:
finally: finally:
self.close() self.close()
def change_name(self, user_id):
#TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. Обновляем поля first_name, date_changed
#result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), )
pass
def get_info_about_stickers(self, user_id: int): def get_info_about_stickers(self, user_id: int):
""" """
Проверяет, получил ли пользователь стикеры. Проверяет, получил ли пользователь стикеры.
@@ -605,9 +660,11 @@ class BotDB:
f"Запуск функции update_username_and_full_name: user_id={user_id}, username={username}, full_name={full_name}") f"Запуск функции update_username_and_full_name: user_id={user_id}, username={username}, full_name={full_name}")
try: try:
self.connect() self.connect()
self.cursor.execute("UPDATE our_users SET username = ?, full_name = ? WHERE user_id = ?", (username, full_name, user_id,)) self.cursor.execute("UPDATE our_users SET username = ?, full_name = ? WHERE user_id = ?",
(username, full_name, user_id,))
self.conn.commit() self.conn.commit()
self.logger.info(f"Функция update_username_and_full_name. Данные пользователя: user_id={user_id} успешно обновлены") self.logger.info(
f"Функция update_username_and_full_name. Данные пользователя: user_id={user_id} успешно обновлены")
return True return True
except sqlite3.Error as error: except sqlite3.Error as error:
self.logger.error(f"Ошибка в функции update_username_and_full_name: {error}") self.logger.error(f"Ошибка в функции update_username_and_full_name: {error}")
@@ -823,18 +880,157 @@ class BotDB:
finally: finally:
self.close() self.close()
def get_post_content_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(
f"Запуск функции get_post_content_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("""
SELECT cpft.content_name, cpft.content_type
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc
ON pft.message_id = mltc.post_id
JOIN content_post_from_telegram cpft
ON cpft.message_id = mltc.message_id
WHERE pft.helper_text_message_id = ?
""", (last_post_id,))
post_content = result.fetchall()
self.logger.info(
f"Функция get_post_content_from_telegram_by_last_id получила список контента: {post_content}")
return post_content
finally:
self.close()
def get_post_ids_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(
f"Запуск функции get_post_ids_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("""
SELECT mltc.message_id
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc
ON pft.message_id = mltc.post_id
WHERE pft.helper_text_message_id = ?
""", (last_post_id,))
post_ids = result.fetchall()
self.logger.info(f"Функция get_post_ids_from_telegram_by_last_id "
f"получила идентификаторы сообщений: {post_ids}")
return post_ids
except Exception as e:
self.logger.error(f"Ошибка в функции get_post_ids_from_telegram_by_last_id {str(e)}")
return False
finally:
self.close()
def get_post_text_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(f"Запуск функции get_post_text_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("SELECT text "
"FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
(last_post_id,))
text = result.fetchone()[0]
self.logger.info(f"Функция get_post_text_from_telegram_by_last_id получила text")
return text
except Exception as e:
self.logger.error(f"Ошибка в функции get_post_text_from_telegram_by_last_id {str(e)}")
def get_author_id_by_message_id(self, message_id: int):
self.logger.info(f"Запуск функции get_author_id_by_message_id, идентификатор поста {message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT author_id "
"FROM post_from_telegram_suggest WHERE message_id = ?",
(message_id,))
author_id = result.fetchone()[0]
self.logger.info(f"Функция get_author_id_by_message_id получила author_id {author_id}")
return author_id
except Exception as e:
self.logger.error(f"Ошибка в функции get_author_id_by_message_id {str(e)}")
def get_author_id_by_helper_message_id(self, helper_text_message_id: int):
self.logger.info(f"Запуск функции get_author_id_by_helper_message_id, идентификатор поста "
f"{helper_text_message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT author_id "
"FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
(helper_text_message_id,))
author_id = result.fetchone()[0]
self.logger.info(f"Функция get_author_id_by_helper_message_id получила author_id {author_id}")
return author_id
except Exception as e:
self.logger.error(f"Ошибка в функции get_author_id_by_helper_message_id {str(e)}")
def add_post_content_in_db(self, post_id: int, message_id: int, content_name: str, type_content: str):
self.logger.info(
f"Запуск функции add_post_content_in_db: post_id={post_id}, message_id={message_id}, "
f"content_name={content_name}, content_type={type_content}")
try:
self.connect()
self.cursor.execute(
"INSERT INTO message_link_to_content (post_id, message_id)"
"VALUES (?, ?)", (post_id, message_id))
self.conn.commit()
self.cursor.execute(
"INSERT INTO content_post_from_telegram (message_id, content_name, content_type)"
"VALUES (?, ?, ?)", (message_id, content_name, type_content))
self.conn.commit()
self.logger.info(f"Функция add_post_content_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции add_post_content_in_db при добавлении поста в базу данных: {e}")
return False
def add_post_in_db(self, message_id: int, text: str, author_id: int):
self.logger.info(
f"Запуск функции add_post_in_db: message_id={message_id}, "
f"author_id={author_id}")
try:
today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
self.connect()
self.cursor.execute(
"INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at)"
"VALUES (?, ?, ?, ?)", (message_id, text, author_id, today))
self.conn.commit()
self.logger.info(f"Функция add_post_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции add_post_in_db при добавлении поста в базу данных: {e}")
return False
def update_helper_message_in_db(self, message_id: int, helper_message_id: int):
self.logger.info(
f"Запуск функции update_helper_message_in_db: message_id={message_id}, "
f"helper_message_id={helper_message_id}")
try:
self.connect()
self.cursor.execute(
"UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?",
(helper_message_id, message_id,))
self.conn.commit()
self.logger.info(f"Функция update_helper_message_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции update_helper_message_in_db при добавлении поста в базу данных: {e}")
return False
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id): def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
"""Добавляет информацию о войсе юзера в БД""" """Добавляет информацию о войсе юзера в БД"""
self.logger.info( self.logger.info(
f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id}, date_added = {date_added}") f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id},"
f" date_added = {date_added}")
try: try:
self.connect() self.connect()
result = self.cursor.execute( result = self.cursor.execute(
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)", "INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) "
"VALUES (?, ?, ?, ?, ?)",
(file_name, author_id, date_added, listen_count, file_id)) (file_name, author_id, date_added, listen_count, file_id))
self.conn.commit() self.conn.commit()
self.logger.info( self.logger.info(
f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, date_added = {date_added}") f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, "
f"date_added = {date_added}")
return None return None
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) print(error)
@@ -871,7 +1067,8 @@ class BotDB:
try: try:
self.connect() self.connect()
result = self.cursor.execute( result = self.cursor.execute(
"SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", "SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? "
"ORDER BY date_added DESC LIMIT 1",
(user_id,)) (user_id,))
return result.fetchone()[0] return result.fetchone()[0]
except sqlite3.Error as error: except sqlite3.Error as error:
@@ -884,7 +1081,11 @@ class BotDB:
try: try:
self.connect() self.connect()
result = self.cursor.execute( result = self.cursor.execute(
"SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", "SELECT `file_name` "
"FROM `audio_message_reference` "
"WHERE `author_id` = ? "
"ORDER BY date_added "
"DESC LIMIT 1",
(user_id,)) (user_id,))
return result.fetchone()[0] return result.fetchone()[0]
except sqlite3.Error as error: except sqlite3.Error as error:

View File

@@ -1,139 +1,179 @@
import traceback import traceback
from aiogram import Router, types, F from aiogram import Router, types, F
from aiogram.filters import Command, StateFilter from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext from aiogram.fsm.context import FSMContext
from helper_bot.filters.main import ChatTypeFilter from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards.main import get_reply_keyboard_admin, create_keyboard_with_pagination, \ from helper_bot.keyboards.keyboards import get_reply_keyboard_admin, create_keyboard_with_pagination, \
create_keyboard_for_ban_days, create_keyboard_for_approve_ban create_keyboard_for_ban_days, create_keyboard_for_approve_ban, create_keyboard_for_ban_reason
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list
from logs.custom_logger import logger from logs.custom_logger import logger
admin_router = Router()
admin_router = Router()
bdf = BaseDependencyFactory()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
bdf = BaseDependencyFactory() GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] LOGS = bdf.settings['Settings']['logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] TEST = bdf.settings['Settings']['test']
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test'] BotDB = bdf.get_db()
BotDB = bdf.get_db()
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
@admin_router.message( Command('admin')
ChatTypeFilter(chat_type=["private"]), )
Command('admin') async def admin_panel(message: types.Message, state: FSMContext):
) try:
async def admin_panel(message: types.Message, state: FSMContext): if check_access(message.from_user.id):
try: await state.set_state("ADMIN")
if check_access(message.from_user.id): logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
await state.set_state("ADMIN") markup = get_reply_keyboard_admin()
logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}") await message.answer("Добро пожаловать в админку. Выбери что хочешь:",
markup = get_reply_keyboard_admin() reply_markup=markup)
await message.answer("Добро пожаловать в админку. Выбери что хочешь:", else:
reply_markup=markup) await message.answer('Доступ запрещен, досвидания!')
else: except Exception as e:
await message.answer('Доступ запрещен, досвидания!') logger.error(f"Ошибка при запуске админ панели: {e}")
except Exception as e: await message.bot.send_message(IMPORTANT_LOGS,
logger.error(f"Ошибка при запуске админ панели: {e}") f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
await message.bot.send_message(IMPORTANT_LOGS,
f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
@admin_router.message( StateFilter("ADMIN"),
ChatTypeFilter(chat_type=["private"]), F.text == 'Бан (Список)'
StateFilter("ADMIN"), )
F.text == 'Бан (Список)' async def get_last_users(message: types.Message):
) logger.info(
async def get_last_users(message: types.Message): f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
logger.info( list_users = BotDB.get_last_users_from_db()
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})") keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
list_users = BotDB.get_last_users_from_db() await message.answer(text="Список пользователей которые последними обращались к боту",
keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban') reply_markup=keyboard)
await message.answer(text="Список пользователей которые последними обращались к боту",
reply_markup=keyboard)
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
@admin_router.message( StateFilter("ADMIN"),
ChatTypeFilter(chat_type=["private"]), F.text == 'Бан по нику'
StateFilter("ADMIN"), )
F.text == 'Разбан (список)' async def ban_by_nickname(message: types.Message, state: FSMContext):
) await message.answer('Пришли мне username блокируемого пользователя')
async def get_banned_users(message): await state.set_state('PRE_BAN')
logger.info(
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
message_text = get_banned_users_list(0) @admin_router.message(
buttons_list = get_banned_users_buttons() ChatTypeFilter(chat_type=["private"]),
if buttons_list: F.text == 'Отменить'
k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock') )
await message.answer(text=message_text, reply_markup=k) async def decline_ban(message: types.Message, state: FSMContext):
else: await state.set_data({})
await message.answer(text="В списке забанненых пользователей никого нет") await state.set_state("ADMIN")
logger.info(f"Отмена процедуры блокировки")
markup = get_reply_keyboard_admin()
@admin_router.message( await message.answer('Вернулись в меню', reply_markup=markup)
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_2")
) @admin_router.message(
async def ban_user_step_2(message: types.Message, state: FSMContext): ChatTypeFilter(chat_type=["private"]),
user_data = await state.get_data() StateFilter("PRE_BAN")
logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})") )
await state.update_data(message_for_user=message.text) async def ban_by_nickname_step_2(message: types.Message, state: FSMContext):
markup = create_keyboard_for_ban_days() logger.info(
await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши " f"Функция ban_by_nickname_2. Получен никнейм пользователя: {message.text}")
f"его в чат", reply_markup=markup) user_name = message.text
await state.set_state("BAN_3") user_id = BotDB.get_user_id_by_username(user_name)
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
date_to_unban=None)
@admin_router.message( full_name = BotDB.get_full_name_by_id(user_id)
ChatTypeFilter(chat_type=["private"]), markup = create_keyboard_for_ban_reason()
StateFilter("BAN_3") await message.answer(
) text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name}\n"
async def ban_user_step_3(message: types.Message, state: FSMContext): f"Имя:{full_name}\nВыбери причину бана из списка или напиши ее в чат",
logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}") reply_markup=markup)
if message.text != 'Навсегда': await state.set_state('BAN_2')
count_days = int(message.text)
date_to_unban = add_days_to_date(count_days)
else: @admin_router.message(
date_to_unban = None ChatTypeFilter(chat_type=["private"]),
logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}") StateFilter("ADMIN"),
await state.update_data(date_to_unban=date_to_unban) F.text == 'Разбан (список)'
user_data = await state.get_data() )
markup = create_keyboard_for_approve_ban() async def get_banned_users(message):
await message.answer( logger.info(
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}", f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
reply_markup=markup) message_text = get_banned_users_list(0)
await state.set_state("BAN_FINAL") buttons_list = get_banned_users_buttons()
if buttons_list:
k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock')
@admin_router.message( await message.answer(text=message_text, reply_markup=k)
ChatTypeFilter(chat_type=["private"]), else:
StateFilter("BAN_FINAL") await message.answer(text="В списке забанненых пользователей никого нет")
)
async def approve_ban(message: types.Message, state: FSMContext):
user_data = await state.get_data() @admin_router.message(
logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})") ChatTypeFilter(chat_type=["private"]),
if message.text == 'Подтвердить': StateFilter("BAN_2")
exists = BotDB.check_user_in_blacklist(user_data['user_id']) )
if exists: async def ban_user_step_2(message: types.Message, state: FSMContext):
await message.reply(f"Пользователь уже был заблокирован ранее.") user_data = await state.get_data()
logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)") logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})")
await state.set_state('ADMIN') await state.update_data(message_for_user=message.text)
else: markup = create_keyboard_for_ban_days()
BotDB.set_user_blacklist(user_data['user_id'], await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
user_data['user_name'], f"его в чат", reply_markup=markup)
user_data['message_for_user'], await state.set_state("BAN_3")
user_data['date_to_unban'])
await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.")
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)") @admin_router.message(
await state.set_state('ADMIN') ChatTypeFilter(chat_type=["private"]),
markup = get_reply_keyboard_admin() StateFilter("BAN_3")
await message.answer('Вернулись в меню', reply_markup=markup) )
async def ban_user_step_3(message: types.Message, state: FSMContext):
logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}")
if message.text != 'Навсегда':
count_days = int(message.text)
date_to_unban = add_days_to_date(count_days)
else:
date_to_unban = None
logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}")
await state.update_data(date_to_unban=date_to_unban)
user_data = await state.get_data()
markup = create_keyboard_for_approve_ban()
await message.answer(
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}",
reply_markup=markup)
await state.set_state("BAN_FINAL")
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_FINAL"),
F.text == 'Подтвердить'
)
async def approve_ban(message: types.Message, state: FSMContext):
user_data = await state.get_data()
logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})")
exists = BotDB.check_user_in_blacklist(user_data['user_id'])
if exists:
await message.reply(f"Пользователь уже был заблокирован ранее.")
logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)")
await state.set_state('ADMIN')
else:
BotDB.set_user_blacklist(user_data['user_id'],
user_data['user_name'],
user_data['message_for_user'],
user_data['date_to_unban'])
await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.")
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)")
await state.set_state('ADMIN')
markup = get_reply_keyboard_admin()
await message.answer('Вернулись в меню', reply_markup=markup)

View File

@@ -1,158 +1,265 @@
import traceback import traceback
from aiogram import Router, F from aiogram import Router, F
from aiogram.fsm.context import FSMContext from aiogram.fsm.context import FSMContext
from aiogram.types import CallbackQuery from aiogram.types import CallbackQuery
from helper_bot.keyboards.main import create_keyboard_with_pagination, get_reply_keyboard_admin, \ from helper_bot.keyboards.keyboards import create_keyboard_with_pagination, get_reply_keyboard_admin, \
create_keyboard_for_ban_reason create_keyboard_for_ban_reason
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \ from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \
get_banned_users_buttons, delete_user_blacklist, get_help_message_id, send_media_group_message get_banned_users_buttons, delete_user_blacklist, send_media_group_to_channel, \
from logs.custom_logger import logger send_video_message, send_video_note_message, send_audio_message, send_voice_message
from logs.custom_logger import logger
callback_router = Router()
callback_router = Router()
bdf = BaseDependencyFactory()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] bdf = BaseDependencyFactory()
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
LOGS = bdf.settings['Settings']['logs'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
TEST = bdf.settings['Settings']['test'] LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = bdf.get_db()
BotDB = bdf.get_db()
@callback_router.callback_query(
F.data == "publish" @callback_router.callback_query(
) F.data == "publish"
async def post_for_group(call: CallbackQuery, state: FSMContext): )
logger.info( async def post_for_group(call: CallbackQuery, state: FSMContext):
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})') logger.info(
if call.data == 'publish' and call.message.content_type == 'text' and call.message.text != "^": f'Получен callback-запрос с действием: {call.data} от пользователя {call.from_user.full_name} (ID сообщения: {call.message.message_id})')
try: if call.message.content_type == 'text' and call.message.text != "^":
await send_text_message(MAIN_PUBLIC, call.message, call.message.text) try:
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) # Пересылаем сообщение в канал
logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.') await send_text_message(MAIN_PUBLIC, call.message, call.message.text)
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
except Exception as e: # Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
await call.bot.send_message(chat_id=IMPORTANT_LOGS, author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}')
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) # Очищаем предложку и удаляем оттуда пост
elif call.data == 'publish' and call.message.content_type == 'photo': await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
try: logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.')
await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, call.message.caption) await call.answer(text='Выложено!', show_alert=True, cache_time=3)
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) except Exception as e:
logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.') await call.bot.send_message(chat_id=IMPORTANT_LOGS,
await call.answer(text='Выложено!', show_alert=True, cache_time=3) text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
except Exception as e: logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}')
await call.bot.send_message(chat_id=IMPORTANT_LOGS, await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") elif call.message.content_type == 'photo':
logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}') try:
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, call.message.caption)
elif call.data == 'publish' and call.message.text == "^":
user_data = await state.get_data() # Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
media_group_message_id = get_help_message_id(call.message.message_id, user_data) author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
await call.bot.copy_message(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST,message_id=media_group_message_id, reply_markup=None) await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
print(user_data['media_group_message_id']) # Удаляем пост из предложки
print(user_data['help_message_id']) await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
await call.answer(text='Выложено!', show_alert=True, cache_time=3) logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.')
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
@callback_router.callback_query( except Exception as e:
F.data == "decline" await call.bot.send_message(chat_id=IMPORTANT_LOGS,
) text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
async def decline_post_for_group(call: CallbackQuery, state: FSMContext): logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}')
logger.info( await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})') elif call.message.content_type == 'video':
try: try:
if call.message.content_type == 'text' and call.message.text != "^" or call.message.content_type == 'photo': await send_video_message(MAIN_PUBLIC, call.message, call.message.video.file_id, call.message.caption)
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
logger.info( # Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).') author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
await call.answer(text='Отклонено!', show_alert=True, cache_time=3) await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
if call.message.text == '^':
user_data = await state.get_data() await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
media_group_message_id = get_help_message_id(call.message.message_id, user_data) logger.info(f'Пост с видео опубликован в канале {MAIN_PUBLIC}.')
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id) await call.answer(text='Выложено!', show_alert=True, cache_time=3)
except Exception as e: except Exception as e:
await call.bot.send_message(IMPORTANT_LOGS, await call.bot.send_message(chat_id=IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}') logger.error(f'Ошибка при публикации видео в канал {MAIN_PUBLIC}: {str(e)}')
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
elif call.message.content_type == 'video_note':
try:
@callback_router.callback_query( await send_video_note_message(MAIN_PUBLIC, call.message, call.message.video_note.file_id)
F.data.contains('ban')
) # Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
async def process_ban_user(call: CallbackQuery, state: FSMContext): author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
user_id = call.data[4:] await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
logger.info(
f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}") await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
user_name = BotDB.get_username(user_id=user_id) logger.info(f'Пост с кружком опубликован в канале {MAIN_PUBLIC}.')
if user_name: await call.answer(text='Выложено!', show_alert=True, cache_time=3)
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None, except Exception as e:
date_to_unban=None) await call.bot.send_message(chat_id=IMPORTANT_LOGS,
markup = create_keyboard_for_ban_reason() text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
await call.message.answer( logger.error(f'Ошибка при публикации кружка в канал {MAIN_PUBLIC}: {str(e)}')
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name}\nИмя:{call.message.from_user.full_name}\nВыбери причину бана из списка или напиши ее в чат", await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
reply_markup=markup) elif call.message.content_type == 'audio':
await state.set_state('BAN_2') try:
else: await send_audio_message(MAIN_PUBLIC, call.message, call.message.audio.file_id, call.message.caption)
markup = get_reply_keyboard_admin()
await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup) # Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
await state.set_state('ADMIN') author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
@callback_router.callback_query( await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
F.data.contains('unlock') logger.info(f'Пост с аудио опубликован в канале {MAIN_PUBLIC}.')
) await call.answer(text='Выложено!', show_alert=True, cache_time=3)
async def process_unlock_user(call: CallbackQuery): except Exception as e:
user_id = call.data[7:] await call.bot.send_message(chat_id=IMPORTANT_LOGS,
user_name = BotDB.get_username(user_id=user_id) text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
delete_user_blacklist(user_id) logger.error(f'Ошибка при публикации аудио в канал {MAIN_PUBLIC}: {str(e)}')
logger.info(f"Разблокирован пользователь с ID: {user_id} username:{user_name}") await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
username = BotDB.get_username(user_id) elif call.message.content_type == 'voice':
await call.answer(f'Пользователь разблокирован {username}', show_alert=True) try:
await send_voice_message(MAIN_PUBLIC, call.message, call.message.voice.file_id)
@callback_router.callback_query( # Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
F.data == 'return' author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
) await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
async def return_to_main_menu(call: CallbackQuery):
await call.message.delete() await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}") logger.info(f'Пост с войсом опубликован в канале {MAIN_PUBLIC}.')
markup = get_reply_keyboard_admin() await call.answer(text='Выложено!', show_alert=True, cache_time=3)
await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:", except Exception as e:
reply_markup=markup) await call.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(f'Ошибка при публикации войса в канал {MAIN_PUBLIC}: {str(e)}')
@callback_router.callback_query( await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
F.data.contains('page') elif call.message.text == "^":
) # Получаем контент медиагруппы и текст для публикации
async def change_page(call: CallbackQuery): post_content = BotDB.get_post_content_from_telegram_by_last_id(call.message.message_id)
page_number = int(call.data[5:]) post_text = BotDB.get_post_text_from_telegram_by_last_id(call.message.message_id)
logger.info(f"Переход на страницу {page_number}")
if call.message.text == 'Список пользователей которые последними обращались к боту': # Готовим список для удаления
list_users = BotDB.get_last_users_from_db() post_ids = BotDB.get_post_ids_from_telegram_by_last_id(call.message.message_id)
#TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range message_ids = [row[0] for row in post_ids]
keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users, message_ids.append(call.message.message_id)
'ban')
# Выкладываем пост в канал
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id, await send_media_group_to_channel(bot=call.bot, chat_id=MAIN_PUBLIC, post_content=post_content,
reply_markup=keyboard) post_text=post_text)
else:
#Готовим сообщения # Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
message_user = get_banned_users_list(int(page_number) * 7 - 7) author_id = BotDB.get_author_id_by_helper_message_id(call.message.message_id)
await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
text=message_user)
# TODO: Удалить фотки с локалки после выкладки?
#Готовим клавиатуру await call.bot.delete_messages(chat_id=GROUP_FOR_POST, message_ids=message_ids)
buttons = get_banned_users_buttons() await call.answer(text='Выложено!', show_alert=True, cache_time=3)
keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
reply_markup=keyboard) @callback_router.callback_query(
F.data == "decline"
)
async def decline_post_for_group(call: CallbackQuery, state: FSMContext):
logger.info(
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
try:
if call.message.content_type == 'text' and call.message.text != "^" or call.message.content_type == 'photo' \
or call.message.content_type == 'audio' or call.message.content_type == 'voice' \
or call.message.content_type == 'video' or call.message.content_type == 'video_note':
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
await send_text_message(author_id, call.message, 'Твой пост был отклонен😔')
logger.info(
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
await call.answer(text='Отклонено!', show_alert=True, cache_time=3)
if call.message.text == '^':
post_ids = BotDB.get_post_ids_from_telegram_by_last_id(call.message.message_id)
message_ids = [row[0] for row in post_ids]
message_ids.append(call.message.message_id)
await call.bot.delete_messages(chat_id=GROUP_FOR_POST, message_ids=message_ids)
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
author_id = BotDB.get_author_id_by_helper_message_id(call.message.message_id)
await send_text_message(author_id, call.message, 'Твой пост был отклонен😔')
await call.answer(text='Удалено!', show_alert=True, cache_time=3)
except Exception as e:
await call.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}')
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
@callback_router.callback_query(
F.data.contains('ban')
)
async def process_ban_user(call: CallbackQuery, state: FSMContext):
user_id = call.data[4:]
logger.info(
f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}")
user_name = BotDB.get_username(user_id=user_id)
if user_name:
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
date_to_unban=None)
markup = create_keyboard_for_ban_reason()
await call.message.answer(
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name}\nИмя:{call.message.from_user.full_name}\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
await state.set_state('BAN_2')
else:
markup = get_reply_keyboard_admin()
await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup)
await state.set_state('ADMIN')
@callback_router.callback_query(
F.data.contains('unlock')
)
async def process_unlock_user(call: CallbackQuery):
user_id = call.data[7:]
user_name = BotDB.get_username(user_id=user_id)
delete_user_blacklist(user_id)
logger.info(f"Разблокирован пользователь с ID: {user_id} username:{user_name}")
username = BotDB.get_username(user_id)
await call.answer(f'Пользователь разблокирован {username}', show_alert=True)
@callback_router.callback_query(
F.data == 'return'
)
async def return_to_main_menu(call: CallbackQuery):
await call.message.delete()
logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}")
markup = get_reply_keyboard_admin()
await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:",
reply_markup=markup)
@callback_router.callback_query(
F.data.contains('page')
)
async def change_page(call: CallbackQuery):
page_number = int(call.data[5:])
logger.info(f"Переход на страницу {page_number}")
if call.message.text == 'Список пользователей которые последними обращались к боту':
list_users = BotDB.get_last_users_from_db()
# TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range
keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
'ban')
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
reply_markup=keyboard)
else:
# Готовим сообщения
message_user = get_banned_users_list(int(page_number) * 7 - 7)
await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text=message_user)
# Готовим клавиатуру
buttons = get_banned_users_buttons()
keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
reply_markup=keyboard)

View File

@@ -1,50 +1,49 @@
from aiogram import Router, types from aiogram import Router, types
from aiogram.fsm.context import FSMContext from aiogram.fsm.context import FSMContext
from helper_bot.filters.main import ChatTypeFilter from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat from helper_bot.keyboards.keyboards import get_reply_keyboard_leave_chat
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import send_text_message from helper_bot.utils.helper_func import send_text_message
from logs.custom_logger import logger from logs.custom_logger import logger
group_router = Router() group_router = Router()
bdf = BaseDependencyFactory()
bdf = BaseDependencyFactory() GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] LOGS = bdf.settings['Settings']['logs']
LOGS = bdf.settings['Settings']['logs'] TEST = bdf.settings['Settings']['test']
TEST = bdf.settings['Settings']['test']
BotDB = bdf.get_db()
BotDB = bdf.get_db()
@group_router.message(
@group_router.message( ChatTypeFilter(chat_type=["group", "supergroup"]),
ChatTypeFilter(chat_type=["group", "supergroup"]), )
) async def handle_message(message: types.Message, state: FSMContext):
async def handle_message(message: types.Message, state: FSMContext): """Функция ответа админа пользователю через закрытый чат"""
"""Функция ответа админа пользователю через закрытый чат""" logger.info(
logger.info( f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"')
f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"') markup = get_reply_keyboard_leave_chat()
markup = get_reply_keyboard_leave_chat() message_id = 0
message_id = 0 try:
try: message_id = message.reply_to_message.message_id
message_id = message.reply_to_message.message_id except AttributeError as e:
except AttributeError as e: await message.answer('Блять, выдели сообщение!')
await message.answer('Блять, выдели сообщение!') logger.warning(
logger.warning( f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа. Ошибка {str(e)}')
f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа. Ошибка {str(e)}') message_from_admin = message.text
message_from_admin = message.text try:
try: chat_id = BotDB.get_user_by_message_id(message_id)
chat_id = BotDB.get_user_by_message_id(message_id) await send_text_message(chat_id, message, message_from_admin, markup)
await send_text_message(chat_id, message, message_from_admin, markup) await state.set_state("CHAT")
await state.set_state("CHAT") logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id} на сообщение {message_id}')
logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id} на сообщение {message_id}') except TypeError as e:
except TypeError as e: await message.answer('Не могу найти кому ответить в базе, проебали сообщение.')
await message.answer('Не могу найти кому ответить в базе, проебали сообщение.') logger.error(
logger.error( f'Ошибка при поиске пользователя в базе для ответа на сообщение: {message.text} в группе {message.chat.title} (ID сообщения: {message.message_id}) Ошибка: {str(e)}')
f'Ошибка при поиске пользователя в базе для ответа на сообщение: {message.text} в группе {message.chat.title} (ID сообщения: {message.message_id}) Ошибка: {str(e)}')

View File

@@ -1,282 +1,403 @@
import random import random
import traceback import traceback
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
from aiogram import types, Router, F from aiogram import types, Router, F
from aiogram.filters import Command, StateFilter from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext from aiogram.fsm.context import FSMContext
from aiogram.types import FSInputFile from aiogram.types import FSInputFile
from helper_bot.filters.main import ChatTypeFilter from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat from helper_bot.keyboards.keyboards import get_reply_keyboard_leave_chat
from helper_bot.middlewares.album_middleware import AlbumMiddleware from helper_bot.middlewares.album_middleware import AlbumMiddleware
from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware
from helper_bot.utils import messages from helper_bot.utils import messages
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \ from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \
process_photo_album, send_media_group_message, check_username_and_full_name send_media_group_message_to_private_chat, prepare_media_group_from_middlewares, check_username_and_full_name, \
from logs.custom_logger import logger send_video_message, send_video_note_message, send_audio_message, send_voice_message, add_in_db_media
from logs.custom_logger import logger
private_router = Router()
private_router = Router()
private_router.message.middleware(AlbumMiddleware())
private_router.message.middleware(BlacklistMiddleware()) private_router.message.middleware(AlbumMiddleware())
private_router.message.middleware(BlacklistMiddleware())
bdf = BaseDependencyFactory()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] bdf = BaseDependencyFactory()
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
LOGS = bdf.settings['Settings']['logs'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
TEST = bdf.settings['Settings']['test'] LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = bdf.get_db()
BotDB = bdf.get_db()
@private_router.message(
ChatTypeFilter(chat_type=["private"]), @private_router.message(
Command("start") ChatTypeFilter(chat_type=["private"]),
) Command("start")
@private_router.message( )
ChatTypeFilter(chat_type=["private"]), @private_router.message(
F.text == 'Вернуться в бота' ChatTypeFilter(chat_type=["private"]),
) F.text == 'Вернуться в бота'
async def handle_start_message(message: types.Message, state: FSMContext): )
try: async def handle_start_message(message: types.Message, state: FSMContext):
await message.forward(chat_id=GROUP_FOR_LOGS) try:
full_name = message.from_user.full_name await message.forward(chat_id=GROUP_FOR_LOGS)
username = message.from_user.username full_name = message.from_user.full_name
first_name = message.from_user.first_name username = message.from_user.username
is_bot = message.from_user.is_bot first_name = message.from_user.first_name
language_code = message.from_user.language_code is_bot = message.from_user.is_bot
user_id = message.from_user.id language_code = message.from_user.language_code
current_date = datetime.now() user_id = message.from_user.id
date = current_date.strftime("%Y-%m-%d %H:%M:%S") current_date = datetime.now()
if not BotDB.user_exists(user_id): date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date, if not BotDB.user_exists(user_id):
date) BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
else: date)
is_need_update = check_username_and_full_name(user_id, username, full_name) else:
if is_need_update: is_need_update = check_username_and_full_name(user_id, username, full_name)
BotDB.update_username_and_full_name(user_id, username, full_name) if is_need_update:
await message.answer(f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name} и ник @{username}") BotDB.update_username_and_full_name(user_id, username, full_name)
await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}') await message.answer(
sleep(1) f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name} и ник @{username}")
BotDB.update_date_for_user(date, user_id) await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
await state.set_state("START") text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}')
logger.info( sleep(1)
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} " BotDB.update_date_for_user(date, user_id)
f"Имя автора сообщения: {message.from_user.full_name})") await state.set_state("START")
name_stick_hello = list(Path('Stick').rglob('Hello_*')) logger.info(
random_stick_hello = random.choice(name_stick_hello) f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} "
random_stick_hello = FSInputFile(path=random_stick_hello) f"Имя автора сообщения: {message.from_user.full_name})")
logger.info(f"Стикер успешно получен из БД") name_stick_hello = list(Path('Stick').rglob('Hello_*'))
await message.answer_sticker(random_stick_hello) random_stick_hello = random.choice(name_stick_hello)
sleep(0.3) random_stick_hello = FSInputFile(path=random_stick_hello)
except Exception as e: logger.info(f"Стикер успешно получен из БД")
logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}") await message.answer_sticker(random_stick_hello)
await message.bot.send_message(chat_id=IMPORTANT_LOGS, sleep(0.3)
text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") except Exception as e:
try: logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
markup = get_reply_keyboard(BotDB, message.from_user.id) text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE') try:
await message.answer(hello_message, reply_markup=markup)
except Exception as e: markup = get_reply_keyboard(BotDB, message.from_user.id)
logger.error( hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE')
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}") await message.answer(hello_message, reply_markup=markup)
await message.bot.send_message(IMPORTANT_LOGS, except Exception as e:
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") logger.error(
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
await message.bot.send_message(IMPORTANT_LOGS,
@private_router.message( f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
F.text == '📢Предложить свой пост' @private_router.message(
) StateFilter("START"),
async def suggest_post(message: types.Message, state: FSMContext): ChatTypeFilter(chat_type=["private"]),
try: F.text == '📢Предложить свой пост'
await message.forward(chat_id=GROUP_FOR_LOGS) )
await state.set_state("SUGGEST") async def suggest_post(message: types.Message, state: FSMContext):
current_state = await state.get_state() try:
logger.info( user_id = message.from_user.id
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}") current_date = datetime.now()
markup = types.ReplyKeyboardRemove() date = current_date.strftime("%Y-%m-%d %H:%M:%S")
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS') BotDB.update_date_for_user(date, user_id)
await message.answer(suggest_news) await message.forward(chat_id=GROUP_FOR_LOGS)
sleep(0.3) await state.set_state("SUGGEST")
suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2') current_state = await state.get_state()
await message.answer(suggest_news_2, reply_markup=markup) logger.info(
except Exception as e: f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
await message.bot.send_message(IMPORTANT_LOGS, markup = types.ReplyKeyboardRemove()
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
await message.answer(suggest_news)
sleep(0.3)
@private_router.message( suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2')
ChatTypeFilter(chat_type=["private"]), await message.answer(suggest_news_2, reply_markup=markup)
F.text == '👋🏼Сказать пока!' except Exception as e:
) await message.bot.send_message(IMPORTANT_LOGS,
@private_router.message( f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
ChatTypeFilter(chat_type=["private"]),
F.text == 'Выйти из чата'
) @private_router.message(
async def end_message(message: types.Message, state: FSMContext): ChatTypeFilter(chat_type=["private"]),
try: F.text == '👋🏼Сказать пока!'
logger.info( )
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") @private_router.message(
name_stick_bye = list(Path('Stick').rglob('Universal_*')) ChatTypeFilter(chat_type=["private"]),
random_stick_bye = random.choice(name_stick_bye) F.text == 'Выйти из чата'
random_stick_bye = FSInputFile(path=random_stick_bye) )
await message.answer_sticker(random_stick_bye) async def end_message(message: types.Message, state: FSMContext):
except Exception as e: try:
logger.error( user_id = message.from_user.id
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") current_date = datetime.now()
await message.bot.send_message(chat_id=IMPORTANT_LOGS, date = current_date.strftime("%Y-%m-%d %H:%M:%S")
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") BotDB.update_date_for_user(date, user_id)
try: await message.forward(chat_id=GROUP_FOR_LOGS)
markup = types.ReplyKeyboardRemove() logger.info(
bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE') f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
await message.answer(bye_message, reply_markup=markup) name_stick_bye = list(Path('Stick').rglob('Universal_*'))
await state.set_state("START") random_stick_bye = random.choice(name_stick_bye)
except Exception as e: random_stick_bye = FSInputFile(path=random_stick_bye)
logger.error( await message.answer_sticker(random_stick_bye)
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS, logger.error(
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@private_router.message( try:
StateFilter("SUGGEST"), markup = types.ReplyKeyboardRemove()
ChatTypeFilter(chat_type=["private"]), bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE')
) await message.answer(bye_message, reply_markup=markup)
async def suggest_router(message: types.Message, state: FSMContext, album: list = None): await state.set_state("START")
logger.info( except Exception as e:
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") logger.error(
try: f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
if message.content_type == 'text': await message.bot.send_message(chat_id=IMPORTANT_LOGS,
lower_text = message.text.lower() text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
post_text, is_anonymous = get_text_message(lower_text, message.from_user.full_name,
message.from_user.username)
markup = get_reply_keyboard_for_post() @private_router.message(
if is_anonymous: StateFilter("SUGGEST"),
await send_text_message(GROUP_FOR_POST, message, post_text, markup) ChatTypeFilter(chat_type=["private"]),
else: )
await send_text_message(GROUP_FOR_POST, message, post_text, markup) async def suggest_router(message: types.Message, state: FSMContext, album: list = None):
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) logger.info(
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
await message.answer(success_send_message, reply_markup=markup_for_user) try:
await state.set_state("START") post_caption = ''
elif message.content_type == 'photo' and message.media_group_id is None: if message.content_type == 'text':
lower_caption = message.caption.lower() lower_text = message.text.lower()
markup = get_reply_keyboard_for_post() # Получаем текст сообщения и преобразовываем его по правилам
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name, post_text = get_text_message(lower_text, message.from_user.full_name,
message.from_user.username) message.from_user.username)
#TODO: тут какая-то шляпа
if is_anonymous: # Получаем клавиатуру для поста
await send_photo_message(GROUP_FOR_POST, message, markup = get_reply_keyboard_for_post()
message.photo[-1].file_id, post_caption, markup)
else: # Отправляем сообщение в приватный канал
await send_photo_message(GROUP_FOR_POST, message, sent_message_id = await send_text_message(GROUP_FOR_POST, message, post_text, markup)
message.photo[-1].file_id, post_caption, markup)
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) # Записываем в базу пост
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') BotDB.add_post_in_db(sent_message_id, message.text, message.from_user.id)
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START") # Отправляем юзеру ответ, что сообщение отравлено и возвращаем его в меню
elif message.media_group_id is not None: markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
post_caption = " " success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
if album[0].caption: await message.answer(success_send_message, reply_markup=markup_for_user)
lower_caption = album[0].caption.lower() await state.set_state("START")
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
message.from_user.username) elif message.content_type == 'photo' and message.media_group_id is None:
media_group = process_photo_album(album, post_caption) if message.caption:
media_group_message_id = await send_media_group_message(GROUP_FOR_POST, message, lower_caption = message.caption.lower()
media_group) # Получаем текст сообщения и преобразовываем его по правилам
sleep(0.2) post_caption = get_text_message(lower_caption, message.from_user.full_name,
markup = get_reply_keyboard_for_post() message.from_user.username)
help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup) markup = get_reply_keyboard_for_post()
await state.update_data(media_group_message_id=media_group_message_id, help_message_id=help_message_id)
# Отправляем фото и текст в приватный канал
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) sent_message = await send_photo_message(GROUP_FOR_POST, message,
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') message.photo[-1].file_id, post_caption, markup)
await message.answer(success_send_message, reply_markup=markup_for_user) # Записываем в базу пост и контент
await state.set_state("START") BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
else: await add_in_db_media(sent_message)
await message.bot.send_message(message.chat.id,
'Я пока не умею работать с таким сообщением. Пришли текст и фото/фоты(ы)') # Отправляем юзеру ответ и возвращаем его в меню
except Exception as e: markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
await message.bot.send_message(chat_id=IMPORTANT_LOGS, success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
@private_router.message( elif message.content_type == 'video' and message.media_group_id is None:
ChatTypeFilter(chat_type=["private"]), if message.caption:
F.text == '🤪Хочу стикеры' lower_caption = message.caption.lower()
) post_caption = get_text_message(lower_caption, message.from_user.full_name,
async def stickers(message: types.Message, state: FSMContext): message.from_user.username)
logger.info( markup = get_reply_keyboard_for_post()
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") # Получаем текст сообщения и преобразовываем его по правилам
markup = get_reply_keyboard(BotDB, message.from_user.id)
try: # Отправляем видео и текст в приватный канал
BotDB.update_info_about_stickers(user_id=message.from_user.id) sent_message = await send_video_message(GROUP_FOR_POST, message,
await message.forward(chat_id=GROUP_FOR_LOGS) message.video.file_id, post_caption, markup)
await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
reply_markup=markup) # Записываем в базу пост и контент
await state.set_state("START") BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
except Exception as e: await add_in_db_media(sent_message)
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") # Записываем в базу пост и контент
logger.error( BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") await add_in_db_media(sent_message)
# Отправляем юзеру ответ и возвращаем его в меню
@private_router.message( markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
StateFilter("START"), success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
ChatTypeFilter(chat_type=["private"]), await message.answer(success_send_message, reply_markup=markup_for_user)
F.text == '📩Связаться с админами' await state.set_state("START")
)
async def connect_with_admin(message: types.Message, state: FSMContext): elif message.content_type == 'video_note' and message.media_group_id is None:
logger.info( markup = get_reply_keyboard_for_post()
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN') # Отправляем видеокружок в приватный канал
await message.answer(admin_message, parse_mode="html") sent_message = await send_video_note_message(GROUP_FOR_POST, message,
await message.forward(chat_id=GROUP_FOR_LOGS) message.video_note.file_id, markup)
await state.set_state("PRE_CHAT")
# Записываем в базу пост и контент
BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
@private_router.message( await add_in_db_media(sent_message)
StateFilter("PRE_CHAT"),
ChatTypeFilter(chat_type=["private"]), # Отправляем юзеру ответ и возвращаем его в меню
) markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
@private_router.message( success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
StateFilter("CHAT"), await message.answer(success_send_message, reply_markup=markup_for_user)
ChatTypeFilter(chat_type=["private"]), await state.set_state("START")
)
async def resend_message_in_group_for_message(message: types.Message, state: FSMContext): elif message.content_type == 'audio' and message.media_group_id is None:
logger.info( if message.caption:
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})") lower_caption = message.caption.lower()
await message.forward(chat_id=GROUP_FOR_MESSAGE) # Получаем текст сообщения и преобразовываем его по правилам
current_date = datetime.now() post_caption = get_text_message(lower_caption, message.from_user.full_name,
date = current_date.strftime("%Y-%m-%d %H:%M:%S") message.from_user.username)
BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) markup = get_reply_keyboard_for_post()
question = messages.get_message(get_first_name(message), 'QUESTION')
user_state = await state.get_state() # Отправляем аудио и текст в приватный канал
if user_state == "PRE_CHAT": sent_message = await send_audio_message(GROUP_FOR_POST, message,
markup = get_reply_keyboard(BotDB, message.from_user.id) message.audio.file_id, post_caption, markup)
await message.answer(question, reply_markup=markup)
await state.set_state("START") # Записываем в базу пост и контент
elif user_state == "CHAT": BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
markup = get_reply_keyboard_leave_chat() await add_in_db_media(sent_message)
await message.answer(question, reply_markup=markup)
# Отправляем юзеру ответ и возвращаем его в меню
# @private_router.message( markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
# ChatTypeFilter(chat_type=["private"]) success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
# ) await message.answer(success_send_message, reply_markup=markup_for_user)
# async def default(message: types.Message, state: FSMContext): await state.set_state("START")
# markup = get_reply_keyboard(BotDB, message.from_user.id)
# await message.answer('Кажется ты заблудился. Держи клавиатуру, твое состояние сброшено на начало', reply_markup=markup) elif message.content_type == 'voice' and message.media_group_id is None:
# await state.set_state("START") markup = get_reply_keyboard_for_post()
# Отправляем войс и текст в приватный канал
sent_message = await send_voice_message(GROUP_FOR_POST, message,
message.voice.file_id, markup)
# Записываем в базу пост и контент
BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message)
# Отправляем юзеру ответ и возвращаем его в меню
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.media_group_id is not None:
post_caption = " "
# Получаем сообщение и проверяем есть ли подпись. Если подпись есть, то преобразуем ее через функцию
if album[0].caption:
lower_caption = album[0].caption.lower()
post_caption = get_text_message(lower_caption, message.from_user.full_name,
message.from_user.username)
# Иначе обрабатываем фото и получаем медиагруппу
media_group = await prepare_media_group_from_middlewares(album, post_caption)
# Отправляем медиагруппу в секретный чат
media_group_message_id = await send_media_group_message_to_private_chat(GROUP_FOR_POST, message,
media_group)
sleep(0.2)
# Получаем клавиатуру и отправляем еще одно текстовое сообщение с кнопками
markup = get_reply_keyboard_for_post()
help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup)
# Записываем в state идентификаторы текстового сообщения И последнего сообщения медиагруппы
BotDB.update_helper_message_in_db(message_id=media_group_message_id, helper_message_id=help_message_id)
# Получаем клавиатуру для пользователя, благодарим за пост, и возвращаем в дефолтное сообщение
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
else:
await message.bot.send_message(message.chat.id,
'Я пока не умею работать с таким сообщением. '
'Пришли текст и фото/фоты(ы). А лучше перешли это сообщение админу @kerrad1\n'
'Мы добавим его к обработке если необходимо')
except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == '🤪Хочу стикеры'
)
async def stickers(message: types.Message, state: FSMContext):
logger.info(
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
markup = get_reply_keyboard(BotDB, message.from_user.id)
try:
BotDB.update_info_about_stickers(user_id=message.from_user.id)
await message.forward(chat_id=GROUP_FOR_LOGS)
await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
reply_markup=markup)
await state.set_state("START")
except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
@private_router.message(
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
F.text == '📩Связаться с админами'
)
async def connect_with_admin(message: types.Message, state: FSMContext):
logger.info(
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
user_id = message.from_user.id
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.update_date_for_user(date, user_id)
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN')
await message.answer(admin_message, parse_mode="html")
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("PRE_CHAT")
@private_router.message(
StateFilter("PRE_CHAT"),
ChatTypeFilter(chat_type=["private"]),
)
@private_router.message(
StateFilter("CHAT"),
ChatTypeFilter(chat_type=["private"]),
)
async def resend_message_in_group_for_message(message: types.Message, state: FSMContext):
user_id = message.from_user.id
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.update_date_for_user(date, user_id)
logger.info(
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})")
await message.forward(chat_id=GROUP_FOR_MESSAGE)
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
question = messages.get_message(get_first_name(message), 'QUESTION')
user_state = await state.get_state()
if user_state == "PRE_CHAT":
markup = get_reply_keyboard(BotDB, message.from_user.id)
await message.answer(question, reply_markup=markup)
await state.set_state("START")
elif user_state == "CHAT":
markup = get_reply_keyboard_leave_chat()
await message.answer(question, reply_markup=markup)

View File

@@ -1 +1 @@
from .main import get_reply_keyboard_for_post, get_reply_keyboard from .keyboards import get_reply_keyboard_for_post, get_reply_keyboard

View File

@@ -1,110 +1,115 @@
from aiogram import types from aiogram import types
from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder
def get_reply_keyboard_for_post(): def get_reply_keyboard_for_post():
builder = InlineKeyboardBuilder() builder = InlineKeyboardBuilder()
builder.row(types.InlineKeyboardButton( builder.row(types.InlineKeyboardButton(
text="Опубликовать", callback_data="publish"), text="Опубликовать", callback_data="publish")
types.InlineKeyboardButton( )
text="Отклонить", callback_data="decline") builder.row(types.InlineKeyboardButton(
) text="Отклонить", callback_data="decline")
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) )
return markup markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def get_reply_keyboard(BotDB, user_id):
builder = ReplyKeyboardBuilder() def get_reply_keyboard(BotDB, user_id):
builder.add(types.KeyboardButton(text="📢Предложить свой пост")) builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="📩Связаться с админами")) builder.add(types.KeyboardButton(text="📢Предложить свой пост"))
builder.add(types.KeyboardButton(text="👋🏼Сказать пока!")) builder.add(types.KeyboardButton(text="📩Связаться с админами"))
if not BotDB.get_info_about_stickers(user_id=user_id): builder.add(types.KeyboardButton(text="👋🏼Сказать пока!"))
builder.add(types.KeyboardButton(text="🤪Хочу стикеры")) if not BotDB.get_info_about_stickers(user_id=user_id):
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) builder.add(types.KeyboardButton(text="🤪Хочу стикеры"))
return markup markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def get_reply_keyboard_leave_chat():
builder = ReplyKeyboardBuilder() def get_reply_keyboard_leave_chat():
builder.add(types.KeyboardButton(text="Выйти из чата")) builder = ReplyKeyboardBuilder()
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) builder.add(types.KeyboardButton(text="Выйти из чата"))
return markup markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def get_reply_keyboard_admin():
builder = ReplyKeyboardBuilder() def get_reply_keyboard_admin():
builder.add(types.KeyboardButton(text="Бан (Список)")) builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="Разбан (список)")) builder.add(types.KeyboardButton(text="Бан (Список)"))
builder.add(types.KeyboardButton(text="Вернуться в бота")) builder.add(types.KeyboardButton(text="Бан по нику"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) builder.add(types.KeyboardButton(text="Разбан (список)"))
return markup builder.add(types.KeyboardButton(text="Вернуться в бота"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str):
"""
Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str):
"""
Args: Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback
page: Номер текущей страницы.
total_items: Общее количество элементов. Args:
array_items: Лист кортежей. Содержит в себе user_name: user_id page: Номер текущей страницы.
callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id}) total_items: Общее количество элементов.
array_items: Лист кортежей. Содержит в себе user_name: user_id
Returns: callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id})
InlineKeyboardMarkup: Клавиатура с кнопками пагинации.
""" Returns:
InlineKeyboardMarkup: Клавиатура с кнопками пагинации.
# Определяем общее количество страниц """
total_pages = (total_items + 9 - 1) // 9
# Определяем общее количество страниц
# Создаем билдер для клавиатуры total_pages = (total_items + 9 - 1) // 9
keyboard = InlineKeyboardBuilder()
# TODO: Тут поправить на 9 объектов, а не 7. Клавиатуру переделал # Создаем билдер для клавиатуры
# Вычисляем стартовый номер для текущей страницы keyboard = InlineKeyboardBuilder()
start_index = (page - 1) * 9 # Вычисляем стартовый номер для текущей страницы
start_index = (page - 1) * 9
# Кнопки с номерами страниц
for i in range(start_index, min(start_index + 9, len(array_items))): # Кнопки с номерами страниц
keyboard.add(types.InlineKeyboardButton( for i in range(start_index, min(start_index + 9, len(array_items))):
text=f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}" keyboard.add(types.InlineKeyboardButton(
)) text=f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}"
keyboard.adjust(3) ))
keyboard.adjust(3)
next_button = types.InlineKeyboardButton(
text="➡️ Следующая", callback_data=f"page_{page + 1}" next_button = types.InlineKeyboardButton(
) text="➡️ Следующая", callback_data=f"page_{page + 1}"
prev_button = types.InlineKeyboardButton( )
text="⬅️ Предыдущая", callback_data=f"page_{page - 1}" prev_button = types.InlineKeyboardButton(
) text="⬅️ Предыдущая", callback_data=f"page_{page - 1}"
keyboard.row(prev_button, next_button) )
home_button = types.InlineKeyboardButton( keyboard.row(prev_button, next_button)
text="🏠 Назад", callback_data="return") home_button = types.InlineKeyboardButton(
keyboard.row(home_button) text="🏠 Назад", callback_data="return")
k = keyboard.as_markup() keyboard.row(home_button)
return k k = keyboard.as_markup()
return k
def create_keyboard_for_ban_reason():
builder = ReplyKeyboardBuilder() def create_keyboard_for_ban_reason():
builder.add(types.KeyboardButton(text="Спам")) builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="Заебал стикерами")) builder.add(types.KeyboardButton(text="Спам"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) builder.add(types.KeyboardButton(text="Заебал стикерами"))
return markup builder.row(types.KeyboardButton(text="Реклама здесь: @kerrad1 "))
builder.row(types.KeyboardButton(text="Тема с лагерями: https://vk.com/topic-75343895_50049913"))
builder.row(types.KeyboardButton(text="Отменить"))
def create_keyboard_for_ban_days(): markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
builder = ReplyKeyboardBuilder() return markup
builder.add(types.KeyboardButton(text="1"))
builder.add(types.KeyboardButton(text="7"))
builder.add(types.KeyboardButton(text="30")) def create_keyboard_for_ban_days():
builder.add(types.KeyboardButton(text="Навсегда")) builder = ReplyKeyboardBuilder()
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) builder.add(types.KeyboardButton(text="1"))
return markup builder.add(types.KeyboardButton(text="7"))
builder.add(types.KeyboardButton(text="30"))
builder.row(types.KeyboardButton(text="Навсегда"))
def create_keyboard_for_approve_ban(): builder.row(types.KeyboardButton(text="Отменить"))
builder = ReplyKeyboardBuilder() markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
builder.add(types.KeyboardButton(text="Подтвердить")) return markup
builder.add(types.KeyboardButton(text="Отменить"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup def create_keyboard_for_approve_ban():
builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="Подтвердить"))
builder.add(types.KeyboardButton(text="Отменить"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup

View File

@@ -1,21 +1,21 @@
from typing import Dict, Any from typing import Dict, Any
from aiogram import BaseMiddleware, types from aiogram import BaseMiddleware, types
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from logs.custom_logger import logger from logs.custom_logger import logger
bdf = BaseDependencyFactory() bdf = BaseDependencyFactory()
BotDB = bdf.get_db() BotDB = bdf.get_db()
class BlacklistMiddleware(BaseMiddleware): class BlacklistMiddleware(BaseMiddleware):
async def __call__(self, handler, event: types.Message, data: Dict[str, Any]) -> Any: async def __call__(self, handler, event: types.Message, data: Dict[str, Any]) -> Any:
logger.info(f'Вызов BlacklistMiddleware для пользователя {event.from_user.username}') logger.info(f'Вызов BlacklistMiddleware для пользователя {event.from_user.username}')
if BotDB.check_user_in_blacklist(user_id=event.from_user.id): if BotDB.check_user_in_blacklist(user_id=event.from_user.id):
logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} заблокирован!') logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} заблокирован!')
user_info = BotDB.get_blacklist_users_by_id(event.from_user.id) user_info = BotDB.get_blacklist_users_by_id(event.from_user.id)
await event.answer( await event.answer(
f"<b>Ты заблокирован.</b>\n<b>Причина блокировки:</b> {user_info[2]}\n<b>Дата разбана:</b> {user_info[3]}") f"<b>Ты заблокирован.</b>\n<b>Причина блокировки:</b> {user_info[2]}\n<b>Дата разбана:</b> {user_info[3]}")
return False return False
logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} доступ разрешен') logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} доступ разрешен')
return await handler(event, data) return await handler(event, data)

View File

@@ -4,6 +4,8 @@ import sys
from database.db import BotDB from database.db import BotDB
current_dir = os.getcwd()
class BaseDependencyFactory: class BaseDependencyFactory:
def __init__(self): def __init__(self):
@@ -12,7 +14,7 @@ class BaseDependencyFactory:
self.config = configparser.ConfigParser() self.config = configparser.ConfigParser()
self.config.read(config_path) self.config.read(config_path)
self.settings = {} self.settings = {}
self.database = BotDB('database/tg-bot-database') self.database = BotDB(current_dir, 'database/tg-bot-database')
for section in self.config.sections(): for section in self.config.sections():
self.settings[section] = {} self.settings[section] = {}

View File

@@ -1,9 +1,11 @@
import os
from datetime import datetime, timedelta from datetime import datetime, timedelta
from aiogram import types from aiogram import types
from aiogram.types import InputMediaPhoto from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from logs.custom_logger import logger
bdf = BaseDependencyFactory() bdf = BaseDependencyFactory()
@@ -24,50 +26,187 @@ def get_text_message(post_text: str, first_name: str, username: str):
username: Юзернейм автора поста username: Юзернейм автора поста
Returns: Returns:
Кортеж из двух элементов: str: - Сформированный текст сообщения.
- Сформированный текст сообщения.
- Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный).
""" """
if "неанон" in post_text or "не анон" in post_text: if "неанон" in post_text or "не анон" in post_text:
is_anonymous = False return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}'
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
elif "анон" in post_text: elif "анон" in post_text:
is_anonymous = True return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно'
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно', is_anonymous
else: else:
is_anonymous = False return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}'
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
def process_photo_album(album, post_caption: str = ''): async def download_file(message: types.Message, file_id: str):
""" """
Создает список InputMediaPhoto для альбома. Скачивает файл по file_id из Telegram.
Args:
message: сообщение
file_id: File ID фотографии
filename: Имя файла, под которым будет сохранено фото
Returns:
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
"""
try:
os.makedirs("files", exist_ok=True)
os.makedirs("files/photos", exist_ok=True)
os.makedirs("files/videos", exist_ok=True)
os.makedirs("files/music", exist_ok=True)
os.makedirs("files/voice", exist_ok=True)
os.makedirs("files/video_notes", exist_ok=True)
file = await message.bot.get_file(file_id)
file_path = os.path.join("files", file.file_path)
await message.bot.download_file(file_path=file.file_path, destination=file_path)
return file_path
except Exception as e:
logger.error(f"Ошибка скачивания фотографии: {e}")
return None
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
"""
Создает MediaGroup.
Args: Args:
album: Album объект из Telegram API. album: Album объект из Telegram API.
post_caption: Текст подписи к первому фото. post_caption: Текст подписи к первому фото.
Returns: Returns:
Список InputMediaPhoto. Список InputMediaPhoto (MediaGroup).
""" """
photo_media = [] media_group = []
for i, message in enumerate(album): for i, message in enumerate(album):
if i == 0: if message.photo:
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id, caption=post_caption)) file_id = message.photo[-1].file_id
media_type = 'photo'
elif message.video:
file_id = message.video.file_id
media_type = 'video'
elif message.audio:
file_id = message.audio.file_id
media_type = 'audio'
else: else:
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id)) # Если нет фото, видео или аудио, пропускаем сообщение
return photo_media continue
# Формируем объект MediaGroup с учетом типа медиа
if i == len(album) - 1:
if media_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id, caption=post_caption))
elif media_type == 'video':
media_group.append(InputMediaVideo(media=file_id, caption=post_caption))
elif media_type == 'audio':
media_group.append(InputMediaAudio(media=file_id, caption=post_caption))
else:
if media_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id))
elif media_type == 'video':
media_group.append(InputMediaVideo(media=file_id))
elif media_type == 'audio':
media_group.append(InputMediaAudio(media=file_id))
return media_group # Возвращаем MediaGroup
async def send_media_group_message(chat_id: int, message: types.Message, media_group: list[InputMediaPhoto]): async def add_in_db_media_mediagroup(sent_message):
"""
Идентификатор медиа-группы
Args:
sent_message: sent_message объект из Telegram API
Returns:
Список InputFile (FSInputFile).
"""
media_group_message_id = sent_message[-1].message_id # Получаем идентификатор медиа-группы
for i, message in enumerate(sent_message):
if message.photo:
file_id = message.photo[-1].file_id
file_path = await download_file(message, file_id=file_id)
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'photo')
elif message.video:
file_id = message.video.file_id
file_path = await download_file(message, file_id=file_id)
BotDB.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'video')
else:
# Если нет фото, видео или аудио, или другой контент, пропускаем сообщение
continue
async def add_in_db_media(sent_message):
"""
Args:
sent_message: sent_message объект из Telegram API
Returns:
Список InputFile (FSInputFile).
"""
if sent_message.photo:
file_id = sent_message.photo[-1].file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'photo')
elif sent_message.video:
file_id = sent_message.video.file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video')
elif sent_message.voice:
file_id = sent_message.voice.file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'voice')
elif sent_message.audio:
file_id = sent_message.audio.file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'audio')
elif sent_message.video_note:
file_id = sent_message.video_note.file_id
file_path = await download_file(sent_message, file_id=file_id)
BotDB.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video_note')
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
media_group: list[InputMediaPhoto]):
sent_message = await message.bot.send_media_group( sent_message = await message.bot.send_media_group(
chat_id=chat_id, chat_id=chat_id,
media=media_group, media=media_group,
) )
BotDB.add_post_in_db(sent_message[-1].message_id, sent_message[-1].caption, message.from_user.id)
await add_in_db_media_mediagroup(sent_message)
message_id = sent_message[-1].message_id message_id = sent_message[-1].message_id
return message_id return message_id
async def send_media_group_to_channel(bot, chat_id: int, post_content: list[tuple[str]], post_text: str):
"""
Отправляет медиа-группу с подписью к последнему файлу.
Args:
bot: Экземпляр бота aiogram.
chat_id: ID чата для отправки.
post_content: Список кортежей с путями к файлам.
post_text: Текст подписи.
"""
media = []
for file_path in post_content:
try:
file = FSInputFile(path=file_path[0])
type = file_path[1]
if type == 'video':
media.append(types.InputMediaVideo(media=file))
if type == 'photo':
media.append(types.InputMediaPhoto(media=file))
except FileNotFoundError:
logger.error(f"Файл не найден: {file_path[0]}")
return
# Добавляем подпись к последнему файлу
if media:
media[-1].caption = post_text
await bot.send_media_group(chat_id=chat_id, media=media)
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None): async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
if markup is None: if markup is None:
sent_message = await message.bot.send_message( sent_message = await message.bot.send_message(
@@ -86,20 +225,90 @@ async def send_text_message(chat_id, message: types.Message, post_text: str, mar
return message_id return message_id
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str, markup: types.ReplyKeyboardMarkup = None): async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None: if markup is None:
await message.bot.send_photo( sent_message = await message.bot.send_photo(
chat_id=chat_id, chat_id=chat_id,
caption=post_text, caption=post_text,
photo=photo photo=photo
) )
else: else:
await message.bot.send_photo( sent_message = await message.bot.send_photo(
chat_id=chat_id, chat_id=chat_id,
caption=post_text, caption=post_text,
photo=photo, photo=photo,
reply_markup=markup reply_markup=markup
) )
return sent_message
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=post_text,
video=video
)
else:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=post_text,
video=video,
reply_markup=markup
)
return sent_message
async def send_video_note_message(chat_id, message: types.Message, video_note: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_video_note(
chat_id=chat_id,
video_note=video_note
)
else:
sent_message = await message.bot.send_video_note(
chat_id=chat_id,
video_note=video_note,
reply_markup=markup
)
return sent_message
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=post_text,
audio=audio
)
else:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=post_text,
audio=audio,
reply_markup=markup
)
return sent_message
async def send_voice_message(chat_id, message: types.Message, voice: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_voice(
chat_id=chat_id,
voice=voice
)
else:
sent_message = await message.bot.send_voice(
chat_id=chat_id,
voice=voice,
reply_markup=markup
)
return sent_message
def check_access(user_id: int): def check_access(user_id: int):
@@ -155,24 +364,6 @@ def get_banned_users_buttons():
return user_ids return user_ids
def get_help_message_id(media_group_message_id: int, data: dict) -> int:
"""
Получает идентификатор сообщения помощи по идентификатору сообщения группы.
Args:
media_group_message_id: Идентификатор сообщения группы
data: Словарь с данными.
Returns:
Идентификатор сообщения помощи.
"""
if 'help_message_id' in data and 'media_group_message_id' in data:
return data['media_group_message_id']
else:
return 0
def delete_user_blacklist(user_id: int): def delete_user_blacklist(user_id: int):
return BotDB.delete_user_blacklist(user_id=user_id) return BotDB.delete_user_blacklist(user_id=user_id)
@@ -185,7 +376,6 @@ def check_username_and_full_name(user_id: int, username: str, full_name: str):
def unban_notifier(self): def unban_notifier(self):
# Получение сегодняшней даты в формате DD-MM-YYYY # Получение сегодняшней даты в формате DD-MM-YYYY
current_date = datetime.now() current_date = datetime.now()
print('Мы в функции unban_notifier')
today = current_date.strftime("%d-%m-%Y") today = current_date.strftime("%d-%m-%Y")
# Получение списка разблокированных пользователей # Получение списка разблокированных пользователей
unblocked_users = self.BotDB.get_users_for_unblock_today(today) unblocked_users = self.BotDB.get_users_for_unblock_today(today)
@@ -194,4 +384,4 @@ def unban_notifier(self):
message += f"ID: {user_id}, Имя: {user_name}\n" message += f"ID: {user_id}, Имя: {user_name}\n"
# Отправка сообщения в канал # Отправка сообщения в канал
self.bot.send_message(self.GROUP_FOR_MESSAGE, message) self.bot.send_message(self.GROUP_FOR_MESSAGE, message)

View File

@@ -7,6 +7,7 @@ class StateUser(StatesGroup):
ADMIN = State() ADMIN = State()
CHAT = State() CHAT = State()
PRE_CHAT = State() PRE_CHAT = State()
PRE_BAN = State()
BAN_2 = State() BAN_2 = State()
BAN_3 = State() BAN_3 = State()
BAN_4 = State() BAN_4 = State()

View File

@@ -1,25 +1,24 @@
import datetime import datetime
import os import os
from loguru import logger
from loguru import logger
logger = logger.bind(name='main_log') logger = logger.bind(name='main_log')
# Получение сегодняшней даты для имени файла # Получение сегодняшней даты для имени файла
today = datetime.date.today().strftime('%Y-%m-%d') today = datetime.date.today().strftime('%Y-%m-%d')
# Создание папки для логов # Создание папки для логов
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
if not os.path.exists(current_dir): if not os.path.exists(current_dir):
# Если не существует, создаем ее # Если не существует, создаем ее
os.makedirs(current_dir) os.makedirs(current_dir)
filename = f'{current_dir}/helper_bot_{today}.log' filename = f'{current_dir}/helper_bot_{today}.log'
# Настройка формата логов # Настройка формата логов
logger.add( logger.add(
filename, filename,
rotation="00:00", rotation="00:00",
retention="5 days", retention="5 days",
compression="zip", format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {name} | {line} | {message}",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {name} | {line} | {message}", )
)

View File

@@ -1,17 +1,14 @@
import os import os
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
# Получаем текущий рабочий каталог from database.db import BotDB
current_dir = os.path.dirname(os.path.abspath(__file__))
# Переходим на уровень выше, чтобы выйти из папки migrations/ # Получаем текущую директорию
current_dir = os.path.dirname(__file__)
# Переходим на уровень выше
parent_dir = os.path.dirname(current_dir) parent_dir = os.path.dirname(current_dir)
# Строим путь до файла BotDB = BotDB(parent_dir, 'database/tg-bot-database')
tg_bot_database_path = os.path.join(parent_dir, "tg-bot-database")
bdf = BaseDependencyFactory()
BotDB = bdf.get_db()
def get_filename(): def get_filename():
@@ -32,5 +29,6 @@ def main():
BotDB.create_table(migrations_init) BotDB.create_table(migrations_init)
BotDB.update_version(0, get_filename()) BotDB.update_version(0, get_filename())
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -1,8 +1,14 @@
import os import os
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
bdf = BaseDependencyFactory() from database.db import BotDB
BotDB = bdf.get_db()
# Получаем текущую директорию
current_dir = os.path.dirname(__file__)
# Переходим на уровень выше
parent_dir = os.path.dirname(current_dir)
BotDB = BotDB(parent_dir, 'database/tg-bot-database')
def get_filename(): def get_filename():

View File

@@ -0,0 +1,61 @@
import os
from database.db import BotDB
# Получаем текущую директорию
current_dir = os.path.dirname(__file__)
# Переходим на уровень выше
parent_dir = os.path.dirname(current_dir)
BotDB = BotDB(parent_dir, 'database/tg-bot-database')
def get_filename():
"""Возвращает имя файла без расширения."""
filename = os.path.basename(__file__)
filename = os.path.splitext(filename)[0]
return filename
def main():
# Проверка версии миграций
current_version = BotDB.get_current_version() # Добавьте функцию для получения версии
# Выполнение миграций и проверка последней версии
if current_version < 2:
# Скрипты миграции
create_table_sql_1 = """
CREATE TABLE IF NOT EXISTS "post_from_telegram_suggest"
(
message_id INTEGER not null,
text TEXT,
helper_text_message_id INTEGER,
author_id INTEGER,
created_at TEXT
);
"""
create_table_sql_2 = """
CREATE TABLE IF NOT EXISTS message_link_to_content (
post_id INTEGER NOT NULL,
message_id INTEGER NOT NULL
);
"""
create_table_sql_3 = """
CREATE TABLE IF NOT EXISTS content_post_from_telegram (
message_id INTEGER NOT NULL,
content_name TEXT NOT NULL,
content_type TEXT
);
"""
# Применение миграции
BotDB.create_table(create_table_sql_1)
BotDB.create_table(create_table_sql_2)
BotDB.create_table(create_table_sql_3)
filename = get_filename()
BotDB.update_version(2, filename)
if __name__ == "__main__":
main()

View File

@@ -1,22 +1,22 @@
APScheduler==3.10.4 APScheduler==3.10.4
certifi~=2024.6.2 certifi~=2024.6.2
charset-normalizer==3.3.2 charset-normalizer==3.3.2
coverage==7.5.4 coverage==7.5.4
exceptiongroup==1.2.1 exceptiongroup==1.2.1
idna==3.7 idna==3.7
iniconfig==2.0.0 iniconfig==2.0.0
loguru==0.7.2 loguru==0.7.2
packaging==24.1 packaging==24.1
pluggy==1.5.0 pluggy==1.5.0
pytest==8.2.2 pytest==8.2.2
pytz==2024.1 pytz==2024.1
requests==2.32.3 requests==2.32.3
six==1.16.0 six==1.16.0
tomli==2.0.1 tomli==2.0.1
tzlocal==5.2 tzlocal==5.2
urllib3~=2.2.1 urllib3~=2.2.1
pip~=23.2.1 pip~=23.2.1
attrs~=23.2.0 attrs~=23.2.0
typing_extensions~=4.12.2 typing_extensions~=4.12.2
aiohttp~=3.9.5 aiohttp~=3.9.5
aiogram~=3.10.0 aiogram~=3.10.0

View File

@@ -1,7 +1,7 @@
import asyncio import asyncio
from helper_bot.main import start_bot from helper_bot.main import start_bot
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
if __name__ == '__main__': if __name__ == '__main__':
asyncio.run(start_bot(BaseDependencyFactory())) asyncio.run(start_bot(BaseDependencyFactory()))

View File

@@ -1,13 +1,17 @@
from datetime import datetime
import os import os
import sqlite3 import sqlite3
from datetime import datetime
import pytest import pytest
from database.db import BotDB from database.db import BotDB
@pytest.fixture @pytest.fixture
def bot(): def bot():
"""Фикстура для создания объекта BotDB.""" """Фикстура для создания объекта BotDB."""
return BotDB("test.db") current_dir = os.getcwd()
return BotDB(current_dir, "test.db")
@pytest.fixture(autouse=True, ) @pytest.fixture(autouse=True, )
@@ -104,12 +108,12 @@ def setup_db():
); );
""") """)
#blacklist mock data # blacklist mock data
cursor.execute("INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)", cursor.execute("INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)",
(user_id, username, message_for_user, next_date)) (user_id, username, message_for_user, next_date))
cursor.execute("INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)", cursor.execute("INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)",
(user_id_2, username_2, message_for_user_2, date)) (user_id_2, username_2, message_for_user_2, date))
#our_users mock data # our_users mock data
cursor.execute( cursor.execute(
"INSERT INTO our_users (user_id, first_name, full_name, username, date_added, date_changed, has_stickers)" "INSERT INTO our_users (user_id, first_name, full_name, username, date_added, date_changed, has_stickers)"
" VALUES (?, ?, ?, ?, ?, ?, ?)", (user_id, first_name, full_name, username, date, date, has_stickers) " VALUES (?, ?, ?, ?, ?, ?, ?)", (user_id, first_name, full_name, username, date, date, has_stickers)
@@ -118,7 +122,7 @@ def setup_db():
"INSERT INTO our_users (user_id, first_name, full_name, username, date_added, date_changed, has_stickers)" "INSERT INTO our_users (user_id, first_name, full_name, username, date_added, date_changed, has_stickers)"
" VALUES (?, ?, ?, ?, ?, ?, ?)", (user_id_2, first_name_2, full_name_2, username_2, date, date, has_stickers_2) " VALUES (?, ?, ?, ?, ?, ?, ?)", (user_id_2, first_name_2, full_name_2, username_2, date, date, has_stickers_2)
) )
#messages mock data # messages mock data
cursor.execute( cursor.execute(
"INSERT INTO user_messages (message_text, user_id, message_id, date) " "INSERT INTO user_messages (message_text, user_id, message_id, date) "
"VALUES (?, ?, ?, ?)", "VALUES (?, ?, ?, ?)",
@@ -127,7 +131,7 @@ def setup_db():
"INSERT INTO user_messages (message_text, user_id, message_id, date) " "INSERT INTO user_messages (message_text, user_id, message_id, date) "
"VALUES (?, ?, ?, ?)", "VALUES (?, ?, ?, ?)",
(message_text_2, user_id_2, message_id_2, date)) (message_text_2, user_id_2, message_id_2, date))
#mock admins # mock admins
cursor.execute( cursor.execute(
"INSERT INTO admins (user_id, role) " "INSERT INTO admins (user_id, role) "
"VALUES (?, ?)", "VALUES (?, ?)",