1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-30 15:04:18 +00:00
royalnet/royalpack/commands/matchmaking.py

375 lines
18 KiB
Python

from typing import *
import datetime
import re
import dateparser
import typing
import random
import enum
import asyncio as aio
from telegram import Bot as PTBBot
from telegram import Message as PTBMessage
from telegram import InlineKeyboardMarkup as InKM
from telegram import InlineKeyboardButton as InKB
from telegram.error import TelegramError
import royalnet.commands as rc
from royalnet.serf.telegram import TelegramSerf as TelegramBot
from royalnet.serf.telegram import escape as telegram_escape
from royalnet.utils import asyncify, sleep_until, sentry_async_wrap
from royalnet.backpack.tables import User
from ..tables import MMEvent, MMResponse, FiorygiTransaction
from ..types import MMChoice, MMInterfaceDataTelegram
class Interrupts(enum.Enum):
TIME_RAN_OUT = enum.auto()
MANUAL_START = enum.auto()
MANUAL_DELETE = enum.auto()
class MatchmakingCommand(rc.Command):
name: str = "matchmaking"
description: str = "Cerca persone per una partita a qualcosa!"
syntax: str = "[ {ora} ] {nome}\n[descrizione]"
aliases = ["mm", "lfg"]
tables = {MMEvent, MMResponse}
def __init__(self, interface: rc.CommandInterface):
super().__init__(interface)
# Find all relevant MMEvents and run them
session = self.alchemy.Session()
mmevents = (
session
.query(self.alchemy.get(MMEvent))
.filter(self.alchemy.get(MMEvent).interface == self.interface.name,
self.alchemy.get(MMEvent).datetime > datetime.datetime.now(),
self.alchemy.get(MMEvent).interrupted == False)
.all()
)
self.tasks_created = {}
self.queue: Dict[int, aio.queues.Queue] = {}
for mmevent in mmevents:
task = self.interface.loop.create_task(self._run_mmevent(mmevent.mmid))
self.tasks_created[mmevent.mmid] = task
async def run(self, args: rc.CommandArgs, data: rc.CommandData) -> None:
# Create a new MMEvent and run it
if self.interface.name != "telegram":
raise rc.UnsupportedError(f"{self.interface.prefix}matchmaking funziona solo su Telegram. Per ora.")
author = await data.get_author(error_if_none=True)
try:
timestring, title, description = args.match(r"\[\s*([^]]+)\s*]\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
except rc.InvalidInputError:
timestring, title, description = args.match(r"\s*(.+?)\s*\n\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
try:
dt: typing.Optional[datetime.datetime] = dateparser.parse(timestring, settings={
"PREFER_DATES_FROM": "future"
})
except OverflowError:
dt = None
if dt is None:
raise rc.InvalidInputError("La data che hai specificato non è valida.")
if dt <= datetime.datetime.now():
raise rc.InvalidInputError("La data che hai specificato è nel passato.")
if dt - datetime.datetime.now() >= datetime.timedelta(days=366):
raise rc.InvalidInputError("Hai specificato una data tra più di un anno!\n"
"Se volevi scrivere un'orario, ricordati che le ore sono separate da due punti"
" (:) e non da punto semplice!")
mmevent: MMEvent = self.alchemy.get(MMEvent)(creator=author,
datetime=dt,
title=title,
description=description,
interface=self.interface.name)
data.session.add(mmevent)
await data.session_commit()
self.loop.create_task(self._run_mmevent(mmevent.mmid))
await data.reply(f"🚩 Matchmaking creato!")
_mmchoice_values = {
MMChoice.YES: 4,
MMChoice.LATE_SHORT: 3,
MMChoice.LATE_MEDIUM: 2,
MMChoice.LATE_LONG: 1,
MMChoice.MAYBE: 0,
MMChoice.NO: -1
}
def _gen_mm_message(self, mmevent: MMEvent) -> str:
text = f"🚩 [{mmevent.datetime.strftime('%Y-%m-%d %H:%M')}] [b]{mmevent.title}[/b]\n"
if mmevent.description:
text += f"{mmevent.description}\n"
text += "\n"
for response in sorted(mmevent.responses, key=lambda r: -self._mmchoice_values[r.choice]):
response: MMResponse
if response.choice == MMChoice.LATE_SHORT:
td = mmevent.datetime + datetime.timedelta(minutes=10)
time_text = f" [{td.strftime('%H:%M')}]"
elif response.choice == MMChoice.LATE_MEDIUM:
td = mmevent.datetime + datetime.timedelta(minutes=30)
time_text = f" [{td.strftime('%H:%M')}]"
elif response.choice == MMChoice.LATE_LONG:
td = mmevent.datetime + datetime.timedelta(minutes=60)
time_text = f" [{td.strftime('%H:%M')}+]"
else:
time_text = ""
creator_crown = " 👑" if response.user == mmevent.creator else ""
text += f"{response.choice.value} {response.user}{time_text}{creator_crown}\n"
return text
@staticmethod
def _gen_telegram_keyboard(mmevent: MMEvent):
if mmevent.datetime <= datetime.datetime.now():
return None
return InKM([
[
InKB(f"{MMChoice.YES.value} Ci sarò!",
callback_data=f"mm{mmevent.mmid}_YES"),
InKB(f"{MMChoice.MAYBE.value} Forse...",
callback_data=f"mm{mmevent.mmid}_MAYBE"),
InKB(f"{MMChoice.NO.value} Non mi interessa.",
callback_data=f"mm{mmevent.mmid}_NO"),
],
[
InKB(f"{MMChoice.LATE_SHORT.value} 10 min",
callback_data=f"mm{mmevent.mmid}_LATE_SHORT"),
InKB(f"{MMChoice.LATE_MEDIUM.value} 30 min",
callback_data=f"mm{mmevent.mmid}_LATE_MEDIUM"),
InKB(f"{MMChoice.LATE_LONG.value} 60+ min",
callback_data=f"mm{mmevent.mmid}_LATE_LONG"),
],
[
InKB(f"🗑 Elimina",
callback_data=f"mm{mmevent.mmid}_DELETE"),
InKB(f"🚩 Inizia",
callback_data=f"mm{mmevent.mmid}_START"),
]
])
async def _update_telegram_mm_message(self, client: PTBBot, mmevent: MMEvent):
try:
await self.interface.serf.api_call(client.edit_message_text,
chat_id=self.config["Matchmaking"]["mm_chat_id"],
text=telegram_escape(self._gen_mm_message(mmevent)),
message_id=mmevent.interface_data.message_id,
parse_mode="HTML",
disable_web_page_preview=True,
reply_markup=self._gen_telegram_keyboard(mmevent))
except TelegramError:
pass
def _gen_mm_telegram_callback(self, client: PTBBot, mmid: int, choice: MMChoice):
async def callback(data: rc.CommandData):
author = await data.get_author(error_if_none=True)
mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.get(MMEvent)).get, mmid)
mmresponse: MMResponse = await asyncify(
data.session.query(self.alchemy.get(MMResponse)).filter_by(user=author, mmevent=mmevent).one_or_none)
if mmresponse is None:
mmresponse = self.alchemy.get(MMResponse)(user=author, mmevent=mmevent, choice=choice)
data.session.add(mmresponse)
if random.randrange(14) == 0:
await FiorygiTransaction.spawn_fiorygi(data, author, 1, "aver risposto a un matchmaking")
else:
mmresponse.choice = choice
await data.session_commit()
await self._update_telegram_mm_message(client, mmevent)
await data.reply(f"{choice.value} Messaggio ricevuto!")
return callback
def _gen_mm_telegram_delete(self, client, mmid: int):
async def callback(data: rc.CommandData):
author: User = await data.get_author(error_if_none=True)
mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.get(MMEvent)).get, mmid)
if author != mmevent.creator and "admin" not in author.roles:
raise rc.UserError("Non sei il creatore di questo matchmaking!")
await self.queue[mmid].put(Interrupts.MANUAL_DELETE)
await data.reply(f"🗑 Evento eliminato!")
return callback
def _gen_mm_telegram_start(self, client, mmid: int):
async def callback(data: rc.CommandData):
author = await data.get_author(error_if_none=True)
mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.get(MMEvent)).get, mmid)
if author != mmevent.creator and "admin" not in author.roles:
raise rc.UserError("Non sei il creatore di questo matchmaking!")
await self.queue[mmid].put(Interrupts.MANUAL_START)
await data.reply(f"🚩 Evento avviato!")
return callback
async def _set_event_after(self, mmid: int, dt: datetime.datetime):
await sleep_until(dt)
if mmid in self.queue:
await self.queue[mmid].put(Interrupts.TIME_RAN_OUT)
def _gen_event_start_message(self, mmevent: MMEvent):
text = f"🚩 L'evento [b]{mmevent.title}[/b] è iniziato!\n\n"
for response in sorted(mmevent.responses, key=lambda r: -self._mmchoice_values[r.choice]):
response: MMResponse
text += f"{response.choice.value} {response.user}\n"
return text
@staticmethod
def _gen_unauth_message(user: User):
return f"⚠️ Non sono autorizzato a mandare messaggi a [b]{user.username}[/b]!\n" \
f"{user.telegram[0].mention()}, apri una chat privata con me e mandami un messaggio!"
@sentry_async_wrap()
async def _run_mmevent(self, mmid: int):
"""Run a MMEvent."""
# Create the event in the dict
self.queue[mmid] = aio.Queue()
# Open a new Alchemy Session
session = self.alchemy.Session()
# Find the MMEvent with the current session
mmevent: MMEvent = await asyncify(session.query(self.alchemy.get(MMEvent)).get, mmid)
if mmevent is None:
raise ValueError("Invalid mmid.")
# Ensure the MMEvent hasn't already started
if mmevent.datetime <= datetime.datetime.now():
raise ValueError("MMEvent has already started.")
# Ensure the MMEvent interface matches the current one
if mmevent.interface != self.interface.name:
raise ValueError("Invalid interface.")
# If the matchmaking message hasn't been sent yet, do so now
if mmevent.interface_data is None:
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.serf
client: PTBBot = bot.client
# Send the keyboard
message: PTBMessage = await self.interface.serf.api_call(
client.send_message,
chat_id=self.config["Matchmaking"]["mm_chat_id"],
text=telegram_escape(
self._gen_mm_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=self._gen_telegram_keyboard(mmevent)
)
# Store message data in the interface data object
mmevent.interface_data = MMInterfaceDataTelegram(chat_id=self.config["Matchmaking"]["mm_chat_id"],
message_id=message.message_id)
await asyncify(session.commit)
else:
raise rc.UnsupportedError()
# Register handlers for the keyboard events
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.serf
client: PTBBot = bot.client
bot.register_keyboard_key(f"mm{mmevent.mmid}_YES", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.YES.value}",
text="Ci sarò!",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.YES)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.LATE_SHORT.value}",
text="10 min",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_SHORT)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.LATE_MEDIUM.value}",
text="30 min",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_MEDIUM)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.LATE_LONG.value}",
text="60 min",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_LONG)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_MAYBE", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.MAYBE.value}",
text="Forse...",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.MAYBE)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_NO", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.NO.value}",
text="Non mi interessa.",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.NO)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_DELETE", key=rc.KeyboardKey(
interface=self.interface,
short=f"🗑",
text="Elimina",
callback=self._gen_mm_telegram_delete(client, mmid)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_START", key=rc.KeyboardKey(
interface=self.interface,
short=f"🚩",
text="Inizia",
callback=self._gen_mm_telegram_start(client, mmid)
))
else:
raise rc.UnsupportedError()
# Sleep until something interrupts
self.loop.create_task(self._set_event_after(mmid, mmevent.datetime))
interrupt = await self.queue[mmid].get()
mmevent.interrupted = True
await asyncify(session.commit)
del self.queue[mmid]
# Notify the positive answers of the event start
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.serf
client: PTBBot = bot.client
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_YES")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_MAYBE")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_NO")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_DELETE")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_START")
await self.interface.serf.api_call(client.delete_message,
chat_id=mmevent.interface_data.chat_id,
message_id=mmevent.interface_data.message_id)
if interrupt == Interrupts.TIME_RAN_OUT or interrupt == Interrupts.MANUAL_START:
await asyncify(client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=telegram_escape(self._gen_event_start_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True)
for response in mmevent.responses:
if response.choice == MMChoice.NO:
return
try:
await asyncify(client.send_message,
chat_id=response.user.telegram[0].tg_id,
text=telegram_escape(self._gen_event_start_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True)
except TelegramError:
await self.interface.serf.api_call(
client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=telegram_escape(self._gen_unauth_message(response.user)),
parse_mode="HTML",
disable_webpage_preview=True
)
elif interrupt == Interrupts.MANUAL_DELETE:
await self.interface.serf.api_call(
client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=telegram_escape(f"🗑 L'evento [b]{mmevent.title}[/b] è stato annullato."),
parse_mode="HTML",
disable_webpage_preview=True
)
else:
raise rc.UnsupportedError()
# The end!
await asyncify(session.close)
del self.tasks_created[mmid]