From c6c9b28616bac0c65b66e7853bf56d35b9834f7b Mon Sep 17 00:00:00 2001 From: Stefano Pigozzi Date: Tue, 15 Oct 2019 11:07:04 +0200 Subject: [PATCH] First commit --- .gitignore | 13 ++++ README.md | 4 + royalherald/__init__.py | 22 ++++++ royalherald/config.py | 14 ++++ royalherald/errors.py | 22 ++++++ royalherald/link.py | 163 ++++++++++++++++++++++++++++++++++++++++ royalherald/package.py | 113 ++++++++++++++++++++++++++++ royalherald/request.py | 24 ++++++ royalherald/response.py | 61 +++++++++++++++ royalherald/server.py | 143 +++++++++++++++++++++++++++++++++++ setup.py | 24 ++++++ tests/test_network.py | 60 +++++++++++++++ 12 files changed, 663 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 royalherald/__init__.py create mode 100644 royalherald/config.py create mode 100644 royalherald/errors.py create mode 100644 royalherald/link.py create mode 100644 royalherald/package.py create mode 100644 royalherald/request.py create mode 100644 royalherald/response.py create mode 100644 royalherald/server.py create mode 100644 setup.py create mode 100644 tests/test_network.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..2ea4e4db --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +config.ini +.idea/ +.vscode/ +__pycache__ +downloads/ +ignored/ +markovmodels/ +logs/ +royalnet.egg-info/ +.pytest_cache/ +dist/ +build/ +venv/ diff --git a/README.md b/README.md new file mode 100644 index 00000000..d775f90c --- /dev/null +++ b/README.md @@ -0,0 +1,4 @@ +# `herald` [![PyPI](https://img.shields.io/pypi/v/herald.svg)](https://pypi.org/project/royalnet/) + + +A websocket communication protocol for [`royalnet`](https://github.com/Steffo99/royalnet)! diff --git a/royalherald/__init__.py b/royalherald/__init__.py new file mode 100644 index 00000000..dbf8a66d --- /dev/null +++ b/royalherald/__init__.py @@ -0,0 +1,22 @@ +from .config import Config +from .errors import HeraldError, ConnectionClosedError, LinkError, InvalidServerResponseError, ServerError +from .link import Link +from .package import Package +from .request import Request +from .response import Response +from .server import Server + + +__all__ = [ + "Config", + "HeraldError", + "ConnectionClosedError", + "LinkError", + "InvalidServerResponseError", + "ServerError", + "Link", + "Package", + "Request", + "Response", + "Server", +] diff --git a/royalherald/config.py b/royalherald/config.py new file mode 100644 index 00000000..82253400 --- /dev/null +++ b/royalherald/config.py @@ -0,0 +1,14 @@ +class Config: + __slots__ = "master_uri", "master_secret" + + def __init__(self, + master_uri: str, + master_secret: str): + if not (master_uri.startswith("ws://") + or master_uri.startswith("wss://")): + raise ValueError("Invalid protocol (must be ws:// or wss://)") + self.master_uri = master_uri + self.master_secret = master_secret + + def __repr__(self): + return f"{self.__class__.__qualname__}(master_uri={self.master_uri}, master_secret={self.master_secret})" diff --git a/royalherald/errors.py b/royalherald/errors.py new file mode 100644 index 00000000..b6b32317 --- /dev/null +++ b/royalherald/errors.py @@ -0,0 +1,22 @@ +class HeraldError(Exception): + """A generic :py:mod:`royalherald` error.""" + + +class LinkError(HeraldError): + """An error for something that happened in a :py:class:`Link`.""" + + +class ServerError(HeraldError): + """An error for something that happened in a :py:class:`Server`.""" + + +class ConnectionClosedError(LinkError): + """The :py:class:`Link`'s connection was closed unexpectedly. The link can't be used anymore.""" + + +class InvalidServerResponseError(LinkError): + """The :py:class:`Server` sent invalid data to the :py:class:`Link`.""" + + +class ResponseError(LinkError): + """The :py:class:`Response` was an error, and raise_on_error is :py:const:`True`.""" diff --git a/royalherald/link.py b/royalherald/link.py new file mode 100644 index 00000000..463f8cc4 --- /dev/null +++ b/royalherald/link.py @@ -0,0 +1,163 @@ +import asyncio +import websockets +import uuid +import functools +import math +import numbers +import logging as _logging +import typing +from .package import Package +from .errors import ConnectionClosedError, InvalidServerResponseError + + +log = _logging.getLogger(__name__) + + +class PendingRequest: + def __init__(self, *, loop: asyncio.AbstractEventLoop = None): + if loop is None: + self.loop = asyncio.get_event_loop() + else: + self.loop = loop + self.event: asyncio.Event = asyncio.Event(loop=loop) + self.data: typing.Optional[dict] = None + + def __repr__(self): + if self.event.is_set(): + return f"<{self.__class__.__qualname__}: {self.data.__class__.__name__}>" + return f"<{self.__class__.__qualname__}>" + + def set(self, data): + self.data = data + self.event.set() + + +def requires_connection(func): + @functools.wraps(func) + async def new_func(self, *args, **kwargs): + await self.connect_event.wait() + return await func(self, *args, **kwargs) + return new_func + + +def requires_identification(func): + @functools.wraps(func) + async def new_func(self, *args, **kwargs): + await self.identify_event.wait() + return await func(self, *args, **kwargs) + return new_func + + +class Link: + def __init__(self, master_uri: str, secret: str, link_type: str, request_handler, *, + loop: asyncio.AbstractEventLoop = None): + if ":" in link_type: + raise ValueError("Link types cannot contain colons.") + self.master_uri: str = master_uri + self.link_type: str = link_type + self.nid: str = str(uuid.uuid4()) + self.secret: str = secret + self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None + self.request_handler = request_handler + self._pending_requests: typing.Dict[str, PendingRequest] = {} + if loop is None: + self._loop = asyncio.get_event_loop() + else: + self._loop = loop + self.error_event: asyncio.Event = asyncio.Event(loop=self._loop) + self.connect_event: asyncio.Event = asyncio.Event(loop=self._loop) + self.identify_event: asyncio.Event = asyncio.Event(loop=self._loop) + + def __repr__(self): + if self.identify_event.is_set(): + return f"<{self.__class__.__qualname__} (identified)>" + elif self.connect_event.is_set(): + return f"<{self.__class__.__qualname__} (connected)>" + elif self.error_event.is_set(): + return f"<{self.__class__.__qualname__} (error)>" + else: + return f"<{self.__class__.__qualname__} (disconnected)>" + + async def connect(self): + """Connect to the :py:class:`royalnet.network.NetworkServer` at ``self.master_uri``.""" + log.info(f"Connecting to {self.master_uri}...") + self.websocket = await websockets.connect(self.master_uri, loop=self._loop) + self.connect_event.set() + log.info(f"Connected!") + + @requires_connection + async def receive(self) -> Package: + """Recieve a :py:class:`Package` from the :py:class:`Server`. + + Raises: + :py:exc:`royalnet.network.royalnetlink.ConnectionClosedError` if the connection closes.""" + try: + jbytes: bytes = await self.websocket.recv() + package: Package = Package.from_json_bytes(jbytes) + except websockets.ConnectionClosed: + self.error_event.set() + self.connect_event.clear() + self.identify_event.clear() + log.info(f"Connection to {self.master_uri} was closed.") + # What to do now? Let's just reraise. + raise ConnectionClosedError() + if self.identify_event.is_set() and package.destination != self.nid: + raise InvalidServerResponseError("Package is not addressed to this NetworkLink.") + log.debug(f"Received package: {package}") + return package + + @requires_connection + async def identify(self) -> None: + log.info(f"Identifying to {self.master_uri}...") + await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{self.secret}") + response: Package = await self.receive() + if not response.source == "": + raise InvalidServerResponseError("Received a non-service package before identification.") + if "type" not in response.data: + raise InvalidServerResponseError("Missing 'type' in response data") + if response.data["type"] == "error": + raise ConnectionClosedError(f"Identification error: {response.data['type']}") + assert response.data["type"] == "success" + self.identify_event.set() + log.info(f"Identified successfully!") + + @requires_identification + async def send(self, package: Package): + await self.websocket.send(package.to_json_bytes()) + log.debug(f"Sent package: {package}") + + @requires_identification + async def request(self, message, destination): + package = Package(message, source=self.nid, destination=destination) + request = PendingRequest(loop=self._loop) + self._pending_requests[package.source_conv_id] = request + await self.send(package) + log.debug(f"Sent request: {message} -> {destination}") + await request.event.wait() + response: dict = request.data + log.debug(f"Received response: {request} -> {response}") + return response + + async def run(self): + """Blockingly run the Link.""" + log.debug(f"Running main client loop for {self.nid}.") + if self.error_event.is_set(): + raise ConnectionClosedError("RoyalnetLinks can't be rerun after an error.") + while True: + if not self.connect_event.is_set(): + await self.connect() + if not self.identify_event.is_set(): + await self.identify() + package: Package = await self.receive() + # Package is a response + if package.destination_conv_id in self._pending_requests: + request = self._pending_requests[package.destination_conv_id] + request.set(package.data) + continue + # Package is a request + assert isinstance(package, Package) + log.debug(f"Received request {package.source_conv_id}: {package}") + response = await self.request_handler(package.data) + response_package: Package = package.reply(response) + await self.send(response_package) + log.debug(f"Replied to request {response_package.source_conv_id}: {response_package}") diff --git a/royalherald/package.py b/royalherald/package.py new file mode 100644 index 00000000..4f162233 --- /dev/null +++ b/royalherald/package.py @@ -0,0 +1,113 @@ +import json +import uuid +import typing + + +class Package: + """A ``royalherald`` package, the data type with which a :py:class:`Link` communicates with a :py:class:`Server` or + another Link. + + Contains info about the source and the destination.""" + + def __init__(self, + data: dict, + *, + source: str, + destination: str, + source_conv_id: typing.Optional[str] = None, + destination_conv_id: typing.Optional[str] = None): + """Create a Package. + + Parameters: + data: The data that should be sent. + source: The ``nid`` of the node that created this Package. + destination: The ``link_type`` of the destination node, or alternatively, the ``nid`` of the node. Can also be the ``NULL`` value to send the message to nobody. + source_conv_id: The conversation id of the node that created this package. Akin to the sequence number on IP packets. + destination_conv_id: The conversation id of the node that this Package is a reply to.""" + # TODO: something is not right in these type hints. Check them. + self.data: dict = data + self.source: str = source + self.source_conv_id: str = source_conv_id or str(uuid.uuid4()) + self.destination: str = destination + self.destination_conv_id: typing.Optional[str] = destination_conv_id + + def __repr__(self): + return f"<{self.__class__.__qualname__} {self.source} ({self.source_conv_id}) to {self.destination} ({self.destination_conv_id}>" + + def __eq__(self, other): + if isinstance(other, Package): + return (self.data == other.data) and \ + (self.source == other.source) and \ + (self.destination == other.destination) and \ + (self.source_conv_id == other.source_conv_id) and \ + (self.destination_conv_id == other.destination_conv_id) + return False + + def reply(self, data) -> "Package": + """Reply to this Package with another Package. + + Parameters: + data: The data that should be sent. Usually a :py:class:`royalnet.network.Message`. + + Returns: + The reply Package.""" + return Package(data, + source=self.destination, + destination=self.source, + source_conv_id=self.destination_conv_id or str(uuid.uuid4()), + destination_conv_id=self.source_conv_id) + + @staticmethod + def from_dict(d) -> "Package": + """Create a Package from a dictionary.""" + if "source" not in d: + raise ValueError("Missing source field") + if "nid" not in d["source"]: + raise ValueError("Missing source.nid field") + if "conv_id" not in d["source"]: + raise ValueError("Missing source.conv_id field") + if "destination" not in d: + raise ValueError("Missing destination field") + if "nid" not in d["destination"]: + raise ValueError("Missing destination.nid field") + if "conv_id" not in d["destination"]: + raise ValueError("Missing destination.conv_id field") + if "data" not in d: + raise ValueError("Missing data field") + return Package(d["data"], + source=d["source"]["nid"], + destination=d["destination"]["nid"], + source_conv_id=d["source"]["conv_id"], + destination_conv_id=d["destination"]["conv_id"]) + + def to_dict(self) -> dict: + """Convert the Package into a dictionary.""" + return { + "source": { + "nid": self.source, + "conv_id": self.source_conv_id + }, + "destination": { + "nid": self.destination, + "conv_id": self.destination_conv_id + }, + "data": self.data + } + + @staticmethod + def from_json_string(string: str) -> "Package": + """Create a Package from a JSON string.""" + return Package.from_dict(json.loads(string)) + + def to_json_string(self) -> str: + """Convert the Package into a JSON string.""" + return json.dumps(self.to_dict()) + + @staticmethod + def from_json_bytes(b: bytes) -> "Package": + """Create a Package from UTF8-encoded JSON bytes.""" + return Package.from_json_string(str(b, encoding="utf8")) + + def to_json_bytes(self) -> bytes: + """Convert the Package into UTF8-encoded JSON bytes.""" + return bytes(self.to_json_string(), encoding="utf8") diff --git a/royalherald/request.py b/royalherald/request.py new file mode 100644 index 00000000..76500801 --- /dev/null +++ b/royalherald/request.py @@ -0,0 +1,24 @@ +class Request: + """A request sent from a :py:class:`Link` to another. + + It contains the name of the requested handler, in addition to the data.""" + + def __init__(self, handler: str, data: dict): + super().__init__() + self.handler: str = handler + self.data: dict = data + + def to_dict(self): + return self.__dict__ + + @staticmethod + def from_dict(d: dict): + return Request(**d) + + def __eq__(self, other): + if isinstance(other, Request): + return self.handler == other.handler and self.data == other.data + return False + + def __repr__(self): + return f"{self.__class__.__qualname__}(handler={self.handler}, data={self.data})" diff --git a/royalherald/response.py b/royalherald/response.py new file mode 100644 index 00000000..58074bc3 --- /dev/null +++ b/royalherald/response.py @@ -0,0 +1,61 @@ +import typing +from .errors import ResponseError + + +class Response: + """A base class to be inherited by all other response types.""" + + def to_dict(self) -> dict: + """Prepare the Response to be sent by converting it to a JSONable :py:class:`dict`.""" + return { + "type": self.__class__.__name__, + **self.__dict__ + } + + def __eq__(self, other): + if isinstance(other, Response): + return self.to_dict() == other.to_dict() + return False + + @classmethod + def from_dict(cls, d: dict) -> "Response": + """Recreate the response from a received :py:class:`dict`.""" + # Ignore type in dict + del d["type"] + # noinspection PyArgumentList + return cls(**d) + + def raise_on_error(self): + """Raise an :py:exc:`ResponseError` if the Response is an error, do nothing otherwise.""" + raise NotImplementedError("Please override Response.raise_on_error()") + + +class ResponseSuccess(Response): + """A response to a successful :py:class:`Request`.""" + + def __init__(self, data: typing.Optional[dict] = None): + if data is None: + self.data = {} + else: + self.data = data + + def __repr__(self): + return f"{self.__class__.__qualname__}(data={self.data})" + + def raise_on_error(self): + pass + + +class ResponseFailure(Response): + """A response to a invalid :py:class:`Request`.""" + + def __init__(self, name: str, description: str, extra_info: typing.Optional[dict] = None): + self.name: str = name + self.description: str = description + self.extra_info: typing.Optional[dict] = extra_info + + def __repr__(self): + return f"{self.__class__.__qualname__}(name={self.name}, description={self.description}, extra_info={self.extra_info})" + + def raise_on_error(self): + raise ResponseError(self) diff --git a/royalherald/server.py b/royalherald/server.py new file mode 100644 index 00000000..7a5e3ba3 --- /dev/null +++ b/royalherald/server.py @@ -0,0 +1,143 @@ +import typing +import websockets +import re +import datetime +import uuid +import asyncio +import logging as _logging +from .package import Package + + +log = _logging.getLogger(__name__) + + +class ConnectedClient: + """The :py:class:`Server`-side representation of a connected :py:class:`Link`.""" + def __init__(self, socket: websockets.WebSocketServerProtocol): + self.socket: websockets.WebSocketServerProtocol = socket + self.nid: typing.Optional[str] = None + self.link_type: typing.Optional[str] = None + self.connection_datetime: datetime.datetime = datetime.datetime.now() + + @property + def is_identified(self) -> bool: + """Has the client sent a valid identification package?""" + return bool(self.nid) + + async def send_service(self, msg_type: str, message: str): + await self.send(Package({"type": msg_type, "service": message}, + source="", + destination=self.nid)) + + async def send(self, package: Package): + """Send a :py:class:`Package` to the :py:class:`Link`.""" + await self.socket.send(package.to_json_bytes()) + + +class Server: + def __init__(self, address: str, port: int, required_secret: str, *, loop: asyncio.AbstractEventLoop = None): + self.address: str = address + self.port: int = port + self.required_secret: str = required_secret + self.identified_clients: typing.List[ConnectedClient] = [] + self.loop = loop + + def find_client(self, *, nid: str = None, link_type: str = None) -> typing.List[ConnectedClient]: + assert not (nid and link_type) + if nid: + matching = [client for client in self.identified_clients if client.nid == nid] + assert len(matching) <= 1 + return matching + if link_type: + matching = [client for client in self.identified_clients if client.link_type == link_type] + return matching or [] + + async def listener(self, websocket: websockets.server.WebSocketServerProtocol): + log.info(f"{websocket.remote_address} connected to the server.") + connected_client = ConnectedClient(websocket) + # Wait for identification + identify_msg = await websocket.recv() + log.debug(f"{websocket.remote_address} identified itself with: {identify_msg}.") + if not isinstance(identify_msg, str): + await connected_client.send_service("error", "Invalid identification message (not a str)") + return + identification = re.match(r"Identify ([^:\s]+):([^:\s]+):([^:\s]+)", identify_msg) + if identification is None: + await connected_client.send_service("error", "Invalid identification message (regex failed)") + return + secret = identification.group(3) + if secret != self.required_secret: + await connected_client.send_service("error", "Invalid secret") + return + # Identification successful + connected_client.nid = identification.group(1) + connected_client.link_type = identification.group(2) + self.identified_clients.append(connected_client) + log.debug(f"{websocket.remote_address} identified successfully as {connected_client.nid}" + f" ({connected_client.link_type}).") + await connected_client.send_service("success", "Identification successful!") + log.debug(f"{connected_client.nid}'s identification confirmed.") + # Main loop + while True: + # Receive packages + raw_bytes = await websocket.recv() + package: Package = Package.from_json_bytes(raw_bytes) + log.debug(f"Received package: {package}") + # Check if the package destination is the server itself. + if package.destination == "": + # TODO: do stuff + pass + # Otherwise, route the package to its destination + # noinspection PyAsyncCall + self.loop.create_task(self.route_package(package)) + + def find_destination(self, package: Package) -> typing.List[ConnectedClient]: + """Find a list of destinations for the package. + + Parameters: + package: The package to find the destination of. + + Returns: + A :py:class:`list` of :py:class:`ConnectedClient` to send the package to.""" + # Parse destination + # Is it nothing? + if package.destination == "": + return [] + # Is it a valid nid? + try: + destination = str(uuid.UUID(package.destination)) + except ValueError: + pass + else: + return self.find_client(nid=destination) + # Is it a link_type? + return self.find_client(link_type=package.destination) + + async def route_package(self, package: Package) -> None: + """Executed every time a package is received and must be routed somewhere.""" + destinations = self.find_destination(package) + log.debug(f"Routing package: {package} -> {destinations}") + for destination in destinations: + # This may have some consequences + specific_package = Package(package.data, + source=package.source, + destination=destination.nid, + source_conv_id=package.source_conv_id, + destination_conv_id=package.destination_conv_id) + await destination.send(specific_package) + + def serve(self): + log.debug(f"Serving on ws://{self.address}:{self.port}") + self.loop.run_until_complete(self.run()) + self.loop.run_forever() + + async def run(self): + await websockets.serve(self.listener, + host=self.address, + port=self.port, + loop=self.loop) + + def run_blocking(self): + if self.loop is None: + self.loop = asyncio.get_event_loop() + self.serve() diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..22aebc82 --- /dev/null +++ b/setup.py @@ -0,0 +1,24 @@ +import setuptools + +with open("README.md", "r") as f: + long_description = f.read() + +setuptools.setup( + name="royalherald", + version="5.0b1", + author="Stefano Pigozzi", + author_email="ste.pigozzi@gmail.com", + description="A websocket communication protocol", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/Steffo99/royalherald", + packages=setuptools.find_packages(), + install_requires=[], + python_requires=">=3.7", + classifiers=[ + "Development Status :: 4 - Beta", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3.7", + "License :: OSI Approved :: MIT License" + ] +) diff --git a/tests/test_network.py b/tests/test_network.py new file mode 100644 index 00000000..75a913cd --- /dev/null +++ b/tests/test_network.py @@ -0,0 +1,60 @@ +import pytest +import uuid +import asyncio +import logging +import royalherald as h + + +log = logging.root +stream_handler = logging.StreamHandler() +stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{") +log.addHandler(stream_handler) +log.setLevel(logging.WARNING) + + +@pytest.fixture +def async_loop(): + loop = asyncio.get_event_loop() + yield loop + loop.close() + + +async def echo_request_handler(message): + return message + + +def test_package_serialization(): + pkg = h.Package({"ciao": "ciao"}, + source=str(uuid.uuid4()), + destination=str(uuid.uuid4()), + source_conv_id=str(uuid.uuid4()), + destination_conv_id=str(uuid.uuid4())) + assert pkg == h.Package.from_dict(pkg.to_dict()) + assert pkg == h.Package.from_json_string(pkg.to_json_string()) + assert pkg == h.Package.from_json_bytes(pkg.to_json_bytes()) + + +def test_request_creation(): + request = h.Request("pytest", {"testing": "is fun", "bugs": "are less fun"}) + assert request == h.Request.from_dict(request.to_dict()) + + +# Broken! +# +# def test_links(async_loop: asyncio.AbstractEventLoop): +# address, port = "127.0.0.1", 1234 +# master = h.Server(address, port, "test", loop=async_loop) +# async_loop.create_task(master.run()) +# async_loop.run_until_complete(asyncio.sleep(5)) +# # Test invalid secret +# wrong_secret_link = h.Link(f"ws://{address}:{port}", "invalid", "test", echo_request_handler, loop=async_loop) +# with pytest.raises(h.ConnectionClosedError): +# async_loop.run_until_complete(wrong_secret_link.run()) +# # Test regular connection +# link1 = h.Link("ws://127.0.0.1:1235", "test", "one", echo_request_handler, loop=async_loop) +# async_loop.create_task(link1.run()) +# link2 = h.Link("ws://127.0.0.1:1235", "test", "two", echo_request_handler, loop=async_loop) +# async_loop.create_task(link2.run()) +# message = {"ciao": "ciao"} +# response = async_loop.run_until_complete(link1.request(message, "two")) +# assert message == response