1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-24 03:54:20 +00:00
royalnet/royalpack/commands/trivia.py
2019-11-11 09:56:08 +01:00

139 lines
5.6 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import typing
import asyncio
import aiohttp
import random
import uuid
import html
from royalnet.commands import *
from royalnet.utils import asyncify
from ..tables import TriviaScore
class TriviaCommand(Command):
name: str = "trivia"
aliases = ["t"]
description: str = "Manda una domanda dell'OpenTDB in chat."
tables = {TriviaScore}
syntax = "[credits|scores]"
_letter_emojis = ["🇦", "🇧", "🇨", "🇩"]
_medal_emojis = ["🥇", "🥈", "🥉", "🔹"]
_correct_emoji = ""
_wrong_emoji = ""
_answer_time = 17
_question_lock: bool = False
def __init__(self, interface: CommandInterface):
super().__init__(interface)
self._answerers: typing.Dict[uuid.UUID, typing.Dict[str, bool]] = {}
async def run(self, args: CommandArgs, data: CommandData) -> None:
arg = args.optional(0)
if arg == "credits":
await data.reply(f" [c]{self.interface.prefix}{self.name}[/c] di [i]Steffo[/i]\n"
f"\n"
f"Tutte le domande vengono dall'[b]Open Trivia Database[/b] di [i]Pixeltail Games[/i],"
f" creatori di Tower Unite, e sono rilasciate sotto la licenza [b]CC BY-SA 4.0[/b].")
return
elif arg == "scores":
trivia_scores = await asyncify(data.session.query(self.alchemy.TriviaScore).all)
strings = ["🏆 [b]Trivia Leaderboards[/b]\n"]
for index, ts in enumerate(sorted(trivia_scores, key=lambda ts: -ts.correct_rate)):
if index > 3:
index = 3
strings.append(f"{self._medal_emojis[index]} {ts.royal.username}"
f" ({ts.correct_answers}/{ts.total_answers})")
await data.reply("\n".join(strings))
return
if self._question_lock:
raise CommandError("C'è già un'altra domanda attiva!")
self._question_lock = True
# Fetch the question
async with aiohttp.ClientSession() as session:
async with session.get("https://opentdb.com/api.php?amount=1") as response:
j = await response.json()
# Parse the question
if j["response_code"] != 0:
raise CommandError(f"OpenTDB returned an error response_code ({j['response_code']}).")
question = j["results"][0]
text = f'❓ [b]{question["category"]} - {question["difficulty"].capitalize()}[/b]\n' \
f'{html.unescape(question["question"])}'
# Prepare answers
correct_answer: str = question["correct_answer"]
wrong_answers: typing.List[str] = question["incorrect_answers"]
answers: typing.List[str] = [correct_answer, *wrong_answers]
if question["type"] == "multiple":
random.shuffle(answers)
elif question["type"] == "boolean":
answers.sort(key=lambda a: a)
answers.reverse()
else:
raise NotImplementedError("Unknown question type")
# Find the correct index
for index, answer in enumerate(answers):
if answer == correct_answer:
correct_index = index
break
else:
raise ValueError("correct_index not found")
# Add emojis
for index, answer in enumerate(answers):
answers[index] = f"{self._letter_emojis[index]} {html.unescape(answers[index])}"
# Create the question id
question_id = uuid.uuid4()
self._answerers[question_id] = {}
# Create the correct and wrong functions
async def correct(data: CommandData):
answerer_ = await data.get_author(error_if_none=True)
try:
self._answerers[question_id][answerer_.uid] = True
except KeyError:
raise KeyboardExpiredError("Tempo scaduto!")
return "🆗 Hai risposto alla domanda. Ora aspetta un attimo per i risultati!"
async def wrong(data: CommandData):
answerer_ = await data.get_author(error_if_none=True)
try:
self._answerers[question_id][answerer_.uid] = False
except KeyError:
raise KeyboardExpiredError("Tempo scaduto!")
return "🆗 Hai risposto alla domanda. Ora aspetta un attimo per i risultati!"
# Add question
keyboard = {}
for index, answer in enumerate(answers):
if index == correct_index:
keyboard[answer] = correct
else:
keyboard[answer] = wrong
await data.keyboard(text, keyboard)
await asyncio.sleep(self._answer_time)
results = f"❗️ Tempo scaduto!\n" \
f"La risposta corretta era [b]{answers[correct_index]}[/b]!\n\n"
for answerer_id in self._answerers[question_id]:
answerer = data.session.query(self.alchemy.User).get(answerer_id)
if answerer.trivia_score is None:
ts = self.interface.alchemy.TriviaScore(royal=answerer)
data.session.add(ts)
await asyncify(data.session.commit)
if self._answerers[question_id][answerer_id]:
results += self._correct_emoji
answerer.trivia_score.correct_answers += 1
else:
results += self._wrong_emoji
answerer.trivia_score.wrong_answers += 1
results += f" {answerer} ({answerer.trivia_score.correct_answers}/{answerer.trivia_score.total_answers})\n"
await data.reply(results)
del self._answerers[question_id]
await asyncify(data.session.commit)
self._question_lock = False