diff --git a/custom_logger.py b/custom_logger.py
new file mode 100644
index 0000000..28b9655
--- /dev/null
+++ b/custom_logger.py
@@ -0,0 +1,43 @@
+import datetime
+import os
+from loguru import logger
+
+
+class Logger:
+ def __init__(self, name):
+ self.logger = logger.bind(name=name)
+
+ # Получение сегодняшней даты для имени файла
+ today = datetime.date.today().strftime('%Y-%m-%d')
+
+ # Создание папки для логов
+ current_dir = os.path.dirname(os.path.abspath(__file__))
+ logs_dir = os.path.join(current_dir, 'logs')
+ if not os.path.exists(logs_dir):
+ # Если не существует, создаем ее
+ os.makedirs(logs_dir)
+ filename = f'{logs_dir}/helper_bot_{today}.log'
+
+ # Настройка формата логов
+ self.logger.add(
+ filename,
+ rotation="00:00",
+ retention="5 days",
+ compression="zip",
+ format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {name} | {line} | {message}",
+ )
+
+ def info(self, message):
+ self.logger.info(message)
+
+ def debug(self, message):
+ self.logger.debug(message)
+
+ def warning(self, message):
+ self.logger.warning(message)
+
+ def error(self, message):
+ self.logger.error(message)
+
+ def critical(self, message):
+ self.logger.critical(message)
diff --git a/db.py b/db.py
index 62bfc72..76cef3f 100644
--- a/db.py
+++ b/db.py
@@ -1,9 +1,13 @@
import sqlite3
import configparser
import os
-import sys
+from datetime import datetime
+from loguru import logger
+from custom_logger import Logger
-config_path = os.path.join(sys.path[0], 'settings.ini')
+db_logger = Logger(name='db')
+script_dir = os.path.dirname(os.path.abspath(__file__))
+config_path = os.path.join(script_dir, 'settings.ini')
config = configparser.ConfigParser()
config.read(config_path)
LOGS = config.getboolean('Settings', 'logs')
@@ -12,10 +16,51 @@ IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
class BotDB:
- def __init__(self, db_file):
+ def __init__(self):
+ db_file_path = os.path.dirname(os.path.abspath(__file__))
+ db_file = os.path.join(db_file_path, 'tg-bot-database')
self.conn = sqlite3.connect(db_file, check_same_thread=False)
self.cursor = self.conn.cursor()
+ logger.info(f'Подключен к базе данных: {db_file_path}')
+ def create_table(self, sql_script):
+ """Создает таблицу в базе."""
+ try:
+ cursor = self.conn.cursor()
+ cursor.execute(sql_script)
+ except Exception as e:
+ print(f"Ошибка при создании таблицы: {e}")
+
+ def get_current_version(self):
+ """Получает текущую версию миграций из таблицы migrations."""
+ try:
+ cursor = self.conn.cursor()
+ cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
+ version = cursor.fetchone()[0]
+ return version
+ except Exception as e:
+ print(f"Ошибка при получении версии: {e}")
+ return 0
+
+ def update_version(self, new_version, script_name):
+ """Обновляет версию миграций в таблице migrations."""
+ logger.info(f'Попытка обновления версии: {new_version}, скрипт: {script_name}')
+ try:
+ current_date = datetime.now()
+ today = current_date.strftime("%d-%m-%Y %H:%M:%S")
+ cursor = self.conn.cursor()
+ cursor.execute(
+ "INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
+ (new_version, script_name, today),
+ )
+ self.conn.commit()
+ logger.info(f"Версия обновлена: {new_version}, скрипт: {script_name}")
+ except sqlite3.IntegrityError as e:
+ logger.error(f"Ошибка при обновлении версии: {e}")
+ except Exception as e:
+ logger.error(f"Ошибка при обновлении версии: {e}")
+
+ # TODO: Deprecated, удалить
def get_message_from_db(self, type: str, username):
"""Функция для запроса к базе данных и получения сообщений для бота. В аргументы передаются:
type - тип получаемой обратной связи, строковое значение, сохраненное в БД
@@ -35,6 +80,7 @@ class BotDB:
except sqlite3.Error as error:
print(error)
+ # TODO: Deprecated. Остался только в voice боте, удалить и оттуда
def get_error_message_from_db(self, id: int):
"""Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
id - идентификатор ошибки
@@ -48,18 +94,18 @@ class BotDB:
except sqlite3.Error as error:
print(error)
- def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed):
+ def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added,
+ date_changed):
"""Добавляем юзера в базу"""
try:
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name,
- username,is_bot,language_code,date_added,date_changed))
+ username, is_bot, language_code, date_added, date_changed))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
-
def user_exists(self, user_id):
"""Проверяем, есть ли юзер в базе"""
try:
@@ -68,7 +114,6 @@ class BotDB:
except sqlite3.Error as error:
print(error)
-
def get_user_id(self, user_id):
"""Достаем id юзера в базе по его user_id"""
try:
@@ -77,7 +122,6 @@ class BotDB:
except sqlite3.Error as error:
print(error)
-
def get_username(self, user_id):
"""Достаем id юзера в базе по его user_id"""
try:
@@ -89,7 +133,7 @@ class BotDB:
def get_all_user_id(self):
"""Достаем все айдишники юзеров из БД и преобразуем их в список"""
try:
- result = self.cursor.execute("SELECT `user_id` FROM `our_users`",)
+ result = self.cursor.execute("SELECT `user_id` FROM `our_users`", )
fetch_all = result.fetchall()
list_of_users = []
for i in fetch_all:
@@ -105,7 +149,6 @@ class BotDB:
except sqlite3.Error as error:
print(error)
-
def change_name(self, user_id):
#TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. ОБновляем поля first_name, date_changed
#result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), )
@@ -114,7 +157,9 @@ class BotDB:
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
"""Добавляет информацию о войсе юзера в БД"""
try:
- result = self.cursor.execute("INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)", (file_name, author_id, date_added, listen_count, file_id))
+ result = self.cursor.execute(
+ "INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)",
+ (file_name, author_id, date_added, listen_count, file_id))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
@@ -128,11 +173,11 @@ class BotDB:
except sqlite3.Error as error:
print(error)
-
def get_last_user_audio_record(self, user_id):
"""Получает данные о количестве записей пользователя"""
try:
- result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?", (user_id,))
+ result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
+ (user_id,))
return bool(len(result.fetchall()))
except sqlite3.Error as error:
print(error)
@@ -140,8 +185,9 @@ class BotDB:
def get_id_for_audio_record(self, user_id):
"""Получает ID аудио сообщения пользователя"""
try:
- result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
- (user_id,))
+ result = self.cursor.execute(
+ "SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
+ (user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
@@ -149,7 +195,9 @@ class BotDB:
def get_path_for_audio_record(self, user_id):
"""Получает данные о названии файла"""
try:
- result = self.cursor.execute("SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,))
+ result = self.cursor.execute(
+ "SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
+ (user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
@@ -162,9 +210,10 @@ class BotDB:
FROM audio_message_reference a
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
WHERE l.user_id = ?
- AND l.file_name IS NOT NULL""" , (user_id,))
+ AND l.file_name IS NOT NULL""", (user_id,))
check_sign = query_listen_audio.fetchall()
- query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?', (user_id,))
+ query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
+ (user_id,))
sign_all_audio = query_all_audio.fetchall()
new_sign1 = list(set(sign_all_audio) - set(check_sign))
new_sign = []
@@ -177,7 +226,9 @@ class BotDB:
def mark_listened_audio(self, file_name, user_id):
"""Отмечает аудио прослушанным для конкретного пользователя."""
try:
- result = self.cursor.execute("INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)", (file_name, user_id, 1))
+ result = self.cursor.execute(
+ "INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
+ (file_name, user_id, 1))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
@@ -186,7 +237,8 @@ class BotDB:
"""Получает данные о получении стикеров пользователем"""
try:
result = self.cursor.execute("SELECT `has_stickers` FROM `our_users` WHERE `user_id` = ?", (user_id,))
- return result.fetchone()[0]
+ return result.fetchone()[0] == 1
+ #return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
@@ -198,7 +250,157 @@ class BotDB:
except sqlite3.Error as error:
print(error)
+ def get_users_blacklist(self):
+ """Возвращает список пользователей в черном списке"""
+ try:
+ result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist`")
+ fetch_all = result.fetchall()
+ list_of_users = {}
+ for i in fetch_all:
+ list_of_users[i[0]] = i[1]
+ return list_of_users
+ except sqlite3.Error as error:
+ print(error)
+
+ def get_users_for_unblock_today(self, date_to_unban):
+ """Возвращает пользователей у которых истекает срок блокировки сегодня"""
+ try:
+ result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist` WHERE date_to_unban = ?", (date_to_unban,))
+ fetch_all = result.fetchall()
+ list_of_users = {}
+ for i in fetch_all:
+ list_of_users[i[0]] = i[1]
+ return list_of_users
+ except sqlite3.Error as error:
+ print(error)
+
+ def get_blacklist_users_by_id(self, user_id):
+ """Возвращает список пользователей в черном списке по user_id"""
+ try:
+ result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban FROM `blacklist` WHERE user_id = ?", (user_id, ))
+ return self.cursor.fetchone()
+ except sqlite3.Error as error:
+ print(error)
+
+
+ def check_user_in_blacklist(self, user_id):
+ """Проверяет, существует ли запись с данным user_id в blacklist."""
+ self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ return bool(result)
+
+ def set_user_blacklist(self, user_id, user_name=None, message_for_user=None, date_to_unban=None):
+ """Добавляет пользователя в черный список"""
+ try:
+ result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
+ " 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
+ (user_id, user_name, message_for_user, date_to_unban,))
+ return self.conn.commit()
+ except sqlite3.Error as error:
+ return error
+
+ def delete_user_blacklist(self, user_id):
+ """Удаляет пользователя из черного списка"""
+ try:
+ #TODO: Функция всегда возвращает true, даже если такого id нет в таблице
+ self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
+ self.conn.commit()
+ logger.info(f"Пользователь с идентификатором {user_id} успешно удален.")
+ return True
+ except sqlite3.Error as error:
+ logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} из таблицы blacklist. Ошибка: {str(error)}")
+ return False
+
+ def add_new_message_in_db(self, message_text, user_id, message_id, date, has_answer):
+ """Добавляем сообщение юзера в базу"""
+ try:
+
+ self.cursor.execute(
+ "INSERT INTO `user_messages` (message_text, user_id, message_id, date, has_answer) "
+ "VALUES (?, ?, ?, ?, ?)",
+ (message_text, message_id, user_id, date, has_answer))
+ return self.conn.commit()
+ except sqlite3.Error as error:
+ print(error)
+
+ def update_date_for_user(self, date, user_id: int):
+ try:
+ result = self.cursor.execute("UPDATE `our_users` SET `date_changed` = ? WHERE `user_id` = ?",
+ (date, user_id,))
+ return self.conn.commit()
+ except sqlite3.Error as error:
+ print(error)
+
+ def is_admin(self, user_id):
+ """
+ Проверяет, является ли пользователь администратором.
+ Args:
+ user_id: ID пользователя Telegram.
+ Returns:
+ True, если пользователь администратор, иначе False.
+ """
+ self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ return bool(result)
+
+ def add_admin(self, user_id, role):
+ """
+ Добавляет пользователя в список администраторов.
+ Args:
+ user_id: ID пользователя Telegram.
+ role: Роль пользователя.
+ Доступные варианты:
+ 1. creator - создатель
+ 2. admin - обычная роль
+ """
+ self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
+ return self.conn.commit()
+
+ def remove_admin(self, user_id):
+ """
+ Удаляет пользователя из списка администраторов.
+
+ Args:
+ user_id: ID пользователя Telegram.
+ """
+ self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
+ return self.conn.commit()
+
+ def get_user_by_message_id(self, message_id):
+ """Возвращает идентификатор пользователя по идентификатору сообщения"""
+ try:
+ result = self.cursor.execute("SELECT user_id FROM `user_messages` WHERE message_id = ?", (message_id,))
+ return result.fetchone()[0]
+ except sqlite3.Error as error:
+ print(error)
+
+ def get_last_users_from_db(self):
+ """Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
+ try:
+ result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC")
+ users = result.fetchall()
+ return users
+ except sqlite3.Error as error:
+ print(error)
+
+ def get_banned_users_from_db(self):
+ """Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
+ try:
+ result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist", )
+ users = result.fetchall()
+ return users
+ except sqlite3.Error as error:
+ print(error)
+
+ def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
+ """Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
+ try:
+ result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist LIMIT ?, ?", (offset, limit,) )
+ users = result.fetchall()
+ return users
+ except sqlite3.Error as error:
+ print(error)
+
def close(self):
"""Закрываем соединение с БД"""
self.conn.close()
-
diff --git a/main.py b/main.py
index 08ee2da..9cbe318 100644
--- a/main.py
+++ b/main.py
@@ -3,19 +3,25 @@ import os
import sys
from pathlib import Path
from time import sleep
+from enum import Enum
+from typing import Any
+from apscheduler.schedulers.background import BackgroundScheduler
+import db
from db import BotDB
import telebot
import random
-from datetime import datetime
+from datetime import datetime, timedelta
import time
from telebot import types
from telebot.apihelper import ApiTelegramException
+import messages
+import traceback
-#Настройки
+# Настройки
config_path = os.path.join(sys.path[0], 'settings.ini')
config = configparser.ConfigParser()
config.read(config_path)
-#TELEGRAM
+# TELEGRAM
BOT_TOKEN = config.get('Telegram', 'BOT_TOKEN')
GROUP_FOR_POST = config.get('Telegram', 'group_for_posts')
GROUP_FOR_MESSAGE = config.get('Telegram', 'group_for_message')
@@ -23,34 +29,309 @@ MAIN_PUBLIC = config.get('Telegram', 'main_public')
GROUP_FOR_LOGS = config.get('Telegram', 'group_for_logs')
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
PREVIEW_LINK = config.getboolean('Telegram', 'PREVIEW_LINK')
-#SETTINGS
+# SETTINGS
LOGS = config.getboolean('Settings', 'logs')
TEST = config.getboolean('Settings', 'test')
+# Инициализируем бота и базку
+BotDB = BotDB()
-#Инициализируем бота и базку
-bot = telebot.TeleBot(BOT_TOKEN, parse_mode=None)
-BotDB = BotDB('tg-bot-database')
-def telegram_bot():
- @bot.message_handler(commands=['start'])
- def send_welcome(message):
- #TODO: Здесь переписать через randint
- #TODO: Тексты приветствий вынести в отдельный файл
+class State(Enum):
+ START = "START"
+ SUGGEST = "SUGGEST"
+ ADMIN = "ADMIN"
+ CHAT = "CHAT"
+ PRE_CHAT = "PRE_CHAT"
+
+
+class TelegramHelperBot:
+ def __init__(self, token):
+ self.bot = telebot.TeleBot(token)
+ self.state = State.START
+
+ # Router for user
+ @self.bot.message_handler(func=lambda message: True, chat_types=['private'])
+ def handle_message(message):
+ if BotDB.check_user_in_blacklist(message.from_user.id):
+ attribute = BotDB.get_blacklist_users_by_id(message.from_user.id)
+ self.bot.send_message(message.chat.id,
+ f'Ты заблокирован\nПричина блокировки: {attribute[2]}\nДата разблокировки: {attribute[3]}', parse_mode='HTML')
+ return
+ if self.state == State.START:
+ if message.text == '/start':
+ self.start_message(message)
+ elif message.text == '📢Предложить свой пост':
+ self.suggest_post(message)
+ self.state = State.SUGGEST
+ elif message.text == '🤪Хочу стикеры':
+ self.stickers(message)
+ self.state = State.START
+ elif message.text == '📩Связаться с админами':
+ self.connect_with_admin(message)
+ self.state = State.PRE_CHAT
+ elif message.text == '👋🏼Сказать пока!':
+ self.end_message(message)
+ self.state = State.START
+ elif message.text == 'Выйти из чата':
+ self.end_message(message)
+ elif message.text == '/admin':
+ access = self.check_access(message.from_user.id)
+ if access:
+ self.admin_panel(message)
+ self.state = State.ADMIN
+ else:
+ self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
+ elif message.text == '/state':
+ self.bot.send_message(message.chat.id,
+ f'Твой state == {self.state.value}')
+ else:
+ self.bot.send_message(message.chat.id,
+ "Не понимаю где ты находишься. Нажми /state, и я расскажу что ты можешь "
+ "сделать")
+
+ if self.state == State.SUGGEST:
+ self.bot.register_next_step_handler(message, self.send_to_suggest)
+ self.state = State.START
+ if message.text == '/start':
+ self.state = State.START
+ self.start_message(message)
+ if self.state == State.PRE_CHAT:
+ self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message)
+ self.state = State.START
+ if message.text == '/start':
+ self.state = State.START
+ self.start_message(message)
+
+ if self.state == State.CHAT:
+ if message.text == 'Выйти из чата':
+ self.state = State.START
+ self.end_message(message)
+ elif message.text == '/start':
+ self.state = State.START
+ self.start_message(message)
+ else:
+ self.resend_message_in_group_for_message(message)
+
+ if self.state == State.ADMIN:
+ if message == '/admin' or message == '/restart' or message == 'Вернуться в админку':
+ access = self.check_access(message.from_user.id)
+ if access:
+ self.admin_panel(message)
+ else:
+ self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
+ if message.text == '/start':
+ self.state = State.START
+ self.start_message(message)
+
+ @self.bot.message_handler(func=lambda message: True, chat_types=['group'])
+ def handle_message(message):
+ """Функция ответа админа пользователю через закрытый чат"""
+ self.state = State.CHAT
+ markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
+ item1 = types.KeyboardButton("Выйти из чата")
+ markup.add(item1)
+ message_id = message.reply_to_message.id
+ message_from_admin = message.text
+ chat_id = BotDB.get_user_by_message_id(message_id)
+ self.bot.send_message(chat_id, message_from_admin, reply_markup=markup)
+
+ # Админка
+ @self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline'])
+ def post_for_group(call):
+ if call.data == 'publish' and call.message.content_type == 'text':
+ try:
+ self.bot.send_message(chat_id=MAIN_PUBLIC, text=call.message.text)
+ self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
+ except Exception as e:
+ if LOGS:
+ self.bot.send_message(chat_id=IMPORTANT_LOGS,
+ text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
+ elif call.data == 'publish' and call.message.content_type == 'photo':
+ try:
+ self.bot.send_photo(
+ chat_id=MAIN_PUBLIC,
+ caption=call.message.caption,
+ photo=call.message.photo[-1].file_id,
+ )
+ self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
+ except Exception as e:
+ if LOGS:
+ self.bot.send_message(chat_id=IMPORTANT_LOGS,
+ text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
+ elif call.data == 'decline':
+ try:
+ self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
+ except Exception as e:
+ if LOGS:
+ self.bot.send_message(IMPORTANT_LOGS,
+ f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
+
+ @self.bot.callback_query_handler(func=lambda call: True)
+ def pagination(call):
+ if call.data[:3] == 'ban':
+ user_id = call.data[4:]
+ self.ban_user(call.message, user_id)
+ if call.data == 'return':
+ self.bot.delete_message(call.message.chat.id, call.message.message_id)
+ self.admin_panel(call.message)
+
+ if call.data[:5] == 'unban':
+ self.delete_user_blacklist(call.data[6:])
+ msg = f'Успешно удалено.'
+ self.bot.send_message(chat_id=call.message.chat.id, text=msg)
+ elif call.data[:4] == 'page':
+ if call.message.text == 'Список пользователей которые последними обращались к боту':
+ list_users = BotDB.get_last_users_from_db()
+ keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(list_users), list_users,
+ 'ban')
+ self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id,
+ reply_markup=keyboard)
+ if "Список заблокированных пользователей".lower() in call.message.text.lower():
+ #Готовим сообщения
+ message_user = self.get_banned_users_list(int(call.data[5:]) * 7 - 7)
+ self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
+ text=message_user)
+
+ #Готовим клавиатуру
+ buttons = self.get_banned_users_buttons()
+ keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban')
+ self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id,
+ reply_markup=keyboard)
+
+ def start(self):
+ while True:
+ try:
+ self.bot.polling(none_stop=True)
+ except (ConnectionError, Exception):
+ print(f"Произошла ошибка: {str(Exception)}\n\nTraceback:\n{traceback.format_exc()}")
+
+ def unban_notifier(self):
+ # Получение сегодняшней даты в формате DD-MM-YYYY
+ current_date = datetime.now()
+ today = current_date.strftime("%d-%m-%Y")
+ # Получение списка разблокированных пользователей
+ unblocked_users = BotDB.get_users_for_unblock_today(today)
+ message = "Разблокированные пользователи:\n"
+ for user_id, user_name in unblocked_users.items():
+ message += f"ID: {user_id}, Имя: {user_name}\n"
+
+ # Отправка сообщения в канал
+ self.bot.send_message(GROUP_FOR_MESSAGE, message)
+
+ # Черный список
+ def admin_panel(self, message):
+ try:
+ markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
+ item1 = types.KeyboardButton("Бан (Список)")
+ #item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю
+ #item3 = types.KeyboardButton("Удалить админа")
+ item4 = types.KeyboardButton("Разбан (список)")
+ markup.add(item1, item4)
+ self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:",
+ reply_markup=markup)
+ self.bot.register_next_step_handler(message, self.handle_admin_message)
+ except Exception as e:
+ self.bot.register_next_step_handler(message, self.admin_panel)
+
+ def handle_admin_message(self, message):
+ try:
+ if message.text == "Бан (Список)":
+ self.get_last_users(message)
+ elif message.text == "Разбан (список)":
+ self.get_banned_users(message)
+ except Exception as e:
+ self.bot.reply_to(message, f"Ошибка\n\n {e}")
+ self.admin_panel(message)
+
+ def ban_user(self, message, user_id: int):
+ user_name = BotDB.get_username(user_id=user_id)
+ ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None}
+ markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
+ item1 = types.KeyboardButton("Спам")
+ item2 = types.KeyboardButton("Заебал стикерами")
+ markup.add(item1, item2)
+ self.bot.send_message(message.chat.id,
+ f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат",
+ reply_markup=markup)
+ self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object)
+
+ def ban_user_step_2(self, message, ban_object: dict):
+ ban_object['message_for_user'] = message.text
+ markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
+ item1 = types.KeyboardButton("1")
+ item2 = types.KeyboardButton("7")
+ item3 = types.KeyboardButton("30")
+ item4 = types.KeyboardButton("Навсегда")
+ markup.add(item1, item2, item3, item4)
+ self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
+ f"его в чат",
+ reply_markup=markup)
+ self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object)
+
+ def ban_user_step_3(self, message, ban_object: dict):
+ date_to_unban = None
+ if message.text != 'Навсегда':
+ date_to_unban = self.add_days_to_date(message.text)
+ else:
+ pass
+ ban_object['date_to_unban'] = date_to_unban
+ markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
+ item1 = types.KeyboardButton("Подтвердить")
+ item2 = types.KeyboardButton("Отменить")
+ markup.add(item1, item2)
+ self.bot.send_message(message.chat.id,
+ f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}",
+ parse_mode='html',
+ reply_markup=markup)
+ self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object)
+
+ def ban_user_final_step(self, message, ban_object: dict):
+ if message.text == 'Подтвердить':
+ exists = BotDB.check_user_in_blacklist(ban_object['user_id'])
+ if exists:
+ self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.")
+ self.admin_panel(message)
+ else:
+ BotDB.set_user_blacklist(ban_object['user_id'],
+ ban_object['user_name'],
+ ban_object['message_for_user'],
+ ban_object['date_to_unban'])
+ self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.")
+ self.admin_panel(message)
+
+ def get_last_users(self, message):
+ list_users = BotDB.get_last_users_from_db()
+ keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
+ self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту",
+ reply_markup=keyboard)
+
+ def get_banned_users(self, message):
+ message_text = self.get_banned_users_list(0)
+ buttons_list = self.get_banned_users_buttons()
+ if buttons_list:
+ k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban')
+ self.bot.send_message(message.chat.id, message_text, reply_markup=k)
+ else:
+ self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет")
+ self.admin_panel(message)
+
+ def start_message(self, message):
try:
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
- number_stick_hello = random.randint(1, len(name_stick_hello))
- random_stick_hello = open(name_stick_hello[number_stick_hello], 'rb')
- #logging
+ random_stick_hello = open(random.choice(name_stick_hello), 'rb')
+ # logging
if LOGS:
- bot.forward_message(chat_id=GROUP_FOR_LOGS,
- from_chat_id=message.chat.id,
- message_id=message.message_id)
- bot.send_sticker(message.chat.id, random_stick_hello)
+ self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
+ from_chat_id=message.chat.id,
+ message_id=message.message_id)
+ self.bot.send_sticker(message.chat.id, random_stick_hello)
sleep(0.3)
- except:
+ except Exception as e:
+ print(f'{str(e)}')
if LOGS:
- bot.send_message(IMPORTANT_LOGS, BotDB.get_error_message_from_db(7))
+ self.bot.send_message(IMPORTANT_LOGS,
+ f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
user_id = message.from_user.id
@@ -59,191 +340,132 @@ def telegram_bot():
is_bot = message.from_user.is_bot
username = message.from_user.username
language_code = message.from_user.language_code
- time_UTC = int(time.time())
- date_added = datetime.fromtimestamp(time_UTC)
- date_changed = datetime.fromtimestamp(time_UTC)
-
- markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
- item1 = types.KeyboardButton("📢Предложить свой пост")
- item2 = types.KeyboardButton("📩Связаться с админами")
- # TODO: Выпилил, удалить
- #item3 = types.KeyboardButton("❌Удалить пост")
- if BotDB.user_exists(user_id):
- is_need_sticker = BotDB.get_info_about_stickers(user_id=message.from_user.id)
- if is_need_sticker == 0:
- item5 = types.KeyboardButton("🤪Хочу стикеры")
- BotDB.update_info_about_stickers(user_id=message.from_user.id)
- markup.add(item1, item2, item5)
- else:
- markup.add(item1, item2)
- else:
- BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed)
- is_need_sticker = BotDB.get_info_about_stickers(user_id=message.from_user.id)
- if is_need_sticker == 0:
- item5 = types.KeyboardButton("🤪Хочу стикеры")
- BotDB.update_info_about_stickers(user_id=message.from_user.id)
- markup.add(item1, item2, item5)
- else:
- markup.add(item1, item2)
- hello_message = BotDB.get_message_from_db('start_message', first_name)
- bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
- except:
+ current_date = datetime.now()
+ date = current_date.strftime("%Y-%m-%d %H:%M:%S")
+ if not BotDB.user_exists(user_id):
+ BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
+ date)
+ BotDB.update_date_for_user(date, user_id)
+ markup = self.get_reply_keyboard(message)
+ hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE')
+ self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup,
+ disable_web_page_preview=not PREVIEW_LINK)
+ except Exception as e:
+ print(f'{str(e)}')
if LOGS:
- bot.send_message(IMPORTANT_LOGS, BotDB.get_error_message_from_db(8))
- bot.register_next_step_handler(message, go_send_messages)
+ self.bot.send_message(IMPORTANT_LOGS,
+ f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
- @bot.message_handler(commands=['end_command'])
- def after_post(message):
- markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
- item1 = types.KeyboardButton("📢Предложить свой пост")
- item2 = types.KeyboardButton("📩Связаться с админами")
- # TODO: Скрыл, удалить обработчик
- #item3 = types.KeyboardButton("❌Удалить пост")
- item5 = types.KeyboardButton("👋🏼Сказать пока!")
- markup.add(item1, item2, item5)
- bot.send_message(message.chat.id,
- "Выбери нужную кнопку внизу экрана".format(
- message.from_user, bot.get_me()),
- parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
- bot.register_next_step_handler(message, go_send_messages)
-
- def go_send_messages(message):
- global msg
- if message.text == '📢Предложить свой пост':
- try:
- markup = types.ReplyKeyboardRemove()
- first_name = message.from_user.first_name
- suggest_news = BotDB.get_message_from_db('suggest_news', first_name)
- bot.send_message(message.chat.id, suggest_news, parse_mode='html')
- sleep(0.3)
- first_name = message.from_user.first_name
- suggest_news_2 = BotDB.get_message_from_db('suggest_news_2', first_name)
- msg = bot.send_message(message.chat.id, suggest_news_2,parse_mode='html', reply_markup=markup)
- except:
- if LOGS:
- bot.send_message(IMPORTANT_LOGS, BotDB.get_error_message_from_db(10))
- #logging
- if LOGS:
- bot.forward_message(chat_id=GROUP_FOR_LOGS,
- from_chat_id=message.chat.id,
- message_id=message.message_id)
- bot.register_next_step_handler(msg, resend_message_in_group_for_post)
-
- elif message.text == "📩Связаться с админами":
- first_name = message.from_user.first_name
- connect_with_admin = BotDB.get_message_from_db('connect_with_admin', first_name)
- msg = bot.send_message(message.chat.id, connect_with_admin, parse_mode="html")
- #logging
- if LOGS:
- bot.forward_message(chat_id=GROUP_FOR_LOGS,
- from_chat_id=message.chat.id,
- message_id=message.message_id)
- bot.register_next_step_handler(msg, resend_message_in_group_for_message)
-
-
- elif message.text == "❌Удалить пост":
- #TODO: требует автоматизации. На входе говорим пришли мне пост, на выходе получаем идентификатор поста, удаляем из ТГ. Насчет удаления из ВК надо подумать
- first_name = message.from_user.first_name
- del_message = BotDB.get_message_from_db('del_message', first_name)
- msg = bot.send_message(message.chat.id, del_message, parse_mode="html")
- #logging
- if LOGS:
- bot.forward_message(chat_id=GROUP_FOR_LOGS,
- from_chat_id=message.chat.id,
- message_id=message.message_id)
- bot.register_next_step_handler(msg, resend_message_in_group_for_message)
-
- elif message.text == "👋🏼Сказать пока!":
- try:
- name_stick_bye = list(Path('Stick').rglob('Universal_*'))
- number_stick_bye = random.randint(1, len(name_stick_bye))
- random_stick_bye = open(name_stick_bye[number_stick_bye], 'rb')
- bot.send_sticker(message.chat.id, random_stick_bye)
- except ApiTelegramException:
- if LOGS:
- bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(11))
+ def resend_message_in_group_for_message(self, message):
+ self.bot.forward_message(chat_id=GROUP_FOR_MESSAGE,
+ from_chat_id=message.chat.id,
+ message_id=message.message_id
+ )
+ current_date = datetime.now()
+ date = current_date.strftime("%Y-%m-%d %H:%M:%S")
+ BotDB.add_new_message_in_db(message.text, message.message_id + 1, message.from_user.id, date, 0)
+ question = messages.get_message(self.__get_first_name(message), 'QUESTION')
+ markup = self.get_reply_keyboard(message)
+ self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK,
+ reply_markup=markup)
+ def suggest_post(self, message):
+ try:
markup = types.ReplyKeyboardRemove()
- try:
- first_name = message.from_user.first_name
- bye_message = BotDB.get_message_from_db('bye_message', first_name)
- bot.send_message(message.chat.id, bye_message,
- parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
- except:
- if LOGS:
- bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(6))
+ suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS')
+ self.bot.send_message(message.chat.id, suggest_news, parse_mode='html')
+ sleep(0.3)
+ suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2')
+ self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup)
+ except Exception as e:
+ self.bot.send_message(IMPORTANT_LOGS,
+ f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
+ # logging
+ if LOGS:
+ self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
+ from_chat_id=message.chat.id,
+ message_id=message.message_id)
- if LOGS:
- #logging
- bot.forward_message(chat_id=GROUP_FOR_LOGS,
- from_chat_id=message.chat.id,
- message_id=message.message_id)
- elif message.text == "🤪Хочу стикеры":
- markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
- item1 = types.KeyboardButton("📢Предложить свой пост")
- item2 = types.KeyboardButton("📩Связаться с админами")
- # TODO: Скрыл кнопку, убрать обработчик позднее
- #item3 = types.KeyboardButton("❌Удалить пост")
- item5 = types.KeyboardButton("👋🏼Сказать пока!")
- markup.add(item1, item2, item5)
- try:
- if LOGS:
- bot.forward_message(chat_id=GROUP_FOR_LOGS,
- from_chat_id=message.chat.id,
- message_id=message.message_id)
- bot.send_message(message.chat.id, text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', reply_markup=markup)
- bot.register_next_step_handler(message, callback=go_send_messages)
- except ApiTelegramException:
- if LOGS:
- bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(12))
- else:
- try:
- first_name = message.from_user.first_name
- user_error = BotDB.get_message_from_db('user_error', first_name)
- bot.send_message(message.chat.id, user_error, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK)
- except:
- if LOGS:
- bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(9))
- #logging
- if LOGS:
- bot.forward_message(chat_id=GROUP_FOR_LOGS,
- from_chat_id=message.chat.id,
- message_id=message.message_id)
- bot.register_next_step_handler(message, callback=go_send_messages)
+ def stickers(self, message):
+ BotDB.update_info_about_stickers(user_id=message.from_user.id)
+ markup = self.get_reply_keyboard(message)
+ try:
+ self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
+ from_chat_id=message.chat.id,
+ message_id=message.message_id)
+ self.bot.send_message(message.chat.id,
+ text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
+ reply_markup=markup)
+ except ApiTelegramException as e:
+ self.bot.send_message(chat_id=IMPORTANT_LOGS,
+ text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
- def resend_message_in_group_for_post(message):
+ def connect_with_admin(self, message):
+ connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN')
+ self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html")
+ # logging
+ if LOGS:
+ self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
+ from_chat_id=message.chat.id,
+ message_id=message.message_id)
+
+ def end_message(self, message):
+ try:
+ name_stick_bye = list(Path('Stick').rglob('Universal_*'))
+ random_stick_bye = open(random.choice(name_stick_bye), 'rb')
+ self.bot.send_sticker(message.chat.id, random_stick_bye)
+ except ApiTelegramException as e:
+ if LOGS:
+ self.bot.send_message(chat_id=IMPORTANT_LOGS,
+ text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
+ markup = types.ReplyKeyboardRemove()
+ try:
+ bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE')
+ self.bot.send_message(message.chat.id, bye_message,
+ parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
+ except Exception as e:
+ if LOGS:
+ self.bot.send_message(chat_id=IMPORTANT_LOGS,
+ text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
+ if LOGS:
+ # logging
+ self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
+ from_chat_id=message.chat.id,
+ message_id=message.message_id)
+
+ def send_to_suggest(self, message):
markup = types.InlineKeyboardMarkup(row_width=1)
- item1 = types.InlineKeyboardButton("Опубликовать", callback_data='post_post_post')
+ item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish')
item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline')
markup.add(item1, item2)
try:
if message.content_type == 'text':
post_text = message.text.lower()
if post_text.find('неанон') != -1 or post_text.find('не анон') != -1:
- bot.send_message(
- #TODO: GROUP_FOR_POST
+ self.bot.send_message(
+ # TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif post_text.find('анон') != -1:
- bot.send_message(
+ self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно',
reply_markup=markup
)
else:
- bot.send_message(
+ self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
- elif message.content_type == 'photo' and message.media_group_id == None:
+ elif message.content_type == 'photo' and message.media_group_id is None:
post_text_for_photo = message.caption.lower()
if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1:
- bot.send_photo(
+ self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
@@ -251,7 +473,7 @@ def telegram_bot():
reply_markup=markup
)
elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1:
- bot.send_photo(
+ self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно',
@@ -259,81 +481,155 @@ def telegram_bot():
reply_markup=markup
)
else:
- bot.send_photo(
+ self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
photo=message.photo[-1].file_id,
reply_markup=markup
)
- #TODO: Не понятна реализация с альбомами от слова совсем
- #elif message.content_type == 'photo' and message.media_group_id != None:
+ # TODO: Не понятна реализация с альбомами от слова совсем
+ # elif message.content_type == 'photo' and message.media_group_id != None:
# bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id )
else:
pass
- except:
+ except Exception as e:
if LOGS:
- username = message.from_user.first_name
- error_message = str(BotDB.get_error_message_from_db(5)).replace('username', username)
- bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message)
+ self.bot.send_message(chat_id=IMPORTANT_LOGS,
+ text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
+ markup_for_user = self.get_reply_keyboard(message)
+ success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
+ self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
+ disable_web_page_preview=not PREVIEW_LINK, reply_markup=markup_for_user)
- username = message.from_user.first_name
- success_send_message = BotDB.get_message_from_db('success_send_message', username)
+ @staticmethod
+ def get_reply_keyboard(message):
+ markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
+ item1 = types.KeyboardButton("📢Предложить свой пост")
+ item2 = types.KeyboardButton("📩Связаться с админами")
+ item3 = types.KeyboardButton("👋🏼Сказать пока!")
+ #TODO: Есть ощущение что не совсем так работает как надо
+ item4 = types.KeyboardButton("🤪Хочу стикеры") if not BotDB.get_info_about_stickers(
+ user_id=message.from_user.id) else None
- bot.send_message(message.chat.id, success_send_message, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK)
- after_post(message=message)
+ if item4:
+ markup.add(item1, item2, item3, item4)
+ else:
+ markup.add(item1, item2, item3)
- def resend_message_in_group_for_message(message):
- bot.forward_message(chat_id=GROUP_FOR_MESSAGE,
- from_chat_id=message.chat.id,
- message_id=message.message_id
- )
- username = message.from_user.first_name
- question = BotDB.get_message_from_db('question', username)
- bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK)
- try:
- pass
- except:
- if LOGS:
- bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(4))
- after_post(message=message)
+ return markup
+
+ @staticmethod
+ def __get_first_name(message):
+ return message.from_user.first_name
+
+ @staticmethod
+ def check_access(user_id: int):
+ """Проверка прав на совершение действий"""
+ return BotDB.is_admin(user_id)
+
+ @staticmethod
+ def add_days_to_date(days):
+ """Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
+ current_date = datetime.now()
+ future_date = current_date + timedelta(days=int(days))
+ formatted_date = future_date.strftime("%d-%m-%Y")
+ return formatted_date
+
+ @staticmethod
+ def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[Any, Any]], callback: str):
+ """
+ Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback
+
+ Args:
+ page: Номер текущей страницы.
+ total_items: Общее количество элементов.
+ array_items: Лист кортежей. Содержит в себе user_name: user_id
+ callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id})
+
+ Returns:
+ InlineKeyboardMarkup: Клавиатура с кнопками пагинации.
+ """
+
+ # Определяем общее количество страниц
+ total_pages = (total_items + 7 - 1) // 7
+
+ page = page
+ # Создаем список кнопок
+ buttons = []
+ # Вычисляем стартовый номер для текущей страницы
+ start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы
+ # Кнопки с номерами страниц
+ for i in range(start_index, min(start_index + 7,
+ len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы
+ buttons.append(
+ types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}"))
+
+ # Добавляем кнопки "Предыдущая" и "Следующая"
+ if int(page) > 1:
+ buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}"))
+ if page < total_pages:
+ buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}"))
+ #Добавляем кнопку назад
+ buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return"))
+ # Формируем клавиатуру с 3 кнопками в ряд
+ keyboard = []
+ for i in range(0, len(buttons), 3):
+ keyboard.append(buttons[i:i + 3])
+ return types.InlineKeyboardMarkup(keyboard)
+
+ @staticmethod
+ def get_banned_users_list(offset: int):
+ """
+ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
+
+ Args:
+ offset: отступ для запроса в базу данных
+
+ Returns:
+ message - текст сообщения
+ user_ids - лист кортежей [(user_name: user_id)]
+ """
+ users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
+ message = "Список заблокированных пользователей:\n"
+
+ for user in users:
+ message += f"Пользователь: {user[0]}\n"
+ message += f"Причина бана: {user[2]}\n"
+ message += f"Дата разбана: {user[3]}\n\n"
+ return message
+
+ @staticmethod
+ def get_banned_users_buttons():
+ """
+ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
+
+ Args:
+ offset: отступ для запроса в базу данных
+
+ Returns:
+ message - текст сообщения
+ user_ids - лист кортежей [(user_name: user_id)]
+ """
+ users = BotDB.get_banned_users_from_db()
+ user_ids = []
+
+ for user in users:
+ user_ids.append((user[0], user[1]))
+ return user_ids
+
+ @staticmethod
+ def delete_user_blacklist(user_id):
+ return BotDB.delete_user_blacklist(user_id=user_id)
- #Админка
- @bot.callback_query_handler(func=lambda call: True)
- def post_for_group(call):
- if call.data == 'post_post_post' and call.message.content_type == 'text':
- try:
- bot.send_message(chat_id=MAIN_PUBLIC, text=call.message.text)
- bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
- except:
- if LOGS:
- bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(3))
- elif call.data == 'post_post_post' and call.message.content_type == 'photo':
- try:
- bot.send_photo(
- chat_id=MAIN_PUBLIC,
- caption=call.message.caption,
- photo=call.message.photo[-1].file_id,
- )
- bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
- except:
- if LOGS:
- bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(2))
- elif call.data == 'decline':
- try:
- bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
- except:
- if LOGS:
- bot.send_message(IMPORTANT_LOGS, BotDB.get_error_message_from_db(1))
+bot = TelegramHelperBot(BOT_TOKEN)
+if __name__ == "__main__":
+ # Запускаем бота
+ bot.start()
+
+ scheduler = BackgroundScheduler()
+ scheduler.add_job(bot.unban_notifier(), 'cron', hour=0, minute=0)
+ scheduler.start()
-if __name__ == '__main__':
- telegram_bot()
- while True:
- try:
- bot.polling(none_stop=True)
- bot.enable_save_next_step_handlers(delay=2)
- bot.load_next_step_handlers()
- except (ConnectionError, Exception):
- print("Произошла ошибка, перезапуск бота")
\ No newline at end of file
diff --git a/messages.py b/messages.py
new file mode 100644
index 0000000..883ae1d
--- /dev/null
+++ b/messages.py
@@ -0,0 +1,41 @@
+
+
+def get_message(username: str, type_message: str):
+ constants = {
+ 'HELLO_MESSAGE': "Привет, username!👋🏼&Меня зовут Виби, я бот канала 'Влюбленный Бийск'❤🤖"
+ "&Я был создан для того, чтобы помочь тебе выложить пост в наш канал и если это необходимо, связаться с админами ✍✉"
+ "&Так же я могу выдать тебе набор стикеров, где я буду главным героем🦸♂"
+ "&Наш бот голосового общения переехал сюда: https://t.me/podslushano_biysk_bot 🎤&Там можно послушать о чем говорит наш город🎧"
+ "&Предлагай свой пост мне и я обязательно его опубликую😉"
+ "&Для продолжения взаимодействия воспользуйся меню внизу твоего дисплея⬇"
+ "&&Если что-то пошло не так: введи в чат команду /start, это перезапустит сценарий сначала."
+ "&Не жми кнопку несколько раз если я не ответил с первого раза. Возможно ведутся тех.работы и я отвечу позже"
+ "&&Основная группа в ВК: https://vk.com/love_bsk"
+ "&Основной канал в ТГ: https://t.me/love_bsk",
+ 'SUGGEST_NEWS': "username, окей, жду от тебя текст поста🙌🏼"
+ "&В данный момент я работаю в тестовом режиме, поэтому к посту можно прикрепить не более одного фото и никаких аудио или видео👻"
+ "&&Обещаю, я научусь их обрабатывать, но позже🤝🤖",
+ 'SUGGEST_NEWS_2': "Обрати внимание, что я умный и смогу из твоего текста понять команды указанные ниже😉"
+ "&Если хочешь чтобы пост был опубликован анонимно, напиши в любом месте своего поста слово 'анон'."
+ "&Если хочешь опубликовать пост не анонимно, то напиши 'не анон', 'неанон' или не пиши ничего."
+ "&&❗️❗️❗️Я обучен только на команды, указанные мной выше❗️❗️❗️👆"
+ "&‼Проверь, чтобы указание авторства было выполнено так как я попросил, иначе пост будет выложен не корректно"
+ "&Пост будет опубликован только в группе ТГ📩",
+ "CONNECT_WITH_ADMIN": "username, напиши свое обращение или предложение✍️"
+ "&Мы рассмотрим и ответим тебе в ближайшее время☺️❤️",
+ "DEL_MESSAGE": "username, напиши свое обращение или предложение✍"
+ "&Мы рассмотрим и ответим тебе в ближайшее время☺❤",
+ "BYE_MESSAGE": "Если позднее захочешь предложить еще один пост или обратиться к админам с вопросом, то просто пришли в чат команду 👉 /start"
+ "&&И тебе пока!👋🏼❤️",
+ "USER_ERROR": "Увы, я не понимаю тебя😐💔 Выбери один из пунктов в нижнем меню, а затем пиши.",
+ "QUESTION": "Сообщение успешно отправлено❤️ Ответим, как только сможем😉",
+ "SUCCESS_SEND_MESSAGE": "Пост успешно отправлен❤️ Ожидай одобрения😊",
+ "MESSAGE_FOR_STANDUP": "Отлично, ты вошел в режим стендапа 📣"
+ "&Это свободное пространство, в котором может высказаться каждый житель нашего города, и он будет услышан🙌🏼"
+ "&Для того чтобы высказаться, нажми кнопку: 'Высказаться' и запиши голосовое сообщение, оно выпадет анонимно кому-то другому🗣"
+ "&Для того чтобы послушать о чем говорит наш город, нажми кнопку: 'Послушать'👂"
+ "&Ты можешь анонимно пообщаться, поделиться чем-то важным, обратиться напрямую к жителям🤝 Также можешь выступить перед аудиторией (спеть песню, рассказать стихотворение, шутку)🎤"
+ "&❗️Но пожалуйста не оскорбляй никого, и будь вежлив."
+ }
+ message = constants[type_message]
+ return message.replace('username', username).replace('&', '\n')
diff --git a/migrations/000_migrations_init.py b/migrations/000_migrations_init.py
new file mode 100644
index 0000000..4ba49b0
--- /dev/null
+++ b/migrations/000_migrations_init.py
@@ -0,0 +1,27 @@
+import os
+from db import BotDB
+
+
+BotDB = BotDB()
+
+
+def get_filename():
+ """Возвращает имя файла без расширения."""
+ filename = os.path.basename(__file__)
+ filename = os.path.splitext(filename)[0]
+ return filename
+
+
+def main():
+ migrations_init = """
+ CREATE TABLE IF NOT EXISTS migrations (
+ version INTEGER PRIMARY KEY NOT NULL,
+ script_name TEXT NOT NULL,
+ created_at TEXT
+ );
+ """
+ BotDB.create_table(migrations_init)
+ BotDB.update_version(0, get_filename())
+
+if __name__ == "__main__":
+ main()
diff --git a/migrations/001_create_new_tables.py b/migrations/001_create_new_tables.py
new file mode 100644
index 0000000..cd45af5
--- /dev/null
+++ b/migrations/001_create_new_tables.py
@@ -0,0 +1,56 @@
+import os
+from db import BotDB
+
+BotDB = BotDB()
+
+
+def get_filename():
+ """Возвращает имя файла без расширения."""
+ filename = os.path.basename(__file__)
+ filename = os.path.splitext(filename)[0]
+ return filename
+
+
+def main():
+ # Проверка версии миграций
+ current_version = BotDB.get_current_version() # Добавьте функцию для получения версии
+
+ # Выполнение миграций и проверка последней версии
+ if current_version < 1:
+ # Скрипты миграции
+ create_table_sql_1 = """
+ CREATE TABLE IF NOT EXISTS "admins" (
+ user_id INTEGER NOT NULL,
+ "role" TEXT
+ );
+ """
+ create_table_sql_2 = """CREATE TABLE IF NOT EXISTS "blacklist"
+ (
+ "user_id" INTEGER NOT NULL UNIQUE,
+ "user_name" INTEGER,
+ "message_for_user" INTEGER,
+ "date_to_unban" INTEGER
+ );
+ """
+ create_table_sql_3 = """
+ CREATE TABLE IF NOT EXISTS user_messages (
+ id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+ message_text TEXT,
+ user_id INTEGER,
+ message_id INTEGER NOT NULL,
+ date TEXT
+ );
+ """
+ # Применение миграции
+ BotDB.create_table(create_table_sql_1)
+ BotDB.create_table(create_table_sql_2)
+ BotDB.create_table(create_table_sql_3)
+ BotDB.add_admin(842766148, 'creator')
+ BotDB.add_admin(920057022, 'admin')
+ filename = get_filename()
+
+ BotDB.update_version(1, filename)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/requirements.txt b/requirements.txt
index 0ec9818..35a1522 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1,3 @@
-pyTelegramBotAPI
\ No newline at end of file
+pyTelegramBotAPI
+APScheduler~=3.10.4
+loguru~=0.7.2
\ No newline at end of file
diff --git a/settings_example.ini b/settings_example.ini
index b088ac8..6a953a6 100644
--- a/settings_example.ini
+++ b/settings_example.ini
@@ -1,9 +1,13 @@
[Telegram]
-bot_token = 5719450198:AAEI6hQgwiIcxMLGvVrPdopWqVlwHqR0QoU
+bot_token = 000000000:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
preview_link = false
-main_public = @love_biysk
-group_for_posts = -793789724
-group_for_message = -736077298
-group_for_logs = -721685792
-important_logs = -625900899
-test_channel = -1001725605158
\ No newline at end of file
+main_public = @test
+group_for_posts = -00000000
+group_for_message = -00000000
+group_for_logs = -00000000
+important_logs = -00000000
+test_channel = -000000000000
+
+[Settings]
+logs = true
+test = false
\ No newline at end of file