1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

Managed to get keyboards working

This commit is contained in:
Steffo 2020-01-16 02:24:59 +01:00
parent 9f491d9d1c
commit 4af07df6a8
12 changed files with 233 additions and 115 deletions

View file

@ -2,13 +2,21 @@
from .version import VersionCommand
from .exception import ExceptionCommand
from .excevent import ExceventCommand
from .keyboardtest import KeyboardtestCommand
# Enter the commands of your Pack here!
available_commands = [
VersionCommand,
]
# noinspection PyUnreachableCode
if __debug__:
available_commands = [
*available_commands,
ExceptionCommand,
ExceventCommand,
]
KeyboardtestCommand,
]
# Don't change this, it should automatically generate __all__
__all__ = [command.__name__ for command in available_commands]

View file

@ -8,6 +8,4 @@ class ExceptionCommand(Command):
description: str = "Raise an exception in the command."
async def run(self, args: CommandArgs, data: CommandData) -> None:
if not self.interface.cfg["exc_debug"]:
raise UserError(f"{self.interface.prefix}{self.name} is not enabled.")
raise Exception(f"{self.interface.prefix}{self.name} was called")

View file

@ -8,7 +8,5 @@ class ExceventCommand(Command):
description: str = "Call an event that raises an exception."
async def run(self, args: CommandArgs, data: CommandData) -> None:
if not self.interface.cfg["exc_debug"]:
raise UserError(f"{self.interface.prefix}{self.name} is not enabled.")
await self.interface.call_herald_event(self.interface.name, "exception")
await data.reply("✅ Event called!")

View file

@ -0,0 +1,28 @@
from typing import *
from royalnet.commands import *
import functools
import asyncio
class KeyboardtestCommand(Command):
name: str = "keyboardtest"
description: str = "Create a new keyboard with the specified keys."
syntax: str = "{keys}+"
@staticmethod
async def echo(data: CommandData, echo: str):
await data.reply(echo)
async def run(self, args: CommandArgs, data: CommandData) -> None:
keys = []
for arg in args:
# noinspection PyTypeChecker
keys.append(KeyboardKey(interface=self.interface,
short=arg[0],
text=arg,
callback=functools.partial(self.echo, echo=arg)))
async with data.keyboard("This is a test keyboard.", keys):
await asyncio.sleep(10)
await data.reply("The keyboard is no longer in scope.")

View file

