1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-27 13:34:28 +00:00

#53: Use JSON for Royalnet communication (#56)

#53: Use JSON for Royalnet communication
This commit is contained in:
Steffo 2019-05-22 16:59:57 +02:00 committed by GitHub
commit 990011dadf
22 changed files with 455 additions and 306 deletions

View file

@ -3857,7 +3857,7 @@ jQuery.Deferred.exceptionHook = function( error, stack ) {
// Support: IE 8 - 9 only
// Console exists when dev tools are open, which can happen at any time
if ( window.console && window.console.warn && error && rerrorNames.test( error.name ) ) {
window.console.warn( "jQuery.Deferred exception: " + error.message, error.stack, stack );
window.console.warn( "jQuery.Deferred exception: " + error.data, error.stack, stack );
}
};

File diff suppressed because one or more lines are too long

View file

@ -1,5 +0,0 @@
from . import audio, bots, commands, database, network, utils, error
version = "5.0a7"
__all__ = ["audio", "bots", "commands", "database", "network", "utils", "error"]

View file

@ -5,8 +5,8 @@ import logging as _logging
from .generic import GenericBot
from ..commands import NullCommand
from ..utils import asyncify, Call, Command
from ..error import UnregisteredError, NoneFoundError, TooManyFoundError, InvalidConfigError
from ..network import Message, Reply, RoyalnetConfig
from ..error import UnregisteredError, NoneFoundError, TooManyFoundError, InvalidConfigError, RoyalnetResponseError
from ..network import RoyalnetConfig, Request, Response, ResponseSuccess, ResponseError
from ..database import DatabaseConfig
from ..audio import PlayMode, Playlist, RoyalPCMAudio
@ -62,12 +62,20 @@ class DiscordBot(GenericBot):
.replace("[/p]", "```")
await call.channel.send(escaped_text)
async def net_request(call, message: Message, destination: str):
async def net_request(call, request: Request, destination: str) -> dict:
if self.network is None:
raise InvalidConfigError("Royalnet is not enabled on this bot")
response: Reply = await self.network.request(message, destination)
response_dict: dict = await self.network.request(request.to_dict(), destination)
if "type" not in response_dict:
raise RoyalnetResponseError("Response is missing a type")
elif response_dict["type"] == "ResponseSuccess":
response: typing.Union[ResponseSuccess, ResponseError] = ResponseSuccess.from_dict(response_dict)
elif response_dict["type"] == "ResponseError":
response = ResponseError.from_dict(response_dict)
else:
raise RoyalnetResponseError("Response type is unknown")
response.raise_on_error()
return response
return response.data
async def get_author(call, error_if_none=False):
message: discord.Message = call.kwargs["message"]

View file

@ -4,7 +4,7 @@ import asyncio
import logging
from ..utils import Command, NetworkHandler, Call
from ..commands import NullCommand
from ..network import RoyalnetLink, Message, RoyalnetConfig
from ..network import RoyalnetLink, Request, Response, ResponseSuccess, ResponseError, RoyalnetConfig
from ..database import Alchemy, DatabaseConfig, relationshiplinkchain
@ -24,7 +24,7 @@ class GenericBot:
"""Generate the ``commands`` dictionary required to handle incoming messages, and the ``network_handlers`` dictionary required to handle incoming requests."""
log.debug(f"Now generating commands")
self.commands: typing.Dict[str, typing.Type[Command]] = {}
self.network_handlers: typing.Dict[typing.Type[Message], typing.Type[NetworkHandler]] = {}
self.network_handlers: typing.Dict[str, typing.Type[NetworkHandler]] = {}
for command in commands:
lower_command_name = command.command_name.lower()
self.commands[f"{command_prefix}{lower_command_name}"] = command
@ -47,25 +47,38 @@ class GenericBot:
log.debug(f"Running RoyalnetLink {self.network}")
loop.create_task(self.network.run())
async def _network_handler(self, message: Message) -> Message:
"""Handle a single :py:class:`royalnet.network.Message` received from the :py:class:`royalnet.network.RoyalnetLink`.
async def _network_handler(self, request_dict: dict) -> dict:
"""Handle a single :py:class:`dict` received from the :py:class:`royalnet.network.RoyalnetLink`.
Returns:
Another message, to be sent as :py:class:`royalnet.network.Reply`."""
log.debug(f"Received {message} from the RoyalnetLink")
Another :py:class:`dict`, formatted as a :py:class:`royalnet.network.Response`."""
# Convert the dict to a Request
try:
network_handler = self.network_handlers[message.__class__]
request: Request = Request.from_dict(request_dict)
except TypeError:
log.warning(f"Invalid request received: {request_dict}")
return ResponseError("invalid_request",
f"The Request that you sent was invalid. Check extra_info to see what you sent.",
extra_info={"you_sent": request_dict}).to_dict()
log.debug(f"Received {request} from the RoyalnetLink")
try:
network_handler = self.network_handlers[request.handler]
except KeyError:
_, exc, tb = sys.exc_info()
log.debug(f"Missing network_handler for {message}")
raise Exception(f"Missing network_handler for {message}")
log.warning(f"Missing network_handler for {request.handler}")
return ResponseError("no_handler", f"This Link is missing a network handler for {request.handler}.").to_dict()
try:
log.debug(f"Using {network_handler} as handler for {message}")
return await getattr(network_handler, self.interface_name)(self, message)
log.debug(f"Using {network_handler} as handler for {request.handler}")
response: Response = await getattr(network_handler, self.interface_name)(self, request.data)
return response.to_dict()
except Exception:
_, exc, _ = sys.exc_info()
log.debug(f"Exception {exc} in {network_handler}")
raise
return ResponseError("exception_in_handler",
f"An exception was raised in {network_handler} for {request.handler}. Check extra_info for details.",
extra_info={
"type": exc.__class__.__name__,
"str": str(exc)
}).to_dict()
def _init_database(self, commands: typing.List[typing.Type[Command]], database_config: DatabaseConfig):
"""Create an :py:class:`royalnet.database.Alchemy` with the tables required by the commands. Then, find the chain that links the ``master_table`` to the ``identity_table``."""

