mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 19:44:20 +00:00
Move MMTask to utils
This commit is contained in:
parent
3260417ebd
commit
76af87725e
3 changed files with 460 additions and 450 deletions
|
@ -1,460 +1,12 @@
|
|||
import contextlib
|
||||
from typing import *
|
||||
import datetime
|
||||
import re
|
||||
import dateparser
|
||||
import typing
|
||||
import random
|
||||
import enum
|
||||
import asyncio as aio
|
||||
from telegram import Message as PTBMessage
|
||||
from telegram import InlineKeyboardMarkup as InKM
|
||||
from telegram import InlineKeyboardButton as InKB
|
||||
from telegram.error import TelegramError
|
||||
import royalnet.commands as rc
|
||||
from royalnet.serf.telegram import escape as telegram_escape
|
||||
from royalnet.utils import asyncify, sleep_until, sentry_async_wrap
|
||||
import logging
|
||||
|
||||
from ..tables import MMEvent, MMResponse, FiorygiTransaction
|
||||
from ..types import MMChoice, MMInterfaceDataTelegram
|
||||
|
||||
|
||||
class Interrupts(enum.Enum):
|
||||
TIME_RAN_OUT = enum.auto()
|
||||
MANUAL_START = enum.auto()
|
||||
MANUAL_DELETE = enum.auto()
|
||||
|
||||
|
||||
mmchoice_sorting = {
|
||||
MMChoice.YES: -4,
|
||||
MMChoice.LATE_SHORT: -3,
|
||||
MMChoice.LATE_MEDIUM: -2,
|
||||
MMChoice.LATE_LONG: -1,
|
||||
MMChoice.MAYBE: 0,
|
||||
MMChoice.NO: 1
|
||||
}
|
||||
|
||||
|
||||
class MMTask:
|
||||
def __init__(self, mmid: int, *, command: rc.Command):
|
||||
log.debug(f"Creating task for: {mmid}")
|
||||
|
||||
self.loop: aio.AbstractEventLoop = command.loop
|
||||
self.task: Optional[aio.Task] = None
|
||||
self.queue: aio.Queue = aio.Queue(loop=self.loop)
|
||||
self.command: rc.Command = command
|
||||
self.mmid: int = mmid
|
||||
|
||||
self._session: Optional = None
|
||||
self._EventT: Optional[Type[MMEvent]] = None
|
||||
self._ResponseT: Optional[Type[MMResponse]] = None
|
||||
self._mmevent: Optional[MMEvent] = None
|
||||
|
||||
@property
|
||||
def is_running(self):
|
||||
return self.task is not None
|
||||
|
||||
def sync(self):
|
||||
self._session.refresh(self._mmevent)
|
||||
|
||||
def get_response_line(self, response: MMResponse):
|
||||
self.sync()
|
||||
|
||||
# noinspection PyListCreation
|
||||
line = []
|
||||
|
||||
# Emoji
|
||||
line.append(f"{response.choice.value}")
|
||||
|
||||
# Mention the user if he said yes, otherwise just display his name
|
||||
if response.choice == MMChoice.NO:
|
||||
line.append(f"{response.user.telegram[0].name()}")
|
||||
else:
|
||||
line.append(f"{response.user.telegram[0].mention()}")
|
||||
|
||||
# Late time
|
||||
if response.choice == MMChoice.LATE_SHORT:
|
||||
td = self._mmevent.datetime + datetime.timedelta(minutes=10)
|
||||
line.append(f"[{td.strftime('%H:%M')}]")
|
||||
|
||||
elif response.choice == MMChoice.LATE_MEDIUM:
|
||||
td = self._mmevent.datetime + datetime.timedelta(minutes=30)
|
||||
line.append(f"[{td.strftime('%H:%M')}]")
|
||||
|
||||
elif response.choice == MMChoice.LATE_LONG:
|
||||
td = self._mmevent.datetime + datetime.timedelta(minutes=60)
|
||||
line.append(f"[{td.strftime('%H:%M')}+]")
|
||||
|
||||
# Creator
|
||||
if response.user == self._mmevent.creator:
|
||||
line.append("👑")
|
||||
|
||||
# Result
|
||||
return " ".join(line)
|
||||
|
||||
@property
|
||||
def channel_text(self) -> str:
|
||||
self.sync()
|
||||
|
||||
# noinspection PyListCreation
|
||||
text = []
|
||||
|
||||
# First line
|
||||
if self._mmevent.datetime is None:
|
||||
text.append(f"🌐 [Prossimamente] [b]{self._mmevent.title}[/b]")
|
||||
else:
|
||||
text.append(f"🚩 [{self._mmevent.datetime.strftime('%Y-%m-%d %H:%M')}] [b]{self._mmevent.title}[/b]")
|
||||
|
||||
# Description
|
||||
if self._mmevent.description:
|
||||
text.append(f"{self._mmevent.description}")
|
||||
|
||||
# Spacer
|
||||
text.append("")
|
||||
|
||||
# Responses
|
||||
responses = sorted(self._mmevent.responses, key=lambda r: mmchoice_sorting[r.choice])
|
||||
for response in responses:
|
||||
text.append(self.get_response_line(response))
|
||||
|
||||
# Result
|
||||
return "\n".join(text)
|
||||
|
||||
@property
|
||||
def start_text(self) -> str:
|
||||
self.sync()
|
||||
|
||||
# noinspection PyListCreation
|
||||
text = []
|
||||
|
||||
# First line
|
||||
if self._mmevent.datetime is None:
|
||||
text.append(f"🌐 Le iscrizioni all'evento [b]{self._mmevent.title}[/b] sono terminate!")
|
||||
else:
|
||||
text.append(f"🚩 L'evento [b]{self._mmevent.title}[/b] è iniziato!")
|
||||
|
||||
# Description
|
||||
if self._mmevent.description:
|
||||
text.append(f"{self._mmevent.description}")
|
||||
|
||||
# Spacer
|
||||
text.append("")
|
||||
|
||||
# Responses
|
||||
responses = sorted(self._mmevent.responses, key=lambda r: mmchoice_sorting[r.choice])
|
||||
for response in responses:
|
||||
text.append(self.get_response_line(response))
|
||||
|
||||
# Result
|
||||
return "\n".join(text)
|
||||
|
||||
@property
|
||||
def delete_text(self) -> str:
|
||||
return f"🗑 L'evento [b]{self._mmevent.title}[/b] è stato eliminato."
|
||||
|
||||
def get_answer_callback(self, choice: MMChoice):
|
||||
async def callback(data: rc.CommandData):
|
||||
# Find the user who clicked on the button
|
||||
user = await data.get_author(error_if_none=True)
|
||||
|
||||
# Get the related MMEvent
|
||||
mmevent: MMEvent = await asyncify(data.session.query(self._EventT).get, self.mmid)
|
||||
|
||||
# Check if the user had already responded
|
||||
mmresponse: MMResponse = await asyncify(
|
||||
data.session.query(self._ResponseT).filter_by(user=user, mmevent=mmevent).one_or_none
|
||||
)
|
||||
|
||||
if mmresponse is None:
|
||||
# If they didn't respond, create a new MMResponse
|
||||
mmresponse = self._ResponseT(user=user, mmevent=mmevent, choice=choice)
|
||||
data.session.add(mmresponse)
|
||||
|
||||
# Drop fiorygi
|
||||
if random.randrange(100) < self.command.config["Matchmaking"]["fiorygi_award_chance"]:
|
||||
await FiorygiTransaction.spawn_fiorygi(data, user, 1, "aver risposto a un matchmaking")
|
||||
else:
|
||||
# Change their response
|
||||
mmresponse.choice = choice
|
||||
await data.session_commit()
|
||||
|
||||
await self.telegram_channel_message_update()
|
||||
|
||||
await data.reply(f"{choice.value} Hai risposto al matchmaking!")
|
||||
return callback
|
||||
|
||||
def get_delete_callback(self):
|
||||
async def callback(data: rc.CommandData):
|
||||
# Find the user who clicked on the button
|
||||
user = await data.get_author(error_if_none=True)
|
||||
|
||||
# Get the related MMEvent
|
||||
mmevent: MMEvent = await asyncify(data.session.query(self._EventT).get, self.mmid)
|
||||
|
||||
# Ensure the user has the required roles to start the matchmaking
|
||||
if user != mmevent.creator and "admin" not in user.roles:
|
||||
raise rc.UserError("Non hai i permessi per eliminare questo matchmaking!")
|
||||
|
||||
# Interrupt the matchmaking with the MANUAL_DELETE reason
|
||||
await self.queue.put(Interrupts.MANUAL_DELETE)
|
||||
|
||||
await data.reply(f"🗑 Evento eliminato!")
|
||||
return callback
|
||||
|
||||
def get_start_callback(self):
|
||||
async def callback(data: rc.CommandData):
|
||||
# Find the user who clicked on the button
|
||||
user = await data.get_author(error_if_none=True)
|
||||
|
||||
# Get the related MMEvent
|
||||
mmevent: MMEvent = await asyncify(data.session.query(self._EventT).get, self.mmid)
|
||||
|
||||
# Ensure the user has the required roles to start the matchmaking
|
||||
if user != mmevent.creator and "admin" not in user.roles:
|
||||
raise rc.UserError("Non hai i permessi per eliminare questo matchmaking!")
|
||||
|
||||
# Interrupt the matchmaking with the MANUAL_DELETE reason
|
||||
await self.queue.put(Interrupts.MANUAL_START)
|
||||
|
||||
await data.reply(f"🚩 Evento avviato!")
|
||||
return callback
|
||||
|
||||
@property
|
||||
def royalnet_keyboard(self):
|
||||
# noinspection PyListCreation
|
||||
rows = []
|
||||
|
||||
rows.append([
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.YES.value}",
|
||||
text="Ci sarò!",
|
||||
callback=self.get_answer_callback(MMChoice.YES)
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.MAYBE.value}",
|
||||
text="Forse...",
|
||||
callback=self.get_answer_callback(MMChoice.MAYBE)
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.NO.value}",
|
||||
text="Non mi interessa.",
|
||||
callback=self.get_answer_callback(MMChoice.NO)
|
||||
),
|
||||
])
|
||||
|
||||
if self._mmevent.datetime is not None:
|
||||
rows.append([
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.LATE_SHORT.value}",
|
||||
text="10 min",
|
||||
callback=self.get_answer_callback(MMChoice.LATE_SHORT)
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.LATE_MEDIUM.value}",
|
||||
text="30 min",
|
||||
callback=self.get_answer_callback(MMChoice.LATE_MEDIUM)
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.LATE_LONG.value}",
|
||||
text="60 min",
|
||||
callback=self.get_answer_callback(MMChoice.LATE_LONG)
|
||||
)
|
||||
])
|
||||
|
||||
rows.append([
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"🗑",
|
||||
text="Elimina",
|
||||
callback=self.get_delete_callback()
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"🚩",
|
||||
text="Inizia",
|
||||
callback=self.get_start_callback()
|
||||
),
|
||||
])
|
||||
|
||||
return rows
|
||||
|
||||
@property
|
||||
def telegram_keyboard(self):
|
||||
# noinspection PyListCreation
|
||||
rows = []
|
||||
key_id = 0
|
||||
|
||||
for r_row in self.royalnet_keyboard:
|
||||
row = []
|
||||
for r_key in r_row:
|
||||
# Generate a unique callback string
|
||||
callback_str = f"mm{self.mmid}_{key_id}"
|
||||
|
||||
# Create a InlineKeyboardButton with that callback string
|
||||
row.append(InKB(f"{r_key.short} {r_key.text}", callback_data=callback_str))
|
||||
|
||||
# Increase the key_id
|
||||
key_id += 1
|
||||
rows.append(row)
|
||||
|
||||
# Return the resulting InlineKeyboardMarkup
|
||||
return InKM(rows)
|
||||
|
||||
def register_telegram_keyboard(self, inkm: InKM):
|
||||
# noinspection PyListCreation
|
||||
royalnet_keyboard = self.royalnet_keyboard
|
||||
for x, row in enumerate(inkm.inline_keyboard):
|
||||
for y, key in enumerate(row):
|
||||
key: InKB
|
||||
self.command.interface.serf.register_keyboard_key(key.callback_data, key=royalnet_keyboard[x][y])
|
||||
|
||||
def unregister_telegram_keyboard(self, inkm: InKM):
|
||||
for row in inkm.inline_keyboard:
|
||||
for key in row:
|
||||
key: InKB
|
||||
self.command.interface.serf.unregister_keyboard_key(key.callback_data)
|
||||
|
||||
async def wait_until_due(self):
|
||||
"""When the event is due, interrupt the MMTask with the TIME_RAN_OUT reason."""
|
||||
if self._mmevent.datetime is None:
|
||||
return
|
||||
await sleep_until(self._mmevent.datetime)
|
||||
await self.queue.put(Interrupts.TIME_RAN_OUT)
|
||||
|
||||
@property
|
||||
def telegram_channel_id(self):
|
||||
return self.command.config["Matchmaking"]["mm_telegram_channel_id"]
|
||||
|
||||
@property
|
||||
def telegram_group_id(self):
|
||||
return self.command.config["Matchmaking"]["mm_telegram_group_id"]
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def telegram_channel_message(self):
|
||||
|
||||
# Generate the InlineKeyboardMarkup
|
||||
inkm = self.telegram_keyboard
|
||||
|
||||
# Bind the Royalnet buttons to the Telegram keyboard
|
||||
log.debug(f"Registering keyboard for: {self.mmid}")
|
||||
self.register_telegram_keyboard(inkm)
|
||||
|
||||
# If the event has no associated interface data...
|
||||
if self._mmevent.interface_data is None:
|
||||
# Send the channel message
|
||||
log.debug(f"Sending message for: {self.mmid}")
|
||||
message: PTBMessage = await self.command.interface.serf.api_call(
|
||||
self.command.interface.serf.client.send_message,
|
||||
chat_id=self.telegram_channel_id,
|
||||
text=telegram_escape(self.channel_text),
|
||||
parse_mode="HTML",
|
||||
disable_webpage_preview=True,
|
||||
reply_markup=inkm
|
||||
)
|
||||
|
||||
# Register the interface data on the database
|
||||
self._mmevent.interface_data = MMInterfaceDataTelegram(
|
||||
chat_id=self.telegram_channel_id,
|
||||
message_id=message.message_id
|
||||
)
|
||||
self._session.commit()
|
||||
|
||||
# Wait until the event starts
|
||||
yield
|
||||
|
||||
# Delete the channel message
|
||||
log.debug(f"Deleting message for: {self.mmid}")
|
||||
await self.command.interface.serf.api_call(
|
||||
self.command.interface.serf.client.delete_message,
|
||||
chat_id=self._mmevent.interface_data.chat_id,
|
||||
message_id=self._mmevent.interface_data.message_id
|
||||
)
|
||||
|
||||
# Unregister the Telegram keyboard bindings
|
||||
log.debug(f"Unregistering keyboard for: {self.mmid}")
|
||||
self.unregister_telegram_keyboard(inkm)
|
||||
|
||||
async def telegram_channel_message_update(self):
|
||||
log.debug(f"Updating message for: {self.mmid}")
|
||||
try:
|
||||
await asyncify(
|
||||
self.command.interface.serf.client.edit_message_text,
|
||||
chat_id=self._mmevent.interface_data.chat_id,
|
||||
text=telegram_escape(self.channel_text),
|
||||
message_id=self._mmevent.interface_data.message_id,
|
||||
parse_mode="HTML",
|
||||
disable_web_page_preview=True,
|
||||
reply_markup=self.telegram_keyboard
|
||||
)
|
||||
except TelegramError as e:
|
||||
log.warning(f"TelegramError during update: {e}")
|
||||
|
||||
async def telegram_group_message_start(self):
|
||||
await self.command.interface.serf.api_call(
|
||||
self.command.interface.serf.client.send_message,
|
||||
chat_id=self.telegram_group_id,
|
||||
text=telegram_escape(self.start_text),
|
||||
parse_mode="HTML",
|
||||
disable_webpage_preview=True
|
||||
)
|
||||
|
||||
async def telegram_group_message_delete(self):
|
||||
await self.command.interface.serf.api_call(
|
||||
self.command.interface.serf.client.send_message,
|
||||
chat_id=self.telegram_group_id,
|
||||
text=telegram_escape(self.delete_text),
|
||||
parse_mode="HTML",
|
||||
disable_webpage_preview=True
|
||||
)
|
||||
|
||||
def start(self):
|
||||
log.debug(f"Starting task for: {self.mmid}")
|
||||
self.task = self.loop.create_task(self.run())
|
||||
|
||||
@sentry_async_wrap()
|
||||
async def run(self):
|
||||
log.debug(f"Running task for: {self.mmid}")
|
||||
|
||||
# Create a new session for the MMTask
|
||||
self._session = self.command.alchemy.Session()
|
||||
self._EventT = self.command.alchemy.get(MMEvent)
|
||||
self._ResponseT = self.command.alchemy.get(MMResponse)
|
||||
self._mmevent: MMEvent = self._session.query(self._EventT).get(self.mmid)
|
||||
|
||||
if self._mmevent is None:
|
||||
raise rc.InvalidInputError(f"No event exists with the mmid {self.mmid}.")
|
||||
|
||||
if self._mmevent.interface != "telegram":
|
||||
raise rc.UnsupportedError("Currently only the Telegram interface is supported.")
|
||||
|
||||
async with self.telegram_channel_message():
|
||||
self.loop.create_task(self.wait_until_due())
|
||||
|
||||
# Sleep until something interrupts the task
|
||||
interrupt = await self.queue.get()
|
||||
|
||||
# Mark the event as interrupted
|
||||
self._mmevent.interrupted = True
|
||||
self._session.commit()
|
||||
|
||||
# Send a group notification if the MMEvent wasn't deleted
|
||||
if interrupt != Interrupts.MANUAL_DELETE:
|
||||
await self.telegram_group_message_start()
|
||||
else:
|
||||
await self.telegram_group_message_delete()
|
||||
|
||||
# Close the database session
|
||||
await asyncify(self._session.close)
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
from ..tables import MMEvent
|
||||
from ..utils import MMTask
|
||||
|
||||
|
||||
class MatchmakingCommand(rc.Command):
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
from .royalqueue import RoyalQueue
|
||||
from .royalpool import RoyalPool
|
||||
from .mmtask import MMTask
|
||||
|
||||
__all__ = [
|
||||
"RoyalQueue",
|
||||
"RoyalPool",
|
||||
"MMTask",
|
||||
]
|
||||
|
|
456
royalpack/utils/mmtask.py
Normal file
456
royalpack/utils/mmtask.py
Normal file
|
@ -0,0 +1,456 @@
|
|||
import contextlib
|
||||
import random
|
||||
from typing import *
|
||||
import logging
|
||||
import datetime
|
||||
import enum
|
||||
import asyncio as aio
|
||||
|
||||
from telegram import InlineKeyboardMarkup as InKM
|
||||
from telegram import InlineKeyboardButton as InKB
|
||||
from telegram import Message as PTBMessage
|
||||
from telegram import TelegramError
|
||||
|
||||
import royalnet.commands as rc
|
||||
import royalnet.utils as ru
|
||||
import royalnet.serf.telegram as rst
|
||||
|
||||
from ..types import MMChoice, MMInterfaceDataTelegram
|
||||
from ..tables import MMEvent, MMResponse, FiorygiTransaction
|
||||
|
||||
|
||||
class Interrupts(enum.Enum):
|
||||
TIME_RAN_OUT = enum.auto()
|
||||
MANUAL_START = enum.auto()
|
||||
MANUAL_DELETE = enum.auto()
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
mmchoice_sorting = {
|
||||
MMChoice.YES: -4,
|
||||
MMChoice.LATE_SHORT: -3,
|
||||
MMChoice.LATE_MEDIUM: -2,
|
||||
MMChoice.LATE_LONG: -1,
|
||||
MMChoice.MAYBE: 0,
|
||||
MMChoice.NO: 1
|
||||
}
|
||||
|
||||
|
||||
class MMTask:
|
||||
def __init__(self, mmid: int, *, command: rc.Command):
|
||||
log.debug(f"Creating task for: {mmid}")
|
||||
|
||||
self.loop: aio.AbstractEventLoop = command.loop
|
||||
self.task: Optional[aio.Task] = None
|
||||
self.queue: aio.Queue = aio.Queue(loop=self.loop)
|
||||
self.command: rc.Command = command
|
||||
self.mmid: int = mmid
|
||||
|
||||
self._session: Optional = None
|
||||
self._EventT: Optional[Type[MMEvent]] = None
|
||||
self._ResponseT: Optional[Type[MMResponse]] = None
|
||||
self._mmevent: Optional[MMEvent] = None
|
||||
|
||||
@property
|
||||
def is_running(self):
|
||||
return self.task is not None
|
||||
|
||||
def sync(self):
|
||||
self._session.refresh(self._mmevent)
|
||||
|
||||
def get_response_line(self, response: MMResponse):
|
||||
self.sync()
|
||||
|
||||
# noinspection PyListCreation
|
||||
line = []
|
||||
|
||||
# Emoji
|
||||
line.append(f"{response.choice.value}")
|
||||
|
||||
# Mention the user if he said yes, otherwise just display his name
|
||||
if response.choice == MMChoice.NO:
|
||||
line.append(f"{response.user.telegram[0].name()}")
|
||||
else:
|
||||
line.append(f"{response.user.telegram[0].mention()}")
|
||||
|
||||
# Late time
|
||||
if response.choice == MMChoice.LATE_SHORT:
|
||||
td = self._mmevent.datetime + datetime.timedelta(minutes=10)
|
||||
line.append(f"[{td.strftime('%H:%M')}]")
|
||||
|
||||
elif response.choice == MMChoice.LATE_MEDIUM:
|
||||
td = self._mmevent.datetime + datetime.timedelta(minutes=30)
|
||||
line.append(f"[{td.strftime('%H:%M')}]")
|
||||
|
||||
elif response.choice == MMChoice.LATE_LONG:
|
||||
td = self._mmevent.datetime + datetime.timedelta(minutes=60)
|
||||
line.append(f"[{td.strftime('%H:%M')}+]")
|
||||
|
||||
# Creator
|
||||
if response.user == self._mmevent.creator:
|
||||
line.append("👑")
|
||||
|
||||
# Result
|
||||
return " ".join(line)
|
||||
|
||||
@property
|
||||
def channel_text(self) -> str:
|
||||
self.sync()
|
||||
|
||||
# noinspection PyListCreation
|
||||
text = []
|
||||
|
||||
# First line
|
||||
if self._mmevent.datetime is None:
|
||||
text.append(f"🌐 [Prossimamente] [b]{self._mmevent.title}[/b]")
|
||||
else:
|
||||
text.append(f"🚩 [{self._mmevent.datetime.strftime('%Y-%m-%d %H:%M')}] [b]{self._mmevent.title}[/b]")
|
||||
|
||||
# Description
|
||||
if self._mmevent.description:
|
||||
text.append(f"{self._mmevent.description}")
|
||||
|
||||
# Spacer
|
||||
text.append("")
|
||||
|
||||
# Responses
|
||||
responses = sorted(self._mmevent.responses, key=lambda r: mmchoice_sorting[r.choice])
|
||||
for response in responses:
|
||||
text.append(self.get_response_line(response))
|
||||
|
||||
# Result
|
||||
return "\n".join(text)
|
||||
|
||||
@property
|
||||
def start_text(self) -> str:
|
||||
self.sync()
|
||||
|
||||
# noinspection PyListCreation
|
||||
text = []
|
||||
|
||||
# First line
|
||||
if self._mmevent.datetime is None:
|
||||
text.append(f"🌐 Le iscrizioni all'evento [b]{self._mmevent.title}[/b] sono terminate!")
|
||||
else:
|
||||
text.append(f"🚩 L'evento [b]{self._mmevent.title}[/b] è iniziato!")
|
||||
|
||||
# Description
|
||||
if self._mmevent.description:
|
||||
text.append(f"{self._mmevent.description}")
|
||||
|
||||
# Spacer
|
||||
text.append("")
|
||||
|
||||
# Responses
|
||||
responses = sorted(self._mmevent.responses, key=lambda r: mmchoice_sorting[r.choice])
|
||||
for response in responses:
|
||||
text.append(self.get_response_line(response))
|
||||
|
||||
# Result
|
||||
return "\n".join(text)
|
||||
|
||||
@property
|
||||
def delete_text(self) -> str:
|
||||
return f"🗑 L'evento [b]{self._mmevent.title}[/b] è stato eliminato."
|
||||
|
||||
def get_answer_callback(self, choice: MMChoice):
|
||||
async def callback(data: rc.CommandData):
|
||||
# Find the user who clicked on the button
|
||||
user = await data.get_author(error_if_none=True)
|
||||
|
||||
# Get the related MMEvent
|
||||
mmevent: MMEvent = await ru.asyncify(data.session.query(self._EventT).get, self.mmid)
|
||||
|
||||
# Check if the user had already responded
|
||||
mmresponse: MMResponse = await ru.asyncify(
|
||||
data.session.query(self._ResponseT).filter_by(user=user, mmevent=mmevent).one_or_none
|
||||
)
|
||||
|
||||
if mmresponse is None:
|
||||
# If they didn't respond, create a new MMResponse
|
||||
mmresponse = self._ResponseT(user=user, mmevent=mmevent, choice=choice)
|
||||
data.session.add(mmresponse)
|
||||
|
||||
# Drop fiorygi
|
||||
if random.randrange(100) < self.command.config["Matchmaking"]["fiorygi_award_chance"]:
|
||||
await FiorygiTransaction.spawn_fiorygi(data, user, 1, "aver risposto a un matchmaking")
|
||||
else:
|
||||
# Change their response
|
||||
mmresponse.choice = choice
|
||||
await data.session_commit()
|
||||
|
||||
await self.telegram_channel_message_update()
|
||||
|
||||
await data.reply(f"{choice.value} Hai risposto al matchmaking!")
|
||||
return callback
|
||||
|
||||
def get_delete_callback(self):
|
||||
async def callback(data: rc.CommandData):
|
||||
# Find the user who clicked on the button
|
||||
user = await data.get_author(error_if_none=True)
|
||||
|
||||
# Get the related MMEvent
|
||||
mmevent: MMEvent = await ru.asyncify(data.session.query(self._EventT).get, self.mmid)
|
||||
|
||||
# Ensure the user has the required roles to start the matchmaking
|
||||
if user != mmevent.creator and "admin" not in user.roles:
|
||||
raise rc.UserError("Non hai i permessi per eliminare questo matchmaking!")
|
||||
|
||||
# Interrupt the matchmaking with the MANUAL_DELETE reason
|
||||
await self.queue.put(Interrupts.MANUAL_DELETE)
|
||||
|
||||
await data.reply(f"🗑 Evento eliminato!")
|
||||
return callback
|
||||
|
||||
def get_start_callback(self):
|
||||
async def callback(data: rc.CommandData):
|
||||
# Find the user who clicked on the button
|
||||
user = await data.get_author(error_if_none=True)
|
||||
|
||||
# Get the related MMEvent
|
||||
mmevent: MMEvent = await ru.asyncify(data.session.query(self._EventT).get, self.mmid)
|
||||
|
||||
# Ensure the user has the required roles to start the matchmaking
|
||||
if user != mmevent.creator and "admin" not in user.roles:
|
||||
raise rc.UserError("Non hai i permessi per eliminare questo matchmaking!")
|
||||
|
||||
# Interrupt the matchmaking with the MANUAL_DELETE reason
|
||||
await self.queue.put(Interrupts.MANUAL_START)
|
||||
|
||||
await data.reply(f"🚩 Evento avviato!")
|
||||
return callback
|
||||
|
||||
@property
|
||||
def royalnet_keyboard(self):
|
||||
# noinspection PyListCreation
|
||||
rows = []
|
||||
|
||||
rows.append([
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.YES.value}",
|
||||
text="Ci sarò!",
|
||||
callback=self.get_answer_callback(MMChoice.YES)
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.MAYBE.value}",
|
||||
text="Forse...",
|
||||
callback=self.get_answer_callback(MMChoice.MAYBE)
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.NO.value}",
|
||||
text="Non mi interessa.",
|
||||
callback=self.get_answer_callback(MMChoice.NO)
|
||||
),
|
||||
])
|
||||
|
||||
if self._mmevent.datetime is not None:
|
||||
rows.append([
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.LATE_SHORT.value}",
|
||||
text="10 min",
|
||||
callback=self.get_answer_callback(MMChoice.LATE_SHORT)
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.LATE_MEDIUM.value}",
|
||||
text="30 min",
|
||||
callback=self.get_answer_callback(MMChoice.LATE_MEDIUM)
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"{MMChoice.LATE_LONG.value}",
|
||||
text="60 min",
|
||||
callback=self.get_answer_callback(MMChoice.LATE_LONG)
|
||||
)
|
||||
])
|
||||
|
||||
rows.append([
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"🗑",
|
||||
text="Elimina",
|
||||
callback=self.get_delete_callback()
|
||||
),
|
||||
rc.KeyboardKey(
|
||||
interface=self.command.interface,
|
||||
short=f"🚩",
|
||||
text="Inizia",
|
||||
callback=self.get_start_callback()
|
||||
),
|
||||
])
|
||||
|
||||
return rows
|
||||
|
||||
@property
|
||||
def telegram_keyboard(self):
|
||||
# noinspection PyListCreation
|
||||
rows = []
|
||||
key_id = 0
|
||||
|
||||
for r_row in self.royalnet_keyboard:
|
||||
row = []
|
||||
for r_key in r_row:
|
||||
# Generate a unique callback string
|
||||
callback_str = f"mm{self.mmid}_{key_id}"
|
||||
|
||||
# Create a InlineKeyboardButton with that callback string
|
||||
row.append(InKB(f"{r_key.short} {r_key.text}", callback_data=callback_str))
|
||||
|
||||
# Increase the key_id
|
||||
key_id += 1
|
||||
rows.append(row)
|
||||
|
||||
# Return the resulting InlineKeyboardMarkup
|
||||
return InKM(rows)
|
||||
|
||||
def register_telegram_keyboard(self, inkm: InKM):
|
||||
# noinspection PyListCreation
|
||||
royalnet_keyboard = self.royalnet_keyboard
|
||||
for x, row in enumerate(inkm.inline_keyboard):
|
||||
for y, key in enumerate(row):
|
||||
key: InKB
|
||||
self.command.interface.serf.register_keyboard_key(key.callback_data, key=royalnet_keyboard[x][y])
|
||||
|
||||
def unregister_telegram_keyboard(self, inkm: InKM):
|
||||
for row in inkm.inline_keyboard:
|
||||
for key in row:
|
||||
key: InKB
|
||||
self.command.interface.serf.unregister_keyboard_key(key.callback_data)
|
||||
|
||||
async def wait_until_due(self):
|
||||
"""When the event is due, interrupt the MMTask with the TIME_RAN_OUT reason."""
|
||||
if self._mmevent.datetime is None:
|
||||
return
|
||||
await ru.sleep_until(self._mmevent.datetime)
|
||||
await self.queue.put(Interrupts.TIME_RAN_OUT)
|
||||
|
||||
@property
|
||||
def telegram_channel_id(self):
|
||||
return self.command.config["Matchmaking"]["mm_telegram_channel_id"]
|
||||
|
||||
@property
|
||||
def telegram_group_id(self):
|
||||
return self.command.config["Matchmaking"]["mm_telegram_group_id"]
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def telegram_channel_message(self):
|
||||
|
||||
# Generate the InlineKeyboardMarkup
|
||||
inkm = self.telegram_keyboard
|
||||
|
||||
# Bind the Royalnet buttons to the Telegram keyboard
|
||||
log.debug(f"Registering keyboard for: {self.mmid}")
|
||||
self.register_telegram_keyboard(inkm)
|
||||
|
||||
# If the event has no associated interface data...
|
||||
if self._mmevent.interface_data is None:
|
||||
# Send the channel message
|
||||
log.debug(f"Sending message for: {self.mmid}")
|
||||
message: PTBMessage = await self.command.interface.serf.api_call(
|
||||
self.command.interface.serf.client.send_message,
|
||||
chat_id=self.telegram_channel_id,
|
||||
text=rst.escape(self.channel_text),
|
||||
parse_mode="HTML",
|
||||
disable_webpage_preview=True,
|
||||
reply_markup=inkm
|
||||
)
|
||||
|
||||
# Register the interface data on the database
|
||||
self._mmevent.interface_data = MMInterfaceDataTelegram(
|
||||
chat_id=self.telegram_channel_id,
|
||||
message_id=message.message_id
|
||||
)
|
||||
self._session.commit()
|
||||
|
||||
# Wait until the event starts
|
||||
yield
|
||||
|
||||
# Delete the channel message
|
||||
log.debug(f"Deleting message for: {self.mmid}")
|
||||
await self.command.interface.serf.api_call(
|
||||
self.command.interface.serf.client.delete_message,
|
||||
chat_id=self._mmevent.interface_data.chat_id,
|
||||
message_id=self._mmevent.interface_data.message_id
|
||||
)
|
||||
|
||||
# Unregister the Telegram keyboard bindings
|
||||
log.debug(f"Unregistering keyboard for: {self.mmid}")
|
||||
self.unregister_telegram_keyboard(inkm)
|
||||
|
||||
async def telegram_channel_message_update(self):
|
||||
log.debug(f"Updating message for: {self.mmid}")
|
||||
try:
|
||||
await ru.asyncify(
|
||||
self.command.interface.serf.client.edit_message_text,
|
||||
chat_id=self._mmevent.interface_data.chat_id,
|
||||
text=rst.escape(self.channel_text),
|
||||
message_id=self._mmevent.interface_data.message_id,
|
||||
parse_mode="HTML",
|
||||
disable_web_page_preview=True,
|
||||
reply_markup=self.telegram_keyboard
|
||||
)
|
||||
except TelegramError as e:
|
||||
log.warning(f"TelegramError during update: {e}")
|
||||
|
||||
async def telegram_group_message_start(self):
|
||||
await self.command.interface.serf.api_call(
|
||||
self.command.interface.serf.client.send_message,
|
||||
chat_id=self.telegram_group_id,
|
||||
text=rst.escape(self.start_text),
|
||||
parse_mode="HTML",
|
||||
disable_webpage_preview=True
|
||||
)
|
||||
|
||||
async def telegram_group_message_delete(self):
|
||||
await self.command.interface.serf.api_call(
|
||||
self.command.interface.serf.client.send_message,
|
||||
chat_id=self.telegram_group_id,
|
||||
text=rst.escape(self.delete_text),
|
||||
parse_mode="HTML",
|
||||
disable_webpage_preview=True
|
||||
)
|
||||
|
||||
def start(self):
|
||||
log.debug(f"Starting task for: {self.mmid}")
|
||||
self.task = self.loop.create_task(self.run())
|
||||
|
||||
@ru.sentry_async_wrap()
|
||||
async def run(self):
|
||||
log.debug(f"Running task for: {self.mmid}")
|
||||
|
||||
# Create a new session for the MMTask
|
||||
self._session = self.command.alchemy.Session()
|
||||
self._EventT = self.command.alchemy.get(MMEvent)
|
||||
self._ResponseT = self.command.alchemy.get(MMResponse)
|
||||
self._mmevent: MMEvent = self._session.query(self._EventT).get(self.mmid)
|
||||
|
||||
if self._mmevent is None:
|
||||
raise rc.InvalidInputError(f"No event exists with the mmid {self.mmid}.")
|
||||
|
||||
if self._mmevent.interface != "telegram":
|
||||
raise rc.UnsupportedError("Currently only the Telegram interface is supported.")
|
||||
|
||||
async with self.telegram_channel_message():
|
||||
self.loop.create_task(self.wait_until_due())
|
||||
|
||||
# Sleep until something interrupts the task
|
||||
interrupt = await self.queue.get()
|
||||
|
||||
# Mark the event as interrupted
|
||||
self._mmevent.interrupted = True
|
||||
self._session.commit()
|
||||
|
||||
# Send a group notification if the MMEvent wasn't deleted
|
||||
if interrupt != Interrupts.MANUAL_DELETE:
|
||||
await self.telegram_group_message_start()
|
||||
else:
|
||||
await self.telegram_group_message_delete()
|
||||
|
||||
# Close the database session
|
||||
await ru.asyncify(self._session.close)
|
Loading…
Reference in a new issue