1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

This might actually work

This commit is contained in:
Steffo 2019-08-18 02:53:59 +03:00
parent d2c1f87798
commit d65f677e21
8 changed files with 152 additions and 185 deletions

View file

@ -2,11 +2,12 @@ import discord
import typing
import logging as _logging
from .generic import GenericBot
from ..utils import asyncify, Call, Command, discord_escape
from ..error import UnregisteredError, NoneFoundError, TooManyFoundError, InvalidConfigError, RoyalnetResponseError
from ..network import RoyalnetConfig, Request, ResponseSuccess, ResponseError
from ..database import DatabaseConfig
from ..audio import playmodes, YtdlDiscord
from ..utils import *
from ..error import *
from ..network import *
from ..database import *
from ..audio import *
from ..commands import *
log = _logging.getLogger(__name__)
@ -24,47 +25,35 @@ class DiscordConfig:
class DiscordBot(GenericBot):
"""A bot that connects to `Discord <https://discordapp.com/>`_."""
interface_name = "discord"
def _init_voice(self):
"""Initialize the variables needed for the connection to voice chat."""
log.debug(f"Creating music_data dict")
self.music_data: typing.Dict[discord.Guild, playmodes.PlayMode] = {}
def _interface_factory(self) -> typing.Type[Call]:
log.debug(f"Creating DiscordCall")
def _interface_factory(self) -> typing.Type[CommandInterface]:
# noinspection PyPep8Naming
GenericInterface = super()._interface_factory()
# noinspection PyMethodParameters
class DiscordCall(Call):
interface_name = self.interface_name
interface_obj = self
interface_prefix = "!"
# noinspection PyMethodParameters,PyAbstractClass
class DiscordInterface(GenericInterface):
name = "discord"
prefix = "!"
alchemy = self.alchemy
return DiscordInterface
async def reply(call, text: str):
# TODO: don't escape characters inside [c][/c] blocks
await call.channel.send(discord_escape(text))
def _data_factory(self) -> typing.Type[CommandData]:
# noinspection PyMethodParameters,PyAbstractClass
class DiscordData(CommandData):
def __init__(data, interface: CommandInterface, message: discord.Message):
data._interface = interface
data.message = message
async def net_request(call, request: Request, destination: str) -> dict:
if self.network is None:
raise InvalidConfigError("Royalnet is not enabled on this bot")
response_dict: dict = await self.network.request(request.to_dict(), destination)
if "type" not in response_dict:
raise RoyalnetResponseError("Response is missing a type")
elif response_dict["type"] == "ResponseSuccess":
response: typing.Union[ResponseSuccess, ResponseError] = ResponseSuccess.from_dict(response_dict)
elif response_dict["type"] == "ResponseError":
response = ResponseError.from_dict(response_dict)
else:
raise RoyalnetResponseError("Response type is unknown")
response.raise_on_error()
return response.data
async def reply(data, text: str):
await data.message.channel.send(discord_escape(text))
async def get_author(call, error_if_none=False):
message: discord.Message = call.kwargs["message"]
user: discord.Member = message.author
query = call.session.query(self.master_table)
async def get_author(data, error_if_none=False):
user: discord.Member = data.message.author
query = data._interface.session.query(self.master_table)
for link in self.identity_chain:
query = query.join(link.mapper.class_)
query = query.filter(self.identity_column == user.id)
@ -73,7 +62,7 @@ class DiscordBot(GenericBot):
raise UnregisteredError("Author is not registered")
return result
return DiscordCall
return DiscordData
def _bot_factory(self) -> typing.Type[discord.Client]:
"""Create a custom DiscordClient class inheriting from :py:class:`discord.Client`."""
@ -107,20 +96,25 @@ class DiscordBot(GenericBot):
if not text:
return
# Skip non-command updates
if not text.startswith(self.command_prefix):
if not text.startswith("!"):
return
# Skip bot messages
author: typing.Union[discord.User] = message.author
if author.bot:
return
# Start typing
with message.channel.typing():
# Find and clean parameters
command_text, *parameters = text.split(" ")
# Don't use a case-sensitive command name
command_name = command_text.lower()
# Find the command
try:
command = self.commands[command_name]
except KeyError:
# Skip the message
return
# Call the command
await self.call(command_name, message.channel, parameters, message=message)
with message.channel.typing():
await command.run(CommandArgs(parameters), self._Data(interface=command.interface, message=message))
async def on_ready(cli):
log.debug("Connection successful, client is ready")
@ -146,12 +140,14 @@ class DiscordBot(GenericBot):
def find_channel_by_name(cli,
name: str,
guild: typing.Optional[discord.Guild] = None) -> discord.abc.GuildChannel:
"""Find the :py:class:`TextChannel`, :py:class:`VoiceChannel` or :py:class:`CategoryChannel` with the specified name.
"""Find the :py:class:`TextChannel`, :py:class:`VoiceChannel` or :py:class:`CategoryChannel` with the
specified name.
Case-insensitive.
Guild is optional, but the method will raise a :py:exc:`TooManyFoundError` if none is specified and there is more than one channel with the same name.
Will also raise a :py:exc:`NoneFoundError` if no channels are found."""
Guild is optional, but the method will raise a :py:exc:`TooManyFoundError` if none is specified and
there is more than one channel with the same name. Will also raise a :py:exc:`NoneFoundError` if no
channels are found. """
if guild is not None:
all_channels = guild.channels
else:
@ -192,16 +188,10 @@ class DiscordBot(GenericBot):
discord_config: DiscordConfig,
royalnet_config: typing.Optional[RoyalnetConfig] = None,
database_config: typing.Optional[DatabaseConfig] = None,
command_prefix: str = "!",
commands: typing.List[typing.Type[Command]] = None,
missing_command: typing.Type[Command] = NullCommand,
error_command: typing.Type[Command] = NullCommand):
commands: typing.List[typing.Type[Command]] = None):
super().__init__(royalnet_config=royalnet_config,
database_config=database_config,
command_prefix=command_prefix,
commands=commands,
missing_command=missing_command,
error_command=error_command)
commands=commands)
self._discord_config = discord_config
self._init_client()
self._init_voice()