View file

@ -1,13 +1,13 @@
import telegram
from telegram.utils.request import Request
import telegram.utils.request
import asyncio
import typing
import logging as _logging
from .generic import GenericBot
from ..commands import NullCommand
from ..utils import asyncify, Call, Command
from ..error import UnregisteredError, InvalidConfigError
from ..network import Message, RoyalnetConfig, Reply
from ..error import UnregisteredError, InvalidConfigError, RoyalnetResponseError
from ..network import RoyalnetConfig, Request, Response, ResponseSuccess, ResponseError
from ..database import DatabaseConfig
loop = asyncio.get_event_loop()
@ -27,7 +27,7 @@ class TelegramBot(GenericBot):
def _init_client(self):
"""Create the :py:class:`telegram.Bot`, and set the starting offset."""
# https://github.com/python-telegram-bot/python-telegram-bot/issues/341
request = Request(5)
request = telegram.utils.request.Request(5)
self.client = telegram.Bot(self._telegram_config.token, request=request)
self._offset: int = -100
@ -55,12 +55,20 @@ class TelegramBot(GenericBot):
.replace("[/p]", "</pre>")
await asyncify(call.channel.send_message, escaped_text, parse_mode="HTML")
async def net_request(call, message: Message, destination: str):
async def net_request(call, request: Request, destination: str) -> dict:
if self.network is None:
raise InvalidConfigError("Royalnet is not enabled on this bot")
response: Reply = await self.network.request(message, destination)
response_dict: dict = await self.network.request(request.to_dict(), destination)
if "type" not in response_dict:
raise RoyalnetResponseError("Response is missing a type")
elif response_dict["type"] == "ResponseSuccess":
response: typing.Union[ResponseSuccess, ResponseError] = ResponseSuccess.from_dict(response_dict)
elif response_dict["type"] == "ResponseError":
response = ResponseError.from_dict(response_dict)
else:
raise RoyalnetResponseError("Response type is unknown")
response.raise_on_error()
return response
return response.data
async def get_author(call, error_if_none=False):
update: telegram.Update = call.kwargs["update"]

View file

@ -1,14 +1,6 @@
import logging as _logging
from ..utils import Command, Call
from ..error import NoneFoundError, \
TooManyFoundError, \
UnregisteredError, \
UnsupportedError, \
InvalidInputError, \
InvalidConfigError, \
RoyalnetError, \
ExternalError
from ..error import *
log = _logging.getLogger(__name__)
@ -41,11 +33,15 @@ class ErrorHandlerCommand(Command):
if isinstance(exception, InvalidConfigError):
await call.reply(f"⚠️ Il bot non è stato configurato correttamente, quindi questo comando non può essere eseguito.\n[p]{exception}[/p]")
return
if isinstance(exception, RoyalnetError):
await call.reply(f"⚠️ La richiesta a Royalnet ha restituito un errore: [p]{exception.exc}[/p]")
if isinstance(exception, RoyalnetRequestError):
await call.reply(f"⚠️ La richiesta a Royalnet ha restituito un errore: [p]{exception.error}[/p]")
return
if isinstance(exception, ExternalError):
await call.reply(f"⚠️ Una risorsa esterna necessaria per l'esecuzione del comando non ha funzionato correttamente, quindi il comando è stato annullato.\n[p]{exception}[/p]")
return
await call.reply(f"❌ Eccezione non gestita durante l'esecuzione del comando:\n[b]{exception.__class__.__name__}[/b]\n[p]{exception}[/p]")
if isinstance(exception, RoyalnetResponseError):
log.warning(f"Invalid response from Royalnet - {exception.__class__.__name__}: {exception}")
await call.reply(f"❌ La risposta ricevuta da Royalnet non è valida: [p]{exception}[/p]")
return
log.error(f"Unhandled exception - {exception.__class__.__name__}: {exception}")
await call.reply(f"❌ Eccezione non gestita durante l'esecuzione del comando:\n[b]{exception.__class__.__name__}[/b]\n[p]{exception}[/p]")

View file