@ -10,6 +10,7 @@ from .errors import CommandError, \
ExternalError, \
UserError, \
ProgramError
from .keyboardkey import KeyboardKey
__all__ = [
"CommandInterface",
@ -23,5 +24,6 @@ __all__ = [
"ExternalError",
"UserError",
"ProgramError",
"Event"
"Event",
"KeyboardKey",
]

View file

@ -1,37 +1,47 @@
from asyncio import AbstractEventLoop
from typing import Optional, TYPE_CHECKING
import contextlib
import logging
import asyncio as aio
from typing import *
from sqlalchemy.orm.session import Session
from .errors import UnsupportedError
from .commandinterface import CommandInterface
from ..utils import asyncify
from sqlalchemy.orm.session import Session
import royalnet.utils as ru
if TYPE_CHECKING:
from .keyboardkey import KeyboardKey
log = logging.getLogger(__name__)
class CommandData:
def __init__(self, interface: CommandInterface, session: Optional[Session], loop: AbstractEventLoop):
def __init__(self, interface: CommandInterface, loop: aio.AbstractEventLoop):
self._interface: CommandInterface = interface
self._session: Optional[Session] = session
self.loop: AbstractEventLoop = loop
self.loop: aio.AbstractEventLoop = loop
self._session = None
@property
def session(self) -> Session:
"""Get the :class:`~royalnet.alchemy.Alchemy` :class:`Session`, if it is available.
Raises:
UnsupportedError: if no session is available."""
def session(self):
if self._session is None:
raise UnsupportedError("'session' is not supported")
if self._interface.alchemy is None:
raise UnsupportedError("'alchemy' is not enabled on this Royalnet instance")
self._session = ru.asyncify(self._interface.alchemy.Session)
return self._session
async def session_commit(self):
"""Commit the changes to the session."""
await asyncify(self.session.commit)
if self._session:
log.warning("Session had to be created to be committed")
# noinspection PyUnresolvedReferences
await ru.asyncify(self.session.commit)
async def session_close(self):
if self._session is not None:
await ru.asyncify(self._session.close)
async def reply(self, text: str) -> None:
"""Send a text message to the channel where the call was made.
Parameters:
text: The text to be sent, possibly formatted in the weird undescribed markup that I'm using."""
raise UnsupportedError("'reply' is not supported")
raise UnsupportedError(f"'{self.reply.__name__}' is not supported")
async def get_author(self, error_if_none: bool = False):
"""Try to find the identifier of the user that sent the message.
@ -39,7 +49,7 @@ class CommandData:
Parameters:
error_if_none: Raise an exception if this is True and the call has no author."""
raise UnsupportedError("'get_author' is not supported")
raise UnsupportedError(f"'{self.get_author.__name__}' is not supported")
async def delete_invoking(self, error_if_unavailable=False) -> None:
"""Delete the invoking message, if supported by the interface.
@ -49,4 +59,9 @@ class CommandData:
Parameters:
error_if_unavailable: if True, raise an exception if the message cannot been deleted."""
if error_if_unavailable:
raise UnsupportedError("'delete_invoking' is not supported")
raise UnsupportedError(f"'{self.delete_invoking.__name__}' is not supported")
@contextlib.asynccontextmanager
async def keyboard(self, text, keys: List["KeyboardKey"]):
yield
raise UnsupportedError(f"{self.keyboard.__name__} is not supported")

View file

@ -1,5 +1,5 @@
from typing import *
from asyncio import AbstractEventLoop
import asyncio as aio
from .errors import UnsupportedError
if TYPE_CHECKING:
from .event import Event
@ -40,7 +40,7 @@ class CommandInterface:
return self.serf.alchemy
@property
def loop(self) -> AbstractEventLoop:
def loop(self) -> aio.AbstractEventLoop:
"""A shortcut for :attr:`.serf.loop`."""
if self.serf:
return self.serf.loop
@ -63,4 +63,4 @@ class CommandInterface:
You can run a function on a :class:`~royalnet.serf.discord.DiscordSerf` from a
:class:`~royalnet.serf.telegram.TelegramSerf`.
"""
raise UnsupportedError(f"{self.call_herald_event.__name__} is not supported on this platform")
raise UnsupportedError(f"{self.call_herald_event.__name__} is not supported on this platform.")

View file

@ -0,0 +1,18 @@
from typing import *
from .commandinterface import CommandInterface
from .commanddata import CommandData
class KeyboardKey:
def __init__(self,
interface: CommandInterface,
short: str,
text: str,
callback: Callable[[CommandData], Awaitable[None]]):
self.interface: CommandInterface = interface
self.short: str = short
self.text: str = text
self.callback: Callable[[CommandData], Awaitable[None]] = callback
async def press(self, data: CommandData):
await self.callback(data)

View file

@ -3,7 +3,7 @@ import logging
import warnings
from typing import *
import royalnet.backpack as rb
from royalnet.commands import *
import royalnet.commands as rc
from royalnet.utils import asyncify
from royalnet.serf import Serf
from .escape import escape
@ -65,7 +65,9 @@ class DiscordSerf(Serf):
self.voice_players: List[VoicePlayer] = []
"""A :class:`list` of the :class:`VoicePlayer` in use by this :class:`DiscordSerf`."""
def interface_factory(self) -> Type[CommandInterface]:
self.Data: Type[rc.CommandData] = self.data_factory()
def interface_factory(self) -> Type[rc.CommandInterface]:
# noinspection PyPep8Naming
GenericInterface = super().interface_factory()
@ -76,15 +78,14 @@ class DiscordSerf(Serf):
return DiscordInterface
def data_factory(self) -> Type[CommandData]:
def data_factory(self) -> Type[rc.CommandData]:
# noinspection PyMethodParameters,PyAbstractClass
class DiscordData(CommandData):
class DiscordData(rc.CommandData):
def __init__(data,
interface: CommandInterface,
session,
interface: rc.CommandInterface,
loop: aio.AbstractEventLoop,
message: "discord.Message"):
super().__init__(interface=interface, session=session, loop=loop)
super().__init__(interface=interface, loop=loop)
data.message = message
async def reply(data, text: str):
@ -98,7 +99,7 @@ class DiscordSerf(Serf):
query = query.filter(self.identity_column == user.id)
result = await asyncify(query.one_or_none)
if result is None and error_if_none:
raise CommandError("You must be registered to use this command.")
raise rc.CommandError("You must be registered to use this command.")
return result
async def delete_invoking(data, error_if_unavailable=False):
@ -138,7 +139,7 @@ class DiscordSerf(Serf):
else:
session = None
# Prepare data
data = self.Data(interface=command.interface, session=session, loop=self.loop, message=message)
data = self.Data(interface=command.interface, loop=self.loop, message=message)
# Call the command
await self.call(command, data, parameters)
# Close the alchemy session

View file

@ -51,6 +51,8 @@ class MatrixSerf(Serf):
self._started_timestamp: Optional[int] = None
self.Data: Type[rc.CommandData] = self.data_factory()
def interface_factory(self) -> Type[rc.CommandInterface]:
# noinspection PyPep8Naming
GenericInterface = super().interface_factory()
@ -64,14 +66,13 @@ class MatrixSerf(Serf):
def data_factory(self) -> Type[rc.CommandData]:
# noinspection PyMethodParameters,PyAbstractClass
class DiscordData(rc.CommandData):
class MatrixData(rc.CommandData):
def __init__(data,
interface: rc.CommandInterface,
session,
loop: aio.AbstractEventLoop,
room: nio.MatrixRoom,
event: nio.Event):
super().__init__(interface=interface, session=session, loop=loop)
super().__init__(interface=interface, loop=loop)
data.room: nio.MatrixRoom = room
data.event: nio.Event = event
@ -94,7 +95,7 @@ class MatrixSerf(Serf):
# Delete invoking does not really make sense on Matrix
return DiscordData
return MatrixData
async def handle_message(self, room: "nio.MatrixRoom", event: "nio.RoomMessageText"):
# Skip events happened before the startup of the Serf
@ -123,7 +124,7 @@ class MatrixSerf(Serf):
else:
session = None
# Prepare data
data = self.Data(interface=command.interface, session=session, loop=self.loop, room=room, event=event)
data = self.Data(interface=command.interface, loop=self.loop, room=room, event=event)
# Call the command
await self.call(command, data, parameters)
# Close the alchemy session

View file

@ -102,9 +102,6 @@ class Serf:
self.Interface: Type[CommandInterface] = self.interface_factory()
"""The :class:`CommandInterface` class of this Serf."""
self.Data: Type[CommandData] = self.data_factory()
"""The :class:`CommandData` class of this Serf."""
self.commands: Dict[str, Command] = {}
"""The :class:`dict` connecting each command name to its :class:`Command` object."""
@ -200,10 +197,6 @@ class Serf:
return GenericInterface
def data_factory(self) -> Type[CommandData]:
"""Create the :class:`CommandData` for the Serf."""
raise NotImplementedError()
def register_commands(self, commands: List[Type[Command]], pack_cfg: Dict[str, Any]) -> None:
"""Initialize and register all commands passed as argument."""
# Instantiate the Commands
@ -316,6 +309,34 @@ class Serf:
except Exception as e:
ru.sentry_exc(e)
await data.reply(f"⛔️ [b]{e.__class__.__name__}[/b]\n" + '\n'.join(e.args))
finally:
await data.session_close()
async def press(self, key: KeyboardKey, data: CommandData):
log.info(f"Calling key_callback: {repr(key)}")
try:
await key.press(data)
except InvalidInputError as e:
await data.reply(f"⚠️ {e.message}\n"
f"Syntax: [c]{command.interface.prefix}{command.name} {command.syntax}[/c]")
except UserError as e:
await data.reply(f"⚠️ {e.message}")
except UnsupportedError as e:
await data.reply(f"⚠️ {e.message}")
except ExternalError as e:
await data.reply(f"⚠️ {e.message}")
except ConfigurationError as e:
await data.reply(f"⚠️ {e.message}")
except ProgramError as e:
await data.reply(f"⛔️ {e.message}")
except CommandError as e:
await data.reply(f"⚠️ {e.message}")
except Exception as e:
ru.sentry_exc(e)
await data.reply(f"⛔️ [b]{e.__class__.__name__}[/b]\n" + '\n'.join(e.args))
finally:
await data.session_close()
async def run(self):
"""A coroutine that starts the event loop and handles command calls."""

View file

@ -1,8 +1,10 @@
import contextlib
import logging
import asyncio as aio
import uuid
from typing import *
from royalnet.commands import *
from royalnet.utils import asyncify
import royalnet.commands as rc
import royalnet.utils as ru
import royalnet.backpack as rb
from .escape import escape
from ..serf import Serf
@ -57,6 +59,11 @@ class TelegramSerf(Serf):
self.update_offset: int = -100
"""The current `update offset <https://core.telegram.org/bots/api#getupdates>`_."""
self.key_callbacks: Dict[str, rc.KeyboardKey] = {}
self.MessageData: Type[rc.CommandData] = self.message_data_factory()
self.CallbackData: Type[rc.CommandData] = self.callback_data_factory()
@staticmethod
async def api_call(f: Callable, *args, **kwargs) -> Optional:
"""Call a :class:`telegram.Bot` method safely, without getting a mess of errors raised.
@ -64,7 +71,7 @@ class TelegramSerf(Serf):
The method may return None if it was decided that the call should be skipped."""
while True:
try:
return await asyncify(f, *args, **kwargs)
return await ru.asyncify(f, *args, **kwargs)
except telegram.error.TimedOut as error:
log.debug(f"Timed out during {f.__qualname__} (retrying immediatly): {error}")
continue
@ -84,11 +91,11 @@ class TelegramSerf(Serf):
continue
except Exception as error:
log.error(f"{error.__class__.__qualname__} during {f} (skipping): {error}")
TelegramSerf.sentry_exc(error)
ru.sentry_exc(error)
break
return None
def interface_factory(self) -> Type[CommandInterface]:
def interface_factory(self) -> Type[rc.CommandInterface]:
# noinspection PyPep8Naming
GenericInterface = super().interface_factory()
@ -99,54 +106,103 @@ class TelegramSerf(Serf):
return TelegramInterface
def data_factory(self) -> Type[CommandData]:
def message_data_factory(self) -> Type[rc.CommandData]:
# noinspection PyMethodParameters
class TelegramData(CommandData):
class TelegramMessageData(rc.CommandData):
def __init__(data,
interface: CommandInterface,
session,
interface: rc.CommandInterface,
loop: aio.AbstractEventLoop,
update: telegram.Update):
super().__init__(interface=interface, session=session, loop=loop)
data.update = update
message: telegram.Message):
super().__init__(interface=interface, loop=loop)
data.message: telegram.Message = message
async def reply(data, text: str):
await self.api_call(data.update.effective_chat.send_message,
await self.api_call(data.message.chat.send_message,
escape(text),
parse_mode="HTML",
disable_web_page_preview=True)
async def get_author(data, error_if_none=False):
if data.update.message is not None:
user: telegram.User = data.update.message.from_user
elif data.update.callback_query is not None:
user: telegram.User = data.update.callback_query.from_user
else:
raise CommandError("Command caller can not be determined")
user: Optional[telegram.User] = data.message.from_user
if user is None:
if error_if_none:
raise CommandError("No command caller for this message")
raise rc.CommandError("No command caller for this message")
return None
query = data.session.query(self.master_table)
for link in self.identity_chain:
query = query.join(link.mapper.class_)
query = query.filter(self.identity_column == user.id)
result = await asyncify(query.one_or_none)
result = await ru.asyncify(query.one_or_none)
if result is None and error_if_none:
raise CommandError("Command caller is not registered")
raise rc.CommandError("Command caller is not registered")
return result
async def delete_invoking(data, error_if_unavailable=False) -> None:
message: telegram.Message = data.update.message
await self.api_call(message.delete)
await self.api_call(data.message.delete)
return TelegramData
@contextlib.asynccontextmanager
async def keyboard(data, text: str, keys: List[rc.KeyboardKey]):
tg_rows = []
key_uids = []
for key in keys:
uid: str = str(uuid.uuid4())
key_uids.append(uid)
self.key_callbacks[uid] = key
tg_button: telegram.InlineKeyboardButton = telegram.InlineKeyboardButton(key.text,
callback_data=uid)
tg_row: List[telegram.InlineKeyboardButton] = [tg_button]
tg_rows.append(tg_row)
tg_markup: telegram.InlineKeyboardMarkup = telegram.InlineKeyboardMarkup(tg_rows)
message: telegram.Message = await self.api_call(data.message.chat.send_message,
escape(text),
parse_mode="HTML",
disable_web_page_preview=True,
reply_markup=tg_markup)
yield
await self.api_call(message.edit_reply_markup, reply_markup=None)
for uid in key_uids:
del self.key_callbacks[uid]
return TelegramMessageData
def callback_data_factory(self) -> Type[rc.CommandData]:
# noinspection PyMethodParameters
class TelegramKeyboardData(rc.CommandData):
def __init__(data,
interface: rc.CommandInterface,
loop: aio.AbstractEventLoop,
cbq: telegram.CallbackQuery):
super().__init__(interface=interface, loop=loop)
data.cbq: telegram.CallbackQuery = cbq
async def reply(data, text: str):
await self.api_call(data.cbq.answer,
escape(text))
async def get_author(data, error_if_none=False):
user = data.cbq.from_user
if user is None:
if error_if_none:
raise rc.CommandError("No command caller for this message")
return None
query = data.session.query(self.master_table)
for link in self.identity_chain:
query = query.join(link.mapper.class_)
query = query.filter(self.identity_column == user.id)
result = await ru.asyncify(query.one_or_none)
if result is None and error_if_none:
raise rc.CommandError("Command caller is not registered")
return result
return TelegramKeyboardData
async def answer_cbq(self, cbq, text, alert=False):
await self.api_call(cbq.answer, text=text, show_alert=alert)
async def handle_update(self, update: telegram.Update):
"""Delegate :class:`telegram.Update` handling to the correct message type submethod."""
if update.message is not None:
await self.handle_message(update)
await self.handle_message(update.message)
elif update.edited_message is not None:
pass
elif update.channel_post is not None:
@ -158,7 +214,7 @@ class TelegramSerf(Serf):
elif update.chosen_inline_result is not None:
pass
elif update.callback_query is not None:
pass
await self.handle_callback_query(update.callback_query)
elif update.shipping_query is not None:
pass
elif update.pre_checkout_query is not None:
@ -168,9 +224,8 @@ class TelegramSerf(Serf):
else:
log.warning(f"Unknown update type: {update}")
async def handle_message(self, update: telegram.Update):
async def handle_message(self, message: telegram.Message):
"""What should be done when a :class:`telegram.Message` is received?"""
message: telegram.Message = update.message
text: str = message.text
# Try getting the caption instead
if text is None:
@ -191,46 +246,19 @@ class TelegramSerf(Serf):
# Skip the message
return
# Send a typing notification
await self.api_call(update.message.chat.send_action, telegram.ChatAction.TYPING)
await self.api_call(message.chat.send_action, telegram.ChatAction.TYPING)
# Prepare data
if self.alchemy is not None:
session = await asyncify(self.alchemy.Session)
else:
session = None
# Prepare data
data = self.Data(interface=command.interface, session=session, loop=self.loop, update=update)
data = self.MessageData(interface=command.interface, loop=self.loop, message=message)
# Call the command
await self.call(command, data, parameters)
# Close the alchemy session
if session is not None:
await asyncify(session.close)
async def handle_edited_message(self, update: telegram.Update):
pass
async def handle_channel_post(self, update: telegram.Update):
pass
async def handle_edited_channel_post(self, update: telegram.Update):
pass
async def handle_inline_query(self, update: telegram.Update):
pass
async def handle_chosen_inline_result(self, update: telegram.Update):
pass
async def handle_callback_query(self, update: telegram.Update):
pass
async def handle_shipping_query(self, update: telegram.Update):
pass
async def handle_pre_checkout_query(self, update: telegram.Update):
pass
async def handle_poll(self, update: telegram.Update):
pass
async def handle_callback_query(self, cbq: telegram.CallbackQuery):
uid = cbq.data
if uid not in self.key_callbacks:
await self.api_call(cbq.answer, text="⚠️ This keyboard has expired.", show_alert=True)
key: rc.KeyboardKey = self.key_callbacks[uid]
data: rc.CommandData = self.CallbackData(interface=key.interface, loop=self.loop, cbq=cbq)
await self.press(key, data)
async def run(self):
await super().run()