1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

publish: 5.9.5

This commit is contained in:
Steffo 2020-07-02 21:40:33 +02:00
parent fdacd61093
commit 6c7cfeb668
Signed by: steffo
GPG key ID: 896A80F55F7C97F0
8 changed files with 494 additions and 342 deletions

20
poetry.lock generated
View file

@ -59,7 +59,7 @@ description = "Extensible memoizing collections and decorators"
name = "cachetools"
optional = false
python-versions = "~=3.5"
version = "4.1.0"
version = "4.1.1"
[[package]]
category = "main"
@ -269,7 +269,7 @@ description = "Internationalized Domain Names in Applications (IDNA)"
name = "idna"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*"
version = "2.9"
version = "2.10"
[[package]]
category = "main"
@ -442,7 +442,7 @@ description = "A multipurpose bot and web framework"
name = "royalnet"
optional = false
python-versions = ">=3.8,<4.0"
version = "5.10.0"
version = "5.10.3"
[package.dependencies]
dateparser = ">=0.7.2,<0.8.0"
@ -712,7 +712,7 @@ python-versions = "*"
version = "2020.6.16.1"
[metadata]
content-hash = "a88c104db6c8828bc307d3db357aa02b6afd2ada8f6788a004bdb42417ad7207"
content-hash = "b0649ccbfdffd5947cbb0d4930182bd3b5edbf39181880db20e7b73ee85c9b02"
python-versions = "^3.8"
[metadata.files]
@ -759,8 +759,8 @@ bcrypt = [
{file = "bcrypt-3.1.7.tar.gz", hash = "sha256:0b0069c752ec14172c5f78208f1863d7ad6755a6fae6fe76ec2c80d13be41e42"},
]
cachetools = [
{file = "cachetools-4.1.0-py3-none-any.whl", hash = "sha256:de5d88f87781602201cde465d3afe837546663b168e8b39df67411b0bf10cefc"},
{file = "cachetools-4.1.0.tar.gz", hash = "sha256:1d057645db16ca7fe1f3bd953558897603d6f0b9c51ed9d11eb4d071ec4e2aab"},
{file = "cachetools-4.1.1-py3-none-any.whl", hash = "sha256:513d4ff98dd27f85743a8dc0e92f55ddb1b49e060c2d5961512855cda2c01a98"},
{file = "cachetools-4.1.1.tar.gz", hash = "sha256:bbaa39c3dede00175df2dc2b03d0cf18dd2d32a7de7beb68072d13043c9edb20"},
]
certifi = [
{file = "certifi-2020.6.20-py2.py3-none-any.whl", hash = "sha256:8fc0819f1f30ba15bdb34cceffb9ef04d99f420f68eb75d901e9560b8749fc41"},
@ -877,8 +877,8 @@ humanfriendly = [
{file = "humanfriendly-8.2.tar.gz", hash = "sha256:bf52ec91244819c780341a3438d5d7b09f431d3f113a475147ac9b7b167a3d12"},
]
idna = [
{file = "idna-2.9-py2.py3-none-any.whl", hash = "sha256:a068a21ceac8a4d63dbfd964670474107f541babbd2250d61922f029858365fa"},
{file = "idna-2.9.tar.gz", hash = "sha256:7588d1c14ae4c77d74036e8c22ff447b26d0fde8f007354fd48a7814db15b7cb"},
{file = "idna-2.10-py2.py3-none-any.whl", hash = "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0"},
{file = "idna-2.10.tar.gz", hash = "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6"},
]
multidict = [
{file = "multidict-4.7.6-cp35-cp35m-macosx_10_14_x86_64.whl", hash = "sha256:275ca32383bc5d1894b6975bb4ca6a7ff16ab76fa622967625baeebcf8079000"},
@ -1045,8 +1045,8 @@ riotwatcher = [
{file = "riotwatcher-2.7.1.tar.gz", hash = "sha256:5349271c7e00637b7619491a6070e66603705db60558ea2a690e7016f6e6d9a4"},
]
royalnet = [
{file = "royalnet-5.10.0-py3-none-any.whl", hash = "sha256:4d4d5360b71d509ec0ef64edd5c3767e378a7ce48ac1a224a0fa9fa3ea524aed"},
{file = "royalnet-5.10.0.tar.gz", hash = "sha256:7fb7bd4b02f4f5a89b5085cfed0a1a3072d2081a3832ff5b949f06ad79e90635"},
{file = "royalnet-5.10.3-py3-none-any.whl", hash = "sha256:1042a7383e26dbc76d03289d73f3c1b4d167cf860e272397b88ea54db0af513d"},
{file = "royalnet-5.10.3.tar.gz", hash = "sha256:0232279024a3fd92c374770904443d5a0d7537dd0bf360e6c3f4e47068719fc0"},
]
royalspells = [
{file = "royalspells-3.2.tar.gz", hash = "sha256:2bd4a9a66514532e35c02c3907425af48c7cb292364c4843c795719a82b25dfe"},

View file

@ -25,7 +25,7 @@
steam = "*"
[tool.poetry.dependencies.royalnet]
version = "~5.10.0"
version = "~5.10.3"
# Maybe... there is a way to make these selectable?
extras = [
"telegram",

View file

@ -53,7 +53,7 @@ from .lazyfunkwhalealbum import LazyfunkwhalealbumCommand
from .matchmaking import MatchmakingCommand
from .cvstats import CvstatsCommand
from .elevatormusic import ElevatormusicCommand
from .royalpack import RoyalpackCommand
from .royalpackversion import RoyalpackCommand
from .givefiorygi import GivefiorygiCommand
from .help import HelpCommand
from .pug import PugCommand

View file

@ -1,3 +1,4 @@
import contextlib
from typing import *
import datetime
import re
@ -6,16 +7,14 @@ import typing
import random
import enum
import asyncio as aio
from telegram import Bot as PTBBot
from telegram import Message as PTBMessage
from telegram import InlineKeyboardMarkup as InKM
from telegram import InlineKeyboardButton as InKB
from telegram.error import TelegramError
import royalnet.commands as rc
from royalnet.serf.telegram import TelegramSerf as TelegramBot
from royalnet.serf.telegram import escape as telegram_escape
from royalnet.utils import asyncify, sleep_until, sentry_async_wrap
from royalnet.backpack.tables import User
import logging
from ..tables import MMEvent, MMResponse, FiorygiTransaction
from ..types import MMChoice, MMInterfaceDataTelegram
@ -27,59 +26,492 @@ class Interrupts(enum.Enum):
MANUAL_DELETE = enum.auto()
mmchoice_sorting = {
MMChoice.YES: -4,
MMChoice.LATE_SHORT: -3,
MMChoice.LATE_MEDIUM: -2,
MMChoice.LATE_LONG: -1,
MMChoice.MAYBE: 0,
MMChoice.NO: 1
}
class MMTask:
def __init__(self, mmid: int, *, command: rc.Command):
log.debug(f"Creating task for: {mmid}")
self.loop: aio.AbstractEventLoop = command.loop
self.task: Optional[aio.Task] = None
self.queue: aio.Queue = aio.Queue(loop=self.loop)
self.command: rc.Command = command
self.mmid: int = mmid
self._session: Optional = None
self._EventT: Optional[Type[MMEvent]] = None
self._ResponseT: Optional[Type[MMResponse]] = None
self._mmevent: Optional[MMEvent] = None
@property
def is_running(self):
return self.task is not None
def get_response_line(self, response: MMResponse):
# noinspection PyListCreation
line = []
# Emoji
line.append(f"{response.choice.value}")
# Mention the user if he said yes, otherwise just display his name
if response.choice == MMChoice.NO:
line.append(f"{response.user.telegram[0].name()}")
else:
line.append(f"{response.user.telegram[0].mention()}")
# Late time
if response.choice == MMChoice.LATE_SHORT:
td = self._mmevent.datetime + datetime.timedelta(minutes=10)
line.append(f"[{td.strftime('%H:%M')}]")
elif response.choice == MMChoice.LATE_MEDIUM:
td = self._mmevent.datetime + datetime.timedelta(minutes=30)
line.append(f"[{td.strftime('%H:%M')}]")
elif response.choice == MMChoice.LATE_LONG:
td = self._mmevent.datetime + datetime.timedelta(minutes=60)
line.append(f"[{td.strftime('%H:%M')}+]")
# Creator
if response.user == self._mmevent.creator:
line.append("👑")
# Result
return " ".join(line)
@property
def channel_text(self) -> str:
# noinspection PyListCreation
text = []
# First line
if self._mmevent.datetime is None:
text.append(f"🌐 [Prossimamente] [b]{self._mmevent.title}[/b]")
else:
text.append(f"🚩 [{self._mmevent.datetime.strftime('%Y-%m-%d %H:%M')}] [b]{self._mmevent.title}[/b]")
# Description
if self._mmevent.description:
text.append(f"{self._mmevent.description}")
# Spacer
text.append("")
# Responses
responses = sorted(self._mmevent.responses, key=lambda r: mmchoice_sorting[r.choice])
for response in responses:
text.append(self.get_response_line(response))
# Result
return "\n".join(text)
@property
def start_text(self) -> str:
# noinspection PyListCreation
text = []
# First line
if self._mmevent.datetime is None:
text.append(f"🌐 Le iscrizioni all'evento [b]{self._mmevent.title}[/b] sono terminate!")
else:
text.append(f"🚩 L'evento [b]{self._mmevent.title}[/b] è iniziato!")
# Description
if self._mmevent.description:
text.append(f"{self._mmevent.description}")
# Spacer
text.append("")
# Responses
responses = sorted(self._mmevent.responses, key=lambda r: mmchoice_sorting[r.choice])
for response in responses:
text.append(self.get_response_line(response))
# Result
return "\n".join(text)
@property
def delete_text(self) -> str:
return f"🗑 L'evento [b]{self._mmevent.title}[/b] è stato eliminato."
def get_answer_callback(self, choice: MMChoice):
async def callback(data: rc.CommandData):
# Find the user who clicked on the button
user = await data.get_author(error_if_none=True)
# Get the related MMEvent
mmevent: MMEvent = await asyncify(data.session.query(self._EventT).get, self.mmid)
# Check if the user had already responded
mmresponse: MMResponse = await asyncify(
data.session.query(self._ResponseT).filter_by(user=user, mmevent=mmevent).one_or_none
)
if mmresponse is None:
# If they didn't respond, create a new MMResponse
mmresponse = self._ResponseT(user=user, mmevent=mmevent, choice=choice)
data.session.add(mmresponse)
# Drop fiorygi
if random.randrange(100) < self.command.config["Matchmaking"]["fiorygi_award_chance"]:
await FiorygiTransaction.spawn_fiorygi(data, user, 1, "aver risposto a un matchmaking")
else:
# Change their response
mmresponse.choice = choice
await data.session_commit()
await self.telegram_channel_message_update()
await data.reply(f"{choice.value} Hai risposto al matchmaking!")
return callback
def get_delete_callback(self):
async def callback(data: rc.CommandData):
# Find the user who clicked on the button
user = await data.get_author(error_if_none=True)
# Get the related MMEvent
mmevent: MMEvent = await asyncify(data.session.query(self._EventT).get, self.mmid)
# Ensure the user has the required roles to start the matchmaking
if user != mmevent.creator and "admin" not in user.roles:
raise rc.UserError("Non hai i permessi per eliminare questo matchmaking!")
# Interrupt the matchmaking with the MANUAL_DELETE reason
await self.queue.put(Interrupts.MANUAL_DELETE)
await data.reply(f"🗑 Evento eliminato!")
return callback
def get_start_callback(self):
async def callback(data: rc.CommandData):
# Find the user who clicked on the button
user = await data.get_author(error_if_none=True)
# Get the related MMEvent
mmevent: MMEvent = await asyncify(data.session.query(self._EventT).get, self.mmid)
# Ensure the user has the required roles to start the matchmaking
if user != mmevent.creator and "admin" not in user.roles:
raise rc.UserError("Non hai i permessi per eliminare questo matchmaking!")
# Interrupt the matchmaking with the MANUAL_DELETE reason
await self.queue.put(Interrupts.MANUAL_START)
await data.reply(f"🚩 Evento avviato!")
return callback
@property
def royalnet_keyboard(self):
# noinspection PyListCreation
rows = []
rows.append([
rc.KeyboardKey(
interface=self.command.interface,
short=f"{MMChoice.YES.value}",
text="Ci sarò!",
callback=self.get_answer_callback(MMChoice.YES)
),
rc.KeyboardKey(
interface=self.command.interface,
short=f"{MMChoice.MAYBE.value}",
text="Forse...",
callback=self.get_answer_callback(MMChoice.MAYBE)
),
rc.KeyboardKey(
interface=self.command.interface,
short=f"{MMChoice.NO.value}",
text="Non mi interessa.",
callback=self.get_answer_callback(MMChoice.NO)
),
])
if self._mmevent.datetime is not None:
rows.append([
rc.KeyboardKey(
interface=self.command.interface,
short=f"{MMChoice.LATE_SHORT.value}",
text="10 min",
callback=self.get_answer_callback(MMChoice.LATE_SHORT)
),
rc.KeyboardKey(
interface=self.command.interface,
short=f"{MMChoice.LATE_MEDIUM.value}",
text="30 min",
callback=self.get_answer_callback(MMChoice.LATE_MEDIUM)
),
rc.KeyboardKey(
interface=self.command.interface,
short=f"{MMChoice.LATE_LONG.value}",
text="60 min",
callback=self.get_answer_callback(MMChoice.LATE_LONG)
)
])
rows.append([
rc.KeyboardKey(
interface=self.command.interface,
short=f"🗑",
text="Elimina",
callback=self.get_delete_callback()
),
rc.KeyboardKey(
interface=self.command.interface,
short=f"🚩",
text="Inizia",
callback=self.get_start_callback()
),
])
return rows
@property
def telegram_keyboard(self):
# noinspection PyListCreation
rows = []
key_id = 0
for r_row in self.royalnet_keyboard:
row = []
for r_key in r_row:
# Generate a unique callback string
callback_str = f"mm{self.mmid}_{key_id}"
# Create a InlineKeyboardButton with that callback string
row.append(InKB(f"{r_key.short} {r_key.text}", callback_data=callback_str))
# Increase the key_id
key_id += 1
rows.append(row)
# Return the resulting InlineKeyboardMarkup
return InKM(rows)
def register_telegram_keyboard(self, inkm: InKM):
# noinspection PyListCreation
royalnet_keyboard = self.royalnet_keyboard
for x, row in enumerate(inkm.inline_keyboard):
for y, key in enumerate(row):
key: InKB
self.command.interface.serf.register_keyboard_key(key.callback_data, key=royalnet_keyboard[x][y])
def unregister_telegram_keyboard(self, inkm: InKM):
for row in inkm.inline_keyboard:
for key in row:
key: InKB
self.command.interface.serf.unregister_keyboard_key(key.callback_data)
async def wait_until_due(self):
"""When the event is due, interrupt the MMTask with the TIME_RAN_OUT reason."""
if self._mmevent.datetime is None:
return
await sleep_until(self._mmevent.datetime)
await self.queue.put(Interrupts.TIME_RAN_OUT)
@property
def telegram_channel_id(self):
return self.command.config["Matchmaking"]["mm_telegram_channel_id"]
@property
def telegram_group_id(self):
return self.command.config["Matchmaking"]["mm_telegram_group_id"]
@contextlib.asynccontextmanager
async def telegram_channel_message(self):
# Generate the InlineKeyboardMarkup
inkm = self.telegram_keyboard
# Bind the Royalnet buttons to the Telegram keyboard
log.debug(f"Registering keyboard for: {self.mmid}")
self.register_telegram_keyboard(inkm)
# If the event has no associated interface data...
if self._mmevent.interface_data is None:
# Send the channel message
log.debug(f"Sending message for: {self.mmid}")
message: PTBMessage = await self.command.interface.serf.api_call(
self.command.interface.serf.client.send_message,
chat_id=self.telegram_channel_id,
text=telegram_escape(self.channel_text),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=inkm
)
# Register the interface data on the database
self._mmevent.interface_data = MMInterfaceDataTelegram(
chat_id=self.telegram_channel_id,
message_id=message.message_id
)
self._session.commit()
# Wait until the event starts
yield
# Delete the channel message
log.debug(f"Deleting message for: {self.mmid}")
await self.command.interface.serf.api_call(
self.command.interface.serf.client.delete_message,
chat_id=self._mmevent.interface_data.chat_id,
message_id=self._mmevent.interface_data.message_id
)
# Unregister the Telegram keyboard bindings
log.debug(f"Unregistering keyboard for: {self.mmid}")
self.unregister_telegram_keyboard(inkm)
async def telegram_channel_message_update(self):
try:
await self.command.interface.serf.api_call(
self.command.interface.serf.client.edit_message_text,
chat_id=self._mmevent.interface_data.chat_id,
text=telegram_escape(self.channel_text),
message_id=self._mmevent.interface_data.message_id,
parse_mode="HTML",
disable_web_page_preview=True,
reply_markup=self.telegram_keyboard
)
except TelegramError as e:
log.warning(f"TelegramError during update: {e}")
async def telegram_group_message_start(self):
await self.command.interface.serf.api_call(
self.command.interface.serf.client.send_message,
chat_id=self.telegram_group_id,
text=telegram_escape(self.start_text),
parse_mode="HTML",
disable_webpage_preview=True
)
async def telegram_group_message_delete(self):
await self.command.interface.serf.api_call(
self.command.interface.serf.client.send_message,
chat_id=self.telegram_group_id,
text=telegram_escape(self.delete_text),
parse_mode="HTML",
disable_webpage_preview=True
)
def start(self):
log.debug(f"Starting task for: {self.mmid}")
self.task = self.loop.create_task(self.run())
@sentry_async_wrap()
async def run(self):
log.debug(f"Running task for: {self.mmid}")
# Create a new session for the MMTask
self._session = self.command.alchemy.Session()
self._EventT = self.command.alchemy.get(MMEvent)
self._ResponseT = self.command.alchemy.get(MMResponse)
self._mmevent: MMEvent = self._session.query(self._EventT).get(self.mmid)
if self._mmevent is None:
raise rc.InvalidInputError(f"No event exists with the mmid {self.mmid}.")
if self._mmevent.interface != "telegram":
raise rc.UnsupportedError("Currently only the Telegram interface is supported.")
async with self.telegram_channel_message():
self.loop.create_task(self.wait_until_due())
# Sleep until something interrupts the task
interrupt = await self.queue.get()
# Mark the event as interrupted
self._mmevent.interrupted = True
await self._session.commit()
# Send a group notification if the MMEvent wasn't deleted
if interrupt != Interrupts.MANUAL_DELETE:
await self.telegram_group_message_start()
else:
await self.telegram_group_message_delete()
# Close the database session
await asyncify(self._session.close)
log = logging.getLogger(__name__)
class MatchmakingCommand(rc.Command):
name: str = "matchmaking"
description: str = "Cerca persone per una partita a qualcosa!"
syntax: str = "[ {ora} ] {nome}\n[descrizione]"
syntax: str = ""
aliases = ["mm", "lfg"]
tables = {MMEvent, MMResponse}
def __init__(self, interface: rc.CommandInterface):
super().__init__(interface)
# Find all relevant MMEvents and run them
# Find all active MMEvents and run the tasks for them
session = self.alchemy.Session()
mmevents = (
session
.query(self.alchemy.get(MMEvent))
.filter(self.alchemy.get(MMEvent).interface == self.interface.name,
self.alchemy.get(MMEvent).datetime > datetime.datetime.now(),
self.alchemy.get(MMEvent).interrupted == False)
.all()
)
self.tasks_created = {}
self.queue: Dict[int, aio.queues.Queue] = {}
for mmevent in mmevents:
task = self.interface.loop.create_task(self._run_mmevent(mmevent.mmid))
self.tasks_created[mmevent.mmid] = task
# Create a new MMEvent and run it
if self.interface.name == "telegram":
MMEventT = self.alchemy.get(MMEvent)
active_mmevents = (
session
.query(MMEventT)
.filter(
MMEventT.interface == self.interface.name,
MMEventT.interrupted == False
)
.all()
)
for mmevent in active_mmevents:
task = MMTask(mmevent.mmid, command=self)
task.start()
@staticmethod
def _parse_args(args) -> Tuple[Optional[datetime.datetime], str, str]:
"""Parse command arguments, either using the standard syntax or the Proto syntax."""
try:
timestring, title, description = args.match(r"(?:\[\s*([^]]+)\s*]\s*)?([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
except rc.InvalidInputError:
timestring, title, description = args.match(r"(?:\s*(.+?)\s*\n\s*)?([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
if timestring is not None:
try:
dt: typing.Optional[datetime.datetime] = dateparser.parse(timestring, settings={
"PREFER_DATES_FROM": "future"
})
except OverflowError:
dt = None
if dt is None:
raise rc.InvalidInputError("La data che hai specificato non è valida.")
if dt <= datetime.datetime.now():
raise rc.InvalidInputError("La data che hai specificato è nel passato.")
if dt - datetime.datetime.now() >= datetime.timedelta(days=366):
raise rc.InvalidInputError("Hai specificato una data tra più di un anno!\n"
"Se volevi scrivere un'orario, ricordati che le ore sono separate da "
"due punti (:) e non da punto semplice!")
else:
dt = None
return dt, title, description
async def run(self, args: rc.CommandArgs, data: rc.CommandData) -> None:
# Create a new MMEvent and run it
if self.interface.name != "telegram":
raise rc.UnsupportedError(f"{self.interface.prefix}matchmaking funziona solo su Telegram. Per ora.")
"""Handle a matchmaking command call."""
author = await data.get_author(error_if_none=True)
try:
timestring, title, description = args.match(r"\[\s*([^]]+)\s*]\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
except rc.InvalidInputError:
timestring, title, description = args.match(r"\s*(.+?)\s*\n\s*([^\n]+)\s*\n?\s*(.+)?\s*", re.DOTALL)
try:
dt: typing.Optional[datetime.datetime] = dateparser.parse(timestring, settings={
"PREFER_DATES_FROM": "future"
})
except OverflowError:
dt = None
if dt is None:
raise rc.InvalidInputError("La data che hai specificato non è valida.")
if dt <= datetime.datetime.now():
raise rc.InvalidInputError("La data che hai specificato è nel passato.")
if dt - datetime.datetime.now() >= datetime.timedelta(days=366):
raise rc.InvalidInputError("Hai specificato una data tra più di un anno!\n"
"Se volevi scrivere un'orario, ricordati che le ore sono separate da due punti"
" (:) e non da punto semplice!")
# Parse the arguments, either with the standard syntax or with the Proto syntax
dt, title, description = self._parse_args(args)
# Add the MMEvent to the database
mmevent: MMEvent = self.alchemy.get(MMEvent)(creator=author,
datetime=dt,
title=title,
@ -87,289 +519,9 @@ class MatchmakingCommand(rc.Command):
interface=self.interface.name)
data.session.add(mmevent)
await data.session_commit()
self.loop.create_task(self._run_mmevent(mmevent.mmid))
# Create and run a task for the newly created MMEvent
task = MMTask(mmevent.mmid, command=self)
task.start()
await data.reply(f"🚩 Matchmaking creato!")
_mmchoice_values = {
MMChoice.YES: 4,
MMChoice.LATE_SHORT: 3,
MMChoice.LATE_MEDIUM: 2,
MMChoice.LATE_LONG: 1,
MMChoice.MAYBE: 0,
MMChoice.NO: -1
}
def _gen_mm_message(self, mmevent: MMEvent) -> str:
text = f"🚩 [{mmevent.datetime.strftime('%Y-%m-%d %H:%M')}] [b]{mmevent.title}[/b]\n"
if mmevent.description:
text += f"{mmevent.description}\n"
text += "\n"
for response in sorted(mmevent.responses, key=lambda r: -self._mmchoice_values[r.choice]):
response: MMResponse
if response.choice == MMChoice.LATE_SHORT:
td = mmevent.datetime + datetime.timedelta(minutes=10)
time_text = f" [{td.strftime('%H:%M')}]"
elif response.choice == MMChoice.LATE_MEDIUM:
td = mmevent.datetime + datetime.timedelta(minutes=30)
time_text = f" [{td.strftime('%H:%M')}]"
elif response.choice == MMChoice.LATE_LONG:
td = mmevent.datetime + datetime.timedelta(minutes=60)
time_text = f" [{td.strftime('%H:%M')}+]"
else:
time_text = ""
creator_crown = " 👑" if response.user == mmevent.creator else ""
text += f"{response.choice.value} {response.user}{time_text}{creator_crown}\n"
return text
@staticmethod
def _gen_telegram_keyboard(mmevent: MMEvent):
if mmevent.datetime <= datetime.datetime.now():
return None
return InKM([
[
InKB(f"{MMChoice.YES.value} Ci sarò!",
callback_data=f"mm{mmevent.mmid}_YES"),
InKB(f"{MMChoice.MAYBE.value} Forse...",
callback_data=f"mm{mmevent.mmid}_MAYBE"),
InKB(f"{MMChoice.NO.value} Non mi interessa.",
callback_data=f"mm{mmevent.mmid}_NO"),
],
[
InKB(f"{MMChoice.LATE_SHORT.value} 10 min",
callback_data=f"mm{mmevent.mmid}_LATE_SHORT"),
InKB(f"{MMChoice.LATE_MEDIUM.value} 30 min",
callback_data=f"mm{mmevent.mmid}_LATE_MEDIUM"),
InKB(f"{MMChoice.LATE_LONG.value} 60+ min",
callback_data=f"mm{mmevent.mmid}_LATE_LONG"),
],
[
InKB(f"🗑 Elimina",
callback_data=f"mm{mmevent.mmid}_DELETE"),
InKB(f"🚩 Inizia",
callback_data=f"mm{mmevent.mmid}_START"),
]
])
async def _update_telegram_mm_message(self, client: PTBBot, mmevent: MMEvent):
try:
await self.interface.serf.api_call(client.edit_message_text,
chat_id=self.config["Matchmaking"]["mm_chat_id"],
text=telegram_escape(self._gen_mm_message(mmevent)),
message_id=mmevent.interface_data.message_id,
parse_mode="HTML",
disable_web_page_preview=True,
reply_markup=self._gen_telegram_keyboard(mmevent))
except TelegramError:
pass
def _gen_mm_telegram_callback(self, client: PTBBot, mmid: int, choice: MMChoice):
async def callback(data: rc.CommandData):
author = await data.get_author(error_if_none=True)
mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.get(MMEvent)).get, mmid)
mmresponse: MMResponse = await asyncify(
data.session.query(self.alchemy.get(MMResponse)).filter_by(user=author, mmevent=mmevent).one_or_none)
if mmresponse is None:
mmresponse = self.alchemy.get(MMResponse)(user=author, mmevent=mmevent, choice=choice)
data.session.add(mmresponse)
if random.randrange(14) == 0:
await FiorygiTransaction.spawn_fiorygi(data, author, 1, "aver risposto a un matchmaking")
else:
mmresponse.choice = choice
await data.session_commit()
await self._update_telegram_mm_message(client, mmevent)
await data.reply(f"{choice.value} Messaggio ricevuto!")
return callback
def _gen_mm_telegram_delete(self, client, mmid: int):
async def callback(data: rc.CommandData):
author: User = await data.get_author(error_if_none=True)
mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.get(MMEvent)).get, mmid)
if author != mmevent.creator and "admin" not in author.roles:
raise rc.UserError("Non sei il creatore di questo matchmaking!")
await self.queue[mmid].put(Interrupts.MANUAL_DELETE)
await data.reply(f"🗑 Evento eliminato!")
return callback
def _gen_mm_telegram_start(self, client, mmid: int):
async def callback(data: rc.CommandData):
author = await data.get_author(error_if_none=True)
mmevent: MMEvent = await asyncify(data.session.query(self.alchemy.get(MMEvent)).get, mmid)
if author != mmevent.creator and "admin" not in author.roles:
raise rc.UserError("Non sei il creatore di questo matchmaking!")
await self.queue[mmid].put(Interrupts.MANUAL_START)
await data.reply(f"🚩 Evento avviato!")
return callback
async def _set_event_after(self, mmid: int, dt: datetime.datetime):
await sleep_until(dt)
if mmid in self.queue:
await self.queue[mmid].put(Interrupts.TIME_RAN_OUT)
def _gen_event_start_message(self, mmevent: MMEvent):
text = f"🚩 L'evento [b]{mmevent.title}[/b] è iniziato!\n\n"
for response in sorted(mmevent.responses, key=lambda r: -self._mmchoice_values[r.choice]):
response: MMResponse
text += f"{response.choice.value} {response.user}\n"
return text
@staticmethod
def _gen_unauth_message(user: User):
return f"⚠️ Non sono autorizzato a mandare messaggi a [b]{user.username}[/b]!\n" \
f"{user.telegram[0].mention()}, apri una chat privata con me e mandami un messaggio!"
@sentry_async_wrap()
async def _run_mmevent(self, mmid: int):
"""Run a MMEvent."""
# Create the event in the dict
self.queue[mmid] = aio.Queue()
# Open a new Alchemy Session
session = self.alchemy.Session()
# Find the MMEvent with the current session
mmevent: MMEvent = await asyncify(session.query(self.alchemy.get(MMEvent)).get, mmid)
if mmevent is None:
raise ValueError("Invalid mmid.")
# Ensure the MMEvent hasn't already started
if mmevent.datetime <= datetime.datetime.now():
raise ValueError("MMEvent has already started.")
# Ensure the MMEvent interface matches the current one
if mmevent.interface != self.interface.name:
raise ValueError("Invalid interface.")
# If the matchmaking message hasn't been sent yet, do so now
if mmevent.interface_data is None:
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.serf
client: PTBBot = bot.client
# Send the keyboard
message: PTBMessage = await self.interface.serf.api_call(
client.send_message,
chat_id=self.config["Matchmaking"]["mm_chat_id"],
text=telegram_escape(
self._gen_mm_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=self._gen_telegram_keyboard(mmevent)
)
# Store message data in the interface data object
mmevent.interface_data = MMInterfaceDataTelegram(chat_id=self.config["Matchmaking"]["mm_chat_id"],
message_id=message.message_id)
await asyncify(session.commit)
else:
raise rc.UnsupportedError()
# Register handlers for the keyboard events
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.serf
client: PTBBot = bot.client
bot.register_keyboard_key(f"mm{mmevent.mmid}_YES", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.YES.value}",
text="Ci sarò!",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.YES)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.LATE_SHORT.value}",
text="10 min",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_SHORT)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.LATE_MEDIUM.value}",
text="30 min",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_MEDIUM)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.LATE_LONG.value}",
text="60 min",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.LATE_LONG)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_MAYBE", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.MAYBE.value}",
text="Forse...",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.MAYBE)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_NO", key=rc.KeyboardKey(
interface=self.interface,
short=f"{MMChoice.NO.value}",
text="Non mi interessa.",
callback=self._gen_mm_telegram_callback(client, mmid, MMChoice.NO)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_DELETE", key=rc.KeyboardKey(
interface=self.interface,
short=f"🗑",
text="Elimina",
callback=self._gen_mm_telegram_delete(client, mmid)
))
bot.register_keyboard_key(f"mm{mmevent.mmid}_START", key=rc.KeyboardKey(
interface=self.interface,
short=f"🚩",
text="Inizia",
callback=self._gen_mm_telegram_start(client, mmid)
))
else:
raise rc.UnsupportedError()
# Sleep until something interrupts
self.loop.create_task(self._set_event_after(mmid, mmevent.datetime))
interrupt = await self.queue[mmid].get()
mmevent.interrupted = True
await asyncify(session.commit)
del self.queue[mmid]
# Notify the positive answers of the event start
if self.interface.name == "telegram":
bot: TelegramBot = self.interface.serf
client: PTBBot = bot.client
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_YES")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_MAYBE")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_SHORT")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_MEDIUM")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_LATE_LONG")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_NO")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_DELETE")
bot.unregister_keyboard_key(f"mm{mmevent.mmid}_START")
await self.interface.serf.api_call(client.delete_message,
chat_id=mmevent.interface_data.chat_id,
message_id=mmevent.interface_data.message_id)
if interrupt == Interrupts.TIME_RAN_OUT or interrupt == Interrupts.MANUAL_START:
await asyncify(client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=telegram_escape(self._gen_event_start_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True)
for response in mmevent.responses:
if response.choice == MMChoice.NO:
return
try:
await asyncify(client.send_message,
chat_id=response.user.telegram[0].tg_id,
text=telegram_escape(self._gen_event_start_message(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True)
except TelegramError:
await self.interface.serf.api_call(
client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=telegram_escape(self._gen_unauth_message(response.user)),
parse_mode="HTML",
disable_webpage_preview=True
)
elif interrupt == Interrupts.MANUAL_DELETE:
await self.interface.serf.api_call(
client.send_message,
chat_id=self.config["Telegram"]["main_group_id"],
text=telegram_escape(f"🗑 L'evento [b]{mmevent.title}[/b] è stato annullato."),
parse_mode="HTML",
disable_webpage_preview=True
)
else:
raise rc.UnsupportedError()
# The end!
await asyncify(session.close)
del self.tasks_created[mmid]

View file

@ -5,7 +5,7 @@ from ..version import semantic
class RoyalpackCommand(rc.Command):
name: str = "royalpack"
name: str = "royalpackversion"
description: str = "Visualizza la versione attuale di Royalpack."

View file

@ -21,7 +21,7 @@ class MMEvent:
@declared_attr
def datetime(self):
return Column(DateTime, nullable=False)
return Column(DateTime)
@declared_attr
def title(self):

View file

@ -6,5 +6,5 @@ class MMChoice(enum.Enum):
LATE_SHORT = "🕐"
LATE_MEDIUM = "🕒"
LATE_LONG = "🕗"
MAYBE = ""
MAYBE = ""
NO = ""

View file

@ -1 +1 @@
semantic = "5.9.4"
semantic = "5.9.5"