@ -3,9 +3,9 @@ import asyncio
import youtube_dl
import ffmpeg
from ..utils import Command, Call, NetworkHandler, asyncify
from ..network import Message, RequestSuccessful
from ..network import Request, ResponseSuccess
from ..error import TooManyFoundError, NoneFoundError
from ..audio import RoyalPCMAudio, YtdlInfo
from ..audio import RoyalPCMAudio
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
@ -13,26 +13,15 @@ if typing.TYPE_CHECKING:
loop = asyncio.get_event_loop()
class PlayMessage(Message):
def __init__(self, url: str, guild_name: typing.Optional[str] = None):
self.url: str = url
self.guild_name: typing.Optional[str] = guild_name
class PlaySuccessful(RequestSuccessful):
def __init__(self, info_list: typing.List[YtdlInfo]):
self.info_list: typing.List[YtdlInfo] = info_list
class PlayNH(NetworkHandler):
message_type = PlayMessage
message_type = "music_play"
@classmethod
async def discord(cls, bot: "DiscordBot", message: PlayMessage):
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a play Royalnet request. That is, add audio to a PlayMode."""
# Find the matching guild
if message.guild_name:
guild = bot.client.find_guild(message.guild_name)
if data["guild_name"]:
guild = bot.client.find_guild(data["guild_name"])
else:
if len(bot.music_data) == 0:
raise NoneFoundError("No voice clients active")
@ -44,12 +33,12 @@ class PlayNH(NetworkHandler):
# TODO: change Exception
raise Exception("No music_data for this guild")
# Start downloading
if message.url.startswith("http://") or message.url.startswith("https://"):
audio_sources: typing.List[RoyalPCMAudio] = await asyncify(RoyalPCMAudio.create_from_url, message.url)
if data["url"].startswith("http://") or data["url"].startswith("https://"):
audio_sources: typing.List[RoyalPCMAudio] = await asyncify(RoyalPCMAudio.create_from_url, data["url"])
else:
audio_sources = await asyncify(RoyalPCMAudio.create_from_ytsearch, message.url)
audio_sources = await asyncify(RoyalPCMAudio.create_from_ytsearch, data["url"])
await bot.add_to_music_data(audio_sources, guild)
return PlaySuccessful(info_list=[source.rpf.info for source in audio_sources])
return ResponseSuccess({"title_list": [source.rpf.info.title for source in audio_sources]})
async def notify_on_timeout(call: Call, url: str, time: float, repeat: bool = False):
@ -70,11 +59,11 @@ class PlayCommand(Command):
@classmethod
async def common(cls, call: Call):
guild, url = call.args.match(r"(?:\[(.+)])?\s*(.+)")
download_task = loop.create_task(call.net_request(PlayMessage(url, guild), "discord"))
guild_name, url = call.args.match(r"(?:\[(.+)])?\s*(.+)")
download_task = loop.create_task(call.net_request(Request("music_play", {"url": url, "guild_name": guild_name}), "discord"))
notify_task = loop.create_task(notify_on_timeout(call, url, time=30, repeat=True))
try:
response: PlaySuccessful = await download_task
data: dict = await download_task
except Exception as exc:
# RoyalPCMFile errors
if isinstance(exc, FileExistsError):
@ -112,5 +101,5 @@ class PlayCommand(Command):
raise
finally:
notify_task.cancel()
for info in response.info_list:
await call.reply(f"⬇️ Download di [i]{info.title}[/i] completato.")
for title in data["title_list"]:
await call.reply(f"⬇️ Download di [i]{title}[/i] completato.")

View file

@ -1,7 +1,7 @@
import typing
import asyncio
from ..utils import Command, Call, NetworkHandler
from ..network import Message, RequestSuccessful
from ..network import Request, ResponseSuccess
from ..error import NoneFoundError, TooManyFoundError
from ..audio import Playlist, Pool
if typing.TYPE_CHECKING:
@ -11,21 +11,15 @@ if typing.TYPE_CHECKING:
loop = asyncio.get_event_loop()
class PlaymodeMessage(Message):
def __init__(self, mode_name: str, guild_name: typing.Optional[str] = None):
self.mode_name: str = mode_name
self.guild_name: typing.Optional[str] = guild_name
class PlaymodeNH(NetworkHandler):
message_type = PlaymodeMessage
message_type = "music_playmode"
@classmethod
async def discord(cls, bot: "DiscordBot", message: PlaymodeMessage):
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a playmode Royalnet request. That is, change current PlayMode."""
# Find the matching guild
if message.guild_name:
guild = bot.client.find_guild(message.guild_name)
if data["guild_name"]:
guild = bot.client.find_guild(data["guild_name"])
else:
if len(bot.music_data) == 0:
raise NoneFoundError("No voice clients active")
@ -36,13 +30,13 @@ class PlaymodeNH(NetworkHandler):
if bot.music_data[guild] is not None:
bot.music_data[guild].delete()
# Create the new PlayMode
if message.mode_name == "playlist":
if data["mode_name"] == "playlist":
bot.music_data[guild] = Playlist()
elif message.mode_name == "pool":
elif data["mode_name"] == "pool":
bot.music_data[guild] = Pool()
else:
raise ValueError("No such PlayMode")
return RequestSuccessful()
return ResponseSuccess()
class PlaymodeCommand(Command):
@ -54,6 +48,6 @@ class PlaymodeCommand(Command):
@classmethod
async def common(cls, call: Call):
guild, mode_name = call.args.match(r"(?:\[(.+)])?\s*(\S+)\s*")
await call.net_request(PlaymodeMessage(mode_name, guild), "discord")
await call.reply(f"Richiesto di passare alla modalità di riproduzione [c]{mode_name}[/c].")
guild_name, mode_name = call.args.match(r"(?:\[(.+)])?\s*(\S+)\s*")
await call.net_request(Request("music_playmode", {"mode_name": mode_name, "guild_name": guild_name}), "discord")
await call.reply(f"Modalità di riproduzione [c]{mode_name}[/c].")

View file

