Dev 6 #9

Merged
KerradKerridi merged 13 commits from dev-6 into master 2025-08-30 11:58:45 +00:00
9 changed files with 1830 additions and 451 deletions
Showing only changes of commit f75e7f82c9 - Show all commits

995
database/async_db.py Normal file
View File

@@ -0,0 +1,995 @@
import os
import aiosqlite
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from logs.custom_logger import logger
class AsyncBotDB:
"""Асинхронный класс для работы с базой данных."""
def __init__(self, db_path: str):
self.db_path = os.path.abspath(db_path)
self.logger = logger
self.logger.info(f'Инициация асинхронной базы данных: {self.db_path}')
async def _get_connection(self):
"""Получение асинхронного соединения с базой данных."""
try:
# Используем connect вместо connect с контекстным менеджером
conn = await aiosqlite.connect(self.db_path)
# Включаем поддержку внешних ключей
await conn.execute("PRAGMA foreign_keys = ON")
# Включаем WAL режим для лучшей производительности
await conn.execute("PRAGMA journal_mode = WAL")
await conn.execute("PRAGMA synchronous = NORMAL")
await conn.execute("PRAGMA cache_size = 10000")
await conn.execute("PRAGMA temp_store = MEMORY")
return conn
except Exception as e:
self.logger.error(f"Ошибка при получении асинхронного соединения: {e}")
raise
async def create_tables(self):
"""Создание таблиц в базе данных."""
conn = None
try:
conn = await self._get_connection()
# Таблица пользователей
await conn.execute('''
CREATE TABLE IF NOT EXISTS our_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER UNIQUE NOT NULL,
first_name TEXT NOT NULL,
full_name TEXT NOT NULL,
username TEXT,
is_bot BOOLEAN DEFAULT FALSE,
language_code TEXT DEFAULT 'ru',
emoji TEXT DEFAULT '😊',
has_stickers BOOLEAN DEFAULT FALSE,
date_added TEXT NOT NULL,
date_changed TEXT NOT NULL
)
''')
# Таблица черного списка
await conn.execute('''
CREATE TABLE IF NOT EXISTS blacklist (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER UNIQUE NOT NULL,
user_name TEXT,
message_for_user TEXT,
date_to_unban TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Таблица сообщений пользователей
await conn.execute('''
CREATE TABLE IF NOT EXISTS user_messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_text TEXT NOT NULL,
user_id INTEGER NOT NULL,
message_id INTEGER UNIQUE NOT NULL,
date TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES our_users (user_id)
)
''')
# Таблица постов из Telegram
await conn.execute('''
CREATE TABLE IF NOT EXISTS post_from_telegram_suggest (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER UNIQUE NOT NULL,
text TEXT NOT NULL,
author_id INTEGER NOT NULL,
helper_text_message_id INTEGER,
created_at TEXT NOT NULL,
FOREIGN KEY (author_id) REFERENCES our_users (user_id)
)
''')
# Таблица контента постов (создаем ПЕРЕД таблицей связей)
await conn.execute('''
CREATE TABLE IF NOT EXISTS content_post_from_telegram (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER UNIQUE NOT NULL,
content_name TEXT NOT NULL,
content_type TEXT NOT NULL
)
''')
# Таблица связи сообщений с контентом
await conn.execute('''
CREATE TABLE IF NOT EXISTS message_link_to_content (
id INTEGER PRIMARY KEY AUTOINCREMENT,
post_id INTEGER NOT NULL,
message_id INTEGER NOT NULL,
FOREIGN KEY (post_id) REFERENCES post_from_telegram_suggest (message_id),
FOREIGN KEY (message_id) REFERENCES content_post_from_telegram (message_id)
)
''')
# Таблица администраторов
await conn.execute('''
CREATE TABLE IF NOT EXISTS admins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER UNIQUE NOT NULL,
role TEXT NOT NULL DEFAULT 'admin',
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES our_users (user_id)
)
''')
# Таблица миграций
await conn.execute('''
CREATE TABLE IF NOT EXISTS migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
version INTEGER UNIQUE NOT NULL,
script_name TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
# Таблица аудио сообщений
await conn.execute('''
CREATE TABLE IF NOT EXISTS audio_message_reference (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_name TEXT NOT NULL,
author_id INTEGER NOT NULL,
date_added TEXT NOT NULL,
listen_count INTEGER DEFAULT 0,
file_id TEXT NOT NULL,
FOREIGN KEY (author_id) REFERENCES our_users (user_id)
)
''')
# Таблица прослушивания аудио
await conn.execute('''
CREATE TABLE IF NOT EXISTS listen_audio_users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_name TEXT NOT NULL,
user_id INTEGER NOT NULL,
is_listen BOOLEAN DEFAULT FALSE,
FOREIGN KEY (user_id) REFERENCES our_users (user_id)
)
''')
# Таблица для voice bot
await conn.execute('''
CREATE TABLE IF NOT EXISTS audio_moderate (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_id INTEGER UNIQUE NOT NULL,
user_id INTEGER NOT NULL,
FOREIGN KEY (user_id) REFERENCES our_users (user_id)
)
''')
await conn.commit()
self.logger.info("Таблицы успешно созданы")
except Exception as e:
self.logger.error(f"Ошибка при создании таблиц: {e}")
raise
finally:
if conn:
await conn.close()
async def user_exists(self, user_id: int) -> bool:
"""Проверка существования пользователя."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute("SELECT 1 FROM our_users WHERE user_id = ?", (user_id,)) as cursor:
result = await cursor.fetchone()
return bool(result)
except Exception as e:
self.logger.error(f"Ошибка при проверке существования пользователя: {e}")
raise
finally:
if conn:
await conn.close()
async def add_new_user(self, user_id: int, first_name: str, full_name: str, username: str = None,
is_bot: bool = False, language_code: str = "ru", emoji: str = "😊"):
"""Добавление нового пользователя."""
conn = None
try:
date_added = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
date_changed = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
conn = await self._get_connection()
await conn.execute(
"INSERT INTO our_users (user_id, first_name, full_name, username, is_bot, "
"language_code, emoji, date_added, date_changed) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name, username, is_bot, language_code, emoji, date_added, date_changed)
)
await conn.commit()
self.logger.info(f"Новый пользователь добавлен: {user_id}")
except Exception as e:
self.logger.error(f"Ошибка при добавлении пользователя: {e}")
raise
finally:
if conn:
await conn.close()
async def get_user_info(self, user_id: int) -> Optional[Dict[str, Any]]:
"""Получение информации о пользователе."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT username, full_name, has_stickers, emoji FROM our_users WHERE user_id = ?",
(user_id,)
) as cursor:
result = await cursor.fetchone()
if result:
return {
'username': result[0],
'full_name': result[1],
'has_stickers': bool(result[2]) if result[2] is not None else False,
'emoji': result[3]
}
return None
except Exception as e:
self.logger.error(f"Ошибка при получении информации о пользователе: {e}")
raise
finally:
if conn:
await conn.close()
async def update_user_date(self, user_id: int):
"""Обновление даты последнего изменения пользователя."""
conn = None
try:
date_changed = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
conn = await self._get_connection()
await conn.execute(
"UPDATE our_users SET date_changed = ? WHERE user_id = ?",
(date_changed, user_id)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при обновлении даты пользователя: {e}")
raise
finally:
if conn:
await conn.close()
async def update_user_info(self, user_id: int, username: str = None, full_name: str = None):
"""Обновление информации о пользователе."""
conn = None
try:
conn = await self._get_connection()
if username and full_name:
await conn.execute(
"UPDATE our_users SET username = ?, full_name = ? WHERE user_id = ?",
(username, full_name, user_id)
)
elif username:
await conn.execute(
"UPDATE our_users SET username = ? WHERE user_id = ?",
(username, user_id)
)
elif full_name:
await conn.execute(
"UPDATE our_users SET full_name = ? WHERE user_id = ?",
(full_name, user_id)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при обновлении информации о пользователе: {e}")
raise
finally:
if conn:
await conn.close()
async def update_user_emoji(self, user_id: int, emoji: str):
"""Обновление эмодзи пользователя."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"UPDATE our_users SET emoji = ? WHERE user_id = ?",
(emoji, user_id)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при обновлении эмодзи: {e}")
raise
finally:
if conn:
await conn.close()
async def get_user_emoji(self, user_id: int) -> Optional[str]:
"""Получение эмодзи пользователя."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT emoji FROM our_users WHERE user_id = ?", (user_id,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении эмодзи: {e}")
raise
finally:
if conn:
await conn.close()
async def check_emoji_exists(self, emoji: str) -> bool:
"""Проверка существования эмодзи."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT 1 FROM our_users WHERE emoji = ?", (emoji,)
) as cursor:
result = await cursor.fetchone()
return bool(result)
except Exception as e:
self.logger.error(f"Ошибка при проверке эмодзи: {e}")
raise
finally:
if conn:
await conn.close()
async def update_stickers_info(self, user_id: int):
"""Обновление информации о стикерах."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"UPDATE our_users SET has_stickers = 1 WHERE user_id = ?",
(user_id,)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при обновлении информации о стикерах: {e}")
raise
finally:
if conn:
await conn.close()
async def get_stickers_info(self, user_id: int) -> bool:
"""Получение информации о стикерах."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,)
) as cursor:
result = await cursor.fetchone()
return bool(result[0]) if result and result[0] is not None else False
except Exception as e:
self.logger.error(f"Ошибка при получении информации о стикерах: {e}")
return False
finally:
if conn:
await conn.close()
async def add_message(self, message_text: str, user_id: int, message_id: int):
"""Добавление сообщения пользователя."""
conn = None
try:
date = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
conn = await self._get_connection()
await conn.execute(
"INSERT INTO user_messages (message_text, user_id, message_id, date) VALUES (?, ?, ?, ?)",
(message_text, user_id, message_id, date)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при добавлении сообщения: {e}")
raise
finally:
if conn:
await conn.close()
async def add_post(self, message_id: int, text: str, author_id: int):
"""Добавление поста."""
conn = None
try:
created_at = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
conn = await self._get_connection()
await conn.execute(
"INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at) VALUES (?, ?, ?, ?)",
(message_id, text, author_id, created_at)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при добавлении поста: {e}")
raise
finally:
if conn:
await conn.close()
async def update_helper_message(self, message_id: int, helper_message_id: int):
"""Обновление helper сообщения."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?",
(helper_message_id, message_id)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при обновлении helper сообщения: {e}")
raise
finally:
if conn:
await conn.close()
async def add_post_content(self, post_id: int, message_id: int, content_name: str, content_type: str):
"""Добавление контента поста."""
conn = None
try:
conn = await self._get_connection()
# Сначала добавляем связь
await conn.execute(
"INSERT INTO message_link_to_content (post_id, message_id) VALUES (?, ?)",
(post_id, message_id)
)
# Затем добавляем контент
await conn.execute(
"INSERT INTO content_post_from_telegram (message_id, content_name, content_type) VALUES (?, ?, ?)",
(message_id, content_name, content_type)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при добавлении контента поста: {e}")
raise
finally:
if conn:
await conn.close()
async def get_post_content(self, last_post_id: int) -> List[Tuple[str, str]]:
"""Получение контента поста."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute("""
SELECT cpft.content_name, cpft.content_type
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id
JOIN content_post_from_telegram cpft ON cpft.message_id = mltc.message_id
WHERE pft.helper_text_message_id = ?
""", (last_post_id,)) as cursor:
return await cursor.fetchall()
except Exception as e:
self.logger.error(f"Ошибка при получении контента поста: {e}")
raise
finally:
if conn:
await conn.close()
async def get_post_text(self, last_post_id: int) -> Optional[str]:
"""Получение текста поста."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT text FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
(last_post_id,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении текста поста: {e}")
raise
finally:
if conn:
await conn.close()
async def get_post_ids(self, last_post_id: int) -> List[int]:
"""Получение ID постов."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute("""
SELECT mltc.message_id
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id
WHERE pft.helper_text_message_id = ?
""", (last_post_id,)) as cursor:
result = await cursor.fetchall()
return [row[0] for row in result]
except Exception as e:
self.logger.error(f"Ошибка при получении ID постов: {e}")
raise
finally:
if conn:
await conn.close()
async def get_author_id_by_message(self, message_id: int) -> Optional[int]:
"""Получение ID автора по message_id."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT author_id FROM post_from_telegram_suggest WHERE message_id = ?",
(message_id,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении ID автора: {e}")
raise
finally:
if conn:
await conn.close()
async def get_author_id_by_helper_message(self, helper_message_id: int) -> Optional[int]:
"""Получение ID автора по helper_message_id."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT author_id FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
(helper_message_id,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении ID автора по helper сообщению: {e}")
raise
finally:
if conn:
await conn.close()
async def get_last_users(self, limit: int = 30) -> List[Tuple[str, int]]:
"""Получение последних пользователей."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT ?",
(limit,)
) as cursor:
return await cursor.fetchall()
except Exception as e:
self.logger.error(f"Ошибка при получении последних пользователей: {e}")
raise
finally:
if conn:
await conn.close()
async def get_user_by_message_id(self, message_id: int) -> Optional[int]:
"""Получение пользователя по message_id."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT user_id FROM user_messages WHERE message_id = ?",
(message_id,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении пользователя по message_id: {e}")
raise
finally:
if conn:
await conn.close()
# Методы для работы с черным списком
async def add_to_blacklist(self, user_id: int, user_name: str = None,
message_for_user: str = None, date_to_unban: str = None):
"""Добавление пользователя в черный список."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)",
(user_id, user_name, message_for_user, date_to_unban)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при добавлении в черный список: {e}")
raise
finally:
if conn:
await conn.close()
async def remove_from_blacklist(self, user_id: int) -> bool:
"""Удаление пользователя из черного списка."""
conn = None
try:
conn = await self._get_connection()
await conn.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
await conn.commit()
return True
except Exception as e:
self.logger.error(f"Ошибка при удалении из черного списка: {e}")
return False
finally:
if conn:
await conn.close()
async def check_blacklist(self, user_id: int) -> bool:
"""Проверка пользователя в черном списке."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,)
) as cursor:
result = await cursor.fetchone()
return bool(result)
except Exception as e:
self.logger.error(f"Ошибка при проверке черного списка: {e}")
raise
finally:
if conn:
await conn.close()
async def get_blacklist_users(self, offset: int = 0, limit: int = 10) -> List[Tuple[str, int, str, str]]:
"""Получение пользователей из черного списка."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist LIMIT ?, ?",
(offset, limit)
) as cursor:
return await cursor.fetchall()
except Exception as e:
self.logger.error(f"Ошибка при получении пользователей из черного списка: {e}")
raise
finally:
if conn:
await conn.close()
async def get_blacklist_count(self) -> int:
"""Получение количества пользователей в черном списке."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute("SELECT COUNT(*) FROM blacklist") as cursor:
result = await cursor.fetchone()
return result[0] if result else 0
except Exception as e:
self.logger.error(f"Ошибка при получении количества пользователей в черном списке: {e}")
raise
finally:
if conn:
await conn.close()
async def get_users_for_unban_today(self, date_to_unban: str) -> List[Tuple[int, str]]:
"""Получение пользователей для разблокировки сегодня."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT user_id, user_name FROM blacklist WHERE date_to_unban = ?",
(date_to_unban,)
) as cursor:
return await cursor.fetchall()
except Exception as e:
self.logger.error(f"Ошибка при получении пользователей для разблокировки: {e}")
raise
finally:
if conn:
await conn.close()
# Методы для работы с администраторами
async def add_admin(self, user_id: int, role: str = "admin"):
"""Добавление администратора."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"INSERT INTO admins (user_id, role) VALUES (?, ?)",
(user_id, role)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при добавлении администратора: {e}")
raise
finally:
if conn:
await conn.close()
async def remove_admin(self, user_id: int):
"""Удаление администратора."""
conn = None
try:
conn = await self._get_connection()
await conn.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при удалении администратора: {e}")
raise
finally:
if conn:
await conn.close()
async def is_admin(self, user_id: int) -> bool:
"""Проверка, является ли пользователь администратором."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT 1 FROM admins WHERE user_id = ?", (user_id,)
) as cursor:
result = await cursor.fetchone()
return bool(result)
except Exception as e:
self.logger.error(f"Ошибка при проверке прав администратора: {e}")
return False
finally:
if conn:
await conn.close()
# Методы для работы с аудио
async def add_audio_record(self, file_name: str, author_id: int, file_id: str):
"""Добавление аудио записи."""
conn = None
try:
date_added = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
conn = await self._get_connection()
await conn.execute(
"INSERT INTO audio_message_reference (file_name, author_id, date_added, file_id) VALUES (?, ?, ?, ?)",
(file_name, author_id, date_added, file_id)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при добавлении аудио записи: {e}")
raise
finally:
if conn:
await conn.close()
async def get_last_audio_date(self) -> Optional[str]:
"""Получение даты последнего аудио."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT date_added FROM audio_message_reference ORDER BY date_added DESC LIMIT 1"
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении даты последнего аудио: {e}")
raise
finally:
if conn:
await conn.close()
async def get_user_audio_records(self, user_id: int) -> bool:
"""Проверка наличия аудио записей у пользователя."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT 1 FROM audio_message_reference WHERE author_id = ? LIMIT 1",
(user_id,)
) as cursor:
result = await cursor.fetchone()
return bool(result)
except Exception as e:
self.logger.error(f"Ошибка при проверке аудио записей пользователя: {e}")
raise
finally:
if conn:
await conn.close()
async def get_audio_file_id(self, user_id: int) -> Optional[str]:
"""Получение file_id последнего аудио пользователя."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT file_id FROM audio_message_reference WHERE author_id = ? ORDER BY date_added DESC LIMIT 1",
(user_id,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении file_id аудио: {e}")
raise
finally:
if conn:
await conn.close()
async def get_audio_file_name(self, user_id: int) -> Optional[str]:
"""Получение имени файла последнего аудио пользователя."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT file_name FROM audio_message_reference WHERE author_id = ? ORDER BY date_added DESC LIMIT 1",
(user_id,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении имени файла аудио: {e}")
raise
finally:
if conn:
await conn.close()
async def check_audio_listened(self, user_id: int) -> List[str]:
"""Проверка прослушанных аудио пользователем."""
conn = None
try:
conn = await self._get_connection()
# Получаем все аудио файлы
async with conn.execute(
"SELECT file_name FROM audio_message_reference WHERE author_id != ?",
(user_id,)
) as cursor:
all_audio = await cursor.fetchall()
# Получаем прослушанные пользователем
async with conn.execute("""
SELECT l.file_name
FROM audio_message_reference a
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
WHERE l.user_id = ? AND l.file_name IS NOT NULL
""", (user_id,)) as cursor:
listened_audio = await cursor.fetchall()
# Находим непрослушанные
all_audio_names = {row[0] for row in all_audio}
listened_names = {row[0] for row in listened_audio}
return list(all_audio_names - listened_names)
except Exception as e:
self.logger.error(f"Ошибка при проверке прослушанных аудио: {e}")
raise
finally:
if conn:
await conn.close()
async def mark_audio_listened(self, file_name: str, user_id: int):
"""Отметка аудио как прослушанного."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"INSERT INTO listen_audio_users (file_name, user_id, is_listen) VALUES (?, ?, ?)",
(file_name, user_id, 1)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при отметке аудио как прослушанного: {e}")
raise
finally:
if conn:
await conn.close()
async def clear_user_audio_listen(self, user_id: int):
"""Очистка данных о прослушивании аудио пользователем."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"DELETE FROM listen_audio_users WHERE user_id = ?",
(user_id,)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при очистке данных о прослушивании: {e}")
raise
finally:
if conn:
await conn.close()
async def get_user_by_audio_file(self, file_name: str) -> Optional[int]:
"""Получение пользователя по имени аудио файла."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT author_id FROM audio_message_reference WHERE file_name = ?",
(file_name,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении пользователя по имени файла: {e}")
raise
finally:
if conn:
await conn.close()
async def get_audio_date(self, file_name: str) -> Optional[str]:
"""Получение даты аудио файла."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT date_added FROM audio_message_reference WHERE file_name = ?",
(file_name,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении даты аудио файла: {e}")
raise
finally:
if conn:
await conn.close()
# Методы для voice bot
async def set_voice_bot_message(self, message_id: int, user_id: int):
"""Установка связи message_id и user_id для voice bot."""
conn = None
try:
conn = await self._get_connection()
await conn.execute(
"INSERT INTO audio_moderate (message_id, user_id) VALUES (?, ?)",
(message_id, user_id)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при установке связи для voice bot: {e}")
raise
finally:
if conn:
await conn.close()
async def get_voice_bot_user(self, message_id: int) -> Optional[int]:
"""Получение пользователя voice bot по message_id."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT user_id FROM audio_moderate WHERE message_id = ?",
(message_id,)
) as cursor:
result = await cursor.fetchone()
return result[0] if result else None
except Exception as e:
self.logger.error(f"Ошибка при получении пользователя voice bot: {e}")
raise
finally:
if conn:
await conn.close()
# Методы для миграций
async def get_migration_version(self) -> int:
"""Получение текущей версии миграции."""
conn = None
try:
conn = await self._get_connection()
async with conn.execute(
"SELECT version FROM migrations ORDER BY version DESC LIMIT 1"
) as cursor:
result = await cursor.fetchone()
return result[0] if result else 0
except Exception as e:
self.logger.error(f"Ошибка при получении версии миграции: {e}")
raise
finally:
if conn:
await conn.close()
async def update_migration_version(self, version: int, script_name: str):
"""Обновление версии миграции."""
conn = None
try:
created_at = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
conn = await self._get_connection()
await conn.execute(
"INSERT INTO migrations (version, script_name, created_at) VALUES (?, ?, ?)",
(version, script_name, created_at)
)
await conn.commit()
except Exception as e:
self.logger.error(f"Ошибка при обновлении версии миграции: {e}")
raise
finally:
if conn:
await conn.close()
async def close(self):
"""Закрытие соединений."""
# Соединения закрываются в каждом методе
pass

View File

@@ -1 +1,20 @@
from .private_handlers import private_router
"""Private handlers package for Telegram bot"""
from .private_handlers import private_router, create_private_handlers, PrivateHandlers
from .services import BotSettings, UserService, PostService, StickerService
from .constants import FSM_STATES, BUTTON_TEXTS, ERROR_MESSAGES
from .decorators import error_handler
__all__ = [
'private_router',
'create_private_handlers',
'PrivateHandlers',
'BotSettings',
'UserService',
'PostService',
'StickerService',
'FSM_STATES',
'BUTTON_TEXTS',
'ERROR_MESSAGES',
'error_handler'
]

View File

@@ -0,0 +1,29 @@
"""Constants for private handlers"""
# FSM States
FSM_STATES = {
"START": "START",
"SUGGEST": "SUGGEST",
"PRE_CHAT": "PRE_CHAT",
"CHAT": "CHAT"
}
# Button texts
BUTTON_TEXTS = {
"SUGGEST_POST": "📢Предложить свой пост",
"SAY_GOODBYE": "👋🏼Сказать пока!",
"LEAVE_CHAT": "Выйти из чата",
"RETURN_TO_BOT": "Вернуться в бота",
"WANT_STICKERS": "🤪Хочу стикеры",
"CONNECT_ADMIN": "📩Связаться с админами"
}
# Error messages
ERROR_MESSAGES = {
"UNSUPPORTED_CONTENT": (
'Я пока не умею работать с таким сообщением. '
'Пришли текст и фото/фоты(ы). А лучше перешли это сообщение админу @kerrad1\n'
'Мы добавим его к обработке если необходимо'
),
"STICKERS_LINK": "Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk"
}

View File

@@ -0,0 +1,29 @@
"""Decorators and utility functions for private handlers"""
import traceback
from aiogram import types
from logs.custom_logger import logger
def error_handler(func):
"""Decorator for centralized error handling"""
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Error in {func.__name__}: {str(e)}")
# Try to send error to logs if possible
try:
message = next((arg for arg in args if isinstance(arg, types.Message)), None)
if message and hasattr(message, 'bot'):
from helper_bot.utils.base_dependency_factory import get_global_instance
bdf = get_global_instance()
important_logs = bdf.settings['Telegram']['important_logs']
await message.bot.send_message(
chat_id=important_logs,
text=f"Произошла ошибка в {func.__name__}: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
)
except:
pass # If we can't log the error, at least it was logged to logger
raise
return wrapper

View File

@@ -16,488 +16,210 @@ from helper_bot.keyboards.keyboards import get_reply_keyboard_leave_chat
from helper_bot.middlewares.album_middleware import AlbumMiddleware
from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware
from helper_bot.utils import messages
from helper_bot.utils.base_dependency_factory import get_global_instance
from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \
send_media_group_message_to_private_chat, prepare_media_group_from_middlewares, send_video_message, \
send_video_note_message, send_audio_message, send_voice_message, add_in_db_media, \
check_user_emoji, check_username_and_full_name, update_user_info
from logs.custom_logger import logger
private_router = Router()
private_router.message.middleware(AlbumMiddleware())
private_router.message.middleware(BlacklistMiddleware())
bdf = get_global_instance()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = bdf.get_db()
# Import new modular components
from .constants import FSM_STATES, BUTTON_TEXTS, ERROR_MESSAGES
from .services import BotSettings, UserService, PostService, StickerService
from .decorators import error_handler
# Expose sleep for tests (tests patch helper_bot.handlers.private.private_handlers.sleep)
sleep = asyncio.sleep
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
Command("emoji")
)
async def handle_emoji_message(message: types.Message, state: FSMContext):
await message.forward(chat_id=GROUP_FOR_LOGS)
class PrivateHandlers:
"""Main handler class for private messages"""
def __init__(self, db, settings: BotSettings):
self.db = db
self.settings = settings
self.user_service = UserService(db, settings)
self.post_service = PostService(db, settings)
self.sticker_service = StickerService(settings)
# Create router
self.router = Router()
self.router.message.middleware(AlbumMiddleware())
self.router.message.middleware(BlacklistMiddleware())
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register all message handlers"""
# Command handlers
self.router.message.register(self.handle_emoji_message, ChatTypeFilter(chat_type=["private"]), Command("emoji"))
self.router.message.register(self.handle_restart_message, ChatTypeFilter(chat_type=["private"]), Command("restart"))
self.router.message.register(self.handle_start_message, ChatTypeFilter(chat_type=["private"]), Command("start"))
self.router.message.register(self.handle_start_message, ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["RETURN_TO_BOT"])
# Button handlers
self.router.message.register(self.suggest_post, StateFilter(FSM_STATES["START"]), ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["SUGGEST_POST"])
self.router.message.register(self.end_message, ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["SAY_GOODBYE"])
self.router.message.register(self.end_message, ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["LEAVE_CHAT"])
self.router.message.register(self.stickers, ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["WANT_STICKERS"])
self.router.message.register(self.connect_with_admin, StateFilter(FSM_STATES["START"]), ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["CONNECT_ADMIN"])
# State handlers
self.router.message.register(self.suggest_router, StateFilter(FSM_STATES["SUGGEST"]), ChatTypeFilter(chat_type=["private"]))
self.router.message.register(self.resend_message_in_group_for_message, StateFilter(FSM_STATES["PRE_CHAT"]), ChatTypeFilter(chat_type=["private"]))
self.router.message.register(self.resend_message_in_group_for_message, StateFilter(FSM_STATES["CHAT"]), ChatTypeFilter(chat_type=["private"]))
@error_handler
async def handle_emoji_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle emoji command"""
await self.user_service.log_user_message(message)
user_emoji = check_user_emoji(message)
await state.set_state("START")
await state.set_state(FSM_STATES["START"])
if user_emoji is not None:
await message.answer(f'Твоя эмодзя - {user_emoji}', parse_mode='HTML')
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
Command("restart")
)
async def handle_restart_message(message: types.Message, state: FSMContext):
try:
markup = get_reply_keyboard(BotDB, message.from_user.id)
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("START")
@error_handler
async def handle_restart_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle restart command"""
markup = get_reply_keyboard(self.db, message.from_user.id)
await self.user_service.log_user_message(message)
await state.set_state(FSM_STATES["START"])
await update_user_info('love', message)
check_user_emoji(message)
await message.answer('Я перезапущен!', reply_markup=markup, parse_mode='HTML')
except Exception as e:
logger.error(f"Произошла ошибка handle_restart_message. Ошибка:{str(e)}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка handle_restart_message: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@error_handler
async def handle_start_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle start command and return to bot button"""
await self.user_service.log_user_message(message)
await self.user_service.ensure_user_exists(message)
await state.set_state(FSM_STATES["START"])
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
Command("start")
)
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == 'Вернуться в бота'
)
async def handle_start_message(message: types.Message, state: FSMContext):
try:
await message.forward(chat_id=GROUP_FOR_LOGS)
full_name = message.from_user.full_name
username = message.from_user.username
first_name = get_first_name(message)
is_bot = message.from_user.is_bot
language_code = message.from_user.language_code
user_id = message.from_user.id
# Send sticker
await self.sticker_service.send_random_hello_sticker(message)
# Проверяем наличие username для логирования
if not username:
# Экранируем full_name для безопасного использования
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
text=f'Пользователь {user_id} ({safe_full_name}) обратился к боту без username')
logger.warning(f"Пользователь {user_id} ({safe_full_name}) обратился к боту без username")
# Устанавливаем значение по умолчанию для username
username = "private_username"
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
if not BotDB.user_exists(user_id):
# Для первоначального добавления эмодзи пока не назначаем (совместимость)
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, "", date,
date)
else:
is_need_update = check_username_and_full_name(user_id, username, full_name, BotDB)
if is_need_update:
BotDB.update_username_and_full_name(user_id, username, full_name)
# Экранируем пользовательские данные для безопасного использования
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
safe_username = html.escape(username) if username else "Без никнейма"
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}")
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}')
await asyncio.sleep(1)
BotDB.update_date_for_user(date, user_id)
await state.set_state("START")
logger.info(
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} "
f"Имя автора сообщения: {message.from_user.full_name})")
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
random_stick_hello = random.choice(name_stick_hello)
random_stick_hello = FSInputFile(path=random_stick_hello)
logger.info(f"Стикер успешно получен из БД")
await message.answer_sticker(random_stick_hello)
await asyncio.sleep(0.3)
except Exception as e:
logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
markup = get_reply_keyboard(BotDB, message.from_user.id)
# Send welcome message
markup = get_reply_keyboard(self.db, message.from_user.id)
hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE')
await message.answer(hello_message, reply_markup=markup, parse_mode='HTML')
except Exception as e:
logger.error(
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
await message.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@error_handler
async def suggest_post(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle suggest post button"""
await self.user_service.update_user_activity(message.from_user.id)
await self.user_service.log_user_message(message)
await state.set_state(FSM_STATES["SUGGEST"])
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
Command("restart")
)
async def restart_function(message: types.Message, state: FSMContext):
await message.forward(chat_id=GROUP_FOR_LOGS)
full_name = message.from_user.full_name
username = message.from_user.username
user_id = message.from_user.id
# Проверяем наличие username для логирования
if not username:
# Экранируем full_name для безопасного использования
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
text=f'Пользователь {user_id} ({safe_full_name}) обратился к боту без username')
logger.warning(f"Пользователь {user_id} ({safe_full_name}) обратился к боту без username")
# Устанавливаем значение по умолчанию для username
username = "private_username"
markup = get_reply_keyboard(BotDB, message.from_user.id)
await message.answer(text='Я перезапущен!',
reply_markup=markup)
await state.set_state('START')
@private_router.message(
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
F.text == '📢Предложить свой пост'
)
async def suggest_post(message: types.Message, state: FSMContext):
try:
user_id = message.from_user.id
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.update_date_for_user(date, user_id)
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("SUGGEST")
current_state = await state.get_state()
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {safe_full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
markup = types.ReplyKeyboardRemove()
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
await message.answer(suggest_news)
await asyncio.sleep(0.3)
suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2')
await message.answer(suggest_news_2, reply_markup=markup)
except Exception as e:
await message.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@error_handler
async def end_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle goodbye button"""
await self.user_service.update_user_activity(message.from_user.id)
await self.user_service.log_user_message(message)
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == '👋🏼Сказать пока!'
)
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == 'Выйти из чата'
)
async def end_message(message: types.Message, state: FSMContext):
try:
user_id = message.from_user.id
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.update_date_for_user(date, user_id)
await message.forward(chat_id=GROUP_FOR_LOGS)
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
random_stick_bye = random.choice(name_stick_bye)
random_stick_bye = FSInputFile(path=random_stick_bye)
await message.answer_sticker(random_stick_bye)
except Exception as e:
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.error(
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
# Send sticker
await self.sticker_service.send_random_goodbye_sticker(message)
# Send goodbye message
markup = types.ReplyKeyboardRemove()
bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE')
await message.answer(bye_message, reply_markup=markup)
await state.set_state("START")
except Exception as e:
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.error(
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
await state.set_state(FSM_STATES["START"])
@error_handler
async def suggest_router(self, message: types.Message, state: FSMContext, album: list = None, **kwargs):
"""Handle post submission in suggest state"""
await self.post_service.process_post(message, album)
@private_router.message(
StateFilter("SUGGEST"),
ChatTypeFilter(chat_type=["private"]),
# Send success message and return to start state
markup_for_user = get_reply_keyboard(self.db, message.from_user.id)
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state(FSM_STATES["START"])
@error_handler
async def stickers(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle stickers request"""
markup = get_reply_keyboard(self.db, message.from_user.id)
self.db.update_info_about_stickers(user_id=message.from_user.id)
await self.user_service.log_user_message(message)
await message.answer(
text=ERROR_MESSAGES["STICKERS_LINK"],
reply_markup=markup
)
async def suggest_router(message: types.Message, state: FSMContext, album: list = None):
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
first_name = get_first_name(message)
try:
post_caption = ''
if message.media_group_id is not None:
# Экранируем username для безопасного использования
safe_username = html.escape(message.from_user.username) if message.from_user.username else "Без никнейма"
await send_text_message(GROUP_FOR_LOGS, message,
f'Закинул медиагруппу, пользователь: имя - {first_name}, ник - {safe_username}')
else:
await message.forward(chat_id=GROUP_FOR_LOGS)
if message.content_type == 'text':
lower_text = message.text.lower()
# Получаем текст сообщения и преобразовываем его по правилам
post_text = get_text_message(lower_text, first_name,
message.from_user.username)
# Получаем клавиатуру для поста
markup = get_reply_keyboard_for_post()
await state.set_state(FSM_STATES["START"])
# Отправляем сообщение в приватный канал
sent_message_id = await send_text_message(GROUP_FOR_POST, message, post_text, markup)
# Записываем в базу пост
BotDB.add_post_in_db(sent_message_id, message.text, message.from_user.id)
# Отправляем юзеру ответ, что сообщение отравлено и возвращаем его в меню
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.content_type == 'photo' and message.media_group_id is None:
if message.caption:
lower_caption = message.caption.lower()
# Получаем текст сообщения и преобразовываем его по правилам
post_caption = get_text_message(lower_caption, first_name,
message.from_user.username)
markup = get_reply_keyboard_for_post()
# Отправляем фото и текст в приватный канал
sent_message = await send_photo_message(GROUP_FOR_POST, message,
message.photo[-1].file_id, post_caption, markup)
# Записываем в базу пост и контент
BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, BotDB)
# Отправляем юзеру ответ и возвращаем его в меню
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.content_type == 'video' and message.media_group_id is None:
if message.caption:
lower_caption = message.caption.lower()
post_caption = get_text_message(lower_caption, first_name,
message.from_user.username)
markup = get_reply_keyboard_for_post()
# Получаем текст сообщения и преобразовываем его по правилам
# Отправляем видео и текст в приватный канал
sent_message = await send_video_message(GROUP_FOR_POST, message,
message.video.file_id, post_caption, markup)
# Записываем в базу пост и контент
BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, BotDB)
# Записываем в базу пост и контент
BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message)
# Отправляем юзеру ответ и возвращаем его в меню
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.content_type == 'video_note' and message.media_group_id is None:
markup = get_reply_keyboard_for_post()
# Отправляем видеокружок в приватный канал
sent_message = await send_video_note_message(GROUP_FOR_POST, message,
message.video_note.file_id, markup)
# Записываем в базу пост и контент
BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, BotDB)
# Отправляем юзеру ответ и возвращаем его в меню
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.content_type == 'audio' and message.media_group_id is None:
if message.caption:
lower_caption = message.caption.lower()
# Получаем текст сообщения и преобразовываем его по правилам
post_caption = get_text_message(lower_caption, first_name,
message.from_user.username)
markup = get_reply_keyboard_for_post()
# Отправляем аудио и текст в приватный канал
sent_message = await send_audio_message(GROUP_FOR_POST, message,
message.audio.file_id, post_caption, markup)
# Записываем в базу пост и контент
BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, BotDB)
# Отправляем юзеру ответ и возвращаем его в меню
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.content_type == 'voice' and message.media_group_id is None:
markup = get_reply_keyboard_for_post()
# Отправляем войс и текст в приватный канал
sent_message = await send_voice_message(GROUP_FOR_POST, message,
message.voice.file_id, markup)
# Записываем в базу пост и контент
BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, BotDB)
# Отправляем юзеру ответ и возвращаем его в меню
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.media_group_id is not None:
post_caption = " "
# Получаем сообщение и проверяем есть ли подпись. Если подпись есть, то преобразуем ее через функцию
if album[0].caption:
lower_caption = album[0].caption.lower()
post_caption = get_text_message(lower_caption, first_name,
message.from_user.username)
# Иначе обрабатываем фото и получаем медиагруппу
media_group = await prepare_media_group_from_middlewares(album, post_caption)
# Отправляем медиагруппу в секретный чат
media_group_message_id = await send_media_group_message_to_private_chat(GROUP_FOR_POST, message,
media_group, BotDB)
await asyncio.sleep(0.2)
# Получаем клавиатуру и отправляем еще одно текстовое сообщение с кнопками
markup = get_reply_keyboard_for_post()
help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup)
# Записываем в state идентификаторы текстового сообщения И последнего сообщения медиагруппы
BotDB.update_helper_message_in_db(message_id=media_group_message_id, helper_message_id=help_message_id)
# Получаем клавиатуру для пользователя, благодарим за пост, и возвращаем в дефолтное сообщение
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
else:
await message.bot.send_message(message.chat.id,
'Я пока не умею работать с таким сообщением. '
'Пришли текст и фото/фоты(ы). А лучше перешли это сообщение админу @kerrad1\n'
'Мы добавим его к обработке если необходимо')
except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == '🤪Хочу стикеры'
)
async def stickers(message: types.Message, state: FSMContext):
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
markup = get_reply_keyboard(BotDB, message.from_user.id)
try:
BotDB.update_info_about_stickers(user_id=message.from_user.id)
await message.forward(chat_id=GROUP_FOR_LOGS)
await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
reply_markup=markup)
await state.set_state("START")
except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.error(
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
@private_router.message(
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
F.text == '📩Связаться с админами'
)
async def connect_with_admin(message: types.Message, state: FSMContext):
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
user_id = message.from_user.id
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.update_date_for_user(date, user_id)
@error_handler
async def connect_with_admin(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle connect with admin button"""
await self.user_service.update_user_activity(message.from_user.id)
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN')
await message.answer(admin_message, parse_mode="html")
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("PRE_CHAT")
await self.user_service.log_user_message(message)
await state.set_state(FSM_STATES["PRE_CHAT"])
@error_handler
async def resend_message_in_group_for_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle messages in admin chat states"""
await self.user_service.update_user_activity(message.from_user.id)
await message.forward(chat_id=self.settings.group_for_message)
@private_router.message(
StateFilter("PRE_CHAT"),
ChatTypeFilter(chat_type=["private"]),
)
@private_router.message(
StateFilter("CHAT"),
ChatTypeFilter(chat_type=["private"]),
)
async def resend_message_in_group_for_message(message: types.Message, state: FSMContext):
user_id = message.from_user.id
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.update_date_for_user(date, user_id)
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {safe_full_name} Идентификатор сообщения: {message.message_id})")
await message.forward(chat_id=GROUP_FOR_MESSAGE)
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
self.db.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
question = messages.get_message(get_first_name(message), 'QUESTION')
user_state = await state.get_state()
if user_state == "PRE_CHAT":
markup = get_reply_keyboard(BotDB, message.from_user.id)
if user_state == FSM_STATES["PRE_CHAT"]:
markup = get_reply_keyboard(self.db, message.from_user.id)
await message.answer(question, reply_markup=markup)
await state.set_state("START")
elif user_state == "CHAT":
await state.set_state(FSM_STATES["START"])
elif user_state == FSM_STATES["CHAT"]:
markup = get_reply_keyboard_leave_chat()
await message.answer(question, reply_markup=markup)
# Factory function to create handlers with dependencies
def create_private_handlers(db, settings: BotSettings) -> PrivateHandlers:
"""Create private handlers instance with dependencies"""
return PrivateHandlers(db, settings)
# Legacy router for backward compatibility
private_router = Router()
# Initialize with global dependencies (for backward compatibility)
def init_legacy_router():
"""Initialize legacy router with global dependencies"""
global private_router
from helper_bot.utils.base_dependency_factory import get_global_instance
bdf = get_global_instance()
settings = BotSettings(
group_for_posts=bdf.settings['Telegram']['group_for_posts'],
group_for_message=bdf.settings['Telegram']['group_for_message'],
main_public=bdf.settings['Telegram']['main_public'],
group_for_logs=bdf.settings['Telegram']['group_for_logs'],
important_logs=bdf.settings['Telegram']['important_logs'],
preview_link=bdf.settings['Telegram']['preview_link'],
logs=bdf.settings['Settings']['logs'],
test=bdf.settings['Settings']['test']
)
db = bdf.get_db()
handlers = create_private_handlers(db, settings)
# Instead of trying to copy handlers, we'll use the new router directly
# This maintains backward compatibility while using the new architecture
private_router = handlers.router
# Initialize legacy router
init_legacy_router()

View File

@@ -0,0 +1,239 @@
"""Service classes for private handlers"""
import random
import asyncio
import html
from datetime import datetime
from pathlib import Path
from typing import Dict, Callable
from dataclasses import dataclass
from aiogram import types
from aiogram.types import FSInputFile
from helper_bot.utils.helper_func import (
get_first_name, get_text_message, send_text_message, send_photo_message,
send_media_group_message_to_private_chat, prepare_media_group_from_middlewares,
send_video_message, send_video_note_message, send_audio_message, send_voice_message,
add_in_db_media, check_username_and_full_name
)
from helper_bot.keyboards import get_reply_keyboard_for_post
@dataclass
class BotSettings:
"""Bot configuration settings"""
group_for_posts: str
group_for_message: str
main_public: str
group_for_logs: str
important_logs: str
preview_link: str
logs: str
test: str
class UserService:
"""Service for user-related operations"""
def __init__(self, db, settings: BotSettings):
self.db = db
self.settings = settings
async def update_user_activity(self, user_id: int) -> None:
"""Update user's last activity timestamp"""
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.db.update_date_for_user(current_date, user_id)
async def ensure_user_exists(self, message: types.Message) -> None:
"""Ensure user exists in database, create if needed"""
user_id = message.from_user.id
full_name = message.from_user.full_name
username = message.from_user.username or "private_username"
first_name = get_first_name(message)
is_bot = message.from_user.is_bot
language_code = message.from_user.language_code
current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if not self.db.user_exists(user_id):
self.db.add_new_user_in_db(
user_id, first_name, full_name, username, is_bot, language_code,
"", current_date, current_date
)
else:
is_need_update = check_username_and_full_name(user_id, username, full_name, self.db)
if is_need_update:
self.db.update_username_and_full_name(user_id, username, full_name)
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
safe_username = html.escape(username) if username else "Без никнейма"
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}")
await message.bot.send_message(
chat_id=self.settings.group_for_logs,
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}')
self.db.update_date_for_user(current_date, user_id)
async def log_user_message(self, message: types.Message) -> None:
"""Forward user message to logs group"""
await message.forward(chat_id=self.settings.group_for_logs)
def get_safe_user_info(self, message: types.Message) -> tuple[str, str]:
"""Get safely escaped user information for logging"""
full_name = message.from_user.full_name or "Неизвестный пользователь"
username = message.from_user.username or "Без никнейма"
return html.escape(full_name), html.escape(username)
class PostService:
"""Service for post-related operations"""
def __init__(self, db, settings: BotSettings):
self.db = db
self.settings = settings
async def handle_text_post(self, message: types.Message, first_name: str) -> None:
"""Handle text post submission"""
post_text = get_text_message(message.text.lower(), first_name, message.from_user.username)
markup = get_reply_keyboard_for_post()
sent_message_id = await send_text_message(self.settings.group_for_posts, message, post_text, markup)
self.db.add_post_in_db(sent_message_id, message.text, message.from_user.id)
async def handle_photo_post(self, message: types.Message, first_name: str) -> None:
"""Handle photo post submission"""
post_caption = ""
if message.caption:
post_caption = get_text_message(message.caption.lower(), first_name, message.from_user.username)
markup = get_reply_keyboard_for_post()
sent_message = await send_photo_message(
self.settings.group_for_posts, message, message.photo[-1].file_id, post_caption, markup
)
self.db.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, self.db)
async def handle_video_post(self, message: types.Message, first_name: str) -> None:
"""Handle video post submission"""
post_caption = ""
if message.caption:
post_caption = get_text_message(message.caption.lower(), first_name, message.from_user.username)
markup = get_reply_keyboard_for_post()
sent_message = await send_video_message(
self.settings.group_for_posts, message, message.video.file_id, post_caption, markup
)
self.db.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, self.db)
async def handle_video_note_post(self, message: types.Message) -> None:
"""Handle video note post submission"""
markup = get_reply_keyboard_for_post()
sent_message = await send_video_note_message(
self.settings.group_for_posts, message, message.video_note.file_id, markup
)
self.db.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, self.db)
async def handle_audio_post(self, message: types.Message, first_name: str) -> None:
"""Handle audio post submission"""
post_caption = ""
if message.caption:
post_caption = get_text_message(message.caption.lower(), first_name, message.from_user.username)
markup = get_reply_keyboard_for_post()
sent_message = await send_audio_message(
self.settings.group_for_posts, message, message.audio.file_id, post_caption, markup
)
self.db.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, self.db)
async def handle_voice_post(self, message: types.Message) -> None:
"""Handle voice post submission"""
markup = get_reply_keyboard_for_post()
sent_message = await send_voice_message(
self.settings.group_for_posts, message, message.voice.file_id, markup
)
self.db.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id)
await add_in_db_media(sent_message, self.db)
async def handle_media_group_post(self, message: types.Message, album: list, first_name: str) -> None:
"""Handle media group post submission"""
post_caption = " "
if album[0].caption:
post_caption = get_text_message(album[0].caption.lower(), first_name, message.from_user.username)
media_group = await prepare_media_group_from_middlewares(album, post_caption)
media_group_message_id = await send_media_group_message_to_private_chat(
self.settings.group_for_posts, message, media_group, self.db
)
await asyncio.sleep(0.2)
markup = get_reply_keyboard_for_post()
help_message_id = await send_text_message(self.settings.group_for_posts, message, "^", markup)
self.db.update_helper_message_in_db(
message_id=media_group_message_id, helper_message_id=help_message_id
)
async def process_post(self, message: types.Message, album: list = None) -> None:
"""Process post based on content type"""
first_name = get_first_name(message)
if message.media_group_id is not None:
safe_username = html.escape(message.from_user.username) if message.from_user.username else "Без никнейма"
await send_text_message(
self.settings.group_for_logs, message,
f'Закинул медиагруппу, пользователь: имя - {first_name}, ник - {safe_username}'
)
await self.handle_media_group_post(message, album, first_name)
return
content_handlers: Dict[str, Callable] = {
'text': lambda: self.handle_text_post(message, first_name),
'photo': lambda: self.handle_photo_post(message, first_name),
'video': lambda: self.handle_video_post(message, first_name),
'video_note': lambda: self.handle_video_note_post(message),
'audio': lambda: self.handle_audio_post(message, first_name),
'voice': lambda: self.handle_voice_post(message)
}
handler = content_handlers.get(message.content_type)
if handler:
await handler()
else:
from .constants import ERROR_MESSAGES
await message.bot.send_message(
message.chat.id, ERROR_MESSAGES["UNSUPPORTED_CONTENT"]
)
class StickerService:
"""Service for sticker-related operations"""
def __init__(self, settings: BotSettings):
self.settings = settings
async def send_random_hello_sticker(self, message: types.Message) -> None:
"""Send random hello sticker"""
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
random_stick_hello = random.choice(name_stick_hello)
random_stick_hello = FSInputFile(path=random_stick_hello)
await message.answer_sticker(random_stick_hello)
await asyncio.sleep(0.3)
async def send_random_goodbye_sticker(self, message: types.Message) -> None:
"""Send random goodbye sticker"""
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
random_stick_bye = random.choice(name_stick_bye)
random_stick_bye = FSInputFile(path=random_stick_bye)
await message.answer_sticker(random_stick_bye)

