1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-27 13:34:28 +00:00

Improve mm code

This commit is contained in:
Steffo 2019-09-05 19:29:54 +02:00
parent 3fb7dec8fe
commit 6583823c23
4 changed files with 367 additions and 295 deletions

View file

@ -29,7 +29,7 @@ class TelegramBot(GenericBot):
def _init_client(self): def _init_client(self):
"""Create the :py:class:`telegram.Bot`, and set the starting offset.""" """Create the :py:class:`telegram.Bot`, and set the starting offset."""
# https://github.com/python-telegram-bot/python-telegram-bot/issues/341 # https://github.com/python-telegram-bot/python-telegram-bot/issues/341
request = telegram.utils.request.Request(5) request = telegram.utils.request.Request(20, read_timeout=15)
self.client = telegram.Bot(self._telegram_config.token, request=request) self.client = telegram.Bot(self._telegram_config.token, request=request)
self._offset: int = -100 self._offset: int = -100
@ -62,9 +62,10 @@ class TelegramBot(GenericBot):
data.update = update data.update = update
async def reply(data, text: str): async def reply(data, text: str):
await asyncify(data.update.effective_chat.send_message, telegram_escape(text), await TelegramBot.safe_api_call(data.update.effective_chat.send_message,
parse_mode="HTML", telegram_escape(text),
disable_web_page_preview=True) parse_mode="HTML",
disable_web_page_preview=True)
async def get_author(data, error_if_none=False): async def get_author(data, error_if_none=False):
if data.update.message is not None: if data.update.message is not None:
@ -92,11 +93,11 @@ class TelegramBot(GenericBot):
press_id = uuid.uuid4() press_id = uuid.uuid4()
tg_keyboard.append([telegram.InlineKeyboardButton(key, callback_data=str(press_id))]) tg_keyboard.append([telegram.InlineKeyboardButton(key, callback_data=str(press_id))])
data.interface.register_keyboard_key(key_name=str(press_id), callback=keyboard[key]) data.interface.register_keyboard_key(key_name=str(press_id), callback=keyboard[key])
await asyncify(data.update.effective_chat.send_message, await TelegramBot.safe_api_call(data.update.effective_chat.send_message,
telegram_escape(text), telegram_escape(text),
reply_markup=telegram.InlineKeyboardMarkup(tg_keyboard), reply_markup=telegram.InlineKeyboardMarkup(tg_keyboard),
parse_mode="HTML", parse_mode="HTML",
disable_web_page_preview=True) disable_web_page_preview=True)
return TelegramData return TelegramData
@ -113,6 +114,31 @@ class TelegramBot(GenericBot):
self._telegram_config = telegram_config self._telegram_config = telegram_config
self._init_client() self._init_client()
@staticmethod
async def safe_api_call(f: typing.Callable, *args, **kwargs) -> typing.Optional:
while True:
try:
return await asyncify(f, *args, **kwargs)
except telegram.error.TimedOut as error:
log.debug(f"Timed out during {f.__qualname__} (retrying in 15s): {error}")
await asyncio.sleep(15)
continue
except telegram.error.NetworkError as error:
log.warning(f"Network error during {f.__qualname__} (retrying in 15s): {error}")
await asyncio.sleep(15)
continue
except telegram.error.Unauthorized as error:
log.info(f"Unauthorized to run {f.__qualname__} (skipping): {error}")
break
except telegram.error.RetryAfter as error:
log.warning(f"Rate limited during {f.__qualname__} (retrying in 15s): {error}")
await asyncio.sleep(15)
except telegram.error.TelegramError as error:
log.error(f"{error.__class__.__qualname__} during {f} (skipping): {error}")
sentry_sdk.capture_exception(error)
break
return None
async def _handle_update(self, update: telegram.Update): async def _handle_update(self, update: telegram.Update):
# Skip non-message updates # Skip non-message updates
if update.message is not None: if update.message is not None:
@ -136,7 +162,7 @@ class TelegramBot(GenericBot):
command_text, *parameters = text.split(" ") command_text, *parameters = text.split(" ")
command_name = command_text.replace(f"@{self.client.username}", "").lower() command_name = command_text.replace(f"@{self.client.username}", "").lower()
# Send a typing notification # Send a typing notification
update.message.chat.send_action(telegram.ChatAction.TYPING) await self.safe_api_call(update.message.chat.send_action, telegram.ChatAction.TYPING)
# Find the command # Find the command
try: try:
command = self.commands[command_name] command = self.commands[command_name]
@ -166,40 +192,31 @@ class TelegramBot(GenericBot):
callback = command.interface.keys_callbacks[query.data] callback = command.interface.keys_callbacks[query.data]
break break
if callback is None: if callback is None:
await asyncify(source.edit_reply_markup, reply_markup=None) await self.safe_api_call(source.edit_reply_markup, reply_markup=None)
await asyncify(query.answer, text="⛔️ This keyboard has expired.") await self.safe_api_call(query.answer, text="⛔️ This keyboard has expired.")
return return
try: try:
response = await callback(data=self._Data(interface=command.interface, update=update)) response = await callback(data=self._Data(interface=command.interface, update=update))
except KeyboardExpiredError: except KeyboardExpiredError:
# FIXME: May cause a memory leak, as keys are not deleted after use # FIXME: May cause a memory leak, as keys are not deleted after use
await asyncify(source.edit_reply_markup, reply_markup=None) await self.safe_api_call(source.edit_reply_markup, reply_markup=None)
await asyncify(query.answer, text="⛔️ This keyboard has expired.") await self.safe_api_call(query.answer, text="⛔️ This keyboard has expired.")
return return
except Exception as e: except Exception as e:
error_text = f"⛔️ {e.__class__.__name__}\n" error_text = f"⛔️ {e.__class__.__name__}\n"
error_text += '\n'.join(e.args) error_text += '\n'.join(e.args)
await asyncify(query.answer, text=error_text) await self.safe_api_call(query.answer, text=error_text)
return return
else: else:
await asyncify(query.answer, text=response) await self.safe_api_call(query.answer, text=response)
async def run(self): async def run(self):
while True: while True:
# Get the latest 100 updates # Get the latest 100 updates
try: last_updates: typing.List[telegram.Update] = await self.safe_api_call(self.client.get_updates,
last_updates: typing.List[telegram.Update] = await asyncify(self.client.get_updates, offset=self._offset,
offset=self._offset, timeout=30,
timeout=30, read_latency=5.0)
read_latency=5.0)
except telegram.error.TimedOut as error:
log.debug("getUpdates timed out")
continue
except Exception as error:
log.error(f"Error while getting updates: {error.__class__.__name__} {error.args}")
sentry_sdk.capture_exception(error)
await asyncio.sleep(5)
continue
# Handle updates # Handle updates
for update in last_updates: for update in last_updates:
# noinspection PyAsyncCall # noinspection PyAsyncCall