@ -1,25 +1,20 @@
import typing
import discord
from ..network import Message, RequestSuccessful
from ..network import Request, ResponseSuccess
from ..utils import Command, Call, NetworkHandler
from ..error import TooManyFoundError, NoneFoundError
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
class SkipMessage(Message):
def __init__(self, guild_name: typing.Optional[str] = None):
self.guild_name: typing.Optional[str] = guild_name
class SkipNH(NetworkHandler):
message_type = SkipMessage
message_type = "music_skip"
@classmethod
async def discord(cls, bot: "DiscordBot", message: SkipMessage):
async def discord(cls, bot: "DiscordBot", data: dict):
# Find the matching guild
if message.guild_name:
guild = bot.client.find_guild_by_name(message.guild_name)
if data["guild_name"]:
guild = bot.client.find_guild_by_name(data["guild_name"])
else:
if len(bot.music_data) == 0:
raise NoneFoundError("No voice clients active")
@ -32,7 +27,7 @@ class SkipNH(NetworkHandler):
raise NoneFoundError("Nothing to skip")
# noinspection PyProtectedMember
voice_client._player.stop()
return RequestSuccessful()
return ResponseSuccess()
class SkipCommand(Command):
@ -46,5 +41,5 @@ class SkipCommand(Command):
@classmethod
async def common(cls, call: Call):
guild, = call.args.match(r"(?:\[(.+)])?")
await call.net_request(SkipMessage(guild), "discord")
await call.net_request(Request("music_skip", {"guild_name": guild}), "discord")
await call.reply(f"✅ Richiesto lo skip della canzone attuale.")

View file

@ -2,7 +2,7 @@ import typing
import discord
import asyncio
from ..utils import Command, Call, NetworkHandler
from ..network import Message, RequestSuccessful, RequestError
from ..network import Request, ResponseSuccess
from ..error import NoneFoundError
if typing.TYPE_CHECKING:
from ..bots import DiscordBot
@ -11,24 +11,17 @@ if typing.TYPE_CHECKING:
loop = asyncio.get_event_loop()
class SummonMessage(Message):
def __init__(self, channel_identifier: typing.Union[int, str],
guild_identifier: typing.Optional[typing.Union[int, str]] = None):
self.channel_name = channel_identifier
self.guild_identifier = guild_identifier
class SummonNH(NetworkHandler):
message_type = SummonMessage
message_type = "music_summon"
@classmethod
async def discord(cls, bot: "DiscordBot", message: SummonMessage):
async def discord(cls, bot: "DiscordBot", data: dict):
"""Handle a summon Royalnet request. That is, join a voice channel, or move to a different one if that is not possible."""
channel = bot.client.find_channel_by_name(message.channel_name)
channel = bot.client.find_channel_by_name(data["channel_name"])
if not isinstance(channel, discord.VoiceChannel):
raise NoneFoundError("Channel is not a voice channel")
loop.create_task(bot.client.vc_connect_or_move(channel))
return RequestSuccessful()
return ResponseSuccess()
class SummonCommand(Command):
@ -42,8 +35,7 @@ class SummonCommand(Command):
@classmethod
async def common(cls, call: Call):
channel_name: str = call.args[0].lstrip("#")
response: typing.Union[RequestSuccessful, RequestError] = await call.net_request(SummonMessage(channel_name), "discord")
response.raise_on_error()
await call.net_request(Request("music_summon", {"channel_name": channel_name}), "discord")
await call.reply(f"✅ Mi sono connesso in [c]#{channel_name}[/c].")
@classmethod

View file

@ -1,3 +1,8 @@
import typing
if typing.TYPE_CHECKING:
from .network import ResponseError
class NoneFoundError(Exception):
"""The element that was being looked for was not found."""
@ -22,11 +27,16 @@ class InvalidConfigError(Exception):
"""The bot has not been configured correctly, therefore the command can not function."""
class RoyalnetError(Exception):
class RoyalnetRequestError(Exception):
"""An error was raised while handling the Royalnet request.
This exception contains the exception that was raised during the handling."""
def __init__(self, exc: Exception):
self.exc: Exception = exc
This exception contains the :py:class:`royalnet.network.ResponseError` that was returned by the other Link."""
def __init__(self, error: "ResponseError"):
self.error: "ResponseError" = error
class RoyalnetResponseError(Exception):
"""The :py:class:`royalnet.network.Response` that was received is invalid."""
class ExternalError(Exception):

View file

@ -1,23 +1,20 @@
"""Royalnet realated classes."""
from .messages import Message, ServerErrorMessage, InvalidSecretEM, InvalidDestinationEM, InvalidPackageEM, RequestSuccessful, RequestError, Reply
from .packages import Package
from .royalnetlink import RoyalnetLink, NetworkError, NotConnectedError, NotIdentifiedError
from .request import Request
from .response import Response, ResponseSuccess, ResponseError
from .package import Package
from .royalnetlink import RoyalnetLink, NetworkError, NotConnectedError, NotIdentifiedError, ConnectionClosedError
from .royalnetserver import RoyalnetServer
from .royalnetconfig import RoyalnetConfig
__all__ = ["Message",
"ServerErrorMessage",
"InvalidSecretEM",
"InvalidDestinationEM",
"InvalidPackageEM",
"RoyalnetLink",
__all__ = ["RoyalnetLink",
"NetworkError",
"NotConnectedError",
"NotIdentifiedError",
"Package",
"RoyalnetServer",
"RequestSuccessful",
"RequestError",
"RoyalnetConfig",
"Reply"]
"ConnectionClosedError",
"Request",
"Response",
"ResponseSuccess",
"ResponseError"]

View file

@ -1,78 +0,0 @@
import typing
import pickle
from ..error import RoyalnetError
class Message:
"""A message sent through the Royalnet."""
def __repr__(self):
return f"<{self.__class__.__name__}>"
class IdentifySuccessfulMessage(Message):
"""The Royalnet identification step was successful."""
class ServerErrorMessage(Message):
"""Something went wrong in the connection to the :py:class:`royalnet.network.RoyalnetServer`."""
def __init__(self, reason):
super().__init__()
self.reason = reason
class InvalidSecretEM(ServerErrorMessage):
"""The sent secret was incorrect.
This message terminates connection to the :py:class:`royalnet.network.RoyalnetServer`."""
class InvalidPackageEM(ServerErrorMessage):
"""The sent :py:class:`royalnet.network.Package` was invalid."""
class InvalidDestinationEM(InvalidPackageEM):
"""The :py:class:`royalnet.network.Package` destination was invalid or not found."""
class Reply(Message):
"""A reply to a request sent through the Royalnet."""
def raise_on_error(self) -> None:
"""If the reply is an error, raise an error, otherwise, do nothing.
Raises:
A :py:exc:`RoyalnetError`, if the Reply is an error, otherwise, nothing."""
raise NotImplementedError()
class RequestSuccessful(Reply):
"""The sent request was successful."""
def raise_on_error(self) -> None:
"""If the reply is an error, raise an error, otherwise, do nothing.
Does nothing."""
pass
class RequestError(Reply):
"""The sent request wasn't successful."""
def __init__(self, exc: typing.Optional[Exception] = None):
"""Create a RequestError.
Parameters:
exc: The exception that caused the error in the request."""
try:
pickle.dumps(exc)
except TypeError:
self.exc: Exception = Exception(repr(exc))
else:
self.exc = exc
def raise_on_error(self) -> None:
"""If the reply is an error, raise an error, otherwise, do nothing.
Raises:
Always raises a :py:exc:`royalnet.error.RoyalnetError`, containing the exception that caused the error."""
raise RoyalnetError(exc=self.exc)

