Merge pull request #1 from KerradKerridi/dev

Добавил новые функции
This commit was merged in pull request #1.
This commit is contained in:
ANDREY KATYKHIN
2024-07-06 13:07:34 +03:00
committed by GitHub
8 changed files with 937 additions and 266 deletions

43
custom_logger.py Normal file
View File

@@ -0,0 +1,43 @@
import datetime
import os
from loguru import logger
class Logger:
def __init__(self, name):
self.logger = logger.bind(name=name)
# Получение сегодняшней даты для имени файла
today = datetime.date.today().strftime('%Y-%m-%d')
# Создание папки для логов
current_dir = os.path.dirname(os.path.abspath(__file__))
logs_dir = os.path.join(current_dir, 'logs')
if not os.path.exists(logs_dir):
# Если не существует, создаем ее
os.makedirs(logs_dir)
filename = f'{logs_dir}/helper_bot_{today}.log'
# Настройка формата логов
self.logger.add(
filename,
rotation="00:00",
retention="5 days",
compression="zip",
format="{time:YYYY-MM-DD at HH:mm:ss} | {level} | {name} | {line} | {message}",
)
def info(self, message):
self.logger.info(message)
def debug(self, message):
self.logger.debug(message)
def warning(self, message):
self.logger.warning(message)
def error(self, message):
self.logger.error(message)
def critical(self, message):
self.logger.critical(message)

244
db.py
View File

@@ -1,9 +1,13 @@
import sqlite3
import configparser
import os
import sys
from datetime import datetime
from loguru import logger
from custom_logger import Logger
config_path = os.path.join(sys.path[0], 'settings.ini')
db_logger = Logger(name='db')
script_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(script_dir, 'settings.ini')
config = configparser.ConfigParser()
config.read(config_path)
LOGS = config.getboolean('Settings', 'logs')
@@ -12,10 +16,51 @@ IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
class BotDB:
def __init__(self, db_file):
def __init__(self):
db_file_path = os.path.dirname(os.path.abspath(__file__))
db_file = os.path.join(db_file_path, 'tg-bot-database')
self.conn = sqlite3.connect(db_file, check_same_thread=False)
self.cursor = self.conn.cursor()
logger.info(f'Подключен к базе данных: {db_file_path}')
def create_table(self, sql_script):
"""Создает таблицу в базе."""
try:
cursor = self.conn.cursor()
cursor.execute(sql_script)
except Exception as e:
print(f"Ошибка при создании таблицы: {e}")
def get_current_version(self):
"""Получает текущую версию миграций из таблицы migrations."""
try:
cursor = self.conn.cursor()
cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
version = cursor.fetchone()[0]
return version
except Exception as e:
print(f"Ошибка при получении версии: {e}")
return 0
def update_version(self, new_version, script_name):
"""Обновляет версию миграций в таблице migrations."""
logger.info(f'Попытка обновления версии: {new_version}, скрипт: {script_name}')
try:
current_date = datetime.now()
today = current_date.strftime("%d-%m-%Y %H:%M:%S")
cursor = self.conn.cursor()
cursor.execute(
"INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
(new_version, script_name, today),
)
self.conn.commit()
logger.info(f"Версия обновлена: {new_version}, скрипт: {script_name}")
except sqlite3.IntegrityError as e:
logger.error(f"Ошибка при обновлении версии: {e}")
except Exception as e:
logger.error(f"Ошибка при обновлении версии: {e}")
# TODO: Deprecated, удалить
def get_message_from_db(self, type: str, username):
"""Функция для запроса к базе данных и получения сообщений для бота. В аргументы передаются:
type - тип получаемой обратной связи, строковое значение, сохраненное в БД
@@ -35,6 +80,7 @@ class BotDB:
except sqlite3.Error as error:
print(error)
# TODO: Deprecated. Остался только в voice боте, удалить и оттуда
def get_error_message_from_db(self, id: int):
"""Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
id - идентификатор ошибки
@@ -48,18 +94,18 @@ class BotDB:
except sqlite3.Error as error:
print(error)
def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed):
def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added,
date_changed):
"""Добавляем юзера в базу"""
try:
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name,
username,is_bot,language_code,date_added,date_changed))
username, is_bot, language_code, date_added, date_changed))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def user_exists(self, user_id):
"""Проверяем, есть ли юзер в базе"""
try:
@@ -68,7 +114,6 @@ class BotDB:
except sqlite3.Error as error:
print(error)
def get_user_id(self, user_id):
"""Достаем id юзера в базе по его user_id"""
try:
@@ -77,7 +122,6 @@ class BotDB:
except sqlite3.Error as error:
print(error)
def get_username(self, user_id):
"""Достаем id юзера в базе по его user_id"""
try:
@@ -89,7 +133,7 @@ class BotDB:
def get_all_user_id(self):
"""Достаем все айдишники юзеров из БД и преобразуем их в список"""
try:
result = self.cursor.execute("SELECT `user_id` FROM `our_users`",)
result = self.cursor.execute("SELECT `user_id` FROM `our_users`", )
fetch_all = result.fetchall()
list_of_users = []
for i in fetch_all:
@@ -105,7 +149,6 @@ class BotDB:
except sqlite3.Error as error:
print(error)
def change_name(self, user_id):
#TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. ОБновляем поля first_name, date_changed
#result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), )
@@ -114,7 +157,9 @@ class BotDB:
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
"""Добавляет информацию о войсе юзера в БД"""
try:
result = self.cursor.execute("INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)", (file_name, author_id, date_added, listen_count, file_id))
result = self.cursor.execute(
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)",
(file_name, author_id, date_added, listen_count, file_id))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
@@ -128,11 +173,11 @@ class BotDB:
except sqlite3.Error as error:
print(error)
def get_last_user_audio_record(self, user_id):
"""Получает данные о количестве записей пользователя"""
try:
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?", (user_id,))
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
(user_id,))
return bool(len(result.fetchall()))
except sqlite3.Error as error:
print(error)
@@ -140,8 +185,9 @@ class BotDB:
def get_id_for_audio_record(self, user_id):
"""Получает ID аудио сообщения пользователя"""
try:
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,))
result = self.cursor.execute(
"SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
@@ -149,7 +195,9 @@ class BotDB:
def get_path_for_audio_record(self, user_id):
"""Получает данные о названии файла"""
try:
result = self.cursor.execute("SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,))
result = self.cursor.execute(
"SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
@@ -162,9 +210,10 @@ class BotDB:
FROM audio_message_reference a
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
WHERE l.user_id = ?
AND l.file_name IS NOT NULL""" , (user_id,))
AND l.file_name IS NOT NULL""", (user_id,))
check_sign = query_listen_audio.fetchall()
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?', (user_id,))
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
(user_id,))
sign_all_audio = query_all_audio.fetchall()
new_sign1 = list(set(sign_all_audio) - set(check_sign))
new_sign = []
@@ -177,7 +226,9 @@ class BotDB:
def mark_listened_audio(self, file_name, user_id):
"""Отмечает аудио прослушанным для конкретного пользователя."""
try:
result = self.cursor.execute("INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)", (file_name, user_id, 1))
result = self.cursor.execute(
"INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
(file_name, user_id, 1))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
@@ -186,7 +237,8 @@ class BotDB:
"""Получает данные о получении стикеров пользователем"""
try:
result = self.cursor.execute("SELECT `has_stickers` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0]
return result.fetchone()[0] == 1
#return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
@@ -198,7 +250,157 @@ class BotDB:
except sqlite3.Error as error:
print(error)
def get_users_blacklist(self):
"""Возвращает список пользователей в черном списке"""
try:
result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist`")
fetch_all = result.fetchall()
list_of_users = {}
for i in fetch_all:
list_of_users[i[0]] = i[1]
return list_of_users
except sqlite3.Error as error:
print(error)
def get_users_for_unblock_today(self, date_to_unban):
"""Возвращает пользователей у которых истекает срок блокировки сегодня"""
try:
result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist` WHERE date_to_unban = ?", (date_to_unban,))
fetch_all = result.fetchall()
list_of_users = {}
for i in fetch_all:
list_of_users[i[0]] = i[1]
return list_of_users
except sqlite3.Error as error:
print(error)
def get_blacklist_users_by_id(self, user_id):
"""Возвращает список пользователей в черном списке по user_id"""
try:
result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban FROM `blacklist` WHERE user_id = ?", (user_id, ))
return self.cursor.fetchone()
except sqlite3.Error as error:
print(error)
def check_user_in_blacklist(self, user_id):
"""Проверяет, существует ли запись с данным user_id в blacklist."""
self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
def set_user_blacklist(self, user_id, user_name=None, message_for_user=None, date_to_unban=None):
"""Добавляет пользователя в черный список"""
try:
result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
" 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
(user_id, user_name, message_for_user, date_to_unban,))
return self.conn.commit()
except sqlite3.Error as error:
return error
def delete_user_blacklist(self, user_id):
"""Удаляет пользователя из черного списка"""
try:
#TODO: Функция всегда возвращает true, даже если такого id нет в таблице
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
self.conn.commit()
logger.info(f"Пользователь с идентификатором {user_id} успешно удален.")
return True
except sqlite3.Error as error:
logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} из таблицы blacklist. Ошибка: {str(error)}")
return False
def add_new_message_in_db(self, message_text, user_id, message_id, date, has_answer):
"""Добавляем сообщение юзера в базу"""
try:
self.cursor.execute(
"INSERT INTO `user_messages` (message_text, user_id, message_id, date, has_answer) "
"VALUES (?, ?, ?, ?, ?)",
(message_text, message_id, user_id, date, has_answer))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def update_date_for_user(self, date, user_id: int):
try:
result = self.cursor.execute("UPDATE `our_users` SET `date_changed` = ? WHERE `user_id` = ?",
(date, user_id,))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def is_admin(self, user_id):
"""
Проверяет, является ли пользователь администратором.
Args:
user_id: ID пользователя Telegram.
Returns:
True, если пользователь администратор, иначе False.
"""
self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
def add_admin(self, user_id, role):
"""
Добавляет пользователя в список администраторов.
Args:
user_id: ID пользователя Telegram.
role: Роль пользователя.
Доступные варианты:
1. creator - создатель
2. admin - обычная роль
"""
self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
return self.conn.commit()
def remove_admin(self, user_id):
"""
Удаляет пользователя из списка администраторов.
Args:
user_id: ID пользователя Telegram.
"""
self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
return self.conn.commit()
def get_user_by_message_id(self, message_id):
"""Возвращает идентификатор пользователя по идентификатору сообщения"""
try:
result = self.cursor.execute("SELECT user_id FROM `user_messages` WHERE message_id = ?", (message_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_last_users_from_db(self):
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
try:
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC")
users = result.fetchall()
return users
except sqlite3.Error as error:
print(error)
def get_banned_users_from_db(self):
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
try:
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist", )
users = result.fetchall()
return users
except sqlite3.Error as error:
print(error)
def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
try:
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist LIMIT ?, ?", (offset, limit,) )
users = result.fetchall()
return users
except sqlite3.Error as error:
print(error)
def close(self):
"""Закрываем соединение с БД"""
self.conn.close()

770
main.py
View File

@@ -3,19 +3,25 @@ import os
import sys
from pathlib import Path
from time import sleep
from enum import Enum
from typing import Any
from apscheduler.schedulers.background import BackgroundScheduler
import db
from db import BotDB
import telebot
import random
from datetime import datetime
from datetime import datetime, timedelta
import time
from telebot import types
from telebot.apihelper import ApiTelegramException
import messages
import traceback
#Настройки
# Настройки
config_path = os.path.join(sys.path[0], 'settings.ini')
config = configparser.ConfigParser()
config.read(config_path)
#TELEGRAM
# TELEGRAM
BOT_TOKEN = config.get('Telegram', 'BOT_TOKEN')
GROUP_FOR_POST = config.get('Telegram', 'group_for_posts')
GROUP_FOR_MESSAGE = config.get('Telegram', 'group_for_message')
@@ -23,34 +29,309 @@ MAIN_PUBLIC = config.get('Telegram', 'main_public')
GROUP_FOR_LOGS = config.get('Telegram', 'group_for_logs')
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
PREVIEW_LINK = config.getboolean('Telegram', 'PREVIEW_LINK')
#SETTINGS
# SETTINGS
LOGS = config.getboolean('Settings', 'logs')
TEST = config.getboolean('Settings', 'test')
# Инициализируем бота и базку
BotDB = BotDB()
#Инициализируем бота и базку
bot = telebot.TeleBot(BOT_TOKEN, parse_mode=None)
BotDB = BotDB('tg-bot-database')
def telegram_bot():
@bot.message_handler(commands=['start'])
def send_welcome(message):
#TODO: Здесь переписать через randint
#TODO: Тексты приветствий вынести в отдельный файл
class State(Enum):
START = "START"
SUGGEST = "SUGGEST"
ADMIN = "ADMIN"
CHAT = "CHAT"
PRE_CHAT = "PRE_CHAT"
class TelegramHelperBot:
def __init__(self, token):
self.bot = telebot.TeleBot(token)
self.state = State.START
# Router for user
@self.bot.message_handler(func=lambda message: True, chat_types=['private'])
def handle_message(message):
if BotDB.check_user_in_blacklist(message.from_user.id):
attribute = BotDB.get_blacklist_users_by_id(message.from_user.id)
self.bot.send_message(message.chat.id,
f'<b>Ты заблокирован\nПричина блокировки:</b> {attribute[2]}\n<b>Дата разблокировки:</b> {attribute[3]}', parse_mode='HTML')
return
if self.state == State.START:
if message.text == '/start':
self.start_message(message)
elif message.text == '📢Предложить свой пост':
self.suggest_post(message)
self.state = State.SUGGEST
elif message.text == '🤪Хочу стикеры':
self.stickers(message)
self.state = State.START
elif message.text == '📩Связаться с админами':
self.connect_with_admin(message)
self.state = State.PRE_CHAT
elif message.text == '👋🏼Сказать пока!':
self.end_message(message)
self.state = State.START
elif message.text == 'Выйти из чата':
self.end_message(message)
elif message.text == '/admin':
access = self.check_access(message.from_user.id)
if access:
self.admin_panel(message)
self.state = State.ADMIN
else:
self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
elif message.text == '/state':
self.bot.send_message(message.chat.id,
f'Твой state == {self.state.value}')
else:
self.bot.send_message(message.chat.id,
"Не понимаю где ты находишься. Нажми /state, и я расскажу что ты можешь "
"сделать")
if self.state == State.SUGGEST:
self.bot.register_next_step_handler(message, self.send_to_suggest)
self.state = State.START
if message.text == '/start':
self.state = State.START
self.start_message(message)
if self.state == State.PRE_CHAT:
self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message)
self.state = State.START
if message.text == '/start':
self.state = State.START
self.start_message(message)
if self.state == State.CHAT:
if message.text == 'Выйти из чата':
self.state = State.START
self.end_message(message)
elif message.text == '/start':
self.state = State.START
self.start_message(message)
else:
self.resend_message_in_group_for_message(message)
if self.state == State.ADMIN:
if message == '/admin' or message == '/restart' or message == 'Вернуться в админку':
access = self.check_access(message.from_user.id)
if access:
self.admin_panel(message)
else:
self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
if message.text == '/start':
self.state = State.START
self.start_message(message)
@self.bot.message_handler(func=lambda message: True, chat_types=['group'])
def handle_message(message):
"""Функция ответа админа пользователю через закрытый чат"""
self.state = State.CHAT
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Выйти из чата")
markup.add(item1)
message_id = message.reply_to_message.id
message_from_admin = message.text
chat_id = BotDB.get_user_by_message_id(message_id)
self.bot.send_message(chat_id, message_from_admin, reply_markup=markup)
# Админка
@self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline'])
def post_for_group(call):
if call.data == 'publish' and call.message.content_type == 'text':
try:
self.bot.send_message(chat_id=MAIN_PUBLIC, text=call.message.text)
self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except Exception as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
elif call.data == 'publish' and call.message.content_type == 'photo':
try:
self.bot.send_photo(
chat_id=MAIN_PUBLIC,
caption=call.message.caption,
photo=call.message.photo[-1].file_id,
)
self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except Exception as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
elif call.data == 'decline':
try:
self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except Exception as e:
if LOGS:
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@self.bot.callback_query_handler(func=lambda call: True)
def pagination(call):
if call.data[:3] == 'ban':
user_id = call.data[4:]
self.ban_user(call.message, user_id)
if call.data == 'return':
self.bot.delete_message(call.message.chat.id, call.message.message_id)
self.admin_panel(call.message)
if call.data[:5] == 'unban':
self.delete_user_blacklist(call.data[6:])
msg = f'Успешно удалено.'
self.bot.send_message(chat_id=call.message.chat.id, text=msg)
elif call.data[:4] == 'page':
if call.message.text == 'Список пользователей которые последними обращались к боту':
list_users = BotDB.get_last_users_from_db()
keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(list_users), list_users,
'ban')
self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id,
reply_markup=keyboard)
if "Список заблокированных пользователей".lower() in call.message.text.lower():
#Готовим сообщения
message_user = self.get_banned_users_list(int(call.data[5:]) * 7 - 7)
self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text=message_user)
#Готовим клавиатуру
buttons = self.get_banned_users_buttons()
keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban')
self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id,
reply_markup=keyboard)
def start(self):
while True:
try:
self.bot.polling(none_stop=True)
except (ConnectionError, Exception):
print(f"Произошла ошибка: {str(Exception)}\n\nTraceback:\n{traceback.format_exc()}")
def unban_notifier(self):
# Получение сегодняшней даты в формате DD-MM-YYYY
current_date = datetime.now()
today = current_date.strftime("%d-%m-%Y")
# Получение списка разблокированных пользователей
unblocked_users = BotDB.get_users_for_unblock_today(today)
message = "Разблокированные пользователи:\n"
for user_id, user_name in unblocked_users.items():
message += f"ID: {user_id}, Имя: {user_name}\n"
# Отправка сообщения в канал
self.bot.send_message(GROUP_FOR_MESSAGE, message)
# Черный список
def admin_panel(self, message):
try:
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Бан (Список)")
#item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю
#item3 = types.KeyboardButton("Удалить админа")
item4 = types.KeyboardButton("Разбан (список)")
markup.add(item1, item4)
self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.handle_admin_message)
except Exception as e:
self.bot.register_next_step_handler(message, self.admin_panel)
def handle_admin_message(self, message):
try:
if message.text == "Бан (Список)":
self.get_last_users(message)
elif message.text == "Разбан (список)":
self.get_banned_users(message)
except Exception as e:
self.bot.reply_to(message, f"Ошибка\n\n {e}")
self.admin_panel(message)
def ban_user(self, message, user_id: int):
user_name = BotDB.get_username(user_id=user_id)
ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None}
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Спам")
item2 = types.KeyboardButton("Заебал стикерами")
markup.add(item1, item2)
self.bot.send_message(message.chat.id,
f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object)
def ban_user_step_2(self, message, ban_object: dict):
ban_object['message_for_user'] = message.text
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("1")
item2 = types.KeyboardButton("7")
item3 = types.KeyboardButton("30")
item4 = types.KeyboardButton("Навсегда")
markup.add(item1, item2, item3, item4)
self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
f"его в чат",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object)
def ban_user_step_3(self, message, ban_object: dict):
date_to_unban = None
if message.text != 'Навсегда':
date_to_unban = self.add_days_to_date(message.text)
else:
pass
ban_object['date_to_unban'] = date_to_unban
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Подтвердить")
item2 = types.KeyboardButton("Отменить")
markup.add(item1, item2)
self.bot.send_message(message.chat.id,
f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}",
parse_mode='html',
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object)
def ban_user_final_step(self, message, ban_object: dict):
if message.text == 'Подтвердить':
exists = BotDB.check_user_in_blacklist(ban_object['user_id'])
if exists:
self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.")
self.admin_panel(message)
else:
BotDB.set_user_blacklist(ban_object['user_id'],
ban_object['user_name'],
ban_object['message_for_user'],
ban_object['date_to_unban'])
self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.")
self.admin_panel(message)
def get_last_users(self, message):
list_users = BotDB.get_last_users_from_db()
keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту",
reply_markup=keyboard)
def get_banned_users(self, message):
message_text = self.get_banned_users_list(0)
buttons_list = self.get_banned_users_buttons()
if buttons_list:
k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban')
self.bot.send_message(message.chat.id, message_text, reply_markup=k)
else:
self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет")
self.admin_panel(message)
def start_message(self, message):
try:
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
number_stick_hello = random.randint(1, len(name_stick_hello))
random_stick_hello = open(name_stick_hello[number_stick_hello], 'rb')
#logging
random_stick_hello = open(random.choice(name_stick_hello), 'rb')
# logging
if LOGS:
bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
bot.send_sticker(message.chat.id, random_stick_hello)
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
self.bot.send_sticker(message.chat.id, random_stick_hello)
sleep(0.3)
except:
except Exception as e:
print(f'{str(e)}')
if LOGS:
bot.send_message(IMPORTANT_LOGS, BotDB.get_error_message_from_db(7))
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
user_id = message.from_user.id
@@ -59,191 +340,132 @@ def telegram_bot():
is_bot = message.from_user.is_bot
username = message.from_user.username
language_code = message.from_user.language_code
time_UTC = int(time.time())
date_added = datetime.fromtimestamp(time_UTC)
date_changed = datetime.fromtimestamp(time_UTC)
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("📢Предложить свой пост")
item2 = types.KeyboardButton("📩Связаться с админами")
# TODO: Выпилил, удалить
#item3 = types.KeyboardButton("❌Удалить пост")
if BotDB.user_exists(user_id):
is_need_sticker = BotDB.get_info_about_stickers(user_id=message.from_user.id)
if is_need_sticker == 0:
item5 = types.KeyboardButton("🤪Хочу стикеры")
BotDB.update_info_about_stickers(user_id=message.from_user.id)
markup.add(item1, item2, item5)
else:
markup.add(item1, item2)
else:
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed)
is_need_sticker = BotDB.get_info_about_stickers(user_id=message.from_user.id)
if is_need_sticker == 0:
item5 = types.KeyboardButton("🤪Хочу стикеры")
BotDB.update_info_about_stickers(user_id=message.from_user.id)
markup.add(item1, item2, item5)
else:
markup.add(item1, item2)
hello_message = BotDB.get_message_from_db('start_message', first_name)
bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
except:
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
if not BotDB.user_exists(user_id):
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
date)
BotDB.update_date_for_user(date, user_id)
markup = self.get_reply_keyboard(message)
hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE')
self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup,
disable_web_page_preview=not PREVIEW_LINK)
except Exception as e:
print(f'{str(e)}')
if LOGS:
bot.send_message(IMPORTANT_LOGS, BotDB.get_error_message_from_db(8))
bot.register_next_step_handler(message, go_send_messages)
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@bot.message_handler(commands=['end_command'])
def after_post(message):
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("📢Предложить свой пост")
item2 = types.KeyboardButton("📩Связаться с админами")
# TODO: Скрыл, удалить обработчик
#item3 = types.KeyboardButton("❌Удалить пост")
item5 = types.KeyboardButton("👋🏼Сказать пока!")
markup.add(item1, item2, item5)
bot.send_message(message.chat.id,
"Выбери нужную кнопку внизу экрана".format(
message.from_user, bot.get_me()),
parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
bot.register_next_step_handler(message, go_send_messages)
def go_send_messages(message):
global msg
if message.text == '📢Предложить свой пост':
try:
markup = types.ReplyKeyboardRemove()
first_name = message.from_user.first_name
suggest_news = BotDB.get_message_from_db('suggest_news', first_name)
bot.send_message(message.chat.id, suggest_news, parse_mode='html')
sleep(0.3)
first_name = message.from_user.first_name
suggest_news_2 = BotDB.get_message_from_db('suggest_news_2', first_name)
msg = bot.send_message(message.chat.id, suggest_news_2,parse_mode='html', reply_markup=markup)
except:
if LOGS:
bot.send_message(IMPORTANT_LOGS, BotDB.get_error_message_from_db(10))
#logging
if LOGS:
bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
bot.register_next_step_handler(msg, resend_message_in_group_for_post)
elif message.text == "📩Связаться с админами":
first_name = message.from_user.first_name
connect_with_admin = BotDB.get_message_from_db('connect_with_admin', first_name)
msg = bot.send_message(message.chat.id, connect_with_admin, parse_mode="html")
#logging
if LOGS:
bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
bot.register_next_step_handler(msg, resend_message_in_group_for_message)
elif message.text == "❌Удалить пост":
#TODO: требует автоматизации. На входе говорим пришли мне пост, на выходе получаем идентификатор поста, удаляем из ТГ. Насчет удаления из ВК надо подумать
first_name = message.from_user.first_name
del_message = BotDB.get_message_from_db('del_message', first_name)
msg = bot.send_message(message.chat.id, del_message, parse_mode="html")
#logging
if LOGS:
bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
bot.register_next_step_handler(msg, resend_message_in_group_for_message)
elif message.text == "👋🏼Сказать пока!":
try:
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
number_stick_bye = random.randint(1, len(name_stick_bye))
random_stick_bye = open(name_stick_bye[number_stick_bye], 'rb')
bot.send_sticker(message.chat.id, random_stick_bye)
except ApiTelegramException:
if LOGS:
bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(11))
def resend_message_in_group_for_message(self, message):
self.bot.forward_message(chat_id=GROUP_FOR_MESSAGE,
from_chat_id=message.chat.id,
message_id=message.message_id
)
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.add_new_message_in_db(message.text, message.message_id + 1, message.from_user.id, date, 0)
question = messages.get_message(self.__get_first_name(message), 'QUESTION')
markup = self.get_reply_keyboard(message)
self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK,
reply_markup=markup)
def suggest_post(self, message):
try:
markup = types.ReplyKeyboardRemove()
try:
first_name = message.from_user.first_name
bye_message = BotDB.get_message_from_db('bye_message', first_name)
bot.send_message(message.chat.id, bye_message,
parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
except:
if LOGS:
bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(6))
suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS')
self.bot.send_message(message.chat.id, suggest_news, parse_mode='html')
sleep(0.3)
suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2')
self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup)
except Exception as e:
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
# logging
if LOGS:
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
if LOGS:
#logging
bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
elif message.text == "🤪Хочу стикеры":
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("📢Предложить свой пост")
item2 = types.KeyboardButton("📩Связаться с админами")
# TODO: Скрыл кнопку, убрать обработчик позднее
#item3 = types.KeyboardButton("❌Удалить пост")
item5 = types.KeyboardButton("👋🏼Сказать пока!")
markup.add(item1, item2, item5)
try:
if LOGS:
bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
bot.send_message(message.chat.id, text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', reply_markup=markup)
bot.register_next_step_handler(message, callback=go_send_messages)
except ApiTelegramException:
if LOGS:
bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(12))
else:
try:
first_name = message.from_user.first_name
user_error = BotDB.get_message_from_db('user_error', first_name)
bot.send_message(message.chat.id, user_error, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK)
except:
if LOGS:
bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(9))
#logging
if LOGS:
bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
bot.register_next_step_handler(message, callback=go_send_messages)
def stickers(self, message):
BotDB.update_info_about_stickers(user_id=message.from_user.id)
markup = self.get_reply_keyboard(message)
try:
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
self.bot.send_message(message.chat.id,
text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
reply_markup=markup)
except ApiTelegramException as e:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def resend_message_in_group_for_post(message):
def connect_with_admin(self, message):
connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN')
self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html")
# logging
if LOGS:
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
def end_message(self, message):
try:
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
random_stick_bye = open(random.choice(name_stick_bye), 'rb')
self.bot.send_sticker(message.chat.id, random_stick_bye)
except ApiTelegramException as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
markup = types.ReplyKeyboardRemove()
try:
bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE')
self.bot.send_message(message.chat.id, bye_message,
parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
except Exception as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
if LOGS:
# logging
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
def send_to_suggest(self, message):
markup = types.InlineKeyboardMarkup(row_width=1)
item1 = types.InlineKeyboardButton("Опубликовать", callback_data='post_post_post')
item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish')
item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline')
markup.add(item1, item2)
try:
if message.content_type == 'text':
post_text = message.text.lower()
if post_text.find('неанон') != -1 or post_text.find('не анон') != -1:
bot.send_message(
#TODO: GROUP_FOR_POST
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif post_text.find('анон') != -1:
bot.send_message(
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно',
reply_markup=markup
)
else:
bot.send_message(
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif message.content_type == 'photo' and message.media_group_id == None:
elif message.content_type == 'photo' and message.media_group_id is None:
post_text_for_photo = message.caption.lower()
if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1:
bot.send_photo(
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
@@ -251,7 +473,7 @@ def telegram_bot():
reply_markup=markup
)
elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1:
bot.send_photo(
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно',
@@ -259,81 +481,155 @@ def telegram_bot():
reply_markup=markup
)
else:
bot.send_photo(
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
photo=message.photo[-1].file_id,
reply_markup=markup
)
#TODO: Не понятна реализация с альбомами от слова совсем
#elif message.content_type == 'photo' and message.media_group_id != None:
# TODO: Не понятна реализация с альбомами от слова совсем
# elif message.content_type == 'photo' and message.media_group_id != None:
# bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id )
else:
pass
except:
except Exception as e:
if LOGS:
username = message.from_user.first_name
error_message = str(BotDB.get_error_message_from_db(5)).replace('username', username)
bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message)
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
markup_for_user = self.get_reply_keyboard(message)
success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
disable_web_page_preview=not PREVIEW_LINK, reply_markup=markup_for_user)
username = message.from_user.first_name
success_send_message = BotDB.get_message_from_db('success_send_message', username)
@staticmethod
def get_reply_keyboard(message):
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("📢Предложить свой пост")
item2 = types.KeyboardButton("📩Связаться с админами")
item3 = types.KeyboardButton("👋🏼Сказать пока!")
#TODO: Есть ощущение что не совсем так работает как надо
item4 = types.KeyboardButton("🤪Хочу стикеры") if not BotDB.get_info_about_stickers(
user_id=message.from_user.id) else None
bot.send_message(message.chat.id, success_send_message, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK)
after_post(message=message)
if item4:
markup.add(item1, item2, item3, item4)
else:
markup.add(item1, item2, item3)
def resend_message_in_group_for_message(message):
bot.forward_message(chat_id=GROUP_FOR_MESSAGE,
from_chat_id=message.chat.id,
message_id=message.message_id
)
username = message.from_user.first_name
question = BotDB.get_message_from_db('question', username)
bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK)
try:
pass
except:
if LOGS:
bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(4))
after_post(message=message)
return markup
@staticmethod
def __get_first_name(message):
return message.from_user.first_name
@staticmethod
def check_access(user_id: int):
"""Проверка прав на совершение действий"""
return BotDB.is_admin(user_id)
@staticmethod
def add_days_to_date(days):
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
current_date = datetime.now()
future_date = current_date + timedelta(days=int(days))
formatted_date = future_date.strftime("%d-%m-%Y")
return formatted_date
@staticmethod
def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[Any, Any]], callback: str):
"""
Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback
Args:
page: Номер текущей страницы.
total_items: Общее количество элементов.
array_items: Лист кортежей. Содержит в себе user_name: user_id
callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id})
Returns:
InlineKeyboardMarkup: Клавиатура с кнопками пагинации.
"""
# Определяем общее количество страниц
total_pages = (total_items + 7 - 1) // 7
page = page
# Создаем список кнопок
buttons = []
# Вычисляем стартовый номер для текущей страницы
start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы
# Кнопки с номерами страниц
for i in range(start_index, min(start_index + 7,
len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы
buttons.append(
types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}"))
# Добавляем кнопки "Предыдущая" и "Следующая"
if int(page) > 1:
buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}"))
if page < total_pages:
buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}"))
#Добавляем кнопку назад
buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return"))
# Формируем клавиатуру с 3 кнопками в ряд
keyboard = []
for i in range(0, len(buttons), 3):
keyboard.append(buttons[i:i + 3])
return types.InlineKeyboardMarkup(keyboard)
@staticmethod
def get_banned_users_list(offset: int):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
message = "Список заблокированных пользователей:\n"
for user in users:
message += f"Пользователь: {user[0]}\n"
message += f"Причина бана: {user[2]}\n"
message += f"Дата разбана: {user[3]}\n\n"
return message
@staticmethod
def get_banned_users_buttons():
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = BotDB.get_banned_users_from_db()
user_ids = []
for user in users:
user_ids.append((user[0], user[1]))
return user_ids
@staticmethod
def delete_user_blacklist(user_id):
return BotDB.delete_user_blacklist(user_id=user_id)
#Админка
@bot.callback_query_handler(func=lambda call: True)
def post_for_group(call):
if call.data == 'post_post_post' and call.message.content_type == 'text':
try:
bot.send_message(chat_id=MAIN_PUBLIC, text=call.message.text)
bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except:
if LOGS:
bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(3))
elif call.data == 'post_post_post' and call.message.content_type == 'photo':
try:
bot.send_photo(
chat_id=MAIN_PUBLIC,
caption=call.message.caption,
photo=call.message.photo[-1].file_id,
)
bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except:
if LOGS:
bot.send_message(chat_id=IMPORTANT_LOGS, text=BotDB.get_error_message_from_db(2))
elif call.data == 'decline':
try:
bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except:
if LOGS:
bot.send_message(IMPORTANT_LOGS, BotDB.get_error_message_from_db(1))
bot = TelegramHelperBot(BOT_TOKEN)
if __name__ == "__main__":
# Запускаем бота
bot.start()
scheduler = BackgroundScheduler()
scheduler.add_job(bot.unban_notifier(), 'cron', hour=0, minute=0)
scheduler.start()
if __name__ == '__main__':
telegram_bot()
while True:
try:
bot.polling(none_stop=True)
bot.enable_save_next_step_handlers(delay=2)
bot.load_next_step_handlers()
except (ConnectionError, Exception):
print("Произошла ошибка, перезапуск бота")