View file

@ -2,10 +2,10 @@ import sys
import typing
import asyncio
import logging
from ..utils import NetworkHandler
from ..network import RoyalnetLink, Request, Response, ResponseSuccess, ResponseError, RoyalnetConfig
from ..database import Alchemy, DatabaseConfig, relationshiplinkchain
from ..commands import Command, CommandInterface
from ..utils import *
from ..network import *
from ..database import *
from ..commands import *
from ..error import *
@ -13,32 +13,24 @@ log = logging.getLogger(__name__)
class GenericBot:
"""A generic bot class, to be used as base for the other more specific classes, such as :ref:`royalnet.bots.TelegramBot` and :ref:`royalnet.bots.DiscordBot`."""
"""A generic bot class, to be used as base for the other more specific classes, such as
:ref:`royalnet.bots.TelegramBot` and :ref:`royalnet.bots.DiscordBot`. """
interface_name = NotImplemented
def _init_commands(self,
command_prefix: str,
commands: typing.List[typing.Type[Command]],
missing_command: typing.Type[Command],
error_command: typing.Type[Command]) -> None:
"""Generate the ``commands`` dictionary required to handle incoming messages, and the ``network_handlers`` dictionary required to handle incoming requests."""
log.debug(f"Now generating commands")
self.command_prefix = command_prefix
self.commands: typing.Dict[str, typing.Type[Command]] = {}
def _init_commands(self, commands: typing.List[typing.Type[Command]]) -> None:
"""Generate the ``commands`` dictionary required to handle incoming messages, and the ``network_handlers``
dictionary required to handle incoming requests. """
log.debug(f"Now binding commands")
self._Interface = self._interface_factory()
self._Data = self._data_factory()
self.commands = {}
for SelectedCommand in self.commands:
interface = self._Interface()
self.commands[f"{interface.prefix}{SelectedCommand.name}"] = SelectedCommand(interface)
self.network_handlers: typing.Dict[str, typing.Type[NetworkHandler]] = {}
for command in commands:
lower_command_name = command.command_name.lower()
self.commands[f"{command_prefix}{lower_command_name}"] = command
self.missing_command: typing.Type[Command] = missing_command
self.error_command: typing.Type[Command] = error_command
log.debug(f"Successfully generated commands")
log.debug(f"Successfully bound commands")
def _interface_factory(self) -> typing.Type[CommandInterface]:
"""Create a :py:class:`royalnet.commands.CommandInterface` type and return it.
Returns:
The created :py:class:`royalnet.commands.CommandInterface` type."""
# noinspection PyAbstractClass,PyMethodParameters
class GenericInterface(CommandInterface):
alchemy = self.alchemy
@ -47,6 +39,9 @@ class GenericBot:
def register_net_handler(ci, message_type: str, network_handler: typing.Callable):
self.network_handlers[message_type] = network_handler
def unregister_net_handler(ci, message_type: str):
del self.network_handlers[message_type]
async def net_request(ci, request: Request, destination: str) -> dict:
if self.network is None:
raise InvalidConfigError("Royalnet is not enabled on this bot")
@ -64,10 +59,13 @@ class GenericBot:
return GenericInterface
def _data_factory(self) -> typing.Type[CommandData]:
raise NotImplementedError()
def _init_royalnet(self, royalnet_config: RoyalnetConfig):
"""Create a :py:class:`royalnet.network.RoyalnetLink`, and run it as a :py:class:`asyncio.Task`."""
self.network: RoyalnetLink = RoyalnetLink(royalnet_config.master_uri, royalnet_config.master_secret, self.interface_name,
self._network_handler)
self.network: RoyalnetLink = RoyalnetLink(royalnet_config.master_uri, royalnet_config.master_secret,
self.interface_name, self._network_handler)
log.debug(f"Running RoyalnetLink {self.network}")
self.loop.create_task(self.network.run())
@ -98,14 +96,16 @@ class GenericBot:
_, exc, _ = sys.exc_info()
log.debug(f"Exception {exc} in {network_handler}")
return ResponseError("exception_in_handler",
f"An exception was raised in {network_handler} for {request.handler}. Check extra_info for details.",
f"An exception was raised in {network_handler} for {request.handler}. Check "
f"extra_info for details.",
extra_info={
"type": exc.__class__.__name__,
"str": str(exc)
}).to_dict()
def _init_database(self, commands: typing.List[typing.Type[Command]], database_config: DatabaseConfig):
"""Create an :py:class:`royalnet.database.Alchemy` with the tables required by the commands. Then, find the chain that links the ``master_table`` to the ``identity_table``."""
"""Create an :py:class:`royalnet.database.Alchemy` with the tables required by the commands. Then,
find the chain that links the ``master_table`` to the ``identity_table``. """
log.debug(f"Initializing database")
required_tables = set()
for command in commands:
@ -122,10 +122,7 @@ class GenericBot:
def __init__(self, *,
royalnet_config: typing.Optional[RoyalnetConfig] = None,
database_config: typing.Optional[DatabaseConfig] = None,
command_prefix: str,
commands: typing.List[typing.Type[Command]] = None,
missing_command: typing.Type[Command] = NullCommand,
error_command: typing.Type[Command] = NullCommand,
loop: asyncio.AbstractEventLoop = None):
if loop is None:
self.loop = asyncio.get_event_loop()
@ -140,35 +137,12 @@ class GenericBot:
self._init_database(commands=commands, database_config=database_config)
if commands is None:
commands = []
self._init_commands(command_prefix, commands, missing_command=missing_command, error_command=error_command)
self._Call = self._interface_factory()
self._init_commands(commands)
if royalnet_config is None:
self.network = None
else:
self._init_royalnet(royalnet_config=royalnet_config)
async def call(self, command_name: str, channel, parameters: typing.List[str] = None, **kwargs):
"""Call the command with the specified name.
If it doesn't exist, call ``self.missing_command``.
If an exception is raised during the execution of the command, call ``self.error_command``."""
log.debug(f"Trying to call {command_name}")
if parameters is None:
parameters = []
try:
command: typing.Type[Command] = self.commands[command_name]
except KeyError:
log.debug(f"Calling missing_command because {command_name} does not exist")
command = self.missing_command
try:
await self._Call(channel, command, parameters, **kwargs).run()
except Exception as exc:
log.debug(f"Calling error_command because of an error in {command_name}")
await self._Call(channel, self.error_command,
exception=exc,
previous_command=command, **kwargs).run()
async def run(self):
"""A blocking coroutine that should make the bot start listening to commands and requests."""
raise NotImplementedError()