111
royalnet/network/package.py Normal file
View file

@ -0,0 +1,111 @@
import json
import uuid
import typing
class Package:
"""A Royalnet package, the data type with which a :py:class:`royalnet.network.RoyalnetLink` communicates with a :py:class:`royalnet.network.RoyalnetServer` or another link.
Contains info about the source and the destination."""
def __init__(self,
data: dict,
*,
source: str,
destination: str,
source_conv_id: typing.Optional[str] = None,
destination_conv_id: typing.Optional[str] = None):
"""Create a Package.
Parameters:
data: The data that should be sent. Usually a :py:class:`royalnet.network.Message`.
source: The ``nid`` of the node that created this Package.
destination: The ``link_type`` of the destination node, or alternatively, the ``nid`` of the node. Can also be the ``NULL`` value to send the message to nobody.
source_conv_id: The conversation id of the node that created this package. Akin to the sequence number on IP packets.
destination_conv_id: The conversation id of the node that this Package is a reply to."""
# TODO: something is not right in these type hints. Check them.
self.data: dict = data
self.source: str = source
self.source_conv_id: str = source_conv_id or str(uuid.uuid4())
self.destination: str = destination
self.destination_conv_id: typing.Optional[str] = destination_conv_id
def __repr__(self):
return f"<Package {self.source} ({self.source_conv_id}) to {self.destination} ({self.destination_conv_id}>"
def __eq__(self, other):
if isinstance(other, Package):
return (self.data == other.data) and \
(self.source == other.source) and \
(self.destination == other.destination) and \
(self.source_conv_id == other.source_conv_id) and \
(self.destination_conv_id == other.destination_conv_id)
return False
def reply(self, data) -> "Package":
"""Reply to this Package with another Package.
Parameters:
data: The data that should be sent. Usually a :py:class:`royalnet.network.Message`.
Returns:
The reply Package."""
return Package(data,
source=self.destination,
destination=self.source,
source_conv_id=self.destination_conv_id or str(uuid.uuid4()),
destination_conv_id=self.source_conv_id)
@staticmethod
def from_dict(d) -> "Package":
"""Create a Package from a dictionary."""
if "source" not in d:
raise ValueError("Missing source field")
if "nid" not in d["source"]:
raise ValueError("Missing source.nid field")
if "conv_id" not in d["source"]:
raise ValueError("Missing source.conv_id field")
if "destination" not in d:
raise ValueError("Missing destination field")
if "nid" not in d["destination"]:
raise ValueError("Missing destination.nid field")
if "conv_id" not in d["destination"]:
raise ValueError("Missing destination.conv_id field")
if "data" not in d:
raise ValueError("Missing data field")
return Package(d["data"],
source=d["source"]["nid"],
destination=d["destination"]["nid"],
source_conv_id=d["source"]["conv_id"],
destination_conv_id=d["destination"]["conv_id"])
def to_dict(self) -> dict:
"""Convert the Package into a dictionary."""
return {
"source": {
"nid": self.source,
"conv_id": self.source_conv_id
},
"destination": {
"nid": self.destination,
"conv_id": self.destination_conv_id
},
"data": self.data
}
@staticmethod
def from_json_string(string: str) -> "Package":
"""Create a Package from a JSON string."""
return Package.from_dict(json.loads(string))
def to_json_string(self) -> str:
"""Convert the Package into a JSON string."""
return json.dumps(self.to_dict())
@staticmethod
def from_json_bytes(b: bytes) -> "Package":
"""Create a Package from UTF8-encoded JSON bytes."""
return Package.from_json_string(str(b, encoding="utf8"))
def to_json_bytes(self) -> bytes:
"""Convert the Package into UTF8-encoded JSON bytes."""
return bytes(self.to_json_string(), encoding="utf8")

View file