View File

@@ -1,6 +1,9 @@
# Core dependencies
aiogram~=3.10.0
# Database
aiosqlite~=0.20.0
# Logging
loguru==0.7.2

174
tests/test_async_db.py Normal file
View File

@@ -0,0 +1,174 @@
import pytest
import asyncio
import os
import tempfile
from database.async_db import AsyncBotDB
@pytest.fixture
async def temp_db():
"""Создает временную базу данных для тестирования."""
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
db_path = tmp.name
db = AsyncBotDB(db_path)
yield db
# Очистка
try:
os.unlink(db_path)
except:
pass
@pytest.fixture(scope="function")
def event_loop():
"""Создает новый event loop для каждого теста."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.mark.asyncio
async def test_create_tables(temp_db):
"""Тест создания таблиц."""
await temp_db.create_tables()
# Если не возникло исключение, значит таблицы созданы успешно
assert True
@pytest.mark.asyncio
async def test_add_and_get_user(temp_db):
"""Тест добавления и получения пользователя."""
await temp_db.create_tables()
# Добавляем пользователя
user_id = 12345
first_name = "Test"
full_name = "Test User"
username = "testuser"
await temp_db.add_new_user(user_id, first_name, full_name, username)
# Проверяем существование
exists = await temp_db.user_exists(user_id)
assert exists is True
# Получаем информацию
user_info = await temp_db.get_user_info(user_id)
assert user_info is not None
assert user_info['username'] == username
assert user_info['full_name'] == full_name
@pytest.mark.asyncio
async def test_blacklist_operations(temp_db):
"""Тест операций с черным списком."""
await temp_db.create_tables()
user_id = 12345
user_name = "Test User"
message = "Test ban"
date_to_unban = "01-01-2025"
# Добавляем в черный список
await temp_db.add_to_blacklist(user_id, user_name, message, date_to_unban)
# Проверяем наличие
is_banned = await temp_db.check_blacklist(user_id)
assert is_banned is True
# Получаем список
banned_users = await temp_db.get_blacklist_users()
assert len(banned_users) == 1
assert banned_users[0][1] == user_id # user_id
# Удаляем из черного списка
removed = await temp_db.remove_from_blacklist(user_id)
assert removed is True
# Проверяем удаление
is_banned = await temp_db.check_blacklist(user_id)
assert is_banned is False
@pytest.mark.asyncio
async def test_admin_operations(temp_db):
"""Тест операций с администраторами."""
await temp_db.create_tables()
user_id = 12345
role = "admin"
# Добавляем администратора
await temp_db.add_admin(user_id, role)
# Проверяем права
is_admin = await temp_db.is_admin(user_id)
assert is_admin is True
# Удаляем администратора
await temp_db.remove_admin(user_id)
# Проверяем удаление
is_admin = await temp_db.is_admin(user_id)
assert is_admin is False
@pytest.mark.asyncio
async def test_audio_operations(temp_db):
"""Тест операций с аудио."""
await temp_db.create_tables()
user_id = 12345
file_name = "test_audio.mp3"
file_id = "test_file_id"
# Добавляем аудио запись
await temp_db.add_audio_record(file_name, user_id, file_id)
# Получаем file_id
retrieved_file_id = await temp_db.get_audio_file_id(user_id)
assert retrieved_file_id == file_id
# Получаем имя файла
retrieved_file_name = await temp_db.get_audio_file_name(user_id)
assert retrieved_file_name == file_name
@pytest.mark.asyncio
async def test_post_operations(temp_db):
"""Тест операций с постами."""
await temp_db.create_tables()
message_id = 12345
text = "Test post text"
author_id = 67890
# Добавляем пост
await temp_db.add_post(message_id, text, author_id)
# Обновляем helper сообщение
helper_message_id = 54321
await temp_db.update_helper_message(message_id, helper_message_id)
# Получаем текст поста
retrieved_text = await temp_db.get_post_text(helper_message_id)
assert retrieved_text == text
# Получаем ID автора
retrieved_author_id = await temp_db.get_author_id_by_helper_message(helper_message_id)
assert retrieved_author_id == author_id
@pytest.mark.asyncio
async def test_error_handling(temp_db):
"""Тест обработки ошибок."""
# Пытаемся получить пользователя без создания таблиц
with pytest.raises(Exception):
await temp_db.user_exists(12345)
if __name__ == "__main__":
# Запуск тестов
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,169 @@
"""Tests for refactored private handlers"""
import pytest
from unittest.mock import Mock, AsyncMock, MagicMock
from aiogram import types
from aiogram.fsm.context import FSMContext
from helper_bot.handlers.private.private_handlers import (
create_private_handlers, PrivateHandlers
)
from helper_bot.handlers.private.services import BotSettings
from helper_bot.handlers.private.constants import FSM_STATES, BUTTON_TEXTS
class TestPrivateHandlers:
"""Test class for PrivateHandlers"""
@pytest.fixture
def mock_db(self):
"""Mock database"""
db = Mock()
db.user_exists.return_value = False
db.add_new_user_in_db = Mock()
db.update_date_for_user = Mock()
db.update_info_about_stickers = Mock()
db.add_post_in_db = Mock()
db.add_new_message_in_db = Mock()
db.update_helper_message_in_db = Mock()
return db
@pytest.fixture
def mock_settings(self):
"""Mock bot settings"""
return BotSettings(
group_for_posts="test_posts",
group_for_message="test_message",
main_public="test_public",
group_for_logs="test_logs",
important_logs="test_important",
preview_link="test_link",
logs="test_logs_setting",
test="test_test_setting"
)
@pytest.fixture
def mock_message(self):
"""Mock Telegram message"""
message = Mock(spec=types.Message)
message.from_user.id = 12345
message.from_user.full_name = "Test User"
message.from_user.username = "testuser"
message.from_user.is_bot = False
message.from_user.language_code = "ru"
message.text = "test message"
message.chat.id = 12345
message.bot = Mock()
message.bot.send_message = AsyncMock()
message.forward = AsyncMock()
message.answer = AsyncMock()
message.answer_sticker = AsyncMock()
return message
@pytest.fixture
def mock_state(self):
"""Mock FSM state"""
state = Mock(spec=FSMContext)
state.set_state = AsyncMock()
state.get_state = AsyncMock(return_value=FSM_STATES["START"])
return state
def test_create_private_handlers(self, mock_db, mock_settings):
"""Test creating private handlers instance"""
handlers = create_private_handlers(mock_db, mock_settings)
assert isinstance(handlers, PrivateHandlers)
assert handlers.db == mock_db
assert handlers.settings == mock_settings
def test_private_handlers_initialization(self, mock_db, mock_settings):
"""Test PrivateHandlers initialization"""
handlers = PrivateHandlers(mock_db, mock_settings)
assert handlers.db == mock_db
assert handlers.settings == mock_settings
assert handlers.user_service is not None
assert handlers.post_service is not None
assert handlers.sticker_service is not None
assert handlers.router is not None
def test_handle_emoji_message(self, mock_db, mock_settings, mock_message, mock_state):
"""Test emoji message handler"""
handlers = create_private_handlers(mock_db, mock_settings)
# Mock the check_user_emoji function
with pytest.MonkeyPatch().context() as m:
m.setattr('helper_bot.handlers.private.private_handlers.check_user_emoji', lambda x: "😊")
# Test the handler
handlers.handle_emoji_message(mock_message, mock_state)
# Verify state was set
mock_state.set_state.assert_called_once_with(FSM_STATES["START"])
# Verify message was logged
mock_message.forward.assert_called_once_with(chat_id=mock_settings.group_for_logs)
def test_handle_start_message(self, mock_db, mock_settings, mock_message, mock_state):
"""Test start message handler"""
handlers = create_private_handlers(mock_db, mock_settings)
# Mock the get_first_name and messages functions
with pytest.MonkeyPatch().context() as m:
m.setattr('helper_bot.handlers.private.private_handlers.get_first_name', lambda x: "Test")
m.setattr('helper_bot.handlers.private.private_handlers.messages.get_message', lambda x, y: "Hello Test!")
m.setattr('helper_bot.handlers.private.private_handlers.get_reply_keyboard', lambda x, y: Mock())
# Test the handler
handlers.handle_start_message(mock_message, mock_state)
# Verify state was set
mock_state.set_state.assert_called_once_with(FSM_STATES["START"])
# Verify user was ensured to exist
mock_db.add_new_user_in_db.assert_called_once()
mock_db.update_date_for_user.assert_called_once()
class TestBotSettings:
"""Test class for BotSettings dataclass"""
def test_bot_settings_creation(self):
"""Test creating BotSettings instance"""
settings = BotSettings(
group_for_posts="posts",
group_for_message="message",
main_public="public",
group_for_logs="logs",
important_logs="important",
preview_link="link",
logs="logs_setting",
test="test_setting"
)
assert settings.group_for_posts == "posts"
assert settings.group_for_message == "message"
assert settings.main_public == "public"
assert settings.group_for_logs == "logs"
assert settings.important_logs == "important"
assert settings.preview_link == "link"
assert settings.logs == "logs_setting"
assert settings.test == "test_setting"
class TestConstants:
"""Test class for constants"""
def test_fsm_states(self):
"""Test FSM states constants"""
assert FSM_STATES["START"] == "START"
assert FSM_STATES["SUGGEST"] == "SUGGEST"
assert FSM_STATES["PRE_CHAT"] == "PRE_CHAT"
assert FSM_STATES["CHAT"] == "CHAT"
def test_button_texts(self):
"""Test button text constants"""
assert BUTTON_TEXTS["SUGGEST_POST"] == "📢Предложить свой пост"
assert BUTTON_TEXTS["SAY_GOODBYE"] == "👋🏼Сказать пока!"
assert BUTTON_TEXTS["LEAVE_CHAT"] == "Выйти из чата"
assert BUTTON_TEXTS["RETURN_TO_BOT"] == "Вернуться в бота"
assert BUTTON_TEXTS["WANT_STICKERS"] == "🤪Хочу стикеры"
assert BUTTON_TEXTS["CONNECT_ADMIN"] == "📩Связаться с админами"