From 3d01e4a984c790795f0208c9a9fe67b48e5fad95 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Fri, 15 Mar 2019 16:46:47 +0100 Subject: [PATCH 1/9] Unfinished network stuff --- royalnet/bots/telegram.py | 25 +++++++++++++++------- royalnet/utils/call.py | 6 ++++++ royalnet/utils/networkdict.py | 40 +++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 8 deletions(-) create mode 100644 royalnet/utils/networkdict.py diff --git a/royalnet/bots/telegram.py b/royalnet/bots/telegram.py index cc7eb58f..63827995 100644 --- a/royalnet/bots/telegram.py +++ b/royalnet/bots/telegram.py @@ -1,21 +1,27 @@ import telegram import asyncio import typing +import multiprocessing from ..commands import NullCommand from ..utils import asyncify, Call, Command class TelegramBot: - def __init__(self, api_key: str, commands: typing.List[typing.Type[Command]], *, missing_command: Command=NullCommand): - self.bot = telegram.Bot(api_key) - self.should_run = False - self.offset = -100 - self.commands = commands + def __init__(self, + api_key: str, + *, + commands: typing.List[typing.Type[Command]], + missing_command: Command = NullCommand, + network: multiprocessing.connection.Connection): + self.bot: telegram.Bot = telegram.Bot(api_key) + self.should_run: bool = False + self.offset: int = -100 self.missing_command: typing.Callable = missing_command + self.network: multiprocessing.connection.Connection = network # Generate commands - self._commands = {} - for command in self.commands: - self._commands[f"/{command.command_name}"] = command + self.commands = {} + for command in commands: + self.commands[f"/{command.command_name}"] = command class TelegramCall(Call): interface_name = "telegram" @@ -23,6 +29,9 @@ class TelegramBot: async def reply(self, text: str): await asyncify(self.channel.send_message, text, parse_mode="HTML") + + async def network(self, data): + self.network.send self.Call = TelegramCall async def run(self): diff --git a/royalnet/utils/call.py b/royalnet/utils/call.py index e7862200..61694b5b 100644 --- a/royalnet/utils/call.py +++ b/royalnet/utils/call.py @@ -1,3 +1,4 @@ +import typing from .command import Command, CommandArgs @@ -12,6 +13,11 @@ class Call: """Send a text message to the channel the call was made.""" raise NotImplementedError() + async def network(self, data): + """Send data to the rest of the Royalnet, and optionally wait for an answer. + The data must be pickleable.""" + raise NotImplementedError() + # These parameters / methods should be left alone def __init__(self, channel, command: Command, *args, **kwargs): self.channel = channel diff --git a/royalnet/utils/networkdict.py b/royalnet/utils/networkdict.py new file mode 100644 index 00000000..2711e1b2 --- /dev/null +++ b/royalnet/utils/networkdict.py @@ -0,0 +1,40 @@ +import uuid +import typing +from asyncio import Event + + +class RoyalnetData: + """A class to hold data to be sent to the Royalnet.""" + def __init__(self, data): + self.uuid = str(uuid.uuid4()) + self.request = data + self.event = Event() + self.response = None + + def send(self): + """TODO EVERYTHING""" + + + +class RoyalnetWait: + """A class that represents a data request sent to the Royalnet.""" + def __init__(self): + self.event = Event() + self.data = None + + def receive(self, data): + self.data = data + self.event.set() + + async def get(self): + await self.event.wait() + return self.data + + +class RoyalnetDict: + """A dictionary used to asyncrounosly hold data received from the Royalnet.""" + + def __init__(self): + self.dict: typing.Dict[str, RoyalnetRequest] = {} + + async def request(self, data: RoyalnetWait): From 94d0c21ff299b86a9aaafa4e9ef5005886cb0400 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Sun, 17 Mar 2019 19:10:36 +0100 Subject: [PATCH 2/9] Create network client --- requirements.txt | 1 + royalnet/network/__init__.py | 0 royalnet/network/messages.py | 16 +++++ royalnet/network/packages.py | 24 +++++++ royalnet/network/royalnetlink.py | 120 +++++++++++++++++++++++++++++++ royalnet/utils/networkdict.py | 40 ----------- 6 files changed, 161 insertions(+), 40 deletions(-) create mode 100644 royalnet/network/__init__.py create mode 100644 royalnet/network/messages.py create mode 100644 royalnet/network/packages.py create mode 100644 royalnet/network/royalnetlink.py delete mode 100644 royalnet/utils/networkdict.py diff --git a/requirements.txt b/requirements.txt index 506efd03..b4556fcd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ python-telegram-bot>=11.1.0 +websockets>=7.0 diff --git a/royalnet/network/__init__.py b/royalnet/network/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/royalnet/network/messages.py b/royalnet/network/messages.py new file mode 100644 index 00000000..8d1b5b48 --- /dev/null +++ b/royalnet/network/messages.py @@ -0,0 +1,16 @@ +class Message: + pass + + +class IdentifySuccessfulMessage(Message): + pass + + +class ErrorMessage(Message): + def __init__(self, reason): + super().__init__() + self.reason = reason + + +class InvalidSecretErrorMessage(ErrorMessage): + pass diff --git a/royalnet/network/packages.py b/royalnet/network/packages.py new file mode 100644 index 00000000..ff9d28b8 --- /dev/null +++ b/royalnet/network/packages.py @@ -0,0 +1,24 @@ +import pickle +import uuid + + +class Package: + def __init__(self, data, destination: str, *, conversation_id: str = None): + self.data = data + self.destination: str = destination + self.conversation_id = conversation_id or str(uuid.uuid4()) + + def pickle(self): + return pickle.dumps(self) + + +class TwoWayPackage(Package): + def __init__(self, data, destination: str, source: str, *, conversation_id: str = None): + super().__init__(data, destination, conversation_id=conversation_id) + self.source = source + + def reply(self, data) -> Package: + return Package(data, self.source, conversation_id=self.conversation_id) + + def two_way_reply(self, data) -> "TwoWayPackage": + return TwoWayPackage(data, self.source, self.destination, conversation_id=self.conversation_id) diff --git a/royalnet/network/royalnetlink.py b/royalnet/network/royalnetlink.py new file mode 100644 index 00000000..4d62e19c --- /dev/null +++ b/royalnet/network/royalnetlink.py @@ -0,0 +1,120 @@ +import asyncio +from asyncio import Event +import websockets +import uuid +import functools +import typing +import pickle +from .messages import Message, IdentifyMessage, ErrorMessage +from .packages import Package, TwoWayPackage +loop = asyncio.get_event_loop() + + +class NotConnectedError(Exception): + pass + + +class NotIdentifiedError(Exception): + pass + + +class NetworkError(Exception): + def __init__(self, error_msg: ErrorMessage, *args): + super().__init__(*args) + self.error_msg = error_msg + + +class PendingRequest: + def __init__(self): + self.event = Event() + self.data = None + + def set(self, data): + self.data = data + self.event.set() + + +class RoyalnetLink: + def __init__(self, master_uri: str, request_handler): + self.master_uri: str = master_uri + self.nid: str = str(uuid.uuid4()) + self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None + self.identified: bool = False + self.request_handler = request_handler + self._pending_requests: typing.Dict[typing.Optional[Message]] = {} + + async def connect(self): + self.websocket = await websockets.connect(self.master_uri) + + def requires_connection(self, func): + @functools.wraps(func) + def new_func(*args, **kwargs): + if self.websocket is None: + raise NotConnectedError("Tried to call a method which @requires_connection while not connected") + return func(*args, **kwargs) + return new_func + + @requires_connection + async def receive(self) -> Package: + try: + raw_pickle = await self.websocket.recv() + except websockets.ConnectionClosed: + self.websocket = None + self.identified = False + # What to do now? Let's just reraise. + raise + package: typing.Union[Package, TwoWayPackage] = pickle.loads(raw_pickle) + assert package.destination == self.nid + return package + + @requires_connection + async def identify(self, secret) -> None: + await self.websocket.send(f"Identify: {self.nid}:{secret}") + response_package = await self.receive() + response = response_package.data + if isinstance(response, ErrorMessage): + raise NetworkError(response, "Server returned error while identifying self") + self.identified = True + + def requires_identification(self, func): + @functools.wraps(func) + def new_func(*args, **kwargs): + if not self.identified: + raise NotIdentifiedError("Tried to call a method which @requires_identification while not identified") + return func(*args, **kwargs) + return new_func + + @requires_identification + async def send(self, package: Package): + raw_pickle: bytes = pickle.dumps(package) + await self.websocket.send(raw_pickle) + + @requires_identification + async def request(self, message, destination): + package = TwoWayPackage(message, destination, self.nid) + request = PendingRequest() + self._pending_requests[package.conversation_id] = request + await self.send(package) + await request.event.wait() + result = request.data + if isinstance(result, ErrorMessage): + raise NetworkError(result, "Server returned error while requesting something") + return result + + async def run_link(self): + while True: + if self.websocket is None: + await self.connect() + if not self.identified: + await self.identify() + package: Package = self.receive() + # Package is a response + if package.conversation_id in self._pending_requests: + request = self._pending_requests[package.conversation_id] + request.set(package.data) + continue + # Package is a request + assert isinstance(package, TwoWayPackage) + response = await self.request_handler(package.data) + response_package: Package = package.reply(response) + await self.send(response_package) diff --git a/royalnet/utils/networkdict.py b/royalnet/utils/networkdict.py deleted file mode 100644 index 2711e1b2..00000000 --- a/royalnet/utils/networkdict.py +++ /dev/null @@ -1,40 +0,0 @@ -import uuid -import typing -from asyncio import Event - - -class RoyalnetData: - """A class to hold data to be sent to the Royalnet.""" - def __init__(self, data): - self.uuid = str(uuid.uuid4()) - self.request = data - self.event = Event() - self.response = None - - def send(self): - """TODO EVERYTHING""" - - - -class RoyalnetWait: - """A class that represents a data request sent to the Royalnet.""" - def __init__(self): - self.event = Event() - self.data = None - - def receive(self, data): - self.data = data - self.event.set() - - async def get(self): - await self.event.wait() - return self.data - - -class RoyalnetDict: - """A dictionary used to asyncrounosly hold data received from the Royalnet.""" - - def __init__(self): - self.dict: typing.Dict[str, RoyalnetRequest] = {} - - async def request(self, data: RoyalnetWait): From 2b2432b4d53b7f31fdaf911ad9ef9b8417cb8e15 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Mon, 18 Mar 2019 09:40:45 +0100 Subject: [PATCH 3/9] Implement RoyalnetLink on TelegramBot --- royalnet/bots/telegram.py | 18 ++++++++++++------ royalnet/network/__init__.py | 6 ++++++ royalnet/network/royalnetlink.py | 5 +++-- royalnet/utils/call.py | 3 ++- 4 files changed, 23 insertions(+), 9 deletions(-) diff --git a/royalnet/bots/telegram.py b/royalnet/bots/telegram.py index 63827995..4d4087d1 100644 --- a/royalnet/bots/telegram.py +++ b/royalnet/bots/telegram.py @@ -4,20 +4,24 @@ import typing import multiprocessing from ..commands import NullCommand from ..utils import asyncify, Call, Command +from ..network import RoyalnetLink, Message + + +async def null(message: Message): + pass class TelegramBot: def __init__(self, api_key: str, - *, + master_server_uri: str, commands: typing.List[typing.Type[Command]], - missing_command: Command = NullCommand, - network: multiprocessing.connection.Connection): + missing_command: Command = NullCommand): self.bot: telegram.Bot = telegram.Bot(api_key) self.should_run: bool = False self.offset: int = -100 self.missing_command: typing.Callable = missing_command - self.network: multiprocessing.connection.Connection = network + self.network: RoyalnetLink = RoyalnetLink(master_server_uri, "Telegram", null) # Generate commands self.commands = {} for command in commands: @@ -30,8 +34,10 @@ class TelegramBot: async def reply(self, text: str): await asyncify(self.channel.send_message, text, parse_mode="HTML") - async def network(self, data): - self.network.send + async def net_request(self, message: Message, destination: str): + response = await self.network.request(message, destination) + return response + self.Call = TelegramCall async def run(self): diff --git a/royalnet/network/__init__.py b/royalnet/network/__init__.py index e69de29b..8393cb8f 100644 --- a/royalnet/network/__init__.py +++ b/royalnet/network/__init__.py @@ -0,0 +1,6 @@ +from .messages import Message, ErrorMessage, InvalidSecretErrorMessage +from .royalnetlink import RoyalnetLink, NetworkError, NotConnectedError, NotIdentifiedError +from .packages import Package, TwoWayPackage + +__all__ = ["Message", "ErrorMessage", "InvalidSecretErrorMessage", "RoyalnetLink", "NetworkError", "NotConnectedError", + "NotIdentifiedError", "Package", "TwoWayPackage"] diff --git a/royalnet/network/royalnetlink.py b/royalnet/network/royalnetlink.py index 4d62e19c..08e38bab 100644 --- a/royalnet/network/royalnetlink.py +++ b/royalnet/network/royalnetlink.py @@ -5,7 +5,7 @@ import uuid import functools import typing import pickle -from .messages import Message, IdentifyMessage, ErrorMessage +from .messages import Message, ErrorMessage from .packages import Package, TwoWayPackage loop = asyncio.get_event_loop() @@ -35,8 +35,9 @@ class PendingRequest: class RoyalnetLink: - def __init__(self, master_uri: str, request_handler): + def __init__(self, master_uri: str, link_type: str, request_handler): self.master_uri: str = master_uri + self.link_type: str = link_type self.nid: str = str(uuid.uuid4()) self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None self.identified: bool = False diff --git a/royalnet/utils/call.py b/royalnet/utils/call.py index 61694b5b..a3181168 100644 --- a/royalnet/utils/call.py +++ b/royalnet/utils/call.py @@ -1,4 +1,5 @@ import typing +from ..network.messages import Message from .command import Command, CommandArgs @@ -13,7 +14,7 @@ class Call: """Send a text message to the channel the call was made.""" raise NotImplementedError() - async def network(self, data): + async def net_request(self, message: Message, destination: str): """Send data to the rest of the Royalnet, and optionally wait for an answer. The data must be pickleable.""" raise NotImplementedError() From 3f9e2a229ca0fbbe440084a909933eb2f824a02c Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Mon, 18 Mar 2019 09:44:30 +0100 Subject: [PATCH 4/9] Bugfix in telegram.py --- royalnet/bots/telegram.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/royalnet/bots/telegram.py b/royalnet/bots/telegram.py index 4d4087d1..b37e3667 100644 --- a/royalnet/bots/telegram.py +++ b/royalnet/bots/telegram.py @@ -1,7 +1,6 @@ import telegram import asyncio import typing -import multiprocessing from ..commands import NullCommand from ..utils import asyncify, Call, Command from ..network import RoyalnetLink, Message @@ -71,7 +70,7 @@ class TelegramBot: command_text.replace(f"@{self.bot.username}", "") # Find the function try: - command = self._commands[command_text] + command = self.commands[command_text] except KeyError: # Skip inexistent commands command = self.missing_command From 3bc8046bc405a778b1f32b4a4af41c652faf7090 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Mon, 18 Mar 2019 11:45:44 +0100 Subject: [PATCH 5/9] Some progress in the Royalnet server --- royalnet/bots/telegram.py | 2 +- royalnet/network/messages.py | 6 +++- royalnet/network/royalnetlink.py | 3 +- royalnet/network/royalnetserver.py | 54 ++++++++++++++++++++++++++++++ 4 files changed, 62 insertions(+), 3 deletions(-) create mode 100644 royalnet/network/royalnetserver.py diff --git a/royalnet/bots/telegram.py b/royalnet/bots/telegram.py index b37e3667..9d191876 100644 --- a/royalnet/bots/telegram.py +++ b/royalnet/bots/telegram.py @@ -20,7 +20,7 @@ class TelegramBot: self.should_run: bool = False self.offset: int = -100 self.missing_command: typing.Callable = missing_command - self.network: RoyalnetLink = RoyalnetLink(master_server_uri, "Telegram", null) + self.network: RoyalnetLink = RoyalnetLink(master_server_uri, "telegram", null) # Generate commands self.commands = {} for command in commands: diff --git a/royalnet/network/messages.py b/royalnet/network/messages.py index 8d1b5b48..59824eb2 100644 --- a/royalnet/network/messages.py +++ b/royalnet/network/messages.py @@ -12,5 +12,9 @@ class ErrorMessage(Message): self.reason = reason -class InvalidSecretErrorMessage(ErrorMessage): +class BadMessage(ErrorMessage): + pass + + +class InvalidSecretErrorMessage(BadMessage): pass diff --git a/royalnet/network/royalnetlink.py b/royalnet/network/royalnetlink.py index 08e38bab..d07baf4e 100644 --- a/royalnet/network/royalnetlink.py +++ b/royalnet/network/royalnetlink.py @@ -36,6 +36,7 @@ class PendingRequest: class RoyalnetLink: def __init__(self, master_uri: str, link_type: str, request_handler): + assert ":" not in link_type self.master_uri: str = master_uri self.link_type: str = link_type self.nid: str = str(uuid.uuid4()) @@ -70,7 +71,7 @@ class RoyalnetLink: @requires_connection async def identify(self, secret) -> None: - await self.websocket.send(f"Identify: {self.nid}:{secret}") + await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{secret}") response_package = await self.receive() response = response_package.data if isinstance(response, ErrorMessage): diff --git a/royalnet/network/royalnetserver.py b/royalnet/network/royalnetserver.py new file mode 100644 index 00000000..ec9b4cb3 --- /dev/null +++ b/royalnet/network/royalnetserver.py @@ -0,0 +1,54 @@ +import typing +import websockets +import re +import datetime +from .messages import Message, ErrorMessage, BadMessage, InvalidSecretErrorMessage, IdentifySuccessfulMessage +from .packages import Package, TwoWayPackage + + +class ConnectedClient: + def __init__(self, socket: websockets.WebSocketServerProtocol): + self.socket: websockets.WebSocketServerProtocol = socket + self.nid: str = None + self.link_type: str = None + self.connection_datetime: datetime.datetime = datetime.datetime.now() + + @property + def is_identified(self) -> bool: + return bool(self.nid) + + +class RoyalnetServer: + def __init__(self, required_secret: str): + self.required_secret: str = required_secret + self.connected_clients: typing.List[ConnectedClient] = {} + self.server: websockets.server.WebSocketServer = websockets.server + + def find_client_by_nid(self, nid: str): + return [client for client in self.connected_clients if client.nid == nid][0] + + async def listener(self, websocket: websockets.server.WebSocketServerProtocol, request_uri: str): + connected_client = ConnectedClient(websocket) + # Wait for identification + identify_msg = websocket.recv() + if not isinstance(identify_msg, str): + websocket.send(BadMessage("Invalid identification message (not a str)")) + return + identification = re.match(r"Identify ([A-Za-z0-9\-]+):([a-z]+):([A-Za-z0-9\-])", identify_msg) + if identification is None: + websocket.send(BadMessage("Invalid identification message (regex failed)")) + return + secret = identification.group(3) + if secret != self.required_secret: + websocket.send(InvalidSecretErrorMessage("Invalid secret")) + return + # Identification successful + connected_client.nid = identification.group(1) + connected_client.link_type = identification.group(2) + self.connected_clients.append(connected_client) + # Main loop + while True: + pass + + + From 11bbb77afee29125f94f4aa5d5b3143febb8ddfb Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Mon, 18 Mar 2019 15:32:48 +0100 Subject: [PATCH 6/9] More progress --- royalnet/bots/telegram.py | 5 ++- royalnet/network/__init__.py | 18 ++++++-- royalnet/network/messages.py | 10 ++++- royalnet/network/packages.py | 17 +++---- royalnet/network/royalnetlink.py | 30 +++++++------ royalnet/network/royalnetserver.py | 72 +++++++++++++++++++++++++----- 6 files changed, 106 insertions(+), 46 deletions(-) diff --git a/royalnet/bots/telegram.py b/royalnet/bots/telegram.py index 9d191876..866d9eb1 100644 --- a/royalnet/bots/telegram.py +++ b/royalnet/bots/telegram.py @@ -14,13 +14,14 @@ class TelegramBot: def __init__(self, api_key: str, master_server_uri: str, + master_server_secret: str, commands: typing.List[typing.Type[Command]], missing_command: Command = NullCommand): self.bot: telegram.Bot = telegram.Bot(api_key) self.should_run: bool = False self.offset: int = -100 - self.missing_command: typing.Callable = missing_command - self.network: RoyalnetLink = RoyalnetLink(master_server_uri, "telegram", null) + self.missing_command = missing_command + self.network: RoyalnetLink = RoyalnetLink(master_server_uri, master_server_secret, "telegram", null) # Generate commands self.commands = {} for command in commands: diff --git a/royalnet/network/__init__.py b/royalnet/network/__init__.py index 8393cb8f..9015ede6 100644 --- a/royalnet/network/__init__.py +++ b/royalnet/network/__init__.py @@ -1,6 +1,16 @@ -from .messages import Message, ErrorMessage, InvalidSecretErrorMessage +from .messages import Message, ErrorMessage, InvalidSecretEM, InvalidDestinationEM, InvalidPackageEM +from .packages import Package from .royalnetlink import RoyalnetLink, NetworkError, NotConnectedError, NotIdentifiedError -from .packages import Package, TwoWayPackage +from .royalnetserver import RoyalnetServer -__all__ = ["Message", "ErrorMessage", "InvalidSecretErrorMessage", "RoyalnetLink", "NetworkError", "NotConnectedError", - "NotIdentifiedError", "Package", "TwoWayPackage"] +__all__ = ["Message", + "ErrorMessage", + "InvalidSecretEM", + "InvalidDestinationEM", + "InvalidPackageEM", + "RoyalnetLink", + "NetworkError", + "NotConnectedError", + "NotIdentifiedError", + "Package", + "RoyalnetServer"] diff --git a/royalnet/network/messages.py b/royalnet/network/messages.py index 59824eb2..6b13cfa2 100644 --- a/royalnet/network/messages.py +++ b/royalnet/network/messages.py @@ -12,9 +12,15 @@ class ErrorMessage(Message): self.reason = reason -class BadMessage(ErrorMessage): +class InvalidSecretEM(ErrorMessage): pass -class InvalidSecretErrorMessage(BadMessage): +class InvalidPackageEM(ErrorMessage): pass + + +class InvalidDestinationEM(InvalidPackageEM): + pass + + diff --git a/royalnet/network/packages.py b/royalnet/network/packages.py index ff9d28b8..9e2239c1 100644 --- a/royalnet/network/packages.py +++ b/royalnet/network/packages.py @@ -3,22 +3,15 @@ import uuid class Package: - def __init__(self, data, destination: str, *, conversation_id: str = None): + def __init__(self, data, destination: str, source: str, *, conversation_id: str = None): self.data = data self.destination: str = destination + self.source, = source self.conversation_id = conversation_id or str(uuid.uuid4()) + def reply(self, data) -> "Package": + return Package(data, self.source, self.destination, conversation_id=self.conversation_id) + def pickle(self): return pickle.dumps(self) - -class TwoWayPackage(Package): - def __init__(self, data, destination: str, source: str, *, conversation_id: str = None): - super().__init__(data, destination, conversation_id=conversation_id) - self.source = source - - def reply(self, data) -> Package: - return Package(data, self.source, conversation_id=self.conversation_id) - - def two_way_reply(self, data) -> "TwoWayPackage": - return TwoWayPackage(data, self.source, self.destination, conversation_id=self.conversation_id) diff --git a/royalnet/network/royalnetlink.py b/royalnet/network/royalnetlink.py index d07baf4e..c40419b6 100644 --- a/royalnet/network/royalnetlink.py +++ b/royalnet/network/royalnetlink.py @@ -6,7 +6,7 @@ import functools import typing import pickle from .messages import Message, ErrorMessage -from .packages import Package, TwoWayPackage +from .packages import Package loop = asyncio.get_event_loop() @@ -35,11 +35,12 @@ class PendingRequest: class RoyalnetLink: - def __init__(self, master_uri: str, link_type: str, request_handler): + def __init__(self, master_uri: str, secret: str, link_type: str, request_handler): assert ":" not in link_type self.master_uri: str = master_uri self.link_type: str = link_type self.nid: str = str(uuid.uuid4()) + self.secret: str = secret self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None self.identified: bool = False self.request_handler = request_handler @@ -48,12 +49,12 @@ class RoyalnetLink: async def connect(self): self.websocket = await websockets.connect(self.master_uri) - def requires_connection(self, func): + def requires_connection(func): @functools.wraps(func) - def new_func(*args, **kwargs): + def new_func(self, *args, **kwargs): if self.websocket is None: raise NotConnectedError("Tried to call a method which @requires_connection while not connected") - return func(*args, **kwargs) + return func(self, *args, **kwargs) return new_func @requires_connection @@ -65,7 +66,7 @@ class RoyalnetLink: self.identified = False # What to do now? Let's just reraise. raise - package: typing.Union[Package, TwoWayPackage] = pickle.loads(raw_pickle) + package: typing.Union[Package, Package] = pickle.loads(raw_pickle) assert package.destination == self.nid return package @@ -78,12 +79,12 @@ class RoyalnetLink: raise NetworkError(response, "Server returned error while identifying self") self.identified = True - def requires_identification(self, func): + def requires_identification(func): @functools.wraps(func) - def new_func(*args, **kwargs): + def new_func(self, *args, **kwargs): if not self.identified: raise NotIdentifiedError("Tried to call a method which @requires_identification while not identified") - return func(*args, **kwargs) + return func(self, *args, **kwargs) return new_func @requires_identification @@ -93,7 +94,7 @@ class RoyalnetLink: @requires_identification async def request(self, message, destination): - package = TwoWayPackage(message, destination, self.nid) + package = Package(message, destination, self.nid) request = PendingRequest() self._pending_requests[package.conversation_id] = request await self.send(package) @@ -103,7 +104,7 @@ class RoyalnetLink: raise NetworkError(result, "Server returned error while requesting something") return result - async def run_link(self): + async def run(self): while True: if self.websocket is None: await self.connect() @@ -116,7 +117,8 @@ class RoyalnetLink: request.set(package.data) continue # Package is a request - assert isinstance(package, TwoWayPackage) + assert isinstance(package, Package) response = await self.request_handler(package.data) - response_package: Package = package.reply(response) - await self.send(response_package) + if response is not None: + response_package: Package = package.reply(response) + await self.send(response_package) diff --git a/royalnet/network/royalnetserver.py b/royalnet/network/royalnetserver.py index ec9b4cb3..0452d5ea 100644 --- a/royalnet/network/royalnetserver.py +++ b/royalnet/network/royalnetserver.py @@ -2,8 +2,11 @@ import typing import websockets import re import datetime -from .messages import Message, ErrorMessage, BadMessage, InvalidSecretErrorMessage, IdentifySuccessfulMessage -from .packages import Package, TwoWayPackage +import pickle +import asyncio +import uuid +from .messages import Message, ErrorMessage, InvalidPackageEM, InvalidSecretEM, IdentifySuccessfulMessage +from .packages import Package class ConnectedClient: @@ -17,38 +20,83 @@ class ConnectedClient: def is_identified(self) -> bool: return bool(self.nid) + async def send(self, package: Package): + self.socket.send(package.pickle()) + class RoyalnetServer: - def __init__(self, required_secret: str): + def __init__(self, address: str, port: int, required_secret: str): + self.address: str = address + self.port: int = port self.required_secret: str = required_secret - self.connected_clients: typing.List[ConnectedClient] = {} - self.server: websockets.server.WebSocketServer = websockets.server + self.identified_clients: typing.List[ConnectedClient] = {} - def find_client_by_nid(self, nid: str): - return [client for client in self.connected_clients if client.nid == nid][0] + def find_client(self, *, nid: str=None, link_type: str=None) -> typing.List[ConnectedClient]: + assert not (nid and link_type) + if nid: + matching = [client for client in self.identified_clients if client.nid == nid] + assert len(matching) <= 1 + return matching + if link_type: + matching = [client for client in self.identified_clients if client.link_type == link_type] + return matching or [] async def listener(self, websocket: websockets.server.WebSocketServerProtocol, request_uri: str): connected_client = ConnectedClient(websocket) # Wait for identification identify_msg = websocket.recv() if not isinstance(identify_msg, str): - websocket.send(BadMessage("Invalid identification message (not a str)")) + websocket.send(InvalidPackageEM("Invalid identification message (not a str)")) return identification = re.match(r"Identify ([A-Za-z0-9\-]+):([a-z]+):([A-Za-z0-9\-])", identify_msg) if identification is None: - websocket.send(BadMessage("Invalid identification message (regex failed)")) + websocket.send(InvalidPackageEM("Invalid identification message (regex failed)")) return secret = identification.group(3) if secret != self.required_secret: - websocket.send(InvalidSecretErrorMessage("Invalid secret")) + websocket.send(InvalidSecretEM("Invalid secret")) return # Identification successful connected_client.nid = identification.group(1) connected_client.link_type = identification.group(2) - self.connected_clients.append(connected_client) + self.identified_clients.append(connected_client) + await connected_client.send(Package(IdentifySuccessfulMessage(), connected_client.nid, "__master__")) # Main loop while True: + # Receive packages + raw_pickle = await websocket.recv() + package: Package = pickle.loads(raw_pickle) + # Check if the package destination is the server itself. + if package.destination == "__master__": + # TODO: do stuff + pass + # Otherwise, route the package to its destination + asyncio.create_task(self.route_package(package)) + + def find_destination(self, package: Package) -> typing.List[ConnectedClient]: + """Find a list of destinations for the sent packages""" + # Parse destination + # Is it nothing? + if package.destination == "NULL": + return [] + # Is it the wildcard? + if package.destination == "*": + return self.identified_clients + # Is it a valid nid? + try: + destination = str(uuid.UUID(package.destination)) + except ValueError: pass + else: + return self.find_client(nid=destination) + # Is it a link_type? + return self.find_client(link_type=package.destination) + async def route_package(self, package: Package) -> None: + """Executed every time a package is received and must be routed somewhere.""" + destinations = self.find_destination(package) + for destination in destinations: + await destination.send(package) - + async def run(self): + websockets.serve(self.listener, host=self.address, port=self.port) From 879c0ce9533a44a85dcbe25e620ab13f743f090d Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Tue, 19 Mar 2019 12:26:32 +0100 Subject: [PATCH 7/9] Basically complete network branch --- royalnet/network/messages.py | 5 +- royalnet/network/packages.py | 6 ++- royalnet/network/royalnetlink.py | 78 +++++++++++++++++++----------- royalnet/network/royalnetserver.py | 30 ++++++++---- 4 files changed, 77 insertions(+), 42 deletions(-) diff --git a/royalnet/network/messages.py b/royalnet/network/messages.py index 6b13cfa2..99696f02 100644 --- a/royalnet/network/messages.py +++ b/royalnet/network/messages.py @@ -1,5 +1,6 @@ class Message: - pass + def __repr__(self): + return f"<{self.__class__.__name__}>" class IdentifySuccessfulMessage(Message): @@ -22,5 +23,3 @@ class InvalidPackageEM(ErrorMessage): class InvalidDestinationEM(InvalidPackageEM): pass - - diff --git a/royalnet/network/packages.py b/royalnet/network/packages.py index 9e2239c1..499ca0a4 100644 --- a/royalnet/network/packages.py +++ b/royalnet/network/packages.py @@ -6,12 +6,14 @@ class Package: def __init__(self, data, destination: str, source: str, *, conversation_id: str = None): self.data = data self.destination: str = destination - self.source, = source + self.source = source self.conversation_id = conversation_id or str(uuid.uuid4()) + def __repr__(self): + return f"" + def reply(self, data) -> "Package": return Package(data, self.source, self.destination, conversation_id=self.conversation_id) def pickle(self): return pickle.dumps(self) - diff --git a/royalnet/network/royalnetlink.py b/royalnet/network/royalnetlink.py index c40419b6..65aa68ab 100644 --- a/royalnet/network/royalnetlink.py +++ b/royalnet/network/royalnetlink.py @@ -1,13 +1,15 @@ import asyncio -from asyncio import Event import websockets import uuid import functools import typing import pickle +import logging from .messages import Message, ErrorMessage from .packages import Package + loop = asyncio.get_event_loop() +log = logging.getLogger(__name__) class NotConnectedError(Exception): @@ -21,19 +23,40 @@ class NotIdentifiedError(Exception): class NetworkError(Exception): def __init__(self, error_msg: ErrorMessage, *args): super().__init__(*args) - self.error_msg = error_msg + self.error_msg: ErrorMessage = error_msg class PendingRequest: def __init__(self): - self.event = Event() - self.data = None + self.event: asyncio.Event = asyncio.Event() + self.data: Message = None + + def __repr__(self): + if self.event.is_set(): + return f"" + return f"" def set(self, data): self.data = data self.event.set() +def requires_connection(func): + @functools.wraps(func) + async def new_func(self, *args, **kwargs): + await self._connect_event.wait() + return await func(self, *args, **kwargs) + return new_func + + +def requires_identification(func): + @functools.wraps(func) + async def new_func(self, *args, **kwargs): + await self._identify_event.wait() + return await func(self, *args, **kwargs) + return new_func + + class RoyalnetLink: def __init__(self, master_uri: str, secret: str, link_type: str, request_handler): assert ":" not in link_type @@ -42,20 +65,16 @@ class RoyalnetLink: self.nid: str = str(uuid.uuid4()) self.secret: str = secret self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None - self.identified: bool = False self.request_handler = request_handler self._pending_requests: typing.Dict[typing.Optional[Message]] = {} + self._connect_event: asyncio.Event = asyncio.Event() + self._identify_event: asyncio.Event = asyncio.Event() async def connect(self): + log.info(f"Connecting to {self.master_uri}...") self.websocket = await websockets.connect(self.master_uri) - - def requires_connection(func): - @functools.wraps(func) - def new_func(self, *args, **kwargs): - if self.websocket is None: - raise NotConnectedError("Tried to call a method which @requires_connection while not connected") - return func(self, *args, **kwargs) - return new_func + self._connect_event.set() + log.info(f"Connected!") @requires_connection async def receive(self) -> Package: @@ -63,34 +82,32 @@ class RoyalnetLink: raw_pickle = await self.websocket.recv() except websockets.ConnectionClosed: self.websocket = None - self.identified = False + self._connect_event.clear() + self._identify_event.clear() + log.info(f"Connection to {self.master_uri} was closed.") # What to do now? Let's just reraise. raise package: typing.Union[Package, Package] = pickle.loads(raw_pickle) assert package.destination == self.nid + log.debug(f"Received package: {package}") return package @requires_connection - async def identify(self, secret) -> None: - await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{secret}") + async def identify(self) -> None: + log.info(f"Identifying to {self.master_uri}...") + await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{self.secret}") response_package = await self.receive() response = response_package.data if isinstance(response, ErrorMessage): raise NetworkError(response, "Server returned error while identifying self") - self.identified = True - - def requires_identification(func): - @functools.wraps(func) - def new_func(self, *args, **kwargs): - if not self.identified: - raise NotIdentifiedError("Tried to call a method which @requires_identification while not identified") - return func(self, *args, **kwargs) - return new_func + self._identify_event.set() + log.info(f"Identified successfully!") @requires_identification async def send(self, package: Package): raw_pickle: bytes = pickle.dumps(package) await self.websocket.send(raw_pickle) + log.debug(f"Sent package: {package}") @requires_identification async def request(self, message, destination): @@ -98,19 +115,22 @@ class RoyalnetLink: request = PendingRequest() self._pending_requests[package.conversation_id] = request await self.send(package) + log.debug(f"Sent request: {message} -> {destination}") await request.event.wait() - result = request.data + result: Message = request.data + log.debug(f"Received response: {request} -> {result}") if isinstance(result, ErrorMessage): raise NetworkError(result, "Server returned error while requesting something") return result async def run(self): + log.debug(f"Running main client loop for {self.nid}.") while True: if self.websocket is None: await self.connect() - if not self.identified: + if not self._identify_event.is_set(): await self.identify() - package: Package = self.receive() + package: Package = await self.receive() # Package is a response if package.conversation_id in self._pending_requests: request = self._pending_requests[package.conversation_id] @@ -118,7 +138,9 @@ class RoyalnetLink: continue # Package is a request assert isinstance(package, Package) + log.debug(f"Received request: {package.source} -> {package.data}") response = await self.request_handler(package.data) if response is not None: response_package: Package = package.reply(response) await self.send(response_package) + log.debug(f"Replied to request: {response_package.data} -> {response_package.destination}") diff --git a/royalnet/network/royalnetserver.py b/royalnet/network/royalnetserver.py index 0452d5ea..54500c4f 100644 --- a/royalnet/network/royalnetserver.py +++ b/royalnet/network/royalnetserver.py @@ -3,11 +3,15 @@ import websockets import re import datetime import pickle -import asyncio import uuid +import asyncio +import logging from .messages import Message, ErrorMessage, InvalidPackageEM, InvalidSecretEM, IdentifySuccessfulMessage from .packages import Package +loop = asyncio.get_event_loop() +log = logging.getLogger(__name__) + class ConnectedClient: def __init__(self, socket: websockets.WebSocketServerProtocol): @@ -21,7 +25,7 @@ class ConnectedClient: return bool(self.nid) async def send(self, package: Package): - self.socket.send(package.pickle()) + await self.socket.send(package.pickle()) class RoyalnetServer: @@ -29,9 +33,9 @@ class RoyalnetServer: self.address: str = address self.port: int = port self.required_secret: str = required_secret - self.identified_clients: typing.List[ConnectedClient] = {} + self.identified_clients: typing.List[ConnectedClient] = [] - def find_client(self, *, nid: str=None, link_type: str=None) -> typing.List[ConnectedClient]: + def find_client(self, *, nid: str = None, link_type: str = None) -> typing.List[ConnectedClient]: assert not (nid and link_type) if nid: matching = [client for client in self.identified_clients if client.nid == nid] @@ -42,13 +46,15 @@ class RoyalnetServer: return matching or [] async def listener(self, websocket: websockets.server.WebSocketServerProtocol, request_uri: str): + log.info(f"{websocket.remote_address} connected to the server.") connected_client = ConnectedClient(websocket) # Wait for identification - identify_msg = websocket.recv() + identify_msg = await websocket.recv() + log.debug(f"{websocket.remote_address} identified itself with: {identify_msg}.") if not isinstance(identify_msg, str): websocket.send(InvalidPackageEM("Invalid identification message (not a str)")) return - identification = re.match(r"Identify ([A-Za-z0-9\-]+):([a-z]+):([A-Za-z0-9\-])", identify_msg) + identification = re.match(r"Identify ([A-Za-z0-9\-]+):([a-z]+):([A-Za-z0-9\-]+)", identify_msg) if identification is None: websocket.send(InvalidPackageEM("Invalid identification message (regex failed)")) return @@ -60,18 +66,21 @@ class RoyalnetServer: connected_client.nid = identification.group(1) connected_client.link_type = identification.group(2) self.identified_clients.append(connected_client) + log.debug(f"{websocket.remote_address} identified successfully as {connected_client.nid} ({connected_client.link_type}).") await connected_client.send(Package(IdentifySuccessfulMessage(), connected_client.nid, "__master__")) + log.debug(f"{connected_client.nid}'s identification confirmed.") # Main loop while True: # Receive packages raw_pickle = await websocket.recv() package: Package = pickle.loads(raw_pickle) + log.debug(f"Received package: {package}") # Check if the package destination is the server itself. if package.destination == "__master__": # TODO: do stuff pass # Otherwise, route the package to its destination - asyncio.create_task(self.route_package(package)) + loop.create_task(self.route_package(package)) def find_destination(self, package: Package) -> typing.List[ConnectedClient]: """Find a list of destinations for the sent packages""" @@ -95,8 +104,11 @@ class RoyalnetServer: async def route_package(self, package: Package) -> None: """Executed every time a package is received and must be routed somewhere.""" destinations = self.find_destination(package) + log.debug(f"Routing package: {package} -> {destinations}") for destination in destinations: - await destination.send(package) + specific_package = Package(package.data, destination.nid, package.source, conversation_id=package.conversation_id) + await destination.send(specific_package) async def run(self): - websockets.serve(self.listener, host=self.address, port=self.port) + log.debug(f"Running main server loop for __master__ on ws://{self.address}:{self.port}") + await websockets.serve(self.listener, host=self.address, port=self.port) From efd18fc3a7d924c9feadd88e9d0553326583f96c Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Tue, 19 Mar 2019 14:01:00 +0100 Subject: [PATCH 8/9] Implement net_requests handler on Telegram bot --- royalnet/bots/telegram.py | 7 +++---- royalnet/network/royalnetlink.py | 4 ++-- royalnet/network/royalnetserver.py | 3 --- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/royalnet/bots/telegram.py b/royalnet/bots/telegram.py index 866d9eb1..d3c46c33 100644 --- a/royalnet/bots/telegram.py +++ b/royalnet/bots/telegram.py @@ -6,10 +6,6 @@ from ..utils import asyncify, Call, Command from ..network import RoyalnetLink, Message -async def null(message: Message): - pass - - class TelegramBot: def __init__(self, api_key: str, @@ -77,3 +73,6 @@ class TelegramBot: command = self.missing_command # Call the command return await self.Call(message.chat, command, *parameters).run() + + async def handle_net_request(self, message: Message): + pass diff --git a/royalnet/network/royalnetlink.py b/royalnet/network/royalnetlink.py index 65aa68ab..6f6f679b 100644 --- a/royalnet/network/royalnetlink.py +++ b/royalnet/network/royalnetlink.py @@ -138,9 +138,9 @@ class RoyalnetLink: continue # Package is a request assert isinstance(package, Package) - log.debug(f"Received request: {package.source} -> {package.data}") + log.debug(f"Received request {package.conversation_id}: {package}") response = await self.request_handler(package.data) if response is not None: response_package: Package = package.reply(response) await self.send(response_package) - log.debug(f"Replied to request: {response_package.data} -> {response_package.destination}") + log.debug(f"Replied to request {response_package.conversation_id}: {response_package}") diff --git a/royalnet/network/royalnetserver.py b/royalnet/network/royalnetserver.py index 54500c4f..208d805e 100644 --- a/royalnet/network/royalnetserver.py +++ b/royalnet/network/royalnetserver.py @@ -88,9 +88,6 @@ class RoyalnetServer: # Is it nothing? if package.destination == "NULL": return [] - # Is it the wildcard? - if package.destination == "*": - return self.identified_clients # Is it a valid nid? try: destination = str(uuid.UUID(package.destination)) From 1b00ea8ae6d40b08ed65f9d3b7bfcdad7b9a2b28 Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Fri, 22 Mar 2019 11:22:21 +0100 Subject: [PATCH 9/9] Create network tests --- README.md | 2 ++ requirements.txt | 1 + royalnet/bots/__init__.py | 3 +++ royalnet/bots/telegram.py | 6 +++++- royalnet/network/royalnetlink.py | 8 ++++---- setup.py | 27 ++++++++++++++++++++++++++ tests/test_network.py | 33 ++++++++++++++++++++++++++++++++ 7 files changed, 75 insertions(+), 5 deletions(-) create mode 100644 README.md create mode 100644 setup.py create mode 100644 tests/test_network.py diff --git a/README.md b/README.md new file mode 100644 index 00000000..bcd7d6b6 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# `royalnet` + diff --git a/requirements.txt b/requirements.txt index b4556fcd..483e63f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ python-telegram-bot>=11.1.0 websockets>=7.0 +pytest>=4.3.1 diff --git a/royalnet/bots/__init__.py b/royalnet/bots/__init__.py index e69de29b..83599e7e 100644 --- a/royalnet/bots/__init__.py +++ b/royalnet/bots/__init__.py @@ -0,0 +1,3 @@ +from .telegram import TelegramBot + +__all__ = ["TelegramBot"] diff --git a/royalnet/bots/telegram.py b/royalnet/bots/telegram.py index d3c46c33..b56e23be 100644 --- a/royalnet/bots/telegram.py +++ b/royalnet/bots/telegram.py @@ -6,6 +6,10 @@ from ..utils import asyncify, Call, Command from ..network import RoyalnetLink, Message +async def todo(message: Message): + pass + + class TelegramBot: def __init__(self, api_key: str, @@ -17,7 +21,7 @@ class TelegramBot: self.should_run: bool = False self.offset: int = -100 self.missing_command = missing_command - self.network: RoyalnetLink = RoyalnetLink(master_server_uri, master_server_secret, "telegram", null) + self.network: RoyalnetLink = RoyalnetLink(master_server_uri, master_server_secret, "telegram", todo) # Generate commands self.commands = {} for command in commands: diff --git a/royalnet/network/royalnetlink.py b/royalnet/network/royalnetlink.py index 6f6f679b..557cb1e2 100644 --- a/royalnet/network/royalnetlink.py +++ b/royalnet/network/royalnetlink.py @@ -68,7 +68,7 @@ class RoyalnetLink: self.request_handler = request_handler self._pending_requests: typing.Dict[typing.Optional[Message]] = {} self._connect_event: asyncio.Event = asyncio.Event() - self._identify_event: asyncio.Event = asyncio.Event() + self.identify_event: asyncio.Event = asyncio.Event() async def connect(self): log.info(f"Connecting to {self.master_uri}...") @@ -83,7 +83,7 @@ class RoyalnetLink: except websockets.ConnectionClosed: self.websocket = None self._connect_event.clear() - self._identify_event.clear() + self.identify_event.clear() log.info(f"Connection to {self.master_uri} was closed.") # What to do now? Let's just reraise. raise @@ -100,7 +100,7 @@ class RoyalnetLink: response = response_package.data if isinstance(response, ErrorMessage): raise NetworkError(response, "Server returned error while identifying self") - self._identify_event.set() + self.identify_event.set() log.info(f"Identified successfully!") @requires_identification @@ -128,7 +128,7 @@ class RoyalnetLink: while True: if self.websocket is None: await self.connect() - if not self._identify_event.is_set(): + if not self.identify_event.is_set(): await self.identify() package: Package = await self.receive() # Package is a response diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..6595a41a --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +import setuptools + +with open("README.md", "r") as f: + long_description = f.read() + +setuptools.setup( + name="royalnet", + version="5.0a1", + author="Stefano Pigozzi", + author_email="ste.pigozzi@gmail.com", + description="The great bot network of the Royal Games community", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/Steffo99/royalnet", + packages=setuptools.find_packages(), + install_requires=[ + "python-telegram-bot>=11.1.0", + "websockets>=7.0" + ], + python_requires=">=3.7", + classifiers=[ + "Development Status :: 3 - Alpha", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3.7", + "Topic :: Internet" + ] +) diff --git a/tests/test_network.py b/tests/test_network.py new file mode 100644 index 00000000..75c2880c --- /dev/null +++ b/tests/test_network.py @@ -0,0 +1,33 @@ +import pytest +import asyncio +from royalnet.network import RoyalnetLink, RoyalnetServer +from royalnet.network import Message + + +async def echo(message: Message): + return message + + +def test_connection(): + loop = asyncio.SelectorEventLoop() + server = RoyalnetServer("localhost", 1234, "testing") + link = RoyalnetLink("ws://localhost:1234", "testing", "testing", echo) + loop.create_task(server.run()) + loop.run_until_complete(link.run()) + assert link.websocket is not None + assert link.identify_event.is_set() + assert len(server.identified_clients) == 1 + assert server.identified_clients[0].link_type == "testing" + + +def test_request(): + loop = asyncio.SelectorEventLoop() + server = RoyalnetServer("localhost", 1234, "testing") + link1 = RoyalnetLink("ws://localhost:1234", "testing", "testing1", echo) + link2 = RoyalnetLink("ws://localhost:1234", "testing", "testing2", echo) + loop.create_task(server.run()) + loop.create_task(link1.run()) + loop.create_task(link2.run()) + message = Message() + response = loop.run_until_complete(link1.request(message, "testing2")) + assert message is response