View file

@ -3,11 +3,11 @@ import telegram.utils.request
import typing
import logging as _logging
from .generic import GenericBot
from ..utils import asyncify, telegram_escape
from ..error import UnregisteredError, InvalidConfigError, RoyalnetResponseError
from ..network import RoyalnetConfig, Request, ResponseSuccess, ResponseError
from ..database import DatabaseConfig
from ..commands import CommandInterface
from ..utils import *
from ..error import *
from ..network import *
from ..database import *
from ..commands import *
log = _logging.getLogger(__name__)
@ -30,28 +30,35 @@ class TelegramBot(GenericBot):
self._offset: int = -100
def _interface_factory(self) -> typing.Type[CommandInterface]:
# noinspection PyPep8Naming
GenericInterface = super()._interface_factory()
# noinspection PyMethodParameters
# noinspection PyMethodParameters,PyAbstractClass
class TelegramInterface(GenericInterface):
name = "telegram"
prefix = "/"
alchemy = self.alchemy
return TelegramInterface
async def reply(ci, extra: dict, text: str):
await asyncify(ci.channel.send_message, telegram_escape(text),
def _data_factory(self) -> typing.Type[CommandData]:
# noinspection PyMethodParameters,PyAbstractClass
class TelegramData(CommandData):
def __init__(data, interface: CommandInterface, update: telegram.Update):
data._interface = interface
data.update = update
async def reply(data, text: str):
await asyncify(data.update.effective_chat.send_message, telegram_escape(text),
parse_mode="HTML",
disable_web_page_preview=True)
async def get_author(ci, extra: dict, error_if_none=False):
update: telegram.Update = extra["update"]
user: telegram.User = update.effective_user
async def get_author(data, error_if_none=False):
user: telegram.User = data.update.effective_user
if user is None:
if error_if_none:
raise UnregisteredError("No author for this message")
return None
query = ci.session.query(self.master_table)
query = data._interface.session.query(self.master_table)
for link in self.identity_chain:
query = query.join(link.mapper.class_)
query = query.filter(self.identity_column == user.id)
@ -60,22 +67,16 @@ class TelegramBot(GenericBot):
raise UnregisteredError("Author is not registered")
return result
return TelegramCall
return TelegramData
def __init__(self, *,
telegram_config: TelegramConfig,
royalnet_config: typing.Optional[RoyalnetConfig] = None,
database_config: typing.Optional[DatabaseConfig] = None,
command_prefix: str = "/",
commands: typing.List[typing.Type[Command]] = None,
missing_command: typing.Type[Command] = NullCommand,
error_command: typing.Type[Command] = NullCommand):
commands: typing.List[typing.Type[Command]] = None):
super().__init__(royalnet_config=royalnet_config,
database_config=database_config,
command_prefix=command_prefix,
commands=commands,
missing_command=missing_command,
error_command=error_command)
commands=commands)
self._telegram_config = telegram_config
self._init_client()
@ -92,15 +93,21 @@ class TelegramBot(GenericBot):
if text is None:
return
# Skip non-command updates
if not text.startswith(self.command_prefix):
if not text.startswith("/"):
return
# Find and clean parameters
command_text, *parameters = text.split(" ")
command_name = command_text.replace(f"@{self.client.username}", "").lower()
# Send a typing notification
self.client.send_chat_action(update.message.chat, telegram.ChatAction.TYPING)
# Call the command
await self.call(command_name, update.message.chat, parameters, update=update)
# Find the command
try:
command = self.commands[command_name]
except KeyError:
# Skip the message
return
# Run the command
await command.run(CommandArgs(parameters), self._Data(interface=command.interface, update=update))
async def run(self):
while True:
@ -119,11 +126,3 @@ class TelegramBot(GenericBot):
except IndexError:
pass
@property
def botfather_command_string(self) -> str:
"""Generate a string to be pasted in the "Edit Commands" BotFather prompt."""
string = ""
for command_key in self.commands:
command = self.commands[command_key]
string += f"{command.command_name} - {command.command_description}\n"
return string

