1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-30 15:04:18 +00:00
royalnet/royalpack/commands/matchmaking.py

245 lines
14 KiB
Python
Raw Normal View History

2019-11-11 08:56:08 +00:00
import datetime
import re
import dateparser
import typing
from telegram import Bot as PTBBot
from telegram import Message as PTBMessage
from telegram.error import BadRequest, Unauthorized
from telegram import InlineKeyboardMarkup as InKeMa
from telegram import InlineKeyboardButton as InKeBu
from royalnet.commands import *
from royalnet.bots import TelegramBot
from royalnet.utils import telegram_escape, asyncify, sleep_until
from ..tables import MMEvent, MMResponse, User
from ..utils import MMChoice, MMInterfaceDataTelegram
class MatchmakingCommand(Command):
name: str = "matchmaking"
description: str = "Cerca persone per una partita a qualcosa!"
syntax: str = "[ {ora} ] {nome}\n[descrizione]"
aliases = ["mm", "lfg"]
tables = {MMEvent, MMResponse}
def __init__(self, interface: CommandInterface):
super().__init__(interface)
# Find all relevant MMEvents and run them
session = self.alchemy.Session()
mmevents = (
session
.query(self.alchemy.MMEvent)
.filter(self.alchemy.MMEvent.interface == self.interface.name,
self.alchemy.MMEvent.datetime > datetime.datetime.now())
.all()
)
for mmevent in mmevents:
self.interface.loop.create_task(self._run_mmevent(mmevent.mmid))
async def run(self, args: CommandArgs, data: CommandData) -> None:
# Create a new MMEvent and run it
if self.interface.name != "telegram":
raise UnsupportedError(f"{self.interface.prefix}matchmaking funziona solo su Telegram. Per ora.")
author = await data.get_author(error_if_none=True)
try:
timestring, title, description = args.match(r"\[\s*([^]]+)\s*]\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
except InvalidInputError:
timestring, title, description = args.match(r"\s*(.+?)\s*\n\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
try:
dt: typing.Optional[datetime.datetime] = dateparser.parse(timestring, settings={
"PREFER_DATES_FROM": "future"
})
except OverflowError:
dt = None
if dt is None:
raise CommandError("La data che hai specificato non è valida.")
if dt <= datetime.datetime.now():
raise CommandError("La data che hai specificato è nel passato.")
if dt >= datetime.datetime.now() + datetime.timedelta(days=90):
raise CommandError("La data che hai specificato è a più di 90 giorni di distanza da oggi.")
mmevent: MMEvent = self.interface.alchemy.MMEvent(creator=author,
datetime=dt,
title=title,
description=description,
interface=self.interface.name)
data.session.add(mmevent)
await data.session_commit()
self.interface.loop.create_task(self._run_mmevent(mmevent.mmid))
await data.reply(f"✅ Evento [b]{mmevent.title}[/b] creato!")
_mm_chat_id = -1001287169422
_mm_error_chat_id = -1001153723135
def _gen_mm_message(self, mmevent: MMEvent) -> str:
text = f"🌐 [{mmevent.datetime.strftime('%Y-%m-%d %H:%M')}] [b]{mmevent.title}[/b]\n"
if mmevent.description:
text += f"{mmevent.description}\n"
text += "\n"
for response in mmevent.responses:
response: MMResponse
text += f"{response.choice.value} {response.user}\n"
return text
def _gen_telegram_keyboard(self, mmevent: MMEvent):
return InKeMa([
[InKeBu(f"{MMChoice.YES.value} Ci sarò!", callback_data=f"mm{mmevent.mmid}_YES")],
[InKeBu(f"{MMChoice.MAYBE.value} (Forse.)", callback_data=f"mm{mmevent.mmid}_MAYBE")],
[InKeBu(f"{MMChoice.LATE_SHORT.value} Arrivo dopo 5-10 min.",
callback_data=f"mm{mmevent.mmid}_LATE_SHORT")],
[InKeBu(f"{MMChoice.LATE_MEDIUM.value} Arrivo dopo 15-35 min.",
callback_data=f"mm{mmevent.mmid}_LATE_MEDIUM")],
[InKeBu(f"{MMChoice.LATE_LONG.value} Arrivo dopo 40+ min.", callback_data=f"mm{mmevent.mmid}_LATE_LONG")],
[InKeBu(f"{MMChoice.NO_TIME.value} Non posso a quell'ora...", callback_data=f"mm{mmevent.mmid}_NO_TIME")],
[InKeBu(f"{MMChoice.NO_INTEREST.value} Non mi interessa.", callback_data=f"mm{mmevent.mmid}_NO_INTEREST")],
[InKeBu(f"{MMChoice.NO_TECH.value} Ho un problema!", callback_data=f"mm{mmevent.mmid}_NO_TECH")],
])
async def _update_telegram_mm_message(self, client: PTBBot, mmevent: MMEvent):
try:
await self.interface.bot.safe_api_call(client.edit_message_text,
chat_id=self._mm_chat_id,
text=telegram_escape(self._gen_mm_message(mmevent)),
message_id=mmevent.interface_data.message_id,
parse_mode="HTML",
disable_web_page_preview=True,
reply_markup=self._gen_telegram_keyboard(mmevent))
except BadRequest:
pass
def _gen_mm_telegram_callback(self, client: PTBBot, mmid: int, choice: MMChoice):
async def callback(data: CommandData):
author = await data.get_author(error_if_none=True)
# Find the MMEvent with the current session
mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.MMEvent).get, mmid)
mmresponse: MMResponse = await asyncify(
data.session.query(self.alchemy.MMResponse).filter_by(user=author, mmevent=mmevent).one_or_none)
if mmresponse is None:
mmresponse = self.alchemy.MMResponse(user=author, mmevent=mmevent, choice=choice)
data.session.add(mmresponse)
else:
mmresponse.choice = choice
await data.session_commit()
await self._update_telegram_mm_message(client, mmevent)
return f"✅ Messaggio ricevuto!"
return callback
def _gen_event_start_message(self, mmevent: MMEvent):
text = f"🚩 L'evento [b]{mmevent.title}[/b] è iniziato!\n\n"
for response in mmevent.responses:
response: MMResponse
text += f"{response.choice.value} {response.user}\n"
return text
def _gen_unauth_message(self, user: User):
return f"⚠️ Non sono autorizzato a mandare messaggi a [b]{user.username}[/b]!\n" \
f"{user.telegram.mention()}, apri una chat privata con me e mandami un messaggio!"
async def _run_mmevent(self, mmid: int):
"""Run a MMEvent."""
# Open a new Alchemy Session
session = self.alchemy.Session()
# Find the MMEvent with the current session
mmevent: MMEvent = await asyncify(session.query(self.alchemy.MMEvent).get, mmid)
if mmevent is None:
raise ValueError("Invalid mmid.")
# Ensure the MMEvent hasn't already started
if mmevent.datetime <= datetime.datetime.now():
raise ValueError("MMEvent has already started.")
# Ensure the MMEvent interface matches the current one
if mmevent.interface != self.interface.name:
raise ValueError("Invalid interface.")
# If the matchmaking message hasn't been sent yet, do so now
if mmevent.interface_data is None:
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.bot
client: PTBBot = bot.client
# Build the Telegram keyboard
# Send the keyboard
message: PTBMessage = await self.interface.bot.safe_api_call(client.send_message,
chat_id=self._mm_chat_id,
text=telegram_escape(
self._gen_mm_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=self._gen_telegram_keyboard(
mmevent))
# Store message data in the interface data object
mmevent.interface_data = MMInterfaceDataTelegram(chat_id=self._mm_chat_id,
message_id=message.message_id)
await asyncify(session.commit)
else:
raise UnsupportedError()
# Register handlers for the keyboard events
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.bot
client: PTBBot = bot.client
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_YES",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.YES))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_MAYBE",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.MAYBE))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.LATE_SHORT))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.LATE_MEDIUM))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.LATE_LONG))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_TIME",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.NO_TIME))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_INTEREST",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.NO_INTEREST))
self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_TECH",
callback=self._gen_mm_telegram_callback(client, mmid,
MMChoice.NO_TECH))
else:
raise UnsupportedError()
# Sleep until the time of the event
await sleep_until(mmevent.datetime)
# Notify the positive answers of the event start
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.bot
client: PTBBot = bot.client
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_YES")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_MAYBE")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_TIME")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_INTEREST")
self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_TECH")
for response in mmevent.responses:
if response.choice == MMChoice.NO_INTEREST or response.choice == MMChoice.NO_TIME:
return
try:
await self.interface.bot.safe_api_call(client.send_message,
chat_id=response.user.telegram[0].tg_id,
text=telegram_escape(self._gen_event_start_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True)
except Unauthorized:
await self.interface.bot.safe_api_call(client.send_message,
chat_id=self._mm_error_chat_id,
text=telegram_escape(
self._gen_unauth_message(response.user)),
parse_mode="HTML",
disable_webpage_preview=True)
else:
raise UnsupportedError()
# Delete the event message
if self.interface.name == "telegram":
await self.interface.bot.safe_api_call(client.delete_message,
chat_id=mmevent.interface_data.chat_id,
message_id=mmevent.interface_data.message_id)
# The end!
await asyncify(session.close)