mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-27 13:34:28 +00:00
Add trivia command (closes #78)
This commit is contained in:
parent
9b5e06a46c
commit
ea939289b6
6 changed files with 125 additions and 12 deletions
|
@ -27,6 +27,7 @@ class GenericBot:
|
|||
self.commands = {}
|
||||
self.network_handlers: typing.Dict[str, typing.Type[NetworkHandler]] = {}
|
||||
for SelectedCommand in commands:
|
||||
log.debug(f"Binding {SelectedCommand.name}...")
|
||||
interface = self._Interface()
|
||||
self.commands[f"{interface.prefix}{SelectedCommand.name}"] = SelectedCommand(interface)
|
||||
log.debug(f"Successfully bound commands")
|
||||
|
|
|
@ -152,17 +152,30 @@ class TelegramBot(GenericBot):
|
|||
async def _handle_callback_query(self, update: telegram.Update):
|
||||
query: telegram.CallbackQuery = update.callback_query
|
||||
source: telegram.Message = query.message
|
||||
callback: typing.Optional[typing.Callable] = None
|
||||
command: typing.Optional[Command] = None
|
||||
for command in self.commands.values():
|
||||
try:
|
||||
if query.data in command.interface.keys_callbacks:
|
||||
callback = command.interface.keys_callbacks[query.data]
|
||||
except KeyError:
|
||||
continue
|
||||
await callback(data=self._Data(interface=command.interface, update=update))
|
||||
await asyncify(query.answer)
|
||||
break
|
||||
else:
|
||||
if callback is None:
|
||||
await asyncify(source.edit_reply_markup, reply_markup=None)
|
||||
await asyncify(query.answer, text="⛔️ This keyboard has expired.")
|
||||
return
|
||||
try:
|
||||
response = await callback(data=self._Data(interface=command.interface, update=update))
|
||||
except KeyboardExpiredError:
|
||||
# FIXME: May cause a memory leak, as keys are not deleted after use
|
||||
await asyncify(source.edit_reply_markup, reply_markup=None)
|
||||
await asyncify(query.answer, text="⛔️ This keyboard has expired.")
|
||||
return
|
||||
except Exception as e:
|
||||
error_text = f"⛔️ {e.__class__.__name__}\n"
|
||||
error_text += '\n'.join(e.args)
|
||||
await asyncify(query.answer, text=error_text)
|
||||
return
|
||||
else:
|
||||
await asyncify(query.answer, text=response)
|
||||
|
||||
async def run(self):
|
||||
while True:
|
||||
|
|
|
@ -22,6 +22,7 @@ from .summon import SummonCommand
|
|||
from .videochannel import VideochannelCommand
|
||||
from .dnditem import DnditemCommand
|
||||
from .dndspell import DndspellCommand
|
||||
from .trivia import TriviaCommand
|
||||
|
||||
__all__ = [
|
||||
"CiaoruoziCommand",
|
||||
|
@ -42,5 +43,6 @@ __all__ = [
|
|||
"SummonCommand",
|
||||
"VideochannelCommand",
|
||||
"DnditemCommand",
|
||||
"DndspellCommand"
|
||||
"DndspellCommand",
|
||||
"TriviaCommand"
|
||||
]
|
||||
|
|
94
royalnet/commands/royalgames/trivia.py
Normal file
94
royalnet/commands/royalgames/trivia.py
Normal file
|
@ -0,0 +1,94 @@
|
|||
import typing
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import random
|
||||
import uuid
|
||||
from ..command import Command
|
||||
from ..commandargs import CommandArgs
|
||||
from ..commanddata import CommandData
|
||||
from ..commandinterface import CommandInterface
|
||||
from ...error import *
|
||||
|
||||
|
||||
class TriviaCommand(Command):
|
||||
name: str = "trivia"
|
||||
|
||||
description: str = "Manda una domanda dell'OpenTDB in chat."
|
||||
|
||||
_letter_emojis = ["🇦", "🇧", "🇨", "🇩"]
|
||||
|
||||
_correct_emoji = "✅"
|
||||
|
||||
_wrong_emoji = "❌"
|
||||
|
||||
_answer_time = 15
|
||||
|
||||
def __init__(self, interface: CommandInterface):
|
||||
super().__init__(interface)
|
||||
self.answerers: typing.Dict[uuid.UUID, typing.Dict[..., bool]] = {}
|
||||
|
||||
async def run(self, args: CommandArgs, data: CommandData) -> None:
|
||||
# Fetch the question
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get("https://opentdb.com/api.php?amount=1") as response:
|
||||
j = await response.json()
|
||||
# Parse the question
|
||||
if j["response_code"] != 0:
|
||||
raise ExternalError(f"OpenTDB returned {j['response_code']} response_code")
|
||||
question = j["results"][0]
|
||||
text = f'❓ [b]{question["category"]} - {question["difficulty"].capitalize()}[/b]\n' \
|
||||
f'{question["question"]}'
|
||||
# Prepare answers
|
||||
correct_answer: str = question["correct_answer"]
|
||||
wrong_answers: typing.List[str] = question["incorrect_answers"]
|
||||
answers = [correct_answer, *wrong_answers]
|
||||
random.shuffle(answers)
|
||||
# Find the correct index
|
||||
for index, answer in enumerate(answers):
|
||||
if answer == correct_answer:
|
||||
correct_index = index
|
||||
break
|
||||
else:
|
||||
raise ValueError("correct_index not found")
|
||||
# Add emojis
|
||||
for index, answer in enumerate(answers):
|
||||
answers[index] = f"{self._letter_emojis[index]} {answers[index]}"
|
||||
# Create the question id
|
||||
question_id = uuid.uuid4()
|
||||
self.answerers[question_id] = {}
|
||||
|
||||
# Create the correct and wrong functions
|
||||
async def correct(data: CommandData):
|
||||
answerer_ = await data.get_author(error_if_none=True)
|
||||
try:
|
||||
self.answerers[question_id][answerer_] = True
|
||||
except KeyError:
|
||||
raise KeyboardExpiredError("Question time ran out.")
|
||||
return "🆗 Hai risposto alla domanda. Ora aspetta un attimo per i risultati!"
|
||||
|
||||
async def wrong(data: CommandData):
|
||||
answerer_ = await data.get_author(error_if_none=True)
|
||||
try:
|
||||
self.answerers[question_id][answerer_] = False
|
||||
except KeyError:
|
||||
raise KeyboardExpiredError("Question time ran out.")
|
||||
return "🆗 Hai risposto alla domanda. Ora aspetta un attimo per i risultati!"
|
||||
|
||||
# Add question
|
||||
keyboard = {}
|
||||
for index, answer in enumerate(answers):
|
||||
if index == correct_index:
|
||||
keyboard[answer] = correct
|
||||
else:
|
||||
keyboard[answer] = wrong
|
||||
await data.keyboard(text, keyboard)
|
||||
await asyncio.sleep(self._answer_time)
|
||||
results = f"❗️ Tempo scaduto!\n" \
|
||||
f"La risposta corretta era [b]{answers[correct_index]}[/b]!\n\n"
|
||||
for answerer in self.answerers[question_id]:
|
||||
if self.answerers[question_id][answerer]:
|
||||
results += self._correct_emoji
|
||||
else:
|
||||
results += self._wrong_emoji
|
||||
results += f" {answerer}"
|
||||
await data.reply(results)
|
|
@ -49,3 +49,7 @@ class FileTooBigError(Exception):
|
|||
|
||||
class CurrentlyDisabledError(Exception):
|
||||
"""This feature is temporarely disabled and is not available right now."""
|
||||
|
||||
|
||||
class KeyboardExpiredError(Exception):
|
||||
"""A special type of exception that can be raised in keyboard handlers to mark a specific keyboard as expired."""
|
||||
|
|
|
@ -19,14 +19,12 @@ stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{m
|
|||
log.addHandler(stream_handler)
|
||||
|
||||
|
||||
|
||||
sentry_dsn = os.environ.get("SENTRY_DSN")
|
||||
|
||||
# noinspection PyUnreachableCode
|
||||
if __debug__:
|
||||
commands = [
|
||||
DebugErrorCommand,
|
||||
DebugKeyboardCommand
|
||||
|
||||
]
|
||||
log.setLevel(logging.DEBUG)
|
||||
else:
|
||||
|
@ -49,7 +47,8 @@ else:
|
|||
SummonCommand,
|
||||
VideochannelCommand,
|
||||
DnditemCommand,
|
||||
DndspellCommand
|
||||
DndspellCommand,
|
||||
TriviaCommand
|
||||
]
|
||||
log.setLevel(logging.INFO)
|
||||
|
||||
|
|
Loading…
Reference in a new issue