41
messages.py Normal file
View File

@@ -0,0 +1,41 @@
def get_message(username: str, type_message: str):
constants = {
'HELLO_MESSAGE': "Привет, username!👋🏼&Меня зовут Виби, я бот канала 'Влюбленный Бийск'❤🤖"
"&Я был создан для того, чтобы помочь тебе выложить пост в наш канал и если это необходимо, связаться с админами ✍✉"
"&Так же я могу выдать тебе набор стикеров, где я буду главным героем🦸‍♂"
"&Наш бот голосового общения переехал сюда: https://t.me/podslushano_biysk_bot 🎤&Там можно послушать о чем говорит наш город🎧"
"&Предлагай свой пост мне и я обязательно его опубликую😉"
"&Для продолжения взаимодействия воспользуйся меню внизу твоего дисплея⬇"
"&&Если что-то пошло не так: введи в чат команду /start, это перезапустит сценарий сначала."
"&Не жми кнопку несколько раз если я не ответил с первого раза. Возможно ведутся тех.работы и я отвечу позже"
"&&Основная группа в ВК: https://vk.com/love_bsk"
"&Основной канал в ТГ: https://t.me/love_bsk",
'SUGGEST_NEWS': "username, окей, жду от тебя текст поста🙌🏼"
"&В данный момент я работаю в тестовом режиме, поэтому к посту можно прикрепить не более одного фото и никаких аудио или видео👻"
"&&Обещаю, я научусь их обрабатывать, но позже🤝🤖",
'SUGGEST_NEWS_2': "Обрати внимание, что я умный и смогу из твоего текста понять команды указанные ниже😉"
"&Если хочешь чтобы пост был опубликован анонимно, напиши в любом месте своего поста слово 'анон'."
"&Если хочешь опубликовать пост не анонимно, то напиши 'не анон', 'неанон' или не пиши ничего."
"&&❗️❗️❗️Я обучен только на команды, указанные мной выше❗️❗️❗️👆"
"&‼Проверь, чтобы указание авторства было выполнено так как я попросил, иначе пост будет выложен не корректно"
"&Пост будет опубликован только в группе ТГ📩",
"CONNECT_WITH_ADMIN": "username, напиши свое обращение или предложение✍️"
"&Мы рассмотрим и ответим тебе в ближайшее время☺️❤️",
"DEL_MESSAGE": "username, напиши свое обращение или предложение✍"
"&Мы рассмотрим и ответим тебе в ближайшее время☺❤",
"BYE_MESSAGE": "Если позднее захочешь предложить еще один пост или обратиться к админам с вопросом, то просто пришли в чат команду 👉 /start"
"&&И тебе пока!👋🏼❤️",
"USER_ERROR": "Увы, я не понимаю тебя😐💔 Выбери один из пунктов в нижнем меню, а затем пиши.",
"QUESTION": "Сообщение успешно отправлено❤️ Ответим, как только сможем😉",
"SUCCESS_SEND_MESSAGE": "Пост успешно отправлен❤️ Ожидай одобрения😊",
"MESSAGE_FOR_STANDUP": "Отлично, ты вошел в режим стендапа 📣"
"&Это свободное пространство, в котором может высказаться каждый житель нашего города, и он будет услышан🙌🏼"
"&Для того чтобы высказаться, нажми кнопку: 'Высказаться' и запиши голосовое сообщение, оно выпадет анонимно кому-то другому🗣"
"&Для того чтобы послушать о чем говорит наш город, нажми кнопку: 'Послушать'👂"
"&Ты можешь анонимно пообщаться, поделиться чем-то важным, обратиться напрямую к жителям🤝 Также можешь выступить перед аудиторией (спеть песню, рассказать стихотворение, шутку)🎤"
"&❗️Но пожалуйста не оскорбляй никого, и будь вежлив."
}
message = constants[type_message]
return message.replace('username', username).replace('&', '\n')

View File

@@ -0,0 +1,27 @@
import os
from db import BotDB
BotDB = BotDB()
def get_filename():
"""Возвращает имя файла без расширения."""
filename = os.path.basename(__file__)
filename = os.path.splitext(filename)[0]
return filename
def main():
migrations_init = """
CREATE TABLE IF NOT EXISTS migrations (
version INTEGER PRIMARY KEY NOT NULL,
script_name TEXT NOT NULL,
created_at TEXT
);
"""
BotDB.create_table(migrations_init)
BotDB.update_version(0, get_filename())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,56 @@
import os
from db import BotDB
BotDB = BotDB()
def get_filename():
"""Возвращает имя файла без расширения."""
filename = os.path.basename(__file__)
filename = os.path.splitext(filename)[0]
return filename
def main():
# Проверка версии миграций
current_version = BotDB.get_current_version() # Добавьте функцию для получения версии
# Выполнение миграций и проверка последней версии
if current_version < 1:
# Скрипты миграции
create_table_sql_1 = """
CREATE TABLE IF NOT EXISTS "admins" (
user_id INTEGER NOT NULL,
"role" TEXT
);
"""
create_table_sql_2 = """CREATE TABLE IF NOT EXISTS "blacklist"
(
"user_id" INTEGER NOT NULL UNIQUE,
"user_name" INTEGER,
"message_for_user" INTEGER,
"date_to_unban" INTEGER
);
"""
create_table_sql_3 = """
CREATE TABLE IF NOT EXISTS user_messages (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
message_text TEXT,
user_id INTEGER,
message_id INTEGER NOT NULL,
date TEXT
);
"""
# Применение миграции
BotDB.create_table(create_table_sql_1)
BotDB.create_table(create_table_sql_2)
BotDB.create_table(create_table_sql_3)
BotDB.add_admin(842766148, 'creator')
BotDB.add_admin(920057022, 'admin')
filename = get_filename()
BotDB.update_version(1, filename)
if __name__ == "__main__":
main()

View File

@@ -1 +1,3 @@
pyTelegramBotAPI
pyTelegramBotAPI
APScheduler~=3.10.4
loguru~=0.7.2

View File

@@ -1,9 +1,13 @@
[Telegram]
bot_token = 5719450198:AAEI6hQgwiIcxMLGvVrPdopWqVlwHqR0QoU
bot_token = 000000000:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
preview_link = false
main_public = @love_biysk
group_for_posts = -793789724
group_for_message = -736077298
group_for_logs = -721685792
important_logs = -625900899
test_channel = -1001725605158
main_public = @test
group_for_posts = -00000000
group_for_message = -00000000
group_for_logs = -00000000
important_logs = -00000000
test_channel = -000000000000
[Settings]
logs = true
test = false