1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 11:34:18 +00:00

Merge branch 'getting-rid-of-interfaces' into master

# Conflicts:
#	royalnet/backpack/commands/royalnetversion.py
#	royalnet/version.py
This commit is contained in:
Steffo 2020-08-05 02:07:33 +02:00
commit e990180743
22 changed files with 293 additions and 437 deletions

60
poetry.lock generated
View file

@ -851,6 +851,15 @@ optional = true
python-versions = ">= 3.5"
version = "6.0.4"
[[package]]
category = "main"
description = "Backported and Experimental Type Hints for Python 3.5+"
marker = "python_version >= \"3.6\""
name = "typing-extensions"
optional = true
python-versions = "*"
version = "3.7.4.2"
[[package]]
category = "main"
description = "tzinfo object for the local timezone"
@ -876,7 +885,7 @@ description = "HTTP library with thread-safe connection pooling, file post, and
name = "urllib3"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, <4"
version = "1.25.9"
version = "1.25.10"
[package.extras]
brotli = ["brotlipy (>=0.6.0)"]
@ -930,11 +939,12 @@ marker = "python_version >= \"3.6\""
name = "yarl"
optional = true
python-versions = ">=3.5"
version = "1.4.2"
version = "1.5.0"
[package.dependencies]
idna = ">=2.0"
multidict = ">=4.0"
typing-extensions = ">=3.7.4"
[[package]]
category = "main"
@ -957,7 +967,8 @@ sentry = ["sentry_sdk"]
telegram = ["python_telegram_bot"]
[metadata]
content-hash = "5a703d07774f43406d8426b2945eada118bfa44bab2f979df7c3cf2bba2e224d"
content-hash = "341cfeb2b18909f4d9298ab7ce059c4fdbf2283f01d8eb7a5af210697d25a419"
lock-version = "1.0"
python-versions = "^3.8"
[metadata.files]
@ -1500,6 +1511,11 @@ tornado = [
{file = "tornado-6.0.4-cp38-cp38-win_amd64.whl", hash = "sha256:c58d56003daf1b616336781b26d184023ea4af13ae143d9dda65e31e534940b9"},
{file = "tornado-6.0.4.tar.gz", hash = "sha256:0fe2d45ba43b00a41cd73f8be321a44936dc1aba233dee979f17a042b83eb6dc"},
]
typing-extensions = [
{file = "typing_extensions-3.7.4.2-py2-none-any.whl", hash = "sha256:f8d2bd89d25bc39dabe7d23df520442fa1d8969b82544370e03d88b5a591c392"},
{file = "typing_extensions-3.7.4.2-py3-none-any.whl", hash = "sha256:6e95524d8a547a91e08f404ae485bbb71962de46967e1b71a0cb89af24e761c5"},
{file = "typing_extensions-3.7.4.2.tar.gz", hash = "sha256:79ee589a3caca649a9bfd2a8de4709837400dfa00b6cc81962a1e6a1815969ae"},
]
tzlocal = [
{file = "tzlocal-2.1-py2.py3-none-any.whl", hash = "sha256:e2cb6c6b5b604af38597403e9852872d7f534962ae2954c7f35efcb1ccacf4a4"},
{file = "tzlocal-2.1.tar.gz", hash = "sha256:643c97c5294aedc737780a49d9df30889321cbe1204eac2c2ec6134035a92e44"},
@ -1509,8 +1525,8 @@ unpaddedbase64 = [
{file = "unpaddedbase64-1.1.0-py2.py3-none-any.whl", hash = "sha256:81cb4eaaa28cc6a282dd3f2c3855eaa1fbaafa736b5ee64df69889e20540a339"},
]
urllib3 = [
{file = "urllib3-1.25.9-py2.py3-none-any.whl", hash = "sha256:88206b0eb87e6d677d424843ac5209e3fb9d0190d0ee169599165ec25e9d9115"},
{file = "urllib3-1.25.9.tar.gz", hash = "sha256:3018294ebefce6572a474f0604c2021e33b3fd8006ecd11d62107a5d2a963527"},
{file = "urllib3-1.25.10-py2.py3-none-any.whl", hash = "sha256:e7983572181f5e1522d9c98453462384ee92a0be7fac5f1413a1e35c56cc0461"},
{file = "urllib3-1.25.10.tar.gz", hash = "sha256:91056c15fa70756691db97756772bb1eb9678fa585d9184f24534b100dc60f4a"},
]
uvicorn = [
{file = "uvicorn-0.10.9-py3-none-any.whl", hash = "sha256:dc7119b28e15c4c737315c5a570081b0a5a7d8d5c1e8a70a7be70043d88b23a7"},
@ -1556,23 +1572,23 @@ websockets = [
{file = "websockets-8.1.tar.gz", hash = "sha256:5c65d2da8c6bce0fca2528f69f44b2f977e06954c8512a952222cea50dad430f"},
]
yarl = [
{file = "yarl-1.4.2-cp35-cp35m-macosx_10_13_x86_64.whl", hash = "sha256:3ce3d4f7c6b69c4e4f0704b32eca8123b9c58ae91af740481aa57d7857b5e41b"},
{file = "yarl-1.4.2-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:a4844ebb2be14768f7994f2017f70aca39d658a96c786211be5ddbe1c68794c1"},
{file = "yarl-1.4.2-cp35-cp35m-win32.whl", hash = "sha256:d8cdee92bc930d8b09d8bd2043cedd544d9c8bd7436a77678dd602467a993080"},
{file = "yarl-1.4.2-cp35-cp35m-win_amd64.whl", hash = "sha256:c2b509ac3d4b988ae8769901c66345425e361d518aecbe4acbfc2567e416626a"},
{file = "yarl-1.4.2-cp36-cp36m-macosx_10_13_x86_64.whl", hash = "sha256:308b98b0c8cd1dfef1a0311dc5e38ae8f9b58349226aa0533f15a16717ad702f"},
{file = "yarl-1.4.2-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:944494be42fa630134bf907714d40207e646fd5a94423c90d5b514f7b0713fea"},
{file = "yarl-1.4.2-cp36-cp36m-win32.whl", hash = "sha256:5b10eb0e7f044cf0b035112446b26a3a2946bca9d7d7edb5e54a2ad2f6652abb"},
{file = "yarl-1.4.2-cp36-cp36m-win_amd64.whl", hash = "sha256:a161de7e50224e8e3de6e184707476b5a989037dcb24292b391a3d66ff158e70"},
{file = "yarl-1.4.2-cp37-cp37m-macosx_10_13_x86_64.whl", hash = "sha256:26d7c90cb04dee1665282a5d1a998defc1a9e012fdca0f33396f81508f49696d"},
{file = "yarl-1.4.2-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:0c2ab325d33f1b824734b3ef51d4d54a54e0e7a23d13b86974507602334c2cce"},
{file = "yarl-1.4.2-cp37-cp37m-win32.whl", hash = "sha256:e15199cdb423316e15f108f51249e44eb156ae5dba232cb73be555324a1d49c2"},
{file = "yarl-1.4.2-cp37-cp37m-win_amd64.whl", hash = "sha256:2098a4b4b9d75ee352807a95cdf5f10180db903bc5b7270715c6bbe2551f64ce"},
{file = "yarl-1.4.2-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:c9959d49a77b0e07559e579f38b2f3711c2b8716b8410b320bf9713013215a1b"},
{file = "yarl-1.4.2-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:25e66e5e2007c7a39541ca13b559cd8ebc2ad8fe00ea94a2aad28a9b1e44e5ae"},
{file = "yarl-1.4.2-cp38-cp38-win32.whl", hash = "sha256:6faa19d3824c21bcbfdfce5171e193c8b4ddafdf0ac3f129ccf0cdfcb083e462"},
{file = "yarl-1.4.2-cp38-cp38-win_amd64.whl", hash = "sha256:0ca2f395591bbd85ddd50a82eb1fde9c1066fafe888c5c7cc1d810cf03fd3cc6"},
{file = "yarl-1.4.2.tar.gz", hash = "sha256:58cd9c469eced558cd81aa3f484b2924e8897049e06889e8ff2510435b7ef74b"},
{file = "yarl-1.5.0-cp35-cp35m-macosx_10_14_x86_64.whl", hash = "sha256:2657716c1fc998f5f2675c0ee6ce91282e0da0ea9e4a94b584bb1917e11c1559"},
{file = "yarl-1.5.0-cp35-cp35m-manylinux1_x86_64.whl", hash = "sha256:cf5eb664910d759bbae0b76d060d6e21f8af5098242d66c448bbebaf2a7bfa70"},
{file = "yarl-1.5.0-cp35-cp35m-win32.whl", hash = "sha256:875b2a741ce0208f3b818008a859ab5d0f461e98a32bbdc6af82231a9e761c55"},
{file = "yarl-1.5.0-cp35-cp35m-win_amd64.whl", hash = "sha256:1707230e1ea48ea06a3e20acb4ce05a38d2465bd9566c21f48f6212a88e47536"},
{file = "yarl-1.5.0-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:9a592c4aa642249e9bdaf76897d90feeb08118626b363a6be8788a9b300274b5"},
{file = "yarl-1.5.0-cp36-cp36m-manylinux1_x86_64.whl", hash = "sha256:f058b6541477022c7b54db37229f87dacf3b565de4f901ff5a0a78556a174fea"},
{file = "yarl-1.5.0-cp36-cp36m-win32.whl", hash = "sha256:5d410f69b4f92c5e1e2a8ffb73337cd8a274388c6975091735795588a538e605"},
{file = "yarl-1.5.0-cp36-cp36m-win_amd64.whl", hash = "sha256:5bbcb195da7de57f4508b7508c33f7593e9516e27732d08b9aad8586c7b8c384"},
{file = "yarl-1.5.0-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:a1772068401d425e803999dada29a6babf041786e08be5e79ef63c9ecc4c9575"},
{file = "yarl-1.5.0-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:1f269e8e6676193a94635399a77c9059e1826fb6265c9204c9e5a8ccd36006e1"},
{file = "yarl-1.5.0-cp37-cp37m-win32.whl", hash = "sha256:431faa6858f0ea323714d8b7b4a7da1db2eeb9403607f0eaa3800ab2c5a4b627"},
{file = "yarl-1.5.0-cp37-cp37m-win_amd64.whl", hash = "sha256:b325fefd574ebef50e391a1072d1712a60348ca29c183e1d546c9d87fec2cd32"},
{file = "yarl-1.5.0-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:b065a5c3e050395ae563019253cc6c769a50fd82d7fa92d07476273521d56b7c"},
{file = "yarl-1.5.0-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:66b4f345e9573e004b1af184bc00431145cf5e089a4dcc1351505c1f5750192c"},
{file = "yarl-1.5.0-cp38-cp38-win32.whl", hash = "sha256:9a3266b047d15e78bba38c8455bf68b391c040231ca5965ef867f7cbbc60bde5"},
{file = "yarl-1.5.0-cp38-cp38-win_amd64.whl", hash = "sha256:f5cfed0766837303f688196aa7002730d62c5cc802d98c6395ea1feb87252727"},
{file = "yarl-1.5.0.tar.gz", hash = "sha256:5c82f5b1499342339f22c83b97dbe2b8a09e47163fab86cd934a8dd46620e0fb"},
]
youtube-dl = [
{file = "youtube_dl-2020.6.16.1-py2.py3-none-any.whl", hash = "sha256:e54b307048bb18164729fb278013af6d5477c69c3d995147205a16f22a61296b"},

View file

@ -5,7 +5,7 @@
[tool.poetry]
name = "royalnet"
version = "5.10.4"
version = "5.11.0"
description = "A multipurpose bot and web framework"
authors = ["Stefano Pigozzi <ste.pigozzi@gmail.com>"]
license = "AGPL-3.0+"

View file

@ -1,7 +1,5 @@
from typing import *
import royalnet
import royalnet.commands as rc
import royalnet.utils as ru
from ..tables import User

View file

@ -1,7 +1,5 @@
from typing import *
import royalnet
import royalnet.commands as rc
import royalnet.utils as ru
from ..tables import User

View file

@ -2,6 +2,9 @@ from typing import *
import royalnet
import royalnet.commands as rc
import royalnet.utils as ru
import royalnet.serf.telegram as rst
import royalnet.serf.discord as rsd
import royalnet.serf.matrix as rsm
from ..tables.telegram import Telegram
from ..tables.discord import Discord
@ -27,7 +30,7 @@ class RoyalnetsyncCommand(rc.Command):
if not successful:
raise rc.InvalidInputError(f"Invalid password!")
if self.interface.name == "telegram":
if isinstance(self.serf, rst.TelegramSerf):
import telegram
message: telegram.Message = data.message
from_user: telegram.User = message.from_user
@ -53,7 +56,7 @@ class RoyalnetsyncCommand(rc.Command):
await data.session_commit()
await data.reply(f"↔️ Account {tg_user} synced to {author}!")
elif self.interface.name == "discord":
elif isinstance(self.serf, rsd.DiscordSerf):
import discord
message: discord.Message = data.message
author: discord.User = message.author
@ -79,8 +82,8 @@ class RoyalnetsyncCommand(rc.Command):
await data.session_commit()
await data.reply(f"↔️ Account {ds_user} synced to {author}!")
elif self.interface.name == "matrix":
elif isinstance(self.serf, rsm.MatrixSerf):
raise rc.UnsupportedError(f"{self} hasn't been implemented for Matrix yet")
else:
raise rc.UnsupportedError(f"Unknown interface: {self.interface.name}")
raise rc.UnsupportedError(f"Unknown interface: {self.serf.__class__.__qualname__}")

View file

@ -1,4 +1,4 @@
import royalnet
import pkg_resources
from royalnet.commands import *
@ -7,12 +7,17 @@ class RoyalnetversionCommand(Command):
description: str = "Display the current Royalnet version."
@property
def royalnet_version(self):
return pkg_resources.get_distribution("royalnet").version
async def run(self, args: CommandArgs, data: CommandData) -> None:
# noinspection PyUnreachableCode
if __debug__:
message = f" Royalnet [url=https://github.com/Steffo99/royalnet/]Unreleased[/url]\n"
else:
message = f" Royalnet [url=https://github.com/Steffo99/royalnet/releases/tag/{royalnet.__version__}]{royalnet.__version__}[/url]\n"
if "69" in royalnet.__version__:
message = f" Royalnet [url=https://github.com/Steffo99/royalnet/releases/tag/{self.royalnet_version}]" \
f"{self.royalnet_version}[/url]\n"
if "69" in royalnet.version.semantic:
message += "(Nice.)"
await data.reply(message)

View file

@ -1,10 +1,10 @@
from royalnet.commands import *
class ExceptionEvent(Event):
class ExceptionEvent(HeraldEvent):
name = "exception"
def run(self, **kwargs):
if not self.interface.config["exc_debug"]:
raise UserError(f"{self.interface.prefix}{self.name} is not enabled.")
if not self.config["exc_debug"]:
raise UserError(f"{self.__class__.__name__} is not enabled.")
raise Exception(f"{self.name} event was called")

View file

@ -1,17 +1,15 @@
"""The subpackage providing all classes related to Royalnet commands."""
from .commandinterface import CommandInterface
from .command import Command
from .commanddata import CommandData
from .commandargs import CommandArgs
from .event import Event
from .heraldevent import HeraldEvent
from .errors import \
CommandError, InvalidInputError, UnsupportedError, ConfigurationError, ExternalError, UserError, ProgramError
from .keyboardkey import KeyboardKey
from .configdict import ConfigDict
__all__ = [
"CommandInterface",
"Command",
"CommandData",
"CommandArgs",
@ -22,7 +20,7 @@ __all__ = [
"ExternalError",
"UserError",
"ProgramError",
"Event",
"HeraldEvent",
"KeyboardKey",
"ConfigDict",
]

View file

@ -1,10 +1,13 @@
import abc
from typing import *
from .commandinterface import CommandInterface
from .commandargs import CommandArgs
from .commanddata import CommandData
if TYPE_CHECKING:
from ..serf import Serf
class Command:
class Command(metaclass=abc.ABCMeta):
name: str = NotImplemented
"""The main name of the command.
@ -24,31 +27,23 @@ class Command:
"""The syntax of the command, to be displayed when a :py:exc:`InvalidInputError` is raised,
in the format ``(required_arg) [optional_arg]``."""
def __init__(self, interface: CommandInterface):
self.interface = interface
def __init__(self, serf: "Serf", config):
self.serf: "Serf" = serf
self.config = config
def __str__(self):
return f"[c]{self.interface.prefix}{self.name}[/c]"
@property
def serf(self):
"""A shortcut for :attr:`.interface.serf`."""
return self.interface.serf
return f"[c]{self.serf.prefix}{self.name}[/c]"
@property
def alchemy(self):
"""A shortcut for :attr:`.interface.alchemy`."""
return self.interface.alchemy
return self.serf.alchemy
@property
def loop(self):
"""A shortcut for :attr:`.interface.loop`."""
return self.interface.loop
@property
def config(self):
"""A shortcut for :attr:`.interface.config`."""
return self.interface.config
return self.serf.loop
@abc.abstractmethod
async def run(self, args: CommandArgs, data: CommandData) -> None:
raise NotImplementedError()

View file

@ -5,32 +5,33 @@ import asyncio as aio
import royalnet.utils as ru
import io
from .errors import UnsupportedError
from .commandinterface import CommandInterface
from royalnet.backpack.tables.aliases import Alias
from royalnet.backpack.tables.users import User
if TYPE_CHECKING:
from .keyboardkey import KeyboardKey
from royalnet.backpack.tables.users import User
log = logging.getLogger(__name__)
class CommandData:
def __init__(self, interface: CommandInterface, loop: aio.AbstractEventLoop):
self.loop: aio.AbstractEventLoop = loop
self._interface: CommandInterface = interface
def __init__(self, command):
self.command = command
self._session = None
# TODO: make this asyncronous... somehow?
@property
def session(self):
if self._session is None:
if self._interface.alchemy is None:
if self.command.serf.alchemy is None:
raise UnsupportedError("'alchemy' is not enabled on this Royalnet instance")
log.debug("Creating Session...")
self._session = self._interface.alchemy.Session()
self._session = self.command.serf.alchemy.Session()
return self._session
@property
def loop(self):
return self.command.serf.loop
async def session_commit(self):
"""Asyncronously commit the :attr:`.session` of this object."""
if self._session:
@ -83,7 +84,7 @@ class CommandData:
Parameters:
alias: the Alias to search for."""
return await Alias.find_user(self._interface.alchemy, self.session, alias)
return await User.find(self.command.serf.alchemy, self.session, alias)
@contextlib.asynccontextmanager
async def keyboard(self, text, keys: List["KeyboardKey"]):

View file

@ -1,76 +0,0 @@
from typing import *
import asyncio as aio
from .errors import UnsupportedError
from .configdict import ConfigDict
if TYPE_CHECKING:
from .event import Event
from .command import Command
from ..alchemy import Alchemy
from ..serf import Serf
from ..constellation import Constellation
class CommandInterface:
name: str = NotImplemented
"""The name of the :class:`CommandInterface` that's being implemented.
Examples:
``telegram``, ``discord``, ``console``..."""
prefix: str = NotImplemented
"""The prefix used by commands on the interface.
Examples:
``/`` on Telegram, ``!`` on Discord."""
serf: Optional["Serf"] = None
"""A reference to the :class:`~royalnet.serf.Serf` that is implementing this :class:`CommandInterface`.
Example:
A reference to a :class:`~royalnet.serf.telegram.TelegramSerf`."""
constellation: Optional["Constellation"] = None
"""A reference to the Constellation that is implementing this :class:`CommandInterface`.
Example:
A reference to a :class:`~royalnet.constellation.Constellation`."""
def __init__(self, config: Dict[str, Any]):
self.config: ConfigDict[str, Any] = ConfigDict.convert(config)
"""The config section for the pack of the command."""
# Will be bound after the command/event has been created
self.command: Optional[Command] = None
self.event: Optional[Event] = None
@property
def alchemy(self) -> "Alchemy":
"""A shortcut for :attr:`.serf.alchemy`."""
return self.serf.alchemy
@property
def table(self) -> "Callable":
"""A shortcut for :func:`.serf.alchemy.get`.
Raises:
UnsupportedError: if :attr:`.alchemy` is :const:`None`."""
if self.alchemy is None:
raise UnsupportedError("'alchemy' is not enabled on this Royalnet instance")
return self.alchemy.get
@property
def loop(self) -> aio.AbstractEventLoop:
"""A shortcut for :attr:`.serf.loop`."""
if self.serf:
return self.serf.loop
raise UnsupportedError("This command is not being run in a serf.")
async def call_herald_event(self, destination: str, event_name: str, **kwargs) -> dict:
"""Call an event function on a different :class:`~royalnet.serf.Serf`.
Example:
You can run a function on a :class:`~royalnet.serf.discord.DiscordSerf` from a
:class:`~royalnet.serf.telegram.TelegramSerf`.
"""
raise UnsupportedError(f"{self.call_herald_event.__name__} is not supported on this platform.")

View file

@ -1,41 +0,0 @@
import asyncio as aio
from .commandinterface import CommandInterface
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from serf import Serf
class Event:
"""A remote procedure call triggered by a :mod:`royalnet.herald` request."""
name = NotImplemented
"""The event_name that will trigger this event."""
def __init__(self, interface: CommandInterface):
"""Bind the event to a :class:`~royalnet.serf.Serf`."""
self.interface: CommandInterface = interface
"""The :class:`CommandInterface` available to this :class:`Event`."""
@property
def serf(self) -> "Serf":
"""A shortcut for :attr:`.interface.serf`."""
return self.interface.serf
@property
def alchemy(self):
"""A shortcut for :attr:`.interface.alchemy`."""
return self.interface.alchemy
@property
def loop(self) -> aio.AbstractEventLoop:
"""A shortcut for :attr:`.interface.loop`."""
return self.interface.loop
@property
def config(self) -> dict:
"""A shortcut for :attr:`.interface.config`."""
return self.interface.config
async def run(self, **kwargs):
raise NotImplementedError()

View file

@ -0,0 +1,29 @@
import asyncio as aio
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..serf import Serf
class HeraldEvent:
"""A remote procedure call triggered by a :mod:`royalnet.herald` request."""
name = NotImplemented
"""The event_name that will trigger this event."""
def __init__(self, serf: "Serf", config):
self.serf: "Serf" = serf
self.config = config
@property
def alchemy(self):
"""A shortcut for :attr:`.interface.alchemy`."""
return self.serf.alchemy
@property
def loop(self) -> aio.AbstractEventLoop:
"""A shortcut for :attr:`.interface.loop`."""
return self.serf.loop
async def run(self, **kwargs):
raise NotImplementedError()

View file

@ -1,15 +1,12 @@
from typing import *
from .commandinterface import CommandInterface
from .commanddata import CommandData
class KeyboardKey:
def __init__(self,
interface: CommandInterface,
short: str,
text: str,
callback: Callable[[CommandData], Awaitable[None]]):
self.interface: CommandInterface = interface
self.short: str = short
self.text: str = text
self.callback: Callable[[CommandData], Awaitable[None]] = callback

View file

@ -92,10 +92,7 @@ class Constellation:
self.herald_task: Optional[aio.Task] = None
"""A reference to the :class:`aio.Task` that runs the :class:`rh.Link`."""
self.Interface: Type[rc.CommandInterface] = self.interface_factory()
"""The :class:`~rc.CommandInterface` class of this :class:`Constellation`."""
self.events: Dict[str, rc.Event] = {}
self.events: Dict[str, rc.HeraldEvent] = {}
"""A dictionary containing all :class:`~rc.Event` that can be handled by this :class:`Constellation`."""
self.starlette = starlette.applications.Starlette(debug=__debug__)
@ -149,66 +146,55 @@ class Constellation:
Because of how :mod:`uvicorn` runs, it will stay :const:`None` until the first page is requested."""
# TODO: is this a good idea?
def interface_factory(self) -> Type[rc.CommandInterface]:
"""Create the :class:`rc.CommandInterface` class for the :class:`Constellation`."""
# noinspection PyMethodParameters
class GenericInterface(rc.CommandInterface):
alchemy: ra.Alchemy = self.alchemy
constellation = self
async def call_herald_event(ci, destination: str, event_name: str, **kwargs) -> Dict:
"""Send a :class:`royalherald.Request` to a specific destination, and wait for a
:class:`royalherald.Response`."""
if self.herald is None:
raise rc.UnsupportedError("`royalherald` is not enabled on this serf.")
request: rh.Request = rh.Request(handler=event_name, data=kwargs)
response: rh.Response = await self.herald.request(destination=destination, request=request)
if isinstance(response, rh.ResponseFailure):
if response.name == "no_event":
raise rc.ProgramError(f"There is no event named {event_name} in {destination}.")
elif response.name == "error_in_event":
if response.extra_info["type"] == "CommandError":
raise rc.CommandError(response.extra_info["message"])
elif response.extra_info["type"] == "UserError":
raise rc.UserError(response.extra_info["message"])
elif response.extra_info["type"] == "InvalidInputError":
raise rc.InvalidInputError(response.extra_info["message"])
elif response.extra_info["type"] == "UnsupportedError":
raise rc.UnsupportedError(response.extra_info["message"])
elif response.extra_info["type"] == "ConfigurationError":
raise rc.ConfigurationError(response.extra_info["message"])
elif response.extra_info["type"] == "ExternalError":
raise rc.ExternalError(response.extra_info["message"])
else:
raise rc.ProgramError(f"Invalid error in Herald event '{event_name}':\n"
f"[b]{response.extra_info['type']}[/b]\n"
f"{response.extra_info['message']}")
elif response.name == "unhandled_exception_in_event":
raise rc.ProgramError(f"Unhandled exception in Herald event '{event_name}':\n"
f"[b]{response.extra_info['type']}[/b]\n"
f"{response.extra_info['message']}")
else:
raise rc.ProgramError(f"Unknown response in Herald event '{event_name}':\n"
f"[b]{response.name}[/b]"
f"[p]{response}[/p]")
elif isinstance(response, rh.ResponseSuccess):
return response.data
else:
raise rc.ProgramError(f"Other Herald Link returned unknown response:\n"
f"[p]{response}[/p]")
return GenericInterface
def init_herald(self, herald_cfg: Dict[str, Any]):
"""Create a :class:`rh.Link`."""
herald_cfg["name"] = "constellation"
self.herald: rh.Link = rh.Link(rh.Config.from_config(**herald_cfg), self.network_handler)
async def call_herald_event(self, destination: str, event_name: str, **kwargs) -> Dict:
"""Send a :class:`royalherald.Request` to a specific destination, and wait for a
:class:`royalherald.Response`."""
if self.herald is None:
raise rc.UnsupportedError("`royalherald` is not enabled on this serf.")
request: rh.Request = rh.Request(handler=event_name, data=kwargs)
response: rh.Response = await self.herald.request(destination=destination, request=request)
if isinstance(response, rh.ResponseFailure):
if response.name == "no_event":
raise rc.ProgramError(f"There is no event named {event_name} in {destination}.")
elif response.name == "error_in_event":
if response.extra_info["type"] == "CommandError":
raise rc.CommandError(response.extra_info["message"])
elif response.extra_info["type"] == "UserError":
raise rc.UserError(response.extra_info["message"])
elif response.extra_info["type"] == "InvalidInputError":
raise rc.InvalidInputError(response.extra_info["message"])
elif response.extra_info["type"] == "UnsupportedError":
raise rc.UnsupportedError(response.extra_info["message"])
elif response.extra_info["type"] == "ConfigurationError":
raise rc.ConfigurationError(response.extra_info["message"])
elif response.extra_info["type"] == "ExternalError":
raise rc.ExternalError(response.extra_info["message"])
else:
raise rc.ProgramError(f"Invalid error in Herald event '{event_name}':\n"
f"[b]{response.extra_info['type']}[/b]\n"
f"{response.extra_info['message']}")
elif response.name == "unhandled_exception_in_event":
raise rc.ProgramError(f"Unhandled exception in Herald event '{event_name}':\n"
f"[b]{response.extra_info['type']}[/b]\n"
f"{response.extra_info['message']}")
else:
raise rc.ProgramError(f"Unknown response in Herald event '{event_name}':\n"
f"[b]{response.name}[/b]"
f"[p]{response}[/p]")
elif isinstance(response, rh.ResponseSuccess):
return response.data
else:
raise rc.ProgramError(f"Other Herald Link returned unknown response:\n"
f"[p]{response}[/p]")
async def network_handler(self, message: Union[rh.Request, rh.Broadcast]) -> rh.Response:
try:
event: rc.Event = self.events[message.handler]
event: rc.HeraldEvent = self.events[message.handler]
except KeyError:
log.warning(f"No event for '{message.handler}'")
return rh.ResponseFailure("no_event", f"This serf does not have any event for {message.handler}.")
@ -228,13 +214,11 @@ class Constellation:
elif isinstance(message, rh.Broadcast):
await event.run(**message.data)
def register_events(self, events: List[Type[rc.Event]], pack_cfg: Dict[str, Any]):
def register_events(self, events: List[Type[rc.HeraldEvent]], pack_cfg: Dict[str, Any]):
for SelectedEvent in events:
# Create a new interface
interface = self.Interface(config=pack_cfg)
# Initialize the event
try:
event = SelectedEvent(interface)
event = SelectedEvent(serf=self, config=pack_cfg)
except Exception as e:
log.error(f"Skipping: "
f"{SelectedEvent.__qualname__} - {e.__class__.__qualname__} in the initialization.")
@ -266,7 +250,7 @@ class Constellation:
for SelectedPageStar in page_stars:
log.debug(f"Registering: {SelectedPageStar.path} -> {SelectedPageStar.__qualname__}")
try:
page_star_instance = SelectedPageStar(interface=self.Interface(pack_cfg))
page_star_instance = SelectedPageStar(constellation=self, config=pack_cfg)
except Exception as e:
log.error(f"Skipping: "
f"{SelectedPageStar.__qualname__} - {e.__class__.__qualname__} in the initialization.")
@ -277,7 +261,6 @@ class Constellation:
def run_blocking(self):
log.info(f"Running Constellation on https://{self.address}:{self.port}/...")
loop: aio.AbstractEventLoop = aio.get_event_loop()
self.running = True
try:
uvicorn.run(self.starlette, host=self.address, port=self.port, log_config=UVICORN_LOGGING_CONFIG)

View file

@ -1,7 +1,6 @@
from typing import *
from starlette.requests import Request
from starlette.responses import Response
from royalnet.commands import CommandInterface
if TYPE_CHECKING:
from .constellation import Constellation
@ -11,8 +10,9 @@ class Star:
"""A Star is a class representing a part of the website.
It shouldn't be used directly: please use :class:`PageStar` and :class:`ExceptionStar` instead!"""
def __init__(self, interface: CommandInterface):
self.interface: CommandInterface = interface
def __init__(self, constellation: "Constellation", config):
self.constellation: "Constellation" = constellation
self.config = config
async def page(self, request: Request) -> Response:
"""The function generating the :class:`~starlette.Response` to a web :class:`~starlette.Request`.
@ -20,31 +20,21 @@ class Star:
If it raises an error, the corresponding :class:`ExceptionStar` will be used to handle the request instead."""
raise NotImplementedError()
@property
def constellation(self) -> "Constellation":
"""A shortcut for the :class:`Constellation`."""
return self.interface.constellation
@property
def alchemy(self):
"""A shortcut for the :class:`~royalnet.alchemy.Alchemy` of the :class:`Constellation`."""
return self.interface.constellation.alchemy
return self.constellation.alchemy
# noinspection PyPep8Naming
@property
def Session(self):
"""A shortcut for the :class:`~royalnet.alchemy.Alchemy` :class:`Session` of the :class:`Constellation`."""
return self.interface.constellation.alchemy.Session
return self.constellation.alchemy.Session
@property
def session_acm(self):
"""A shortcut for :func:`.alchemy.session_acm` of the :class:`Constellation`."""
return self.interface.constellation.alchemy.session_acm
@property
def config(self) -> Dict[str, Any]:
"""A shortcut for the Pack configuration of the :class:`Constellation`."""
return self.interface.config
return self.constellation.alchemy.session_acm
def __repr__(self):
return f"<{self.__class__.__qualname__}>"

View file

@ -1,6 +1,5 @@
import asyncio as aio
import logging
import warnings
import io
import sys
from typing import *
@ -19,6 +18,7 @@ log = logging.getLogger(__name__)
class DiscordSerf(Serf):
"""A :class:`Serf` that connects to `Discord <https://discordapp.com/>`_ as a bot."""
interface_name = "discord"
prefix = "!"
_identity_table = rbt.Discord
_identity_column = "discord_id"
@ -55,26 +55,14 @@ class DiscordSerf(Serf):
self.Data: Type[rc.CommandData] = self.data_factory()
def interface_factory(self) -> Type[rc.CommandInterface]:
# noinspection PyPep8Naming
GenericInterface = super().interface_factory()
# noinspection PyMethodParameters,PyAbstractClass
class DiscordInterface(GenericInterface):
name = self.interface_name
prefix = "!"
return DiscordInterface
def data_factory(self) -> Type[rc.CommandData]:
# noinspection PyMethodParameters,PyAbstractClass
class DiscordData(rc.CommandData):
def __init__(data,
interface: rc.CommandInterface,
loop: aio.AbstractEventLoop,
command: rc.Command,
message: "discord.Message"):
super().__init__(interface=interface, loop=loop)
data.message = message
super().__init__(command=command)
data.message: "discord.Message" = message
async def reply(data, text: str):
await data.message.channel.send(escape(text))
@ -130,7 +118,8 @@ class DiscordSerf(Serf):
else:
session = None
# Prepare data
data = self.Data(interface=command.interface, loop=self.loop, message=message)
# noinspection PyArgumentList
data = self.Data(command=command, message=message)
# Call the command
await self.call(command, data, parameters)
# Close the alchemy session
@ -141,6 +130,7 @@ class DiscordSerf(Serf):
"""Create a custom class inheriting from :py:class:`discord.Client`."""
# noinspection PyMethodParameters
class DiscordClient(discord.Client):
# noinspection PyMethodMayBeStatic
async def on_message(cli, message: "discord.Message") -> None:
"""Handle messages received by passing them to the handle_message method of the bot."""
# TODO: keep reference to these tasks somewhere
@ -151,6 +141,7 @@ class DiscordSerf(Serf):
log.debug("Discord client is ready!")
await cli.change_presence(status=discord.Status.online, activity=None)
# noinspection PyMethodMayBeStatic
async def on_resume(cli) -> None:
log.debug("Discord client resumed connection.")

View file

@ -15,6 +15,7 @@ log = logging.getLogger(__name__)
class MatrixSerf(Serf):
"""A serf that connects to `Matrix <https://matrix.org/>`_ as an user."""
interface_name = "matrix"
prefix = "!"
_identity_table = rb.tables.Matrix
_identity_column = "matrix_id"
@ -47,26 +48,14 @@ class MatrixSerf(Serf):
self.Data: Type[rc.CommandData] = self.data_factory()
def interface_factory(self) -> Type[rc.CommandInterface]:
# noinspection PyPep8Naming
GenericInterface = super().interface_factory()
# noinspection PyMethodParameters,PyAbstractClass
class DiscordInterface(GenericInterface):
name = self.interface_name
prefix = "!"
return DiscordInterface
def data_factory(self) -> Type[rc.CommandData]:
# noinspection PyMethodParameters,PyAbstractClass
class MatrixData(rc.CommandData):
def __init__(data,
interface: rc.CommandInterface,
loop: aio.AbstractEventLoop,
command: rc.Command,
room: nio.MatrixRoom,
event: nio.Event):
super().__init__(interface=interface, loop=loop)
super().__init__(command=command)
data.room: nio.MatrixRoom = room
data.event: nio.Event = event
@ -118,7 +107,8 @@ class MatrixSerf(Serf):
else:
session = None
# Prepare data
data = self.Data(interface=command.interface, loop=self.loop, room=room, event=event)
# noinspection PyArgumentList
data = self.Data(command=command, room=room, event=event)
# Call the command
await self.call(command, data, parameters)
# Close the alchemy session

View file

@ -4,7 +4,7 @@ import asyncio as aio
import sys
from typing import *
from sqlalchemy.schema import Table
from royalnet.commands import *
import royalnet.commands as rc
import royalnet.utils as ru
import royalnet.alchemy as ra
import royalnet.backpack.tables as rbt
@ -12,7 +12,6 @@ import royalnet.herald as rh
import traceback
import abc
log = logging.getLogger(__name__)
@ -20,6 +19,7 @@ class Serf(abc.ABC):
"""An abstract class, to be used as base to implement Royalnet bots on multiple interfaces (such as Telegram or
Discord)."""
interface_name = NotImplemented
prefix = NotImplemented
_master_table: type = rbt.User
_identity_table: type = NotImplemented
@ -41,10 +41,10 @@ class Serf(abc.ABC):
log.debug(f"Importing pack: {pack_name}")
try:
packs[pack_name] = {
"commands": importlib.import_module(f"{pack_name}.commands"),
"events": importlib.import_module(f"{pack_name}.events"),
"stars": importlib.import_module(f"{pack_name}.stars"),
"tables": importlib.import_module(f"{pack_name}.tables"),
"commands": importlib.import_module(f".commands", pack_name),
"events": importlib.import_module(f".events", pack_name),
"stars": importlib.import_module(f".stars", pack_name),
"tables": importlib.import_module(f".tables", pack_name),
}
except ImportError as e:
log.error(f"{e.__class__.__name__} during the import of {pack_name}:\n"
@ -88,13 +88,10 @@ class Serf(abc.ABC):
self.herald_task: Optional[aio.Task] = None
"""A reference to the :class:`asyncio.Task` that runs the :class:`Link`."""
self.events: Dict[str, Event] = {}
self.events: Dict[str, rc.HeraldEvent] = {}
"""A dictionary containing all :class:`Event` that can be handled by this :class:`Serf`."""
self.Interface: Type[CommandInterface] = self.interface_factory()
"""The :class:`CommandInterface` class of this Serf."""
self.commands: Dict[str, Command] = {}
self.commands: Dict[str, rc.Command] = {}
"""The :class:`dict` connecting each command name to its :class:`Command` object."""
for pack_name in packs:
@ -138,104 +135,86 @@ class Serf(abc.ABC):
"""Find a relationship path starting from the master table and ending at the identity table, and return it."""
return ra.table_dfs(self.master_table, self.identity_table)
def interface_factory(self) -> Type[CommandInterface]:
"""Create the :class:`CommandInterface` class for the Serf."""
# noinspection PyMethodParameters
class GenericInterface(CommandInterface):
alchemy: ra.Alchemy = self.alchemy
serf: "Serf" = self
async def call_herald_event(ci, destination: str, event_name: str, **kwargs) -> Dict:
"""Send a :class:`royalherald.Request` to a specific destination, and wait for a
:class:`royalherald.Response`."""
if self.herald is None:
raise UnsupportedError("`royalherald` is not enabled on this serf.")
request: rh.Request = rh.Request(handler=event_name, data=kwargs)
response: rh.Response = await self.herald.request(destination=destination, request=request)
if isinstance(response, rh.ResponseFailure):
if response.name == "no_event":
raise ProgramError(f"There is no event named {event_name} in {destination}.")
elif response.name == "error_in_event":
if response.extra_info["type"] == "CommandError":
raise CommandError(response.extra_info["message"])
elif response.extra_info["type"] == "UserError":
raise UserError(response.extra_info["message"])
elif response.extra_info["type"] == "InvalidInputError":
raise InvalidInputError(response.extra_info["message"])
elif response.extra_info["type"] == "UnsupportedError":
raise UnsupportedError(response.extra_info["message"])
elif response.extra_info["type"] == "ConfigurationError":
raise ConfigurationError(response.extra_info["message"])
elif response.extra_info["type"] == "ExternalError":
raise ExternalError(response.extra_info["message"])
else:
raise ProgramError(f"Invalid error in Herald event '{event_name}':\n"
f"[b]{response.extra_info['type']}[/b]\n"
f"{response.extra_info['message']}")
elif response.name == "unhandled_exception_in_event":
raise ProgramError(f"Unhandled exception in Herald event '{event_name}':\n"
f"[b]{response.extra_info['type']}[/b]\n"
f"{response.extra_info['message']}")
else:
raise ProgramError(f"Unknown response in Herald event '{event_name}':\n"
f"[b]{response.name}[/b]"
f"[p]{response}[/p]")
elif isinstance(response, rh.ResponseSuccess):
return response.data
async def call_herald_event(self, destination: str, event_name: str, **kwargs) -> Dict:
"""Send a :class:`royalherald.Request` to a specific destination, and wait for a
:class:`royalherald.Response`."""
if self.herald is None:
raise rc.UnsupportedError("`royalherald` is not enabled on this serf.")
request: rh.Request = rh.Request(handler=event_name, data=kwargs)
response: rh.Response = await self.herald.request(destination=destination, request=request)
if isinstance(response, rh.ResponseFailure):
if response.name == "no_event":
raise rc.ProgramError(f"There is no event named {event_name} in {destination}.")
elif response.name == "error_in_event":
if response.extra_info["type"] == "CommandError":
raise rc.CommandError(response.extra_info["message"])
elif response.extra_info["type"] == "UserError":
raise rc.UserError(response.extra_info["message"])
elif response.extra_info["type"] == "InvalidInputError":
raise rc.InvalidInputError(response.extra_info["message"])
elif response.extra_info["type"] == "UnsupportedError":
raise rc.UnsupportedError(response.extra_info["message"])
elif response.extra_info["type"] == "ConfigurationError":
raise rc.ConfigurationError(response.extra_info["message"])
elif response.extra_info["type"] == "ExternalError":
raise rc.ExternalError(response.extra_info["message"])
else:
raise ProgramError(f"Other Herald Link returned unknown response:\n"
f"[p]{response}[/p]")
raise rc.ProgramError(f"Invalid error in Herald event '{event_name}':\n"
f"[b]{response.extra_info['type']}[/b]\n"
f"{response.extra_info['message']}")
elif response.name == "unhandled_exception_in_event":
raise rc.ProgramError(f"Unhandled exception in Herald event '{event_name}':\n"
f"[b]{response.extra_info['type']}[/b]\n"
f"{response.extra_info['message']}")
else:
raise rc.ProgramError(f"Unknown response in Herald event '{event_name}':\n"
f"[b]{response.name}[/b]"
f"[p]{response}[/p]")
elif isinstance(response, rh.ResponseSuccess):
return response.data
else:
raise rc.ProgramError(f"Other Herald Link returned unknown response:\n"
f"[p]{response}[/p]")
return GenericInterface
def register_commands(self, commands: List[Type[Command]], pack_cfg: Dict[str, Any]) -> None:
def register_commands(self, commands: List[Type[rc.Command]], pack_cfg: Dict[str, Any]) -> None:
"""Initialize and register all commands passed as argument."""
# Instantiate the Commands
for SelectedCommand in commands:
# Create a new interface
interface = self.Interface(config=pack_cfg)
# Try to instantiate the command
try:
command = SelectedCommand(interface)
command = SelectedCommand(serf=self, config=pack_cfg)
except Exception as e:
log.error(f"Skipping: "
f"{SelectedCommand.__qualname__} - {e.__class__.__qualname__} in the initialization.")
ru.sentry_exc(e)
continue
# Link the interface to the command
interface.command = command
# Warn if the command would be overriding something
if f"{self.Interface.prefix}{SelectedCommand.name}" in self.commands:
if SelectedCommand.name in self.commands:
log.info(f"Overriding (already defined): "
f"{SelectedCommand.__qualname__} -> {self.Interface.prefix}{SelectedCommand.name}")
f"{SelectedCommand.__qualname__} -> {SelectedCommand.name}")
else:
log.debug(f"Registering: "
f"{SelectedCommand.__qualname__} -> {self.Interface.prefix}{SelectedCommand.name}")
f"{SelectedCommand.__qualname__} -> {SelectedCommand.name}")
# Register the command in the commands dict
self.commands[f"{interface.prefix}{SelectedCommand.name}"] = command
self.commands[SelectedCommand.name] = command
# Register aliases, but don't override anything
for alias in SelectedCommand.aliases:
if f"{interface.prefix}{alias}" not in self.commands:
log.debug(f"Aliasing: {SelectedCommand.__qualname__} -> {interface.prefix}{alias}")
self.commands[f"{interface.prefix}{alias}"] = \
self.commands[f"{interface.prefix}{SelectedCommand.name}"]
if alias not in self.commands:
log.debug(f"Aliasing: {SelectedCommand.__qualname__} -> {alias}")
self.commands[alias] = self.commands[SelectedCommand.name]
else:
log.warning(
f"Ignoring (already defined): {SelectedCommand.__qualname__} -> {interface.prefix}{alias}")
log.warning(f"Ignoring (already defined): {SelectedCommand.__qualname__} -> {alias}")
def init_herald(self, herald_cfg: Dict[str, Any]):
"""Create a :class:`Link` and bind :class:`Event`."""
herald_cfg["name"] = self.interface_name
self.herald: rh.Link = rh.Link(rh.Config.from_config(**herald_cfg), self.network_handler)
def register_events(self, events: List[Type[Event]], pack_cfg: Dict[str, Any]):
def register_events(self, events: List[Type[rc.HeraldEvent]], pack_cfg: Dict[str, Any]):
for SelectedEvent in events:
# Create a new interface
interface = self.Interface(config=pack_cfg)
# Initialize the event
try:
event = SelectedEvent(interface)
event = SelectedEvent(serf=self, config=pack_cfg)
except Exception as e:
log.error(f"Skipping: "
f"{SelectedEvent.__qualname__} - {e.__class__.__qualname__} in the initialization.")
@ -250,7 +229,7 @@ class Serf(abc.ABC):
async def network_handler(self, message: Union[rh.Request, rh.Broadcast]) -> rh.Response:
try:
event: Event = self.events[message.handler]
event: rc.HeraldEvent = self.events[message.handler]
except KeyError:
log.warning(f"No event for '{message.handler}'")
return rh.ResponseFailure("no_event", f"This serf does not have any event for {message.handler}.")
@ -259,7 +238,7 @@ class Serf(abc.ABC):
try:
response_data = await event.run(**message.data)
return rh.ResponseSuccess(data=response_data)
except CommandError as e:
except rc.CommandError as e:
return rh.ResponseFailure("error_in_event",
f"The event '{message.handler}' raised a {e.__class__.__qualname__}.",
extra_info={
@ -278,25 +257,25 @@ class Serf(abc.ABC):
elif isinstance(message, rh.Broadcast):
await event.run(**message.data)
async def call(self, command: Command, data: CommandData, parameters: List[str]):
async def call(self, command: rc.Command, data: rc.CommandData, parameters: List[str]):
log.info(f"Calling command: {command.name}")
try:
# Run the command
await command.run(CommandArgs(parameters), data)
except InvalidInputError as e:
await command.run(rc.CommandArgs(parameters), data)
except rc.InvalidInputError as e:
await data.reply(f"⚠️ {e.message}\n"
f"Syntax: [c]{command.interface.prefix}{command.name} {command.syntax}[/c]")
except UserError as e:
f"Syntax: [c]{self.prefix}{command.name} {command.syntax}[/c]")
except rc.UserError as e:
await data.reply(f"⚠️ {e.message}")
except UnsupportedError as e:
except rc.UnsupportedError as e:
await data.reply(f"⚠️ {e.message}")
except ExternalError as e:
except rc.ExternalError as e:
await data.reply(f"⚠️ {e.message}")
except ConfigurationError as e:
except rc.ConfigurationError as e:
await data.reply(f"⚠️ {e.message}")
except ProgramError as e:
except rc.ProgramError as e:
await data.reply(f"⛔️ {e.message}")
except CommandError as e:
except rc.CommandError as e:
await data.reply(f"⚠️ {e.message}")
except Exception as e:
ru.sentry_exc(e)
@ -304,23 +283,23 @@ class Serf(abc.ABC):
finally:
await data.session_close()
async def press(self, key: KeyboardKey, data: CommandData):
async def press(self, key: rc.KeyboardKey, data: rc.CommandData):
log.info(f"Calling key_callback: {repr(key)}")
try:
await key.press(data)
except InvalidInputError as e:
except rc.InvalidInputError as e:
await data.reply(f"⚠️ {e.message}")
except UserError as e:
except rc.UserError as e:
await data.reply(f"⚠️ {e.message}")
except UnsupportedError as e:
except rc.UnsupportedError as e:
await data.reply(f"⚠️ {e.message}")
except ExternalError as e:
except rc.ExternalError as e:
await data.reply(f"⚠️ {e.message}")
except ConfigurationError as e:
except rc.ConfigurationError as e:
await data.reply(f"⚠️ {e.message}")
except ProgramError as e:
except rc.ProgramError as e:
await data.reply(f"⛔️ {e.message}")
except CommandError as e:
except rc.CommandError as e:
await data.reply(f"⚠️ {e.message}")
except Exception as e:
ru.sentry_exc(e)

View file

@ -1,11 +1,13 @@
from typing import *
import re
def escape(string: str) -> str:
def escape(string: Optional[str]) -> Optional[str]:
"""Escape a string to be sent through Telegram (as HTML), and format it using RoyalCode.
Warning:
Currently escapes everything, even items in code blocks."""
url_pattern = re.compile(r"\[url=(.*?)](.*?)\[/url]")
url_replacement = r'<a href="\1">\2</a>'

View file

@ -1,17 +1,19 @@
from typing import *
import contextlib
import logging
import asyncio as aio
import uuid
from typing import *
import royalnet.commands as rc
import royalnet.utils as ru
import royalnet.backpack.tables as rbt
from .escape import escape
from ..serf import Serf
import io
import telegram
import urllib3
from telegram.utils.request import Request as TRequest
from dataclasses import dataclass
import royalnet.commands as rc
import royalnet.utils as ru
import royalnet.backpack.tables as rbt
from .escape import escape
from ..serf import Serf
try:
from sqlalchemy.orm.session import Session
@ -21,9 +23,16 @@ except ImportError:
log = logging.getLogger(__name__)
@dataclass(frozen=True)
class TelegramKeyCallback:
command: rc.Command
key: rc.KeyboardKey
class TelegramSerf(Serf):
"""A Serf that connects to `Telegram <https://telegram.org/>`_ as a bot."""
interface_name = "telegram"
prefix = "/"
_identity_table = rbt.Telegram
_identity_column = "tg_id"
@ -54,7 +63,7 @@ class TelegramSerf(Serf):
self.update_offset: int = -100
"""The current `update offset <https://core.telegram.org/bots/api#getupdates>`_."""
self.key_callbacks: Dict[str, rc.KeyboardKey] = {}
self.key_callbacks: Dict[str, TelegramKeyCallback] = {}
self.MessageData: Type[rc.CommandData] = self.message_data_factory()
self.CallbackData: Type[rc.CommandData] = self.callback_data_factory()
@ -90,25 +99,13 @@ class TelegramSerf(Serf):
break
return None
def interface_factory(self) -> Type[rc.CommandInterface]:
# noinspection PyPep8Naming
GenericInterface = super().interface_factory()
# noinspection PyMethodParameters
class TelegramInterface(GenericInterface):
name = self.interface_name
prefix = "/"
return TelegramInterface
def message_data_factory(self) -> Type[rc.CommandData]:
# noinspection PyMethodParameters
class TelegramMessageData(rc.CommandData):
def __init__(data,
interface: rc.CommandInterface,
loop: aio.AbstractEventLoop,
command: rc.Command,
message: telegram.Message):
super().__init__(interface=interface, loop=loop)
super().__init__(command=command)
data.message: telegram.Message = message
async def reply(data, text: str):
@ -117,7 +114,7 @@ class TelegramSerf(Serf):
parse_mode="HTML",
disable_web_page_preview=True)
async def reply_image(data, image: io.IOBase, caption: Optional[str] = None) -> None:
async def reply_image(data, image: "BinaryIO", caption: Optional[str] = None) -> None:
await self.api_call(data.message.chat.send_photo,
photo=image,
caption=escape(caption) if caption is not None else None,
@ -149,7 +146,7 @@ class TelegramSerf(Serf):
for key in keys:
uid: str = str(uuid.uuid4())
key_uids.append(uid)
self.register_keyboard_key(uid, key)
data.register_keyboard_key(uid, key)
tg_button: telegram.InlineKeyboardButton = telegram.InlineKeyboardButton(key.text,
callback_data=uid)
tg_row: List[telegram.InlineKeyboardButton] = [tg_button]
@ -163,7 +160,13 @@ class TelegramSerf(Serf):
yield message
await self.api_call(message.edit_reply_markup, reply_markup=None)
for uid in key_uids:
self.unregister_keyboard_key(uid)
data.unregister_keyboard_key(uid)
def register_keyboard_key(data, identifier: str, key: rc.KeyboardKey):
self.key_callbacks[identifier] = TelegramKeyCallback(key=key, command=data.command)
def unregister_keyboard_key(data, identifier: str):
del self.key_callbacks[identifier]
return TelegramMessageData
@ -171,10 +174,9 @@ class TelegramSerf(Serf):
# noinspection PyMethodParameters
class TelegramKeyboardData(rc.CommandData):
def __init__(data,
interface: rc.CommandInterface,
loop: aio.AbstractEventLoop,
command: rc.Command,
cbq: telegram.CallbackQuery):
super().__init__(interface=interface, loop=loop)
super().__init__(command=command)
data.cbq: telegram.CallbackQuery = cbq
async def reply(data, text: str):
@ -250,7 +252,8 @@ class TelegramSerf(Serf):
# Send a typing notification
await self.api_call(message.chat.send_action, telegram.ChatAction.TYPING)
# Prepare data
data = self.MessageData(interface=command.interface, loop=self.loop, message=message)
# noinspection PyArgumentList
data = self.MessageData(command=command, message=message)
# Call the command
await self.call(command, data, parameters)
@ -259,15 +262,10 @@ class TelegramSerf(Serf):
if uid not in self.key_callbacks:
await self.api_call(cbq.answer, text="⚠️ This keyboard has expired.", show_alert=True)
return
key: rc.KeyboardKey = self.key_callbacks[uid]
data: rc.CommandData = self.CallbackData(interface=key.interface, loop=self.loop, cbq=cbq)
await self.press(key, data)
def register_keyboard_key(self, identifier: str, key: rc.KeyboardKey):
self.key_callbacks[identifier] = key
def unregister_keyboard_key(self, identifier: str):
del self.key_callbacks[identifier]
cbd = self.key_callbacks[uid]
# noinspection PyArgumentList
data: rc.CommandData = self.CallbackData(command=cbd.command, cbq=cbq)
await self.press(cbd.key, data)
async def run(self):
await super().run()

View file