@ -1,44 +0,0 @@
import pickle
import uuid
class Package:
"""A Royalnet package, the data type with which a :py:class:`royalnet.network.RoyalnetLink` communicates with a :py:class:`royalnet.network.RoyalnetServer` or another link. """
def __init__(self, data, destination: str, source: str, *, source_conv_id: str = None, destination_conv_id: str = None):
"""Create a Package.
Parameters:
data: The data that should be sent. Usually a :py:class:`royalnet.network.Message`.
destination: The ``link_type`` of the destination node, or alternatively, the ``nid`` of the node. Can also be the ``NULL`` value to send the message to nobody.
source: The ``nid`` of the node that created this Package.
source_conv_id: The conversation id of the node that created this package. Akin to the sequence number on IP packets.
destination_conv_id: The conversation id of the node that this Package is a reply to."""
# TODO: something is not right in these type hints. Check them.
self.data = data
self.destination: str = destination
self.source: str = source
self.source_conv_id: str = source_conv_id or str(uuid.uuid4())
self.destination_conv_id: str = destination_conv_id
def __repr__(self):
return f"<Package to {self.destination}: {self.data.__class__.__name__}>"
def reply(self, data) -> "Package":
"""Reply to this Package with another Package.
Parameters:
data: The data that should be sent. Usually a :py:class:`royalnet.network.Message`.
Returns:
The reply Package."""
return Package(data, self.source, self.destination,
source_conv_id=str(uuid.uuid4()),
destination_conv_id=self.source_conv_id)
def pickle(self) -> bytes:
""":py:mod:`pickle` this Package.
Returns:
The pickled package in form of bytes."""
return pickle.dumps(self)

View file

@ -0,0 +1,24 @@
class Request:
"""A request sent from a :py:class:`royalnet.network.RoyalnetLink` to another.
It contains the name of the requested handler, in addition to the data."""
def __init__(self, handler: str, data: dict):
super().__init__()
self.handler: str = handler
self.data: dict = data
def to_dict(self):
return self.__dict__
@staticmethod
def from_dict(d: dict):
return Request(**d)
def __eq__(self, other):
if isinstance(other, Request):
return self.handler == other.handler and self.data == other.data
return False
def __repr__(self):
return f"royalnet.network.Request(handler={self.handler}, data={self.data})"

View file

@ -0,0 +1,61 @@
import typing
from ..error import RoyalnetRequestError
class Response:
"""A base class to be inherited by all other response types."""
def to_dict(self) -> dict:
"""Prepare the Response to be sent by converting it to a JSONable :py:class:`dict`."""
return {
"type": self.__class__.__name__,
**self.__dict__
}
def __eq__(self, other):
if isinstance(other, Response):
return self.to_dict() == other.to_dict()
return False
@classmethod
def from_dict(cls, d: dict) -> "Response":
"""Recreate the response from a received :py:class:`dict`."""
# Ignore type in dict
del d["type"]
# noinspection PyArgumentList
return cls(**d)
def raise_on_error(self):
"""Raise an :py:class:`Exception` if the Response is an error, do nothing otherwise."""
raise NotImplementedError("Please override Response.raise_on_error()")
class ResponseSuccess(Response):
"""A response to a successful :py:class:`royalnet.network.Request`."""
def __init__(self, data: typing.Optional[dict] = None):
if data is None:
self.data = {}
else:
self.data = data
def __repr__(self):
return f"royalnet.network.ResponseSuccess(data={self.data})"
def raise_on_error(self):
pass
class ResponseError(Response):
"""A response to a invalid :py:class:`royalnet.network.Request`."""
def __init__(self, name: str, description: str, extra_info: typing.Optional[dict] = None):
self.name: str = name
self.description: str = description
self.extra_info: typing.Optional[dict] = extra_info
def __repr__(self):
return f"royalnet.network.ResponseError(name={self.name}, description={self.description}, extra_info={self.extra_info})"
def raise_on_error(self):
raise RoyalnetRequestError(self)

View file

