import datetime import re import dateparser import typing from telegram import Bot as PTBBot from telegram import Message as PTBMessage from telegram.error import BadRequest, Unauthorized from telegram import InlineKeyboardMarkup as InKeMa from telegram import InlineKeyboardButton as InKeBu from royalnet.commands import * from royalnet.bots import TelegramBot from royalnet.utils import telegram_escape, asyncify, sleep_until from ..tables import MMEvent, MMResponse, User from ..utils import MMChoice, MMInterfaceDataTelegram class MatchmakingCommand(Command): name: str = "matchmaking" description: str = "Cerca persone per una partita a qualcosa!" syntax: str = "[ {ora} ] {nome}\n[descrizione]" aliases = ["mm", "lfg"] tables = {MMEvent, MMResponse} def __init__(self, interface: CommandInterface): super().__init__(interface) # Find all relevant MMEvents and run them session = self.alchemy.Session() mmevents = ( session .query(self.alchemy.MMEvent) .filter(self.alchemy.MMEvent.interface == self.interface.name, self.alchemy.MMEvent.datetime > datetime.datetime.now()) .all() ) for mmevent in mmevents: self.interface.loop.create_task(self._run_mmevent(mmevent.mmid)) async def run(self, args: CommandArgs, data: CommandData) -> None: # Create a new MMEvent and run it if self.interface.name != "telegram": raise UnsupportedError(f"{self.interface.prefix}matchmaking funziona solo su Telegram. Per ora.") author = await data.get_author(error_if_none=True) try: timestring, title, description = args.match(r"\[\s*([^]]+)\s*]\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL) except InvalidInputError: timestring, title, description = args.match(r"\s*(.+?)\s*\n\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL) try: dt: typing.Optional[datetime.datetime] = dateparser.parse(timestring, settings={ "PREFER_DATES_FROM": "future" }) except OverflowError: dt = None if dt is None: raise CommandError("La data che hai specificato non è valida.") if dt <= datetime.datetime.now(): raise CommandError("La data che hai specificato è nel passato.") if dt >= datetime.datetime.now() + datetime.timedelta(days=90): raise CommandError("La data che hai specificato è a più di 90 giorni di distanza da oggi.") mmevent: MMEvent = self.interface.alchemy.MMEvent(creator=author, datetime=dt, title=title, description=description, interface=self.interface.name) data.session.add(mmevent) await data.session_commit() self.interface.loop.create_task(self._run_mmevent(mmevent.mmid)) await data.reply(f"✅ Evento [b]{mmevent.title}[/b] creato!") _mm_chat_id = -1001287169422 _mm_error_chat_id = -1001153723135 def _gen_mm_message(self, mmevent: MMEvent) -> str: text = f"🌐 [{mmevent.datetime.strftime('%Y-%m-%d %H:%M')}] [b]{mmevent.title}[/b]\n" if mmevent.description: text += f"{mmevent.description}\n" text += "\n" for response in mmevent.responses: response: MMResponse text += f"{response.choice.value} {response.user}\n" return text def _gen_telegram_keyboard(self, mmevent: MMEvent): return InKeMa([ [InKeBu(f"{MMChoice.YES.value} Ci sarò!", callback_data=f"mm{mmevent.mmid}_YES")], [InKeBu(f"{MMChoice.MAYBE.value} (Forse.)", callback_data=f"mm{mmevent.mmid}_MAYBE")], [InKeBu(f"{MMChoice.LATE_SHORT.value} Arrivo dopo 5-10 min.", callback_data=f"mm{mmevent.mmid}_LATE_SHORT")], [InKeBu(f"{MMChoice.LATE_MEDIUM.value} Arrivo dopo 15-35 min.", callback_data=f"mm{mmevent.mmid}_LATE_MEDIUM")], [InKeBu(f"{MMChoice.LATE_LONG.value} Arrivo dopo 40+ min.", callback_data=f"mm{mmevent.mmid}_LATE_LONG")], [InKeBu(f"{MMChoice.NO_TIME.value} Non posso a quell'ora...", callback_data=f"mm{mmevent.mmid}_NO_TIME")], [InKeBu(f"{MMChoice.NO_INTEREST.value} Non mi interessa.", callback_data=f"mm{mmevent.mmid}_NO_INTEREST")], [InKeBu(f"{MMChoice.NO_TECH.value} Ho un problema!", callback_data=f"mm{mmevent.mmid}_NO_TECH")], ]) async def _update_telegram_mm_message(self, client: PTBBot, mmevent: MMEvent): try: await self.interface.bot.safe_api_call(client.edit_message_text, chat_id=self._mm_chat_id, text=telegram_escape(self._gen_mm_message(mmevent)), message_id=mmevent.interface_data.message_id, parse_mode="HTML", disable_web_page_preview=True, reply_markup=self._gen_telegram_keyboard(mmevent)) except BadRequest: pass def _gen_mm_telegram_callback(self, client: PTBBot, mmid: int, choice: MMChoice): async def callback(data: CommandData): author = await data.get_author(error_if_none=True) # Find the MMEvent with the current session mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.MMEvent).get, mmid) mmresponse: MMResponse = await asyncify( data.session.query(self.alchemy.MMResponse).filter_by(user=author, mmevent=mmevent).one_or_none) if mmresponse is None: mmresponse = self.alchemy.MMResponse(user=author, mmevent=mmevent, choice=choice) data.session.add(mmresponse) else: mmresponse.choice = choice await data.session_commit() await self._update_telegram_mm_message(client, mmevent) return f"✅ Messaggio ricevuto!" return callback def _gen_event_start_message(self, mmevent: MMEvent): text = f"🚩 L'evento [b]{mmevent.title}[/b] è iniziato!\n\n" for response in mmevent.responses: response: MMResponse text += f"{response.choice.value} {response.user}\n" return text def _gen_unauth_message(self, user: User): return f"⚠️ Non sono autorizzato a mandare messaggi a [b]{user.username}[/b]!\n" \ f"{user.telegram.mention()}, apri una chat privata con me e mandami un messaggio!" async def _run_mmevent(self, mmid: int): """Run a MMEvent.""" # Open a new Alchemy Session session = self.alchemy.Session() # Find the MMEvent with the current session mmevent: MMEvent = await asyncify(session.query(self.alchemy.MMEvent).get, mmid) if mmevent is None: raise ValueError("Invalid mmid.") # Ensure the MMEvent hasn't already started if mmevent.datetime <= datetime.datetime.now(): raise ValueError("MMEvent has already started.") # Ensure the MMEvent interface matches the current one if mmevent.interface != self.interface.name: raise ValueError("Invalid interface.") # If the matchmaking message hasn't been sent yet, do so now if mmevent.interface_data is None: if self.interface.name == "telegram": bot: TelegramBot = self.interface.bot client: PTBBot = bot.client # Build the Telegram keyboard # Send the keyboard message: PTBMessage = await self.interface.bot.safe_api_call(client.send_message, chat_id=self._mm_chat_id, text=telegram_escape( self._gen_mm_message(mmevent)), parse_mode="HTML", disable_webpage_preview=True, reply_markup=self._gen_telegram_keyboard( mmevent)) # Store message data in the interface data object mmevent.interface_data = MMInterfaceDataTelegram(chat_id=self._mm_chat_id, message_id=message.message_id) await asyncify(session.commit) else: raise UnsupportedError() # Register handlers for the keyboard events if self.interface.name == "telegram": bot: TelegramBot = self.interface.bot client: PTBBot = bot.client self.interface.register_keyboard_key(f"mm{mmevent.mmid}_YES", callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.YES)) self.interface.register_keyboard_key(f"mm{mmevent.mmid}_MAYBE", callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.MAYBE)) self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT", callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_SHORT)) self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM", callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_MEDIUM)) self.interface.register_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG", callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_LONG)) self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_TIME", callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.NO_TIME)) self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_INTEREST", callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.NO_INTEREST)) self.interface.register_keyboard_key(f"mm{mmevent.mmid}_NO_TECH", callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.NO_TECH)) else: raise UnsupportedError() # Sleep until the time of the event await sleep_until(mmevent.datetime) # Notify the positive answers of the event start if self.interface.name == "telegram": bot: TelegramBot = self.interface.bot client: PTBBot = bot.client self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_YES") self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_MAYBE") self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT") self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM") self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG") self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_TIME") self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_INTEREST") self.interface.unregister_keyboard_key(f"mm{mmevent.mmid}_NO_TECH") for response in mmevent.responses: if response.choice == MMChoice.NO_INTEREST or response.choice == MMChoice.NO_TIME: return try: await self.interface.bot.safe_api_call(client.send_message, chat_id=response.user.telegram[0].tg_id, text=telegram_escape(self._gen_event_start_message(mmevent)), parse_mode="HTML", disable_webpage_preview=True) except Unauthorized: await self.interface.bot.safe_api_call(client.send_message, chat_id=self._mm_error_chat_id, text=telegram_escape( self._gen_unauth_message(response.user)), parse_mode="HTML", disable_webpage_preview=True) else: raise UnsupportedError() # Delete the event message if self.interface.name == "telegram": await self.interface.bot.safe_api_call(client.delete_message, chat_id=mmevent.interface_data.chat_id, message_id=mmevent.interface_data.message_id) # The end! await asyncify(session.close)