1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00
royalnet/royalbot.py

304 lines
10 KiB
Python
Raw Normal View History

2017-02-27 22:16:42 +00:00
import asyncio
2017-03-22 17:04:21 +00:00
import random
2017-03-29 09:23:20 +00:00
import extradiscord
2017-03-22 17:04:21 +00:00
import database
2017-04-26 09:59:44 +00:00
import lol
2017-03-22 17:04:21 +00:00
import royalbotconfig
import telegram
2017-03-05 17:14:45 +00:00
2017-03-22 16:23:17 +00:00
loop = asyncio.get_event_loop()
2017-03-05 18:12:02 +00:00
b = telegram.Bot(royalbotconfig.telegram_token)
2017-03-29 09:23:20 +00:00
d = extradiscord.ExtraClient(royalbotconfig.discord_token)
2017-02-27 22:16:42 +00:00
2017-03-22 16:23:17 +00:00
async def answer(bot, thing, text):
"""Rispondi al messaggio con il canale corretto."""
# Answer on Telegram
if isinstance(thing, telegram.Update):
await thing.message.reply(bot, text, parse_mode="Markdown")
# Answer on Discord
elif isinstance(thing, extradiscord.discord.Message):
await bot.send_message(thing.channel, text)
else:
raise TypeError("thing must be either a telegram.Update or a discord.Message")
async def status_typing(bot, thing):
"""Imposta lo stato a Bot sta scrivendo..."""
# Set typing status on Telegram
if isinstance(thing, telegram.Update):
await thing.message.chat.set_chat_action(bot, "typing")
# Set typing status on Discord
elif isinstance(thing, extradiscord.discord.Message):
await bot.send_typing(thing.channel)
else:
raise TypeError("thing must be either a telegram.Update or a discord.Message")
2017-04-26 07:01:37 +00:00
async def display_help(bot, thing, func):
"""Display the help command of a function"""
# Telegram bot commands start with /
if isinstance(thing, telegram.Update):
symbol = "/"
# Discord bot commands start with !
elif isinstance(thing, extradiscord.discord.Message):
symbol = "!"
# Unknown service
else:
raise TypeError("thing must be either a telegram.Update or a discord.Message")
# Display the help message
2017-04-20 08:11:20 +00:00
await answer(bot, thing, func.__doc__.format(symbol=symbol))
def find_date(thing):
"""Find the date of a message."""
if isinstance(thing, telegram.Update):
date = thing.message.date
elif isinstance(thing, extradiscord.discord.Message):
date = thing.timestamp
else:
raise TypeError("thing must be either a telegram.Update or a discord.Message")
return date
async def diario(bot, thing, arguments):
2017-03-05 17:14:45 +00:00
"""Aggiungi una frase al diario Royal Games.
2017-03-10 11:00:37 +00:00
Devi essere un Royal per poter eseguire questo comando.
Sintassi: `{symbol}diario <frase>`"""
# Set status to typing
await status_typing(bot, thing)
2017-03-23 18:43:26 +00:00
# Check the command syntax
2017-03-05 17:14:45 +00:00
if len(arguments) == 0:
await display_help(bot, thing, diario)
2017-03-05 17:14:45 +00:00
return
2017-03-30 10:13:10 +00:00
# Prepare the text
text = " ".join(arguments).strip()
# Add the new entry
2017-04-20 08:08:39 +00:00
database.new_diario_entry(find_date(thing), text)
2017-03-23 18:43:26 +00:00
# Answer on Telegram
await answer(bot, thing, "✅ Aggiunto al diario!")
2017-03-05 17:14:45 +00:00
async def leggi(bot, thing, arguments):
"""Leggi una frase con un id specifico dal diario Royal Games.
Sintassi: {symbol}leggi <numero>"""
# Set status to typing
await status_typing(bot, thing)
# Create a new database session
session = database.Session()
# Cast the number to an int
try:
n = int(arguments[0])
except ValueError:
await answer(bot, thing, "⚠ Il numero specificato non è valido.")
return
# Query the diario table for the entry with the specified id
entry = session.query(database.Diario).filter_by(id=n).first()
2017-04-03 06:14:20 +00:00
# Check if the entry exists
if entry is None:
await answer(bot, thing, "⚠ Non esiste una frase del diario con quel numero.")
2017-03-24 11:30:34 +00:00
return
2017-04-03 06:14:20 +00:00
# Display the entry
await answer(bot, thing, f"*Dal diario Royal Games, il {entry.date}*:\n"
f"{entry.text}")
2017-03-24 11:30:34 +00:00
2017-04-09 16:11:06 +00:00
async def helpme(bot, thing, arguments):
"""Visualizza il messaggio di aiuto di un comando.
2017-03-13 18:13:42 +00:00
2017-04-26 07:01:37 +00:00
Sintassi: `{symbol}helpme [comando]`"""
# Set status to typing
2017-04-09 16:11:06 +00:00
await status_typing(bot, thing)
# If no command is specified, show the help message for this command.
if len(arguments) == 0 or len(arguments) > 1:
2017-04-26 07:01:37 +00:00
await display_help(bot, thing, helpme)
2017-03-09 14:52:02 +00:00
return
2017-04-09 16:11:06 +00:00
# Check the list of telegram commands if the message was sent from Telegram
if isinstance(thing, telegram.Update):
if arguments[0] in b.commands:
2017-04-26 07:01:37 +00:00
await display_help(bot, thing, b.commands[arguments[0]])
2017-03-10 09:11:06 +00:00
else:
2017-04-09 16:11:06 +00:00
await answer(bot, thing, "⚠ Il comando specificato non esiste.")
# Check the list of discord commands if the message was sent from Discord
if isinstance(thing, extradiscord.discord.Message):
if arguments[0] in d.commands:
2017-04-26 07:01:37 +00:00
await display_help(bot, thing, d.commands[arguments[0]])
2017-03-22 18:13:22 +00:00
else:
2017-04-09 16:11:06 +00:00
await answer(bot, thing, "⚠ Il comando specificato non esiste.")
2017-03-10 09:34:27 +00:00
2017-04-09 16:11:06 +00:00
async def cv(bot, thing, arguments):
"""Visualizza lo stato attuale della chat vocale Discord.
2017-04-09 16:11:06 +00:00
Sintassi: `{symbol}cv`"""
# Set status to typing
2017-04-09 16:11:06 +00:00
await status_typing(bot, thing)
# Check command syntax
if len(arguments) != 0:
2017-04-26 08:46:36 +00:00
await display_help(bot, thing, cv)
return
# Wait for the Discord bot to login
2017-03-22 18:13:22 +00:00
while not d.client.is_logged_in:
await asyncio.sleep(1)
# Find all the users in the server
# Change this if the bot is logged in more than one server at once?
2017-03-22 18:13:22 +00:00
users = list(d.client.get_all_members())
# Find all the channels
channels = dict()
for user in users:
if user.voice_channel is not None:
if user.voice_channel.name not in channels:
channels[user.voice_channel.name] = list()
channels[user.voice_channel.name].append(user)
# Create the string to send to Telegram
to_send = str()
for channel in channels:
# Channel header
to_send += f"*{channel}:*\n"
# Users in channel
for user in channels[channel]:
# Online status
if user.status.name == "online":
2017-03-18 21:11:28 +00:00
# Online
status = "🔵"
2017-03-22 18:34:47 +00:00
elif user.status.name == "dnd" or (user.game is not None and user.game.type == 1):
2017-03-18 21:11:28 +00:00
# Do not disturb or streaming
2017-03-18 20:39:59 +00:00
status = "🔴"
elif user.status.name == "idle":
2017-03-18 21:11:28 +00:00
# Idle
status = ""
elif user.status.name == "offline":
2017-03-18 21:11:28 +00:00
# Invisible
status = ""
else:
2017-03-18 21:11:28 +00:00
# Unknown
status = ""
# Voice status
if user.bot:
2017-03-18 21:11:28 +00:00
# Music bot
volume = "🎵"
elif user.voice.deaf or user.voice.self_deaf:
2017-03-18 21:11:28 +00:00
# Deafened
volume = "🔇"
elif user.voice.mute or user.voice.self_mute:
2017-03-18 21:11:28 +00:00
# Muted
volume = "🔈"
else:
2017-03-18 21:11:28 +00:00
# Speaking
volume = "🔊"
# Game, is formatted
if user.game is not None:
2017-03-18 21:11:28 +00:00
# Playing
if user.game.type == 0:
# Game name
game = f"- *{user.game.name}*"
# Streaming
elif user.game.type == 1:
# Stream name and url
game = f"- [{user.game.name}]({user.game.url})"
else:
game = ""
2017-03-18 21:11:28 +00:00
# Nickname if available, otherwise use the username
if user.nick is not None:
name = user.nick
else:
name = user.name
# Add the user
to_send += f"{volume} {status} {name} {game}\n"
# Channel footer
to_send += "\n"
2017-04-09 16:11:06 +00:00
await answer(bot, thing, to_send)
2017-03-22 18:13:22 +00:00
2017-04-09 16:11:06 +00:00
async def roll(bot, thing, arguments):
2017-03-24 11:24:39 +00:00
"""Lancia un dado a N facce.
2017-04-09 16:11:06 +00:00
Sintassi: `{symbol}roll <max>`"""
# Set status to typing
2017-04-09 16:11:06 +00:00
await status_typing(bot, thing)
2017-03-24 11:24:39 +00:00
# Check the command syntax
2017-03-26 12:08:11 +00:00
if len(arguments) != 1:
2017-04-26 08:46:36 +00:00
await display_help(bot, thing, roll)
2017-03-24 11:24:39 +00:00
return
# Roll the dice!
2017-04-09 16:11:06 +00:00
await answer(bot, thing, f"*Numero generato:* {random.randrange(0, int(arguments[0])) + 1}")
2017-03-24 11:24:39 +00:00
2017-03-24 11:27:54 +00:00
2017-04-26 08:46:36 +00:00
# DISCORD ONLY!
async def syncdiscord(bot, thing: extradiscord.discord.Message, arguments):
"""Crea un nuovo account usando il tuo ID di Discord.
Sintassi: `{symbol}syncdiscord`"""
# Set status to typing
await status_typing(bot, thing)
# Check the command syntax
if len(arguments) != 0:
await display_help(bot, thing, syncdiscord)
return
# Open a new database session
session = database.Session()
# Check if the user already exists
user = session.query(database.Account).filter_by(id=thing.author.id).first()
if user is not None:
await answer(bot, thing, "⚠ L'account è già stato registrato.")
return
# Create a new user
user = database.Account(id=thing.author.id)
session.add(user)
session.commit()
# Notify the sender
await answer(bot, thing, "✅ Account registrato con successo!")
2017-04-26 09:59:44 +00:00
# DISCORD ONLY!
2017-04-26 08:46:36 +00:00
async def synclol(bot, thing, arguments):
"""Connetti il tuo account di LoL all'account Royal Games!
Sintassi: `{symbol}synclol <nome evocatore>`"""
# Set status to typing
await status_typing(bot, thing)
# Check the command syntax
2017-04-26 09:59:44 +00:00
if len(arguments) != 1:
2017-04-26 08:46:36 +00:00
await display_help(bot, thing, synclol)
return
2017-04-26 09:59:44 +00:00
# Open a new database session
session = database.Session()
2017-04-26 08:46:36 +00:00
# Create a new lol account and connect it to the user
2017-04-26 09:59:44 +00:00
user = session.query(database.Account).filter_by(id=thing.author.id).first()
if user is None:
await answer(bot, thing, "⚠ Fai il login prima di sincronizzare l'account di LoL.")
return
# Create a new LoL account and link it to the user
# TODO: IMPROVE THIS
summoner_name = " ".join(arguments)
data = await lol.get_summoner_data("euw", summoner_name=summoner_name)
lolaccount = database.LoL(data["id"])
lolaccount.parentid = user.id
session.add(lolaccount)
session.commit()
database.update_lol(data["id"])
2017-04-26 08:46:36 +00:00
if __name__ == "__main__":
# Init universal bot commands
b.commands["diario"] = diario
d.commands["diario"] = diario
2017-04-09 16:11:06 +00:00
b.commands["d"] = diario
b.commands["help"] = helpme
b.commands["helpme"] = helpme
d.commands["help"] = helpme
d.commands["helpme"] = helpme
b.commands["cv"] = cv
2017-04-26 08:46:36 +00:00
d.commands["syncdiscord"] = syncdiscord
2017-04-26 09:59:44 +00:00
d.commands["synclol"] = synclol
# Init Telegram bot
loop.create_task(b.run())
print("Telegram bot start scheduled!")
# Init Discord bot
2017-03-22 18:13:22 +00:00
loop.create_task(d.run())
print("Discord bot start scheduled!")
# Run everything!
2017-04-09 16:11:06 +00:00
loop.run_forever()