@ -2,11 +2,11 @@ import asyncio
import websockets
import uuid
import functools
import typing
import pickle
import math
import numbers
import logging as _logging
from .messages import Message, ServerErrorMessage, RequestError
from .packages import Package
import typing
from .package import Package
default_loop = asyncio.get_event_loop()
log = _logging.getLogger(__name__)
@ -20,16 +20,24 @@ class NotIdentifiedError(Exception):
"""The :py:class:`royalnet.network.RoyalnetLink` has not identified yet to a :py:class:`royalnet.network.RoyalnetServer`."""
class ConnectionClosedError(Exception):
"""The :py:class:`royalnet.network.RoyalnetLink`'s connection was closed unexpectedly. The link can't be used anymore."""
class InvalidServerResponseError(Exception):
"""The :py:class:`royalnet.network.RoyalnetServer` sent invalid data to the :py:class:`royalnet.network.RoyalnetLink`."""
class NetworkError(Exception):
def __init__(self, error_msg: ServerErrorMessage, *args):
def __init__(self, error_data: dict, *args):
super().__init__(*args)
self.error_msg: ServerErrorMessage = error_msg
self.error_data: dict = error_data
class PendingRequest:
def __init__(self, *, loop=default_loop):
self.event: asyncio.Event = asyncio.Event(loop=loop)
self.data: typing.Optional[Message] = None
self.data: typing.Optional[dict] = None
def __repr__(self):
if self.event.is_set():
@ -44,7 +52,7 @@ class PendingRequest:
def requires_connection(func):
@functools.wraps(func)
async def new_func(self, *args, **kwargs):
await self._connect_event.wait()
await self.connect_event.wait()
return await func(self, *args, **kwargs)
return new_func
@ -67,30 +75,37 @@ class RoyalnetLink:
self.secret: str = secret
self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None
self.request_handler = request_handler
self._pending_requests: typing.Dict[str, typing.Optional[Message]] = {}
self._pending_requests: typing.Dict[str, PendingRequest] = {}
self._loop: asyncio.AbstractEventLoop = loop
self._connect_event: asyncio.Event = asyncio.Event(loop=self._loop)
self.error_event: asyncio.Event = asyncio.Event(loop=self._loop)
self.connect_event: asyncio.Event = asyncio.Event(loop=self._loop)
self.identify_event: asyncio.Event = asyncio.Event(loop=self._loop)
async def connect(self):
"""Connect to the :py:class:`royalnet.network.RoyalnetServer` at ``self.master_uri``."""
log.info(f"Connecting to {self.master_uri}...")
self.websocket = await websockets.connect(self.master_uri, loop=self._loop)
self._connect_event.set()
self.connect_event.set()
log.info(f"Connected!")
@requires_connection
async def receive(self) -> Package:
"""Recieve a :py:class:`Package` from the :py:class:`royalnet.network.RoyalnetServer`.
Raises:
:py:exc:`royalnet.network.royalnetlink.ConnectionClosedError` if the connection closes."""
try:
raw_pickle = await self.websocket.recv()
jbytes: bytes = await self.websocket.recv()
package: Package = Package.from_json_bytes(jbytes)
except websockets.ConnectionClosed:
self.websocket = None
self._connect_event.clear()
self.error_event.set()
self.connect_event.clear()
self.identify_event.clear()
log.info(f"Connection to {self.master_uri} was closed.")
# What to do now? Let's just reraise.
raise
package: typing.Union[Package, Package] = pickle.loads(raw_pickle)
assert package.destination == self.nid
raise ConnectionClosedError()
if self.identify_event.is_set() and package.destination != self.nid:
raise InvalidServerResponseError("Package is not addressed to this RoyalnetLink.")
log.debug(f"Received package: {package}")
return package
@ -98,37 +113,42 @@ class RoyalnetLink:
async def identify(self) -> None:
log.info(f"Identifying to {self.master_uri}...")
await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{self.secret}")
response_package = await self.receive()
response = response_package.data
if isinstance(response, ServerErrorMessage):
raise NetworkError(response, "Server returned error while identifying self")
response: Package = await self.receive()
if not response.source == "<server>":
raise InvalidServerResponseError("Received a non-service package before identification.")
if "type" not in response.data:
raise InvalidServerResponseError("Missing 'type' in response data")
if response.data["type"] == "error":
raise ConnectionClosedError(f"Identification error: {response.data['type']}")
assert response.data["type"] == "success"
self.identify_event.set()
log.info(f"Identified successfully!")
@requires_identification
async def send(self, package: Package):
raw_pickle: bytes = pickle.dumps(package)
await self.websocket.send(raw_pickle)
await self.websocket.send(package.to_json_bytes())
log.debug(f"Sent package: {package}")
@requires_identification
async def request(self, message, destination):
package = Package(message, destination, self.nid)
package = Package(message, source=self.nid, destination=destination)
request = PendingRequest(loop=self._loop)
self._pending_requests[package.source_conv_id] = request
await self.send(package)
log.debug(f"Sent request: {message} -> {destination}")
await request.event.wait()
result: Message = request.data
log.debug(f"Received response: {request} -> {result}")
if isinstance(result, ServerErrorMessage):
raise NetworkError(result, "Server returned error while requesting something")
return result
response: dict = request.data
log.debug(f"Received response: {request} -> {response}")
return response
async def run(self):
async def run(self, loops: numbers.Real = math.inf):
"""Blockingly run the Link."""
log.debug(f"Running main client loop for {self.nid}.")
while True:
if self.websocket is None:
if self.error_event.is_set():
raise ConnectionClosedError("RoyalnetLinks can't be rerun after an error.")
while loops:
loops -= 1
if not self.connect_event.is_set():
await self.connect()
if not self.identify_event.is_set():
await self.identify()
@ -141,11 +161,7 @@ class RoyalnetLink:
# Package is a request
assert isinstance(package, Package)
log.debug(f"Received request {package.source_conv_id}: {package}")
try:
response = await self.request_handler(package.data)
assert isinstance(response, Message)
except Exception as exc:
response = RequestError(exc=exc)
response = await self.request_handler(package.data)
response_package: Package = package.reply(response)
await self.send(response_package)
log.debug(f"Replied to request {response_package.source_conv_id}: {response_package}")

View file

