1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00
royalnet/telegrambot.py

403 lines
18 KiB
Python
Raw Normal View History

2018-01-25 14:24:17 +00:00
import datetime
2017-11-11 17:55:13 +00:00
import random
import math
2017-10-30 09:46:37 +00:00
import db
import errors
2018-01-25 14:24:17 +00:00
import stagismo
2018-03-14 10:53:26 +00:00
from telegram import Bot, Update, InlineKeyboardMarkup, InlineKeyboardButton
from telegram.ext import Updater, CommandHandler, CallbackQueryHandler
2018-04-09 21:02:24 +00:00
import telegram.error
2017-10-30 12:45:38 +00:00
from discord import Status as DiscordStatus
2018-04-09 17:56:43 +00:00
import subprocess
import os
2018-04-09 21:02:24 +00:00
import time
import cast
2017-10-30 09:46:37 +00:00
# Init the config reader
import configparser
config = configparser.ConfigParser()
config.read("config.ini")
2017-11-11 17:55:13 +00:00
discord_connection = None
# Find the latest git tag
2018-04-09 17:56:43 +00:00
if __debug__:
version = "Dev"
commit_msg = "_in sviluppo_"
2018-04-09 17:56:43 +00:00
else:
# Find the latest git tag
old_wd = os.getcwd()
try:
os.chdir(os.path.dirname(__file__))
version = str(subprocess.check_output(["git", "describe", "--tags"]), encoding="utf8").strip()
commit_msg = str(subprocess.check_output(["git", "log", "-1", "--pretty=%B"]), encoding="utf8").strip()
2018-04-09 17:56:43 +00:00
except Exception:
version = ""
finally:
os.chdir(old_wd)
2017-11-11 17:55:13 +00:00
2017-10-30 12:45:38 +00:00
2017-10-30 09:46:37 +00:00
def cmd_register(bot: Bot, update: Update):
session = db.Session()
try:
username = update.message.text.split(" ", 1)[1]
except IndexError:
bot.send_message(update.message.chat.id, "⚠️ Non hai specificato un username!")
2018-01-25 14:24:17 +00:00
session.close()
2017-10-30 09:46:37 +00:00
return
try:
t = db.Telegram.create(session,
royal_username=username,
telegram_user=update.message.from_user)
except errors.AlreadyExistingError:
bot.send_message(update.message.chat.id, "⚠ Il tuo account Telegram è già collegato a un account RYG o"
" l'account RYG che hai specificato è già collegato a un account"
" Telegram.")
2018-01-25 14:24:17 +00:00
session.close()
2017-10-30 09:46:37 +00:00
return
session.add(t)
session.commit()
bot.send_message(update.message.chat.id, "✅ Sincronizzazione completata!")
session.close()
2017-10-30 12:45:38 +00:00
def cmd_discord(bot: Bot, update: Update):
2018-01-20 14:28:04 +00:00
if discord_connection is None:
bot.send_message(update.message.chat.id, "⚠ Il bot non è sincronizzato con Discord al momento.")
return
2017-11-11 17:55:13 +00:00
discord_connection.send("/cv")
server_members = discord_connection.recv()
2018-01-19 08:52:01 +00:00
channels = {0:None}
members_in_channels = {0:[]}
2017-10-30 12:45:38 +00:00
message = ""
2018-01-19 08:52:01 +00:00
# Find all the channels
2017-10-30 12:45:38 +00:00
for member in server_members:
if member.voice.voice_channel is not None:
2018-01-19 08:52:01 +00:00
channel = members_in_channels.get(member.voice.voice_channel.id)
if channel is None:
members_in_channels[member.voice.voice_channel.id] = list()
channel = members_in_channels[member.voice.voice_channel.id]
channels[member.voice.voice_channel.id] = member.voice.voice_channel
channel.append(member)
else:
members_in_channels[0].append(member)
# Edit the message, sorted by channel
for channel in channels:
members_in_channels[channel].sort(key=lambda x: x.nick if x.nick is not None else x.name)
if channel == 0:
message += "Non in chat vocale:\n"
else:
message += f"In #{channels[channel].name}:\n"
for member in members_in_channels[channel]:
if member.status == DiscordStatus.offline and member.voice.voice_channel is None:
continue
# Online status emoji
2018-02-02 07:36:31 +00:00
if member.bot:
message += "🤖 "
elif member.status == DiscordStatus.online:
2018-01-19 08:52:01 +00:00
message += "🔵 "
elif member.status == DiscordStatus.idle:
message += "⚫️ "
elif member.status == DiscordStatus.dnd:
message += "🔴 "
elif member.status == DiscordStatus.offline:
message += "⚪️ "
# Voice
if channel != 0:
# Voice status
if member.voice.self_deaf:
message += f"🔇 "
elif member.voice.self_mute:
message += f"🔈 "
else:
message += f"🔊 "
# Nickname
if member.nick is not None:
message += member.nick
2017-10-30 12:45:38 +00:00
else:
2018-01-19 08:52:01 +00:00
message += member.name
# Game or stream
if member.game is not None:
if member.game.type == 0:
message += f" | 🎮 {member.game.name}"
elif member.game.type == 1:
message += f" | 📡 [{member.game.name}]({member.game.url})"
2018-02-02 07:36:31 +00:00
elif member.game.type == 2:
message += f" | 🎧 {member.game.name}"
elif member.game.type == 3:
message += f" | 📺 {member.game.name}"
2018-01-19 08:52:01 +00:00
message += "\n"
2017-10-30 12:45:38 +00:00
message += "\n"
2018-01-20 14:28:04 +00:00
bot.send_message(update.message.chat.id, message, disable_web_page_preview=True)
2017-10-30 12:45:38 +00:00
2017-11-11 17:55:13 +00:00
def cmd_cast(bot: Bot, update: Update):
try:
spell: str = update.message.text.split(" ", 1)[1]
2017-11-11 17:55:13 +00:00
except IndexError:
bot.send_message(update.message.chat.id, "⚠️ Non hai specificato nessun incantesimo!\n"
"Sintassi corretta: `/cast <nome_incantesimo>`")
return
# Open a new db session
session = db.Session()
# Find a target for the spell
target = random.sample(session.query(db.Telegram).all(), 1)[0]
# Close the session
session.close()
bot.send_message(update.message.chat.id, cast.cast(spell_name=spell,
target_name=target.username if target.username is not None
else target.first_name, platform="telegram"),
parse_mode="HTML")
2018-01-25 14:24:17 +00:00
def cmd_color(bot: Bot, update: Update):
bot.send_message(update.message.chat.id, "I am sorry, unknown error occured during working with your request, Admin were notified")
def cmd_smecds(bot: Bot, update: Update):
ds = random.sample(stagismo.listona, 1)[0]
bot.send_message(update.message.chat.id, f"Secondo me, è colpa {ds}.")
def cmd_ciaoruozi(bot: Bot, update: Update):
if update.message.from_user.username.lstrip("@") == "MeStakes":
bot.send_message(update.message.chat.id, "Ciao me!")
else:
bot.send_message(update.message.chat.id, "Ciao Ruozi!")
def cmd_ahnonlosoio(bot: Bot, update: Update):
if update.message.reply_to_message is not None and update.message.reply_to_message.text in ["/ahnonlosoio", "/ahnonlosoio@royalgamesbot", "Ah, non lo so io!"]:
bot.send_message(update.message.chat.id, "Ah, non lo so neppure io!")
else:
bot.send_message(update.message.chat.id, "Ah, non lo so io!")
def cmd_balurage(bot: Bot, update: Update):
session = db.Session()
try:
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if user is None:
bot.send_message(update.message.chat.id, "⚠ Il tuo account Telegram non è registrato al RYGdb! Registrati con `/register@royalgamesbot <nomeutenteryg>`.")
return
try:
reason = update.message.text.split(" ", 1)[1]
except IndexError:
reason = None
br = db.BaluRage(royal_id=user.royal_id, reason=reason)
session.add(br)
session.commit()
bot.send_message(update.message.chat.id, f"😡 Stai sfogando la tua ira sul bot!")
2018-03-14 10:53:26 +00:00
except Exception:
2018-01-25 14:24:17 +00:00
raise
finally:
session.close()
def cmd_diario(bot: Bot, update: Update):
session = db.Session()
try:
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if user is None:
bot.send_message(update.message.chat.id, "⚠ Il tuo account Telegram non è registrato al RYGdb! Registrati con `/register@royalgamesbot <nomeutenteryg>`.")
return
try:
text = update.message.text.split(" ", 1)[1]
author = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
saver = author
except IndexError:
if update.message.reply_to_message is None:
bot.send_message(update.message.chat.id, f"⚠ Non hai specificato cosa aggiungere al diario! Puoi rispondere `/diario@royalgamesbot` al messaggio che vuoi salvare nel diario oppure scrivere `/diario@royalgamesbot <messaggio>` per aggiungere quel messaggio nel diario.\n"
f"Se l'hai fatto, e continua a comparire questo errore, allora Telegram è stupido e non mi vuole far vedere il messaggio a cui hai risposto.")
return
text = update.message.reply_to_message.text
author = session.query(db.Telegram).filter_by(telegram_id=update.message.reply_to_message.from_user.id).one_or_none()
saver = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if text is None:
bot.send_message(update.message.chat.id, f"⚠ Il messaggio a cui hai risposto non contiene testo.")
return
diario = db.Diario(timestamp=datetime.datetime.now(),
saver=saver,
author=author,
text=text)
session.add(diario)
session.commit()
bot.send_message(update.message.chat.id, f"✅ Aggiunto al diario!")
except Exception:
raise
finally:
session.close()
2017-11-11 17:55:13 +00:00
2018-03-14 10:53:26 +00:00
def cmd_vote(bot: Bot, update: Update):
session = db.Session()
try:
user = session.query(db.Telegram).filter_by(telegram_id=update.message.from_user.id).one_or_none()
if user is None:
bot.send_message(update.message.chat.id,
"⚠ Il tuo account Telegram non è registrato al RYGdb! Registrati con `/register@royalgamesbot <nomeutenteryg>`.")
return
try:
_, mode, question = update.message.text.split(" ", 2)
except IndexError:
bot.send_message(update.message.chat.id,
"⚠ Non hai specificato tutti i parametri necessari!"
"Sintassi: `/vote@royalgamesbot <public|secret> <domanda>`")
return
if mode == "public":
vote = db.VoteQuestion(question=question, anonymous=False)
elif mode == "secret":
vote = db.VoteQuestion(question=question, anonymous=True)
else:
bot.send_message(update.message.chat.id,
"⚠ Non hai specificato una modalità valida!"
"Sintassi: `/vote@royalgamesbot <public|secret> <domanda>`")
return
session.add(vote)
session.flush()
inline_keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("🔵 Sì", callback_data="vote_yes")],
[InlineKeyboardButton("🔴 No", callback_data="vote_no")],
[InlineKeyboardButton("⚫️ Astieniti", callback_data="vote_abstain")]])
message = bot.send_message(update.message.chat.id, vote.generate_text(session=session), reply_markup=inline_keyboard,
parse_mode="HTML")
vote.message_id = message.message_id
session.commit()
2018-03-14 15:28:49 +00:00
except Exception as e:
2018-03-14 10:53:26 +00:00
raise
finally:
session.close()
def on_callback_query(bot: Bot, update: Update):
if update.callback_query.data == "vote_yes":
choice = db.VoteChoices.YES
emoji = "🔵"
elif update.callback_query.data == "vote_no":
choice = db.VoteChoices.NO
emoji = "🔴"
elif update.callback_query.data == "vote_abstain":
choice = db.VoteChoices.ABSTAIN
emoji = "⚫️"
else:
raise NotImplementedError()
if update.callback_query.data.startswith("vote_"):
session = db.Session()
try:
user = session.query(db.Telegram).filter_by(telegram_id=update.callback_query.from_user.id).one_or_none()
if user is None:
bot.answer_callback_query(update.callback_query.id, show_alert=True,
text="⚠ Il tuo account Telegram non è registrato al RYGdb!"
" Registrati con `/register@royalgamesbot <nomeutenteryg>`.")
2018-03-14 10:53:26 +00:00
return
question = session.query(db.VoteQuestion).filter_by(message_id=update.callback_query.message.message_id).one()
answer = session.query(db.VoteAnswer).filter_by(question=question, user=user).one_or_none()
if answer is None:
answer = db.VoteAnswer(question=question, choice=choice, user=user)
session.add(answer)
else:
answer.choice = choice
session.commit()
bot.answer_callback_query(update.callback_query.id, text=f"Hai votato {emoji}.", cache_time=1)
inline_keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("🔵 Sì", callback_data="vote_yes")],
[InlineKeyboardButton("🔴 No", callback_data="vote_no")],
[InlineKeyboardButton("⚫️ Astieniti", callback_data="vote_abstain")]])
bot.edit_message_text(message_id=update.callback_query.message.message_id, chat_id=update.callback_query.message.chat.id,
text=question.generate_text(session), reply_markup=inline_keyboard,
parse_mode="HTML")
except Exception:
2018-03-14 10:53:26 +00:00
raise
finally:
session.close()
2018-04-01 10:57:24 +00:00
def cmd_ban(bot: Bot, update: Update):
if datetime.date.today() != datetime.date(2019, 4, 1):
2018-04-01 10:57:24 +00:00
bot.send_message(update.message.chat.id, "⚠ Non è il giorno adatto per bannare persone!")
return
session = db.Session()
try:
last_bans = session.query(db.AprilFoolsBan).filter(db.AprilFoolsBan.datetime > (datetime.datetime.now() - datetime.timedelta(minutes=15))).all()
2018-04-01 10:57:24 +00:00
if len(last_bans) > 0:
bot.send_message(update.message.chat.id, "⚠ /ban è in cooldown.\n"
"Può essere usato solo 1 volta ogni 15 minuti!")
2018-04-01 10:57:24 +00:00
return
try:
arg = update.message.text.split(" ", 1)[1]
except IndexError:
bot.send_message(update.message.chat.id, "⚠ Devi specificare un bersaglio!")
return
target_user = session.query(db.Telegram).filter_by(username=arg).one_or_none()
if target_user is None:
bot.send_message(update.message.chat.id, "⚠ Il bersaglio specificato non esiste nel RYGdb.\n"
"Le possibilità sono due: non è un membro RYG, "
"oppure non si è ancora registrato e va bannato manualmente.")
2018-04-01 10:57:24 +00:00
return
if int(target_user.telegram_id) == 25167391:
bot.send_message(update.message.chat.id, "⚠ Il creatore della chat non può essere espulso.")
return
bannerino = db.AprilFoolsBan(from_user_id=update.message.from_user.id, to_user_id=target_user.telegram_id, datetime=datetime.datetime.now())
session.add(bannerino)
session.commit()
bot.kick_chat_member(update.message.chat.id, target_user.telegram_id)
bot.unban_chat_member(update.message.chat.id, target_user.telegram_id)
try:
bot.send_message(target_user.telegram_id, "https://t.me/joinchat/AYAGH0TEav8WcbPVfNe75A")
except Exception:
pass
bot.send_message(update.message.chat.id, "🔨")
except Exception as e:
pass
finally:
session.close()
def cmd_eat(bot: Bot, update: Update):
try:
food: str = update.message.text.split(" ", 1)[1].capitalize()
except IndexError:
bot.send_message(update.message.chat.id, "⚠️ Non hai specificato cosa mangiare!\n"
"Sintassi corretta: `/food <cibo>`")
return
if food == "Uranio" and discord_connection is not None:
bot.send_message(update.message.chat.id, f"☢️ Ti senti improvvisamente radioattivo.")
discord_connection.send("/uranium")
return
bot.send_message(update.message.chat.id, f"🍗 Hai mangiato {food}!")
2017-11-11 17:55:13 +00:00
def process(arg_discord_connection):
2017-10-30 12:45:38 +00:00
print("Telegrambot starting...")
2018-01-20 14:28:04 +00:00
if arg_discord_connection is not None:
global discord_connection
discord_connection = arg_discord_connection
2017-10-30 12:45:38 +00:00
u = Updater(config["Telegram"]["bot_token"])
u.dispatcher.add_handler(CommandHandler("register", cmd_register))
u.dispatcher.add_handler(CommandHandler("discord", cmd_discord))
2018-01-25 14:24:17 +00:00
u.dispatcher.add_handler(CommandHandler("cv", cmd_discord))
2017-11-11 17:55:13 +00:00
u.dispatcher.add_handler(CommandHandler("cast", cmd_cast))
2018-01-25 14:24:17 +00:00
u.dispatcher.add_handler(CommandHandler("color", cmd_color))
u.dispatcher.add_handler(CommandHandler("smecds", cmd_smecds))
u.dispatcher.add_handler(CommandHandler("ciaoruozi", cmd_ciaoruozi))
u.dispatcher.add_handler(CommandHandler("ahnonlosoio", cmd_ahnonlosoio))
u.dispatcher.add_handler(CommandHandler("balurage", cmd_balurage))
u.dispatcher.add_handler(CommandHandler("diario", cmd_diario))
2018-03-14 10:53:26 +00:00
u.dispatcher.add_handler(CommandHandler("vote", cmd_vote))
2018-04-01 10:57:24 +00:00
u.dispatcher.add_handler(CommandHandler("ban", cmd_ban))
u.dispatcher.add_handler(CommandHandler("eat", cmd_eat))
2018-03-14 10:53:26 +00:00
u.dispatcher.add_handler(CallbackQueryHandler(on_callback_query))
u.bot.send_message(config["Telegram"]["main_group"],
f" Royal Bot avviato e pronto a ricevere comandi!\n"
f"Ultimo aggiornamento: `{version}: {commit_msg}`",
parse_mode="Markdown")
2018-04-09 21:02:24 +00:00
while True:
try:
u.start_polling()
u.idle()
except telegram.error.TimedOut:
print("Telegrambot timed out.")
time.sleep(60)
print("Telegrambot restarting...")
2018-01-20 14:28:04 +00:00
2018-01-20 14:28:04 +00:00
if __name__ == "__main__":
2018-01-25 14:24:17 +00:00
process(None)