View file

@ -1,4 +1,5 @@
from .commandinterface import CommandInterface
from .command import Command
from .commanddata import CommandData
__all__ = ["CommandInterface", "Command"]
__all__ = ["CommandInterface", "Command", "CommandData"]

View file

@ -2,6 +2,7 @@ import typing
from ..error import UnsupportedError
from .commandinterface import CommandInterface
from .commandargs import CommandArgs
from .commanddata import CommandData
class Command:
@ -22,5 +23,5 @@ class Command:
def __init__(self, interface: CommandInterface):
self.interface = interface
async def run(self, args: CommandArgs, **extra) -> None:
async def run(self, args: CommandArgs, data: CommandData) -> None:
raise UnsupportedError(f"Command {self.name} can't be called on {self.interface.name}.")

View file

@ -0,0 +1,18 @@
class CommandData:
async def reply(self, text: str) -> None:
"""Send a text message to the channel where the call was made.
Parameters:
text: The text to be sent, possibly formatted in the weird undescribed markup that I'm using."""
raise NotImplementedError()
async def get_author(self, error_if_none: bool = False):
"""Try to find the identifier of the user that sent the message.
That probably means, the database row identifying the user.
Parameters:
error_if_none: Raise a :py:exc:`royalnet.error.UnregisteredError` if this is True and the call has no author.
Raises:
:py:exc:`royalnet.error.UnregisteredError` if ``error_if_none`` is set to True and no author is found."""
raise NotImplementedError()

View file

@ -10,38 +10,22 @@ class CommandInterface:
alchemy: "Alchemy" = NotImplemented
bot: "GenericBot" = NotImplemented
def __init__(self, alias: str):
def __init__(self):
self.session = self.alchemy.Session()
def register_net_handler(self, message_type: str, network_handler: typing.Callable):
"""Register a new handler for messages received through Royalnet."""
raise NotImplementedError()
async def reply(self, extra: dict, text: str) -> None:
"""Send a text message to the channel where the call was made.
Parameters:
extra: The ``extra`` dict passed to the Command
text: The text to be sent, possibly formatted in the weird undescribed markup that I'm using."""
def unregister_net_handler(self, message_type: str):
"""Remove a Royalnet handler."""
raise NotImplementedError()
async def net_request(self, extra: dict, message, destination: str) -> dict:
"""Send data through a :py:class:`royalnet.network.RoyalnetLink` and wait for a :py:class:`royalnet.network.Reply`.
async def net_request(self, message, destination: str) -> dict:
"""Send data through a :py:class:`royalnet.network.RoyalnetLink` and wait for a
:py:class:`royalnet.network.Reply`.
Parameters:
extra: The ``extra`` dict passed to the Command
message: The data to be sent. Must be :py:mod:`pickle`-able.
destination: The destination of the request, either in UUID format or node name."""
raise NotImplementedError()
async def get_author(self, extra: dict, error_if_none: bool = False):
"""Try to find the identifier of the user that sent the message.
That probably means, the database row identifying the user.
Parameters:
extra: The ``extra`` dict passed to the Command
error_if_none: Raise a :py:exc:`royalnet.error.UnregisteredError` if this is True and the call has no author.
Raises:
:py:exc:`royalnet.error.UnregisteredError` if ``error_if_none`` is set to True and no author is found."""
raise NotImplementedError()

View file

@ -1,21 +1,21 @@
import asyncio
from ..utils import Command, Call
from ..error import InvalidInputError
import typing
from ..command import Command
from ..commandinterface import CommandInterface
from ..commandargs import CommandArgs
from ..commanddata import CommandData
class PingCommand(Command):
name: str = "ping"
command_name = "ping"
command_description = "Ping pong dopo un po' di tempo!"
command_syntax = "[time_to_wait]"
description: str = "Replies with a Pong!"
@classmethod
async def common(cls, call: Call):
try:
time = int(call.args[0])
except InvalidInputError:
time = 0
except ValueError:
raise InvalidInputError("time_to_wait is not a number")
await asyncio.sleep(time)
await call.reply("🏓 Pong!")
syntax: str = ""
require_alchemy_tables: typing.Set = set()
def __init__(self, interface: CommandInterface):
super().__init__(interface)
async def run(self, args: CommandArgs, data: CommandData) -> None:
await data.reply("Pong!")