@ -2,12 +2,10 @@ import typing
import websockets
import re
import datetime
import pickle
import uuid
import asyncio
import logging as _logging
from .messages import InvalidPackageEM, InvalidSecretEM, IdentifySuccessfulMessage
from .packages import Package
from .package import Package
default_loop = asyncio.get_event_loop()
log = _logging.getLogger(__name__)
@ -26,9 +24,14 @@ class ConnectedClient:
"""Has the client sent a valid identification package?"""
return bool(self.nid)
async def send_service(self, msg_type: str, message: str):
await self.send(Package({"type": msg_type, "service": message},
source="<server>",
destination=self.nid))
async def send(self, package: Package):
"""Send a :py:class:`royalnet.network.Package` to the :py:class:`royalnet.network.RoyalnetLink`."""
await self.socket.send(package.pickle())
await self.socket.send(package.to_json_bytes())
class RoyalnetServer:
@ -49,38 +52,38 @@ class RoyalnetServer:
matching = [client for client in self.identified_clients if client.link_type == link_type]
return matching or []
async def listener(self, websocket: websockets.server.WebSocketServerProtocol, request_uri: str):
async def listener(self, websocket: websockets.server.WebSocketServerProtocol, path):
log.info(f"{websocket.remote_address} connected to the server.")
connected_client = ConnectedClient(websocket)
# Wait for identification
identify_msg = await websocket.recv()
log.debug(f"{websocket.remote_address} identified itself with: {identify_msg}.")
if not isinstance(identify_msg, str):
await websocket.send(InvalidPackageEM("Invalid identification message (not a str)"))
await connected_client.send_service("error", "Invalid identification message (not a str)")
return
identification = re.match(r"Identify ([^:\s]+):([^:\s]+):([^:\s]+)", identify_msg)
if identification is None:
await websocket.send(InvalidPackageEM("Invalid identification message (regex failed)"))
await connected_client.send_service("error", "Invalid identification message (regex failed)")
return
secret = identification.group(3)
if secret != self.required_secret:
await websocket.send(InvalidSecretEM("Invalid secret"))
await connected_client.send_service("error", "Invalid secret")
return
# Identification successful
connected_client.nid = identification.group(1)
connected_client.link_type = identification.group(2)
self.identified_clients.append(connected_client)
log.debug(f"{websocket.remote_address} identified successfully as {connected_client.nid} ({connected_client.link_type}).")
await connected_client.send(Package(IdentifySuccessfulMessage(), connected_client.nid, "__master__"))
await connected_client.send_service("success", "Identification successful!")
log.debug(f"{connected_client.nid}'s identification confirmed.")
# Main loop
while True:
# Receive packages
raw_pickle = await websocket.recv()
package: Package = pickle.loads(raw_pickle)
raw_bytes = await websocket.recv()
package: Package = Package.from_json_bytes(raw_bytes)
log.debug(f"Received package: {package}")
# Check if the package destination is the server itself.
if package.destination == "__master__":
if package.destination == "<server>":
# TODO: do stuff
pass
# Otherwise, route the package to its destination
@ -97,7 +100,7 @@ class RoyalnetServer:
A :py:class:`list` of :py:class:`ConnectedClients` to send the package to."""
# Parse destination
# Is it nothing?
if package.destination == "NULL":
if package.destination == "<none>":
return []
# Is it a valid nid?
try:
@ -114,7 +117,10 @@ class RoyalnetServer:
destinations = self.find_destination(package)
log.debug(f"Routing package: {package} -> {destinations}")
for destination in destinations:
specific_package = Package(package.data, destination.nid, package.source,
# This may have some consequences
specific_package = Package(package.data,
source=package.source,
destination=destination.nid,
source_conv_id=package.source_conv_id,
destination_conv_id=package.destination_conv_id)
await destination.send(specific_package)
@ -123,7 +129,7 @@ class RoyalnetServer:
await websockets.serve(self.listener, host=self.address, port=self.port)
async def start(self):
log.debug(f"Starting main server loop for __master__ on ws://{self.address}:{self.port}")
log.debug(f"Starting main server loop for <server> on ws://{self.address}:{self.port}")
# noinspection PyAsyncCall
self._loop.create_task(self.serve())
# Just to be sure it has started on Linux

View file

@ -1,6 +1,5 @@
import typing
import asyncio
from ..network import Message, Reply
from .command import Command
from .commandargs import CommandArgs
if typing.TYPE_CHECKING:
@ -26,7 +25,7 @@ class Call:
text: The text to be sent, possibly formatted in the weird undescribed markup that I'm using."""
raise NotImplementedError()
async def net_request(self, message, destination: str) -> Reply:
async def net_request(self, message, destination: str) -> dict:
"""Send data through a :py:class:`royalnet.network.RoyalnetLink` and wait for a :py:class:`royalnet.network.Reply`.
Parameters:

57
tests/test_network.py Normal file
View file

@ -0,0 +1,57 @@
import pytest
import uuid
import asyncio
import logging
from royalnet.network import Package, RoyalnetLink, RoyalnetServer, ConnectionClosedError, Request
log = logging.root
stream_handler = logging.StreamHandler()
stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{")
log.addHandler(stream_handler)
log.setLevel(logging.WARNING)
@pytest.fixture
def async_loop():
loop = asyncio.get_event_loop()
yield loop
loop.close()
async def echo_request_handler(message):
return message
def test_package_serialization():
pkg = Package({"ciao": "ciao"},
source=str(uuid.uuid4()),
destination=str(uuid.uuid4()),
source_conv_id=str(uuid.uuid4()),
destination_conv_id=str(uuid.uuid4()))
assert pkg == Package.from_dict(pkg.to_dict())
assert pkg == Package.from_json_string(pkg.to_json_string())
assert pkg == Package.from_json_bytes(pkg.to_json_bytes())
def test_request_creation():
request = Request("pytest", {"testing": "is fun", "bugs": "are less fun"})
assert request == Request.from_dict(request.to_dict())
def test_links(async_loop: asyncio.AbstractEventLoop):
address, port = "127.0.0.1", 1235
master = RoyalnetServer(address, port, "test")
async_loop.run_until_complete(master.start())
# Test invalid secret
wrong_secret_link = RoyalnetLink("ws://127.0.0.1:1235", "invalid", "test", echo_request_handler, loop=async_loop)
with pytest.raises(ConnectionClosedError):
async_loop.run_until_complete(wrong_secret_link.run())
# Test regular connection
link1 = RoyalnetLink("ws://127.0.0.1:1235", "test", "one", echo_request_handler, loop=async_loop)
async_loop.create_task(link1.run())
link2 = RoyalnetLink("ws://127.0.0.1:1235", "test", "two", echo_request_handler, loop=async_loop)
async_loop.create_task(link2.run())
message = {"ciao": "ciao"}
response = async_loop.run_until_complete(link1.request(message, "two"))
assert message == response