1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

Actually add the command

This commit is contained in:
Steffo 2019-12-17 21:02:02 +01:00
parent fe30bcfbda
commit d728bc6dd2
7 changed files with 4 additions and 492 deletions

View file

@ -2,7 +2,7 @@
[tool.poetry]
name = "royalpack"
version = "5.1.8"
version = "5.1.9"
description = "A Royalnet command pack for the Royal Games community"
authors = ["Stefano Pigozzi <ste.pigozzi@gmail.com>"]
license = "AGPL-3.0+"

View file

@ -8,21 +8,16 @@ from .reminder import ReminderCommand
from .ship import ShipCommand
from .smecds import SmecdsCommand
from .videochannel import VideochannelCommand
# from .trivia import TriviaCommand
# from .matchmaking import MatchmakingCommand
from .pause import PauseCommand
from .play import PlayCommand
# from .playmode import PlaymodeCommand
from .queue import QueueCommand
from .skip import SkipCommand
from .summon import SummonCommand
from .youtube import YoutubeCommand
from .soundcloud import SoundcloudCommand
# from .zawarudo import ZawarudoCommand
from .emojify import EmojifyCommand
from .leagueoflegends import LeagueoflegendsCommand
from .diarioquote import DiarioquoteCommand
# from .mp3 import Mp3Command
from .peertubeupdates import PeertubeUpdatesCommand
from .googlevideo import GooglevideoCommand
from .yahoovideo import YahoovideoCommand
@ -31,6 +26,7 @@ from .spell import SpellCommand
from .ahnonlosoio import AhnonlosoioCommand
from .eat import EatCommand
from .pmots import PmotsCommand
from .peertube import PeertubeCommand
# Enter the commands of your Pack here!
available_commands = [
@ -43,21 +39,16 @@ available_commands = [
ShipCommand,
SmecdsCommand,
VideochannelCommand,
# TriviaCommand,
# MatchmakingCommand,
PauseCommand,
PlayCommand,
# PlaymodeCommand,
QueueCommand,
SkipCommand,
SummonCommand,
YoutubeCommand,
SoundcloudCommand,
# ZawarudoCommand,
EmojifyCommand,
LeagueoflegendsCommand,
DiarioquoteCommand,
# Mp3Command,
PeertubeUpdatesCommand,
GooglevideoCommand,
YahoovideoCommand,
@ -66,6 +57,7 @@ available_commands = [
AhnonlosoioCommand,
EatCommand,
PmotsCommand,
PeertubeCommand,
]
# Don't change this, it should automatically generate __all__

View file

@ -1,244 +0,0 @@
import datetime
import re
import dateparser
import typing
from telegram import Bot as PTBBot
from telegram import Message as PTBMessage
from telegram.error import BadRequest, Unauthorized
from telegram import InlineKeyboardMarkup as InKeMa
from telegram import InlineKeyboardButton as InKeBu
from royalnet.commands import *
from royalnet.bots import TelegramBot
from royalnet.utils import telegram_escape, asyncify, sleep_until
from ..tables import MMEvent, MMResponse, User
from ..utils import MMChoice, MMInterfaceDataTelegram
class MatchmakingCommand(Command):
name: str = "matchmaking"
description: str = "Cerca persone per una partita a qualcosa!"
syntax: str = "[ {ora} ] {nome}\n[descrizione]"
aliases = ["mm", "lfg"]
tables = {MMEvent, MMResponse}
def __init__(self, interface: CommandInterface):
super().__init__(interface)
# Find all relevant MMEvents and run them
session = self.alchemy.Session()
mmevents = (
session
.query(self.alchemy.MMEvent)
.filter(self.alchemy.MMEvent.interface == self.interface.name,
self.alchemy.MMEvent.datetime > datetime.datetime.now())
.all()
)
for mmevent in mmevents:
self.interface.loop.create_task(self._run_mmevent(mmevent.mmid))
async def run(self, args: CommandArgs, data: CommandData) -> None:
# Create a new MMEvent and run it
if self.interface.name != "telegram":
raise UnsupportedError(f"{self.interface.prefix}matchmaking funziona solo su Telegram. Per ora.")
author = await data.get_author(error_if_none=True)
try:
timestring, title, description = args.match(r"\[\s*([^]]+)\s*]\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
except InvalidInputError:
timestring, title, description = args.match(r"\s*(.+?)\s*\n\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
try:
dt: typing.Optional[datetime.datetime] = dateparser.parse(timestring, settings={
"PREFER_DATES_FROM": "future"
})
except OverflowError:
dt = None
if dt is None:
raise CommandError("La data che hai specificato non è valida.")
if dt <= datetime.datetime.now():
raise CommandError("La data che hai specificato è nel passato.")
if dt >= datetime.datetime.now() + datetime.timedelta(days=90):
raise CommandError("La data che hai specificato è a più di 90 giorni di distanza da oggi.")
mmevent: MMEvent = self.interface.alchemy.MMEvent(creator=author,
datetime=dt,
title=title,
description=description,
interface=self.interface.name)
data.session.add(mmevent)
await data.session_commit()
self.interface.loop.create_task(self._run_mmevent(mmevent.mmid))
await data.reply(f"✅ Evento [b]{mmevent.title}[/b] creato!")
_mm_chat_id = -1001287169422
_mm_error_chat_id = -1001153723135
def _gen_mm_message(self, mmevent: MMEvent) -> str:
text = f"🌐 [{mmevent.datetime.strftime('%Y-%m-%d %H:%M')}] [b]{mmevent.title}[/b]\n"
if mmevent.description:
text += f"{mmevent.description}\n"
text += "\n"
for response in mmevent.responses:
response: MMResponse
text += f"{response.choice.value} {response.user}\n"
return text
def _gen_telegram_keyboard(self, mmevent: MMEvent):
return InKeMa([
[InKeBu(f"{MMChoice.YES.value} Ci sarò!", callback_data=f"mm{mmevent.mmid}_YES")],
[InKeBu(f"{MMChoice.MAYBE.value} (Forse.)", callback_data=f"mm{mmevent.mmid}_MAYBE")],
[InKeBu(f"{MMChoice.LATE_SHORT.value} Arrivo dopo 5-10 min.",
callback_data=f"mm{mmevent.mmid}_LATE_SHORT")],
[InKeBu(f"{MMChoice.LATE_MEDIUM.value} Arrivo dopo 15-35 min.",
callback_data=f"mm{mmevent.mmid}_LATE_MEDIUM")],
[InKeBu(f"{MMChoice.LATE_LONG.value} Arrivo dopo 40+ min.", callback_data=f"mm{mmevent.mmid}_LATE_LONG")],
[InKeBu(f"{MMChoice.NO_TIME.value} Non posso a quell'ora...", callback_data=f"mm{mmevent.mmid}_NO_TIME")],
[InKeBu(f"{MMChoice.NO_INTEREST.value} Non mi interessa.", callback_data=f"mm{mmevent.mmid}_NO_INTEREST")],
[InKeBu(f"{MMChoice.NO_TECH.value} Ho un problema!", callback_data=f"mm{mmevent.mmid}_NO_TECH")],
])
async def _update_telegram_mm_message(self, client: PTBBot, mmevent: MMEvent):
try:
await self.interface.bot.safe_api_call(client.edit_message_text,
chat_id=self._mm_chat_id,
text=telegram_escape(self._gen_mm_message(mmevent)),
message_id=mmevent.interface_data.message_id,
parse_mode="HTML",
disable_web_page_preview=True,
reply_markup=self._gen_telegram_keyboard(mmevent))
except BadRequest:
pass
def _gen_mm_telegram_callback(self, client: PTBBot, mmid: int, choice: MMChoice):
async def callback(data: CommandData):
author = await data.get_author(error_if_none=True)
# Find the MMEvent with the current session
mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.MMEvent).get, mmid)
mmresponse: MMResponse = await asyncify(
data.session.query(self.alchemy.MMResponse).filter_by(user=author, mmevent=mmevent).one_or_none)
if mmresponse is None:
mmresponse = self.alchemy.MMResponse(user=author, mmevent=mmevent, choice=choice)
data.session.add(mmresponse)
else:
mmresponse.choice = choice
await data.session_commit()
await self._update_telegram_mm_message(client, mmevent)
return f"✅ Messaggio ricevuto!"
return callback
def _gen_event_start_message(self, mmevent: MMEvent):
text = f"🚩 L'evento [b]{mmevent.title}[/b] è iniziato!\n\n"
for response in mmevent.responses:
response: MMResponse
text += f"{response.choice.value} {response.user}\n"
return text
def _gen_unauth_message(self, user: User):
return f"⚠️ Non sono autorizzato a mandare messaggi a [b]{user.username}[/b]!\n" \
f"{user.telegram.mention()}, apri una chat privata con me e mandami un messaggio!"
async def _run_mmevent(self, mmid: int):
"""Run a MMEvent."""
# Open a new Alchemy Session
session = self.alchemy.Session()
# Find the MMEvent with the current session
mmevent: MMEvent = await asyncify(session.query(self.alchemy.MMEvent).get, mmid)
if mmevent is None:
raise ValueError("Invalid mmid.")
# Ensure the MMEvent hasn't already started
if mmevent.datetime <= datetime.datetime.now():
raise ValueError("MMEvent has already started.")
# Ensure the MMEvent interface matches the current one
if mmevent.interface != self.interface.name:
raise ValueError("Invalid interface.")
# If the matchmaking message hasn't been sent yet, do so now
if mmevent.interface_data is None:
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.bot
client: PTBBot = bot.client
# Build the Telegram keyboard
# Send the keyboard
message: PTBMessage = await self.interface.bot.safe_api_call(client.send_message,
chat_id=self._mm_chat_id,
text=telegram_escape(
self._gen_mm_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=self._gen_telegram_keyboard(
mmevent))
# Store message data in the interface data object
mmevent.interface_data = MMInterfaceDataTelegram(chat_id=self._mm_chat_id,
message_id=message.message_id)
await asyncify(session.commit)
else:
raise UnsupportedError()
# Register handlers for the keyboard events
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.bot
client: PTBBot = bot.client
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_YES",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.YES))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_MAYBE",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.MAYBE))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.LATE_SHORT))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.LATE_MEDIUM))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.LATE_LONG))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_TIME",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.NO_TIME))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_INTEREST",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.NO_INTEREST))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_TECH",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.NO_TECH))
else:
raise UnsupportedError()
# Sleep until the time of the event
await sleep_until(mmevent.datetime)
# Notify the positive answers of the event start
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.bot
client: PTBBot = bot.client
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_YES")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_MAYBE")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_TIME")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_INTEREST")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_TECH")
for response in mmevent.responses:
if response.choice == MMChoice.NO_INTEREST or response.choice == MMChoice.NO_TIME:
return
try:
await self.interface.bot.safe_api_call(client.send_message,
chat_id=response.user.telegram[0].tg_id,
text=telegram_escape(self._gen_event_start_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True)
except Unauthorized:
await self.interface.bot.safe_api_call(client.send_message,
chat_id=self._mm_error_chat_id,
text=telegram_escape(
self._gen_unauth_message(response.user)),
parse_mode="HTML",
disable_webpage_preview=True)
else:
raise UnsupportedError()
# Delete the event message
if self.interface.name == "telegram":
await self.interface.bot.safe_api_call(client.delete_message,
chat_id=mmevent.interface_data.chat_id,
message_id=mmevent.interface_data.message_id)
# The end!
await asyncify(session.close)

View file

@ -1,40 +0,0 @@
import typing
import urllib.parse
import asyncio
from royalnet.commands import *
from royalnet.utils import asyncify
from royalnet.audio import YtdlMp3
class Mp3Command(Command):
name: str = "mp3"
aliases = ["dlmusic"]
description: str = "Scarica un video con youtube-dl e invialo in chat."
syntax = "{ytdlstring}"
ytdl_args = {
"format": "bestaudio",
"outtmpl": f"./downloads/%(title)s.%(ext)s"
}
seconds_before_deletion = 15 * 60
async def run(self, args: CommandArgs, data: CommandData) -> None:
url = args.joined()
if url.startswith("http://") or url.startswith("https://"):
vfiles: typing.List[YtdlMp3] = await asyncify(YtdlMp3.create_and_ready_from_url,
url,
**self.ytdl_args)
else:
vfiles = await asyncify(YtdlMp3.create_and_ready_from_url, f"ytsearch:{url}", **self.ytdl_args)
for vfile in vfiles:
await data.reply(f"⬇️ Il file richiesto può essere scaricato a:\n"
f"https://scaleway.steffo.eu/{urllib.parse.quote(vfile.mp3_filename.replace('./downloads/', './musicbot_cache/'))}\n"
f"Verrà eliminato tra {self.seconds_before_deletion} secondi.")
await asyncio.sleep(self.seconds_before_deletion)
for vfile in vfiles:
vfile.delete()
await data.reply(f"⏹ Il file {vfile.info.title} è scaduto ed è stato eliminato.")

View file

@ -1,57 +0,0 @@
import typing
import discord
from royalnet.commands import *
from royalnet.audio.playmodes import Playlist, Pool, Layers
from royalnet.bots import DiscordBot
class PlaymodeCommand(Command):
name: str = "playmode"
aliases = ["pm", "mode"]
description: str = "Cambia modalità di riproduzione per la chat vocale."
syntax = "[ [guild] ] {mode}"
@staticmethod
async def _legacy_playmode_handler(bot: "DiscordBot", guild_name: typing.Optional[str], mode_name: str):
"""Handle a playmode Royalnet request. That is, change current PlayMode."""
# Find the matching guild
if guild_name:
guilds: typing.List[discord.Guild] = bot.client.find_guild_by_name(guild_name)
else:
guilds = bot.client.guilds
if len(guilds) == 0:
raise CommandError("No guilds with the specified name found.")
if len(guilds) > 1:
raise CommandError("Multiple guilds with the specified name found.")
guild = list(bot.client.guilds)[0]
# Delete the previous PlayMode, if it exists
if bot.music_data[guild] is not None:
bot.music_data[guild].playmode.delete()
# Create the new PlayMode
if mode_name == "playlist":
bot.music_data[guild].playmode = Playlist()
elif mode_name == "pool":
bot.music_data[guild].playmode = Pool()
elif mode_name == "layers":
bot.music_data[guild].playmode = Layers()
else:
raise CommandError("Unknown PlayMode specified.")
return {}
_event_name = "_legacy_playmode"
def __init__(self, interface: CommandInterface):
super().__init__(interface)
if interface.name == "discord":
interface.register_herald_action(self._event_name, self._legacy_playmode_handler)
async def run(self, args: CommandArgs, data: CommandData) -> None:
guild_name, mode_name = args.match(r"(?:\[(.+)])?\s*(\S+)\s*")
await self.interface.call_herald_action("discord", self._event_name, {
"guild_name": guild_name,
"mode_name": mode_name
})
await data.reply(f"🔃 Impostata la modalità di riproduzione a: [c]{mode_name}[/c].")

View file

@ -1,139 +0,0 @@
import typing
import asyncio
import aiohttp
import random
import uuid
import html
from royalnet.commands import *
from royalnet.utils import asyncify
from ..tables import TriviaScore
class TriviaCommand(Command):
name: str = "trivia"
aliases = ["t"]
description: str = "Manda una domanda dell'OpenTDB in chat."
tables = {TriviaScore}
syntax = "[credits|scores]"
_letter_emojis = ["🇦", "🇧", "🇨", "🇩"]
_medal_emojis = ["🥇", "🥈", "🥉", "🔹"]
_correct_emoji = ""
_wrong_emoji = ""
_answer_time = 17
_question_lock: bool = False
def __init__(self, interface: CommandInterface):
super().__init__(interface)
self._answerers: typing.Dict[uuid.UUID, typing.Dict[str, bool]] = {}
async def run(self, args: CommandArgs, data: CommandData) -> None:
arg = args.optional(0)
if arg == "credits":
await data.reply(f" [c]{self.interface.prefix}{self.name}[/c] di [i]Steffo[/i]\n"
f"\n"
f"Tutte le domande vengono dall'[b]Open Trivia Database[/b] di [i]Pixeltail Games[/i],"
f" creatori di Tower Unite, e sono rilasciate sotto la licenza [b]CC BY-SA 4.0[/b].")
return
elif arg == "scores":
trivia_scores = await asyncify(data.session.query(self.alchemy.TriviaScore).all)
strings = ["🏆 [b]Trivia Leaderboards[/b]\n"]
for index, ts in enumerate(sorted(trivia_scores, key=lambda ts: -ts.correct_rate)):
if index > 3:
index = 3
strings.append(f"{self._medal_emojis[index]} {ts.royal.username}"
f" ({ts.correct_answers}/{ts.total_answers})")
await data.reply("\n".join(strings))
return
if self._question_lock:
raise CommandError("C'è già un'altra domanda attiva!")
self._question_lock = True
# Fetch the question
async with aiohttp.ClientSession() as session:
async with session.get("https://opentdb.com/api.php?amount=1") as response:
j = await response.json()
# Parse the question
if j["response_code"] != 0:
raise CommandError(f"OpenTDB returned an error response_code ({j['response_code']}).")
question = j["results"][0]
text = f'❓ [b]{question["category"]} - {question["difficulty"].capitalize()}[/b]\n' \
f'{html.unescape(question["question"])}'
# Prepare answers
correct_answer: str = question["correct_answer"]
wrong_answers: typing.List[str] = question["incorrect_answers"]
answers: typing.List[str] = [correct_answer, *wrong_answers]
if question["type"] == "multiple":
random.shuffle(answers)
elif question["type"] == "boolean":
answers.sort(key=lambda a: a)
answers.reverse()
else:
raise NotImplementedError("Unknown question type")
# Find the correct index
for index, answer in enumerate(answers):
if answer == correct_answer:
correct_index = index
break
else:
raise ValueError("correct_index not found")
# Add emojis
for index, answer in enumerate(answers):
answers[index] = f"{self._letter_emojis[index]} {html.unescape(answers[index])}"
# Create the question id
question_id = uuid.uuid4()
self._answerers[question_id] = {}
# Create the correct and wrong functions
async def correct(data: CommandData):
answerer_ = await data.get_author(error_if_none=True)
try:
self._answerers[question_id][answerer_.uid] = True
except KeyError:
raise KeyboardExpiredError("Tempo scaduto!")
return "🆗 Hai risposto alla domanda. Ora aspetta un attimo per i risultati!"
async def wrong(data: CommandData):
answerer_ = await data.get_author(error_if_none=True)
try:
self._answerers[question_id][answerer_.uid] = False
except KeyError:
raise KeyboardExpiredError("Tempo scaduto!")
return "🆗 Hai risposto alla domanda. Ora aspetta un attimo per i risultati!"
# Add question
keyboard = {}
for index, answer in enumerate(answers):
if index == correct_index:
keyboard[answer] = correct
else:
keyboard[answer] = wrong
await data.keyboard(text, keyboard)
await asyncio.sleep(self._answer_time)
results = f"❗️ Tempo scaduto!\n" \
f"La risposta corretta era [b]{answers[correct_index]}[/b]!\n\n"
for answerer_id in self._answerers[question_id]:
answerer = data.session.query(self.alchemy.User).get(answerer_id)
if answerer.trivia_score is None:
ts = self.interface.alchemy.TriviaScore(royal=answerer)
data.session.add(ts)
await asyncify(data.session.commit)
if self._answerers[question_id][answerer_id]:
results += self._correct_emoji
answerer.trivia_score.correct_answers += 1
else:
results += self._wrong_emoji
answerer.trivia_score.wrong_answers += 1
results += f" {answerer} ({answerer.trivia_score.correct_answers}/{answerer.trivia_score.total_answers})\n"
await data.reply(results)
del self._answerers[question_id]
await asyncify(data.session.commit)
self._question_lock = False

View file

@ -1 +1 @@
semantic = "5.1.8"
semantic = "5.1.9"