View file

@ -1,4 +1,3 @@
import typing
import datetime import datetime
import dateparser import dateparser
import os import os
@ -24,59 +23,322 @@ class MmCommand(Command):
require_alchemy_tables = {MMEvent, MMDecision, MMResponse} require_alchemy_tables = {MMEvent, MMDecision, MMResponse}
@staticmethod _cycle_duration = 5
def _decision_string(mmevent: MMEvent) -> str:
return f"⚫️ Hai detto che forse parteciperai a [b]{mmevent.title}[/b] alle [b]{mmevent.datetime.strftime('%H:%M')}[/b].\n" \
f"Confermi di volerci essere?"
@staticmethod @staticmethod
def _decision_keyboard(mmevent: MMEvent) -> telegram.InlineKeyboardMarkup: def _main_keyboard(mmevent: MMEvent) -> typing.Optional[telegram.InlineKeyboardMarkup]:
return telegram.InlineKeyboardMarkup([ if mmevent.state == "WAITING":
[telegram.InlineKeyboardButton("🔵 Ci sarò!", callback_data=f"mm_{mmevent.mmid}_d_YES"), return telegram.InlineKeyboardMarkup([
telegram.InlineKeyboardButton("🔴 Non mi interessa...", callback_data=f"mm_{mmevent.mmid}_d_NO")] [telegram.InlineKeyboardButton("🔵 Ci sarò!", callback_data=f"mm_{mmevent.mmid}_d_YES")],
]) [telegram.InlineKeyboardButton("⚫️ Forse...", callback_data=f"mm_{mmevent.mmid}_d_MAYBE")],
[telegram.InlineKeyboardButton("🔴 Non mi interessa.", callback_data=f"mm_{mmevent.mmid}_d_NO")]
@staticmethod ])
def _response_string(mmevent: MMEvent, count: int = 0) -> str: elif mmevent.state == "DECISION":
if count == 0: return telegram.InlineKeyboardMarkup([
return f"🚩 E' ora di [b]{mmevent.title}[/b]!\n" \ [telegram.InlineKeyboardButton("🔵 Ci sarò!", callback_data=f"mm_{mmevent.mmid}_d_YES"),
f"Sei pronto?" telegram.InlineKeyboardButton("🔴 Non mi interessa...", callback_data=f"mm_{mmevent.mmid}_d_NO")]
])
elif mmevent.state == "READY_CHECK":
return telegram.InlineKeyboardMarkup([
[telegram.InlineKeyboardButton("🚩 Avvia la partita", callback_data=f"mm_{mmevent.mmid}_start")]
])
elif mmevent.state == "STARTED":
return None
else: else:
return f"🕒 Sei in ritardo di [b]{count * 5}[/b] minuti per [b]{mmevent.title}[/b]...\n" \ raise ValueError(f"state is of an unknown value ({mmevent.state})")
f"Sei pronto?"
@staticmethod @staticmethod
def _response_keyboard(mmevent: MMEvent) -> telegram.InlineKeyboardMarkup: def _main_text(mmevent: MMEvent):
return telegram.InlineKeyboardMarkup([ text = f"🌐 [b]{mmevent.title}[/b] - [b]{mmevent.datetime.strftime('%Y-%m-%d %H:%M')}[/b]\n"
[telegram.InlineKeyboardButton("✅ Ci sono!", callback_data=f"mm_{mmevent.mmid}_r_YES")], if mmevent.description:
[telegram.InlineKeyboardButton("🕒 Aspettatemi 5 minuti!", callback_data=f"mm_{mmevent.mmid}_r_LATER")], text += f"{mmevent.description}\n"
[telegram.InlineKeyboardButton("❌ Non ci sono più, mi spiace.", callback_data=f"mm_{mmevent.mmid}_r_NO")] text += "\n"
]) if mmevent.state == "WAITING" or mmevent.state == "DECISION":
for mmdecision in sorted(mmevent.decisions, key=lambda mmd: mmd.royal.username):
mmdecision: "MMDecision"
if mmdecision.decision == "YES":
text += "🔵 "
elif mmdecision.decision == "MAYBE":
text += "⚫️ "
elif mmdecision.decision == "NO":
text += "🔴 "
else:
raise ValueError(f"decision is of an unknown value ({mmdecision.decision})")
text += f"{mmdecision.royal}\n"
elif mmevent.state == "READY_CHECK":
for mmresponse in sorted(mmevent.responses, key=lambda mmr: mmr.royal.username):
mmresponse: "MMResponse"
if mmresponse.response is None:
text += ""
elif mmresponse.response == "YES":
text += ""
elif mmresponse.response == "LATER":
text += "🕒 "
elif mmresponse.response == "NO":
text += ""
else:
raise ValueError(f"response is of an unknown value ({mmresponse.response})")
text += f"{mmresponse.royal}\n"
elif mmevent.state == "STARTED":
for mmresponse in sorted(mmevent.responses, key=lambda mmr: mmr.response, reverse=True):
if mmresponse.response == "YES":
text += f"{mmresponse.royal}\n"
elif mmresponse.response == "NO":
text += "{mmresponse.royal}\n"
return text
async def _update_message(self, mmevent: MMEvent) -> None: async def _run_mm(self, mmevent: MMEvent) -> None:
client: telegram.Bot = self.interface.bot.client client: telegram.Bot = self.interface.bot.client
try:
await asyncify(client.edit_message_text, async def update_message() -> None:
text=telegram_escape(str(mmevent)), try:
chat_id=os.environ["MM_CHANNEL_ID"], await self.interface.bot.safe_api_call(client.edit_message_text,
message_id=mmevent.message_id, text=telegram_escape(self._main_text(mmevent)),
parse_mode="HTML", chat_id=os.environ["MM_CHANNEL_ID"],
disable_web_page_preview=True, message_id=mmevent.message_id,
reply_markup=mmevent.main_keyboard()) parse_mode="HTML",
except telegram.error.BadRequest: disable_web_page_preview=True,
pass reply_markup=self._main_keyboard(mmevent))
except telegram.error.BadRequest:
pass
decision_string = f"⚫️ Hai detto che forse parteciperai a [b]{mmevent.title}[/b]" \
f" alle [b]{mmevent.datetime.strftime('%H:%M')}[/b].\n" \
f"Confermi di volerci essere? (Metti sì anche se arrivi un po' in ritardo!)"
decision_keyboard = telegram.InlineKeyboardMarkup([
[telegram.InlineKeyboardButton("🔵 Ci sarò!", callback_data=f"mm_{mmevent.mmid}_d_YES"),
telegram.InlineKeyboardButton("🔴 Non mi interessa più.", callback_data=f"mm_{mmevent.mmid}_d_NO")]
])
async def decision_yes(data: CommandData):
royal = await data.get_author()
mmdecision: MMDecision = await asyncify(
self.interface.session.query(self.interface.alchemy.MMDecision).filter_by(mmevent=mmevent,
royal=royal).one_or_none)
if mmdecision is None:
mmdecision: MMDecision = self.interface.alchemy.MMDecision(royal=royal,
mmevent=mmevent,
decision="YES")
self.interface.session.add(mmdecision)
else:
mmdecision.decision = "YES"
await asyncify(self.interface.session.commit)
await update_message()
return "🔵 Hai detto che ci sarai!"
async def decision_maybe(data: CommandData):
royal = await data.get_author()
mmdecision: MMDecision = await asyncify(
self.interface.session.query(self.interface.alchemy.MMDecision).filter_by(mmevent=mmevent,
royal=royal).one_or_none)
if mmdecision is None:
mmdecision: MMDecision = self.interface.alchemy.MMDecision(royal=royal,
mmevent=mmevent,
decision="MAYBE")
self.interface.session.add(mmdecision)
else:
mmdecision.decision = "MAYBE"
# Can't asyncify this
self.interface.session.commit()
await update_message()
return "⚫️ Hai detto che forse ci sarai. Rispondi al messaggio di conferma 5 minuti prima dell'inizio!"
async def decision_no(data: CommandData):
royal = await data.get_author()
mmdecision: MMDecision = await asyncify(
self.interface.session.query(self.interface.alchemy.MMDecision).filter_by(mmevent=mmevent,
royal=royal).one_or_none)
if mmdecision is None:
mmdecision: MMDecision = self.interface.alchemy.MMDecision(royal=royal,
mmevent=mmevent,
decision="NO")
self.interface.session.add(mmdecision)
else:
mmdecision.decision = "NO"
# Can't asyncify this
self.interface.session.commit()
await update_message()
return "🔴 Hai detto che non ti interessa."
def response_string() -> str:
delay = (datetime.datetime.now() - mmevent.datetime).total_seconds()
if delay > 60:
return f"🚩 E' ora di [b]{mmevent.title}[/b]!\n" \
f"Sei pronto?"
return f"🕒 Sei in ritardo di [b]{int(delay / 60)} minuti[/b] per [b]{mmevent.title}[/b]...\n" \
f"Sei pronto?"
response_keyboard = telegram.InlineKeyboardMarkup([
[telegram.InlineKeyboardButton("✅ Ci sono!",
callback_data=f"mm_{mmevent.mmid}_r_YES")],
[telegram.InlineKeyboardButton("🕒 Aspettatemi ancora un po'!",
callback_data=f"mm_{mmevent.mmid}_r_LATER")],
[telegram.InlineKeyboardButton("❌ Non vengo più, mi spiace.",
callback_data=f"mm_{mmevent.mmid}_r_NO")]
])
async def response_yes(data: CommandData):
royal = await data.get_author()
mmresponse: MMResponse = await asyncify(
self.interface.session.query(self.interface.alchemy.MMResponse).filter_by(mmevent=mmevent,
royal=royal).one_or_none)
mmresponse.response = "YES"
# Can't asyncify this
self.interface.session.commit()
await update_message()
return "✅ Sei pronto!"
async def response_later(data: CommandData):
royal = await data.get_author()
mmresponse: MMResponse = await asyncify(
self.interface.session.query(self.interface.alchemy.MMResponse).filter_by(mmevent=mmevent,
royal=royal).one_or_none)
mmresponse.response = "LATER"
# Can't asyncify this
self.interface.session.commit()
await update_message()
return f"🕒 Hai chiesto agli altri di aspettarti {self._cycle_duration} minuti."
async def response_no(data: CommandData):
royal = await data.get_author()
mmresponse: MMResponse = await asyncify(
self.interface.session.query(self.interface.alchemy.MMResponse).filter_by(mmevent=mmevent,
royal=royal).one_or_none)
mmresponse.response = "NO"
# Can't asyncify this
self.interface.session.commit()
await update_message()
return "❌ Hai detto che non ci sarai."
def started_string():
text = f"🚩 L'evento [b]{mmevent.title}[/b] è iniziato!\n\n" \
f"Parteciperanno:"
for mmresponse in sorted(mmevent.responses, key=lambda mmr: mmr.response, reverse=True):
if mmresponse.response == "YES":
text += f"{mmresponse.royal}\n"
elif mmresponse.response == "NO":
text += "{mmresponse.royal}\n"
return text
started_without_you_string = f"🚩 Non hai confermato la tua presenza in tempo e [b]{mmevent.title}[/b] è" \
f" iniziato senza di te.\n" \
f"Mi dispiace!"
async def start_event():
mmevent.state = "STARTED"
for mmresponse in mmevent.responses:
if mmresponse.response is None:
mmresponse.response = "NO"
if mmresponse.response == "LATER":
mmresponse.response = "NO"
if mmresponse.response == "YES":
await self.interface.bot.safe_api_call(client.send_message,
chat_id=mmresponse.royal.telegram[0].tg_id,
text=telegram_escape(started_string()),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=response_keyboard)
else:
await self.interface.bot.safe_api_call(client.send_message,
chat_id=mmresponse.royal.telegram[0].tg_id,
text=telegram_escape(started_without_you_string),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=response_keyboard)
await asyncify(self.interface.session.commit)
await update_message()
async def start_key(data: CommandData):
royal = await data.get_author()
if royal == mmevent.creator:
await start_event()
if mmevent.state == "WAITING":
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_d_YES", decision_yes)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_d_MAYBE", decision_maybe)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_d_NO", decision_no)
await sleep_until(mmevent.datetime - datetime.timedelta(minutes=10))
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_d_YES")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_d_MAYBE")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_d_NO")
mmevent.state = "DECISION"
for mmdecision in mmevent.decisions:
mmdecision: MMDecision
if mmdecision.decision == "MAYBE":
await self.interface.bot.safe_api_call(client.send_message,
chat_id=mmdecision.royal.telegram[0].tg_id,
text=telegram_escape(decision_string),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=decision_keyboard)
await asyncify(self.interface.session.commit)
await update_message()
if mmevent.state == "DECISION":
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_d_YES", decision_yes)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_d_NO", decision_no)
await sleep_until(mmevent.datetime)
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_d_YES")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_d_NO")
mmevent.state = "READY_CHECK"
for mmdecision in mmevent.decisions:
if mmdecision.decision == "MAYBE":
mmdecision.decision = "NO"
elif mmdecision.decision == "YES":
mmresponse: MMResponse = self.interface.alchemy.MMResponse(royal=mmdecision.royal, mmevent=mmevent)
self.interface.session.add(mmresponse)
await asyncify(self.interface.session.commit)
await update_message()
if mmevent.state == "READY_CHECK":
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_r_YES", response_yes)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_r_LATER", response_later)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_r_NO", response_no)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_forcestart", start_key)
cycle = 0
while True:
for mmresponse in mmevent.responses:
# Send messages
if mmresponse.response is None:
await self.interface.bot.safe_api_call(client.send_message,
chat_id=mmresponse.royal.telegram[0].tg_id,
text=telegram_escape(response_string()),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=response_keyboard)
# Wait
await asyncio.sleep(60 * self._cycle_duration)
# Advance cycle
for mmresponse in mmevent.responses:
if mmresponse.response is None:
mmresponse.response = "NO"
if mmresponse.response == "LATER":
mmresponse.response = None
# Check if the event can start
for mmresponse in mmevent.responses:
if mmresponse.response is None:
break
else:
break
cycle += 1
await start_event()
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_r_YES")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_r_LATER")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_r_NO")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_start")
async def run(self, args: CommandArgs, data: CommandData) -> None: async def run(self, args: CommandArgs, data: CommandData) -> None:
if self.interface.name != "telegram": if self.interface.name != "telegram":
raise UnsupportedError("mm is supported only on Telegram") raise UnsupportedError("mm is supported only on Telegram")
client: telegram.Bot = self.interface.bot.client client: telegram.Bot = self.interface.bot.client
creator = await data.get_author(error_if_none=True) creator = await data.get_author(error_if_none=True)
try: try:
timestring, title, description = args.match(r"\[\s*([^]]+)\s*]\s*([^\n]+)\s*\n?\s*(.+)?\s*") timestring, title, description = args.match(r"\[\s*([^]]+)\s*]\s*([^\n]+)\s*\n?\s*(.+)?\s*")
except InvalidInputError: except InvalidInputError:
timestring, title, description = args.match(r"\s*(.+?)\s*\n\s*([^\n]+)\s*\n?\s*(.+)?\s*") timestring, title, description = args.match(r"\s*(.+?)\s*\n\s*([^\n]+)\s*\n?\s*(.+)?\s*")
try: try:
dt: typing.Optional[datetime.datetime] = dateparser.parse(timestring) dt: typing.Optional[datetime.datetime] = dateparser.parse(timestring)
except OverflowError: except OverflowError:
@ -87,7 +349,6 @@ class MmCommand(Command):
if dt <= datetime.datetime.now(): if dt <= datetime.datetime.now():
await data.reply("⚠️ La data che hai specificato è nel passato.") await data.reply("⚠️ La data che hai specificato è nel passato.")
return return
mmevent: MMEvent = self.interface.alchemy.MMEvent(creator=creator, mmevent: MMEvent = self.interface.alchemy.MMEvent(creator=creator,
datetime=dt, datetime=dt,
title=title, title=title,
@ -96,183 +357,15 @@ class MmCommand(Command):
self.interface.session.add(mmevent) self.interface.session.add(mmevent)
await asyncify(self.interface.session.commit) await asyncify(self.interface.session.commit)
async def decision_yes(data: CommandData):
royal = await data.get_author()
mmdecision: MMDecision = await asyncify(self.interface.session.query(self.interface.alchemy.MMDecision).filter_by(mmevent=mmevent, royal=royal).one_or_none)
if mmdecision is None:
mmdecision: MMDecision = self.interface.alchemy.MMDecision(royal=royal,
mmevent=mmevent,
decision="YES")
self.interface.session.add(mmdecision)
else:
mmdecision.decision = "YES"
# Can't asyncify this
self.interface.session.commit()
await self._update_message(mmevent)
return "🔵 Hai detto che ci sarai!"
async def decision_maybe(data: CommandData):
royal = await data.get_author()
mmdecision: MMDecision = await asyncify(self.interface.session.query(self.interface.alchemy.MMDecision).filter_by(mmevent=mmevent, royal=royal).one_or_none)
if mmdecision is None:
mmdecision: MMDecision = self.interface.alchemy.MMDecision(royal=royal,
mmevent=mmevent,
decision="MAYBE")
self.interface.session.add(mmdecision)
else:
mmdecision.decision = "MAYBE"
# Can't asyncify this
self.interface.session.commit()
await self._update_message(mmevent)
return "⚫️ Hai detto che forse ci sarai. Rispondi al messaggio di conferma 5 minuti prima dell'inizio!"
async def decision_no(data: CommandData):
royal = await data.get_author()
mmdecision: MMDecision = await asyncify(self.interface.session.query(self.interface.alchemy.MMDecision).filter_by(mmevent=mmevent, royal=royal).one_or_none)
if mmdecision is None:
mmdecision: MMDecision = self.interface.alchemy.MMDecision(royal=royal,
mmevent=mmevent,
decision="NO")
self.interface.session.add(mmdecision)
else:
mmdecision.decision = "NO"
# Can't asyncify this
self.interface.session.commit()
await self._update_message(mmevent)
return "🔴 Hai detto che non ti interessa."
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_d_YES", decision_yes)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_d_MAYBE", decision_maybe)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_d_NO", decision_no)
message: telegram.Message = await asyncify(client.send_message, message: telegram.Message = await asyncify(client.send_message,
chat_id=os.environ["MM_CHANNEL_ID"], chat_id=os.environ["MM_CHANNEL_ID"],
text=telegram_escape(str(mmevent)), text=telegram_escape(self._main_text(mmevent)),
parse_mode="HTML", parse_mode="HTML",
disable_webpage_preview=True, disable_webpage_preview=True,
reply_markup=mmevent.main_keyboard()) reply_markup=self._main_keyboard(mmevent))
mmevent.message_id = message.message_id mmevent.message_id = message.message_id
# Can't asyncify this # Can't asyncify this
self.interface.session.commit()
await sleep_until(dt - datetime.timedelta(minutes=10))
mmevent.state = "DECISION"
await asyncify(self.interface.session.commit)
for mmdecision in mmevent.decisions:
mmdecision: MMDecision
if mmdecision.decision == "MAYBE":
await asyncify(client.send_message,
chat_id=mmdecision.royal.telegram[0].tg_id,
text=telegram_escape(self._decision_string(mmevent)),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=self._decision_keyboard(mmevent))
await self._update_message(mmevent)
await sleep_until(dt)
async def response_yes(data: CommandData):
royal = await data.get_author()
mmresponse: MMResponse = await asyncify(
self.interface.session.query(self.interface.alchemy.MMResponse).filter_by(mmevent=mmevent, royal=royal).one_or_none)
mmresponse.response = "YES"
# Can't asyncify this
self.interface.session.commit()
await self._update_message(mmevent)
return "✅ Sei pronto!"
async def response_later(data: CommandData):
royal = await data.get_author()
mmresponse: MMResponse = await asyncify(
self.interface.session.query(self.interface.alchemy.MMResponse).filter_by(mmevent=mmevent, royal=royal).one_or_none)
mmresponse.response = "LATER"
# Can't asyncify this
self.interface.session.commit()
await self._update_message(mmevent)
return "🕒 Hai chiesto agli altri di aspettarti 5 minuti."
async def response_no(data: CommandData):
royal = await data.get_author()
mmresponse: MMResponse = await asyncify(
self.interface.session.query(self.interface.alchemy.MMResponse).filter_by(mmevent=mmevent, royal=royal).one_or_none)
mmresponse.response = "NO"
# Can't asyncify this
self.interface.session.commit()
await self._update_message(mmevent)
return "❌ Hai detto che non ci sarai."
async def start_now():
mmevent.state = "STARTED"
for mmresponse in mmevent.responses:
if mmresponse.response is None:
mmresponse.response = "NO"
if mmresponse.response == "LATER":
mmresponse.response = "NO"
await self._update_message(mmevent)
await asyncify(self.interface.session.commit)
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_r_YES")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_r_LATER")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_r_NO")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_start")
async def start_key(data: CommandData):
royal = await data.get_author()
if royal == creator:
await start_now()
mmevent.state = "READY_CHECK"
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_d_YES")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_d_MAYBE")
self.interface.unregister_keyboard_key(f"mm_{mmevent.mmid}_d_NO")
for mmdecision in mmevent.decisions:
if mmdecision.decision == "MAYBE":
mmdecision.decision = "NO"
elif mmdecision.decision == "YES":
mmresponse: MMResponse = self.interface.alchemy.MMResponse(royal=mmdecision.royal, mmevent=mmevent)
self.interface.session.add(mmresponse)
await asyncify(self.interface.session.commit) await asyncify(self.interface.session.commit)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_r_YES", response_yes) await self._run_mm(mmevent)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_r_LATER", response_later)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_r_NO", response_no)
self.interface.register_keyboard_key(f"mm_{mmevent.mmid}_start", start_key)
count = 0
while True:
for mmresponse in mmevent.responses:
# Send messages
if mmresponse.response is None:
await asyncify(client.send_message,
chat_id=mmresponse.royal.telegram[0].tg_id,
text=telegram_escape(self._response_string(mmevent, count=count)),
parse_mode="HTML",
disable_webpage_preview=True,
reply_markup=self._response_keyboard(mmevent))
await self._update_message(mmevent)
# Wait
await asyncio.sleep(300)
if mmevent.state == "STARTED":
return
# Advance cycle
for mmresponse in mmevent.responses:
if mmresponse.response is None:
mmresponse.response = "NO"
if mmresponse.response == "LATER":
mmresponse.response = None
# Check if the event can start
for mmresponse in mmevent.responses:
if mmresponse.response is None:
break
else:
break
count += 1
await start_now()

View file

@ -54,60 +54,3 @@ class MMEvent:
def __repr__(self): def __repr__(self):
return f"<MMEvent {self.mmid}: {self.title}>" return f"<MMEvent {self.mmid}: {self.title}>"
def __str__(self):
text = f"🌐 [b]{self.title}[/b] - [b]{self.datetime.strftime('%Y-%m-%d %H:%M')}[/b]\n"
if self.description:
text += f"{self.description}\n"
text += "\n"
if self.state == "WAITING" or self.state == "DECISION":
for mmdecision in self.decisions:
mmdecision: "MMDecision"
if mmdecision.decision == "YES":
text += "🔵 "
elif mmdecision.decision == "MAYBE":
text += "⚫️ "
elif mmdecision.decision == "NO":
text += "🔴 "
else:
raise ValueError(f"decision is of an unknown value ({mmdecision.decision})")
text += f"{mmdecision.royal}\n"
elif self.state == "READY_CHECK":
for mmresponse in self.responses:
mmresponse: "MMResponse"
if mmresponse.response is None:
text += ""
elif mmresponse.response == "YES":
text += ""
elif mmresponse.response == "LATER":
text += "🕒 "
elif mmresponse.response == "NO":
text += ""
else:
raise ValueError(f"response is of an unknown value ({mmresponse.response})")
text += f"{mmresponse.royal}\n"
elif self.state == "STARTED":
for mmresponse in self.responses:
if mmresponse.response == "YES":
text += f"{mmresponse.royal}\n"
return text
def main_keyboard(self) -> typing.Optional[telegram.InlineKeyboardMarkup]:
if self.state == "WAITING":
return telegram.InlineKeyboardMarkup([
[telegram.InlineKeyboardButton("🔵 Ci sarò!", callback_data=f"mm_{self.mmid}_d_YES")],
[telegram.InlineKeyboardButton("⚫️ Forse...", callback_data=f"mm_{self.mmid}_d_MAYBE")],
[telegram.InlineKeyboardButton("🔴 Non mi interessa.", callback_data=f"mm_{self.mmid}_d_NO")]
])
elif self.state == "DECISION":
return telegram.InlineKeyboardMarkup([
[telegram.InlineKeyboardButton("🔵 Ci sarò!", callback_data=f"mm_{self.mmid}_d_YES"),
telegram.InlineKeyboardButton("🔴 Non mi interessa...", callback_data=f"mm_{self.mmid}_d_NO")]
])
elif self.state == "READY_CHECK":
return telegram.InlineKeyboardMarkup([
[telegram.InlineKeyboardButton("🚩 Avvia la partita", callback_data=f"mm_{self.mmid}_start")]
])
elif self.state == "STARTED":
return None
else:
raise ValueError(f"state is of an unknown value ({self.state})")

View file

@ -24,6 +24,25 @@ sentry_dsn = os.environ.get("SENTRY_DSN")
# noinspection PyUnreachableCode # noinspection PyUnreachableCode
if __debug__: if __debug__:
commands = [ commands = [
CiaoruoziCommand,
ColorCommand,
CvCommand,
DiarioCommand,
Mp3Command,
PauseCommand,
PingCommand,
PlayCommand,
PlaymodeCommand,
QueueCommand,
RageCommand,
ReminderCommand,
ShipCommand,
SkipCommand,
SmecdsCommand,
SummonCommand,
VideochannelCommand,
TriviaCommand,
MmCommand
] ]
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
else: else: