1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 11:34:18 +00:00

First commit

This commit is contained in:
Steffo 2019-10-15 11:07:04 +02:00
commit c6c9b28616
12 changed files with 663 additions and 0 deletions

13
.gitignore vendored Normal file
View file

@ -0,0 +1,13 @@
config.ini
.idea/
.vscode/
__pycache__
downloads/
ignored/
markovmodels/
logs/
royalnet.egg-info/
.pytest_cache/
dist/
build/
venv/

4
README.md Normal file
View file

@ -0,0 +1,4 @@
# `herald` [![PyPI](https://img.shields.io/pypi/v/herald.svg)](https://pypi.org/project/royalnet/)
A websocket communication protocol for [`royalnet`](https://github.com/Steffo99/royalnet)!

22
royalherald/__init__.py Normal file
View file

@ -0,0 +1,22 @@
from .config import Config
from .errors import HeraldError, ConnectionClosedError, LinkError, InvalidServerResponseError, ServerError
from .link import Link
from .package import Package
from .request import Request
from .response import Response
from .server import Server
__all__ = [
"Config",
"HeraldError",
"ConnectionClosedError",
"LinkError",
"InvalidServerResponseError",
"ServerError",
"Link",
"Package",
"Request",
"Response",
"Server",
]

14
royalherald/config.py Normal file
View file

@ -0,0 +1,14 @@
class Config:
__slots__ = "master_uri", "master_secret"
def __init__(self,
master_uri: str,
master_secret: str):
if not (master_uri.startswith("ws://")
or master_uri.startswith("wss://")):
raise ValueError("Invalid protocol (must be ws:// or wss://)")
self.master_uri = master_uri
self.master_secret = master_secret
def __repr__(self):
return f"{self.__class__.__qualname__}(master_uri={self.master_uri}, master_secret={self.master_secret})"

22
royalherald/errors.py Normal file
View file

@ -0,0 +1,22 @@
class HeraldError(Exception):
"""A generic :py:mod:`royalherald` error."""
class LinkError(HeraldError):
"""An error for something that happened in a :py:class:`Link`."""
class ServerError(HeraldError):
"""An error for something that happened in a :py:class:`Server`."""
class ConnectionClosedError(LinkError):
"""The :py:class:`Link`'s connection was closed unexpectedly. The link can't be used anymore."""
class InvalidServerResponseError(LinkError):
"""The :py:class:`Server` sent invalid data to the :py:class:`Link`."""
class ResponseError(LinkError):
"""The :py:class:`Response` was an error, and raise_on_error is :py:const:`True`."""

163
royalherald/link.py Normal file
View file

@ -0,0 +1,163 @@
import asyncio
import websockets
import uuid
import functools
import math
import numbers
import logging as _logging
import typing
from .package import Package
from .errors import ConnectionClosedError, InvalidServerResponseError
log = _logging.getLogger(__name__)
class PendingRequest:
def __init__(self, *, loop: asyncio.AbstractEventLoop = None):
if loop is None:
self.loop = asyncio.get_event_loop()
else:
self.loop = loop
self.event: asyncio.Event = asyncio.Event(loop=loop)
self.data: typing.Optional[dict] = None
def __repr__(self):
if self.event.is_set():
return f"<{self.__class__.__qualname__}: {self.data.__class__.__name__}>"
return f"<{self.__class__.__qualname__}>"
def set(self, data):
self.data = data
self.event.set()
def requires_connection(func):
@functools.wraps(func)
async def new_func(self, *args, **kwargs):
await self.connect_event.wait()
return await func(self, *args, **kwargs)
return new_func
def requires_identification(func):
@functools.wraps(func)
async def new_func(self, *args, **kwargs):
await self.identify_event.wait()
return await func(self, *args, **kwargs)
return new_func
class Link:
def __init__(self, master_uri: str, secret: str, link_type: str, request_handler, *,
loop: asyncio.AbstractEventLoop = None):
if ":" in link_type:
raise ValueError("Link types cannot contain colons.")
self.master_uri: str = master_uri
self.link_type: str = link_type
self.nid: str = str(uuid.uuid4())
self.secret: str = secret
self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None
self.request_handler = request_handler
self._pending_requests: typing.Dict[str, PendingRequest] = {}
if loop is None:
self._loop = asyncio.get_event_loop()
else:
self._loop = loop
self.error_event: asyncio.Event = asyncio.Event(loop=self._loop)
self.connect_event: asyncio.Event = asyncio.Event(loop=self._loop)
self.identify_event: asyncio.Event = asyncio.Event(loop=self._loop)
def __repr__(self):
if self.identify_event.is_set():
return f"<{self.__class__.__qualname__} (identified)>"
elif self.connect_event.is_set():
return f"<{self.__class__.__qualname__} (connected)>"
elif self.error_event.is_set():
return f"<{self.__class__.__qualname__} (error)>"
else:
return f"<{self.__class__.__qualname__} (disconnected)>"
async def connect(self):
"""Connect to the :py:class:`royalnet.network.NetworkServer` at ``self.master_uri``."""
log.info(f"Connecting to {self.master_uri}...")
self.websocket = await websockets.connect(self.master_uri, loop=self._loop)
self.connect_event.set()
log.info(f"Connected!")
@requires_connection
async def receive(self) -> Package:
"""Recieve a :py:class:`Package` from the :py:class:`Server`.
Raises:
:py:exc:`royalnet.network.royalnetlink.ConnectionClosedError` if the connection closes."""
try:
jbytes: bytes = await self.websocket.recv()
package: Package = Package.from_json_bytes(jbytes)
except websockets.ConnectionClosed:
self.error_event.set()
self.connect_event.clear()
self.identify_event.clear()
log.info(f"Connection to {self.master_uri} was closed.")
# What to do now? Let's just reraise.
raise ConnectionClosedError()
if self.identify_event.is_set() and package.destination != self.nid:
raise InvalidServerResponseError("Package is not addressed to this NetworkLink.")
log.debug(f"Received package: {package}")
return package
@requires_connection
async def identify(self) -> None:
log.info(f"Identifying to {self.master_uri}...")
await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{self.secret}")
response: Package = await self.receive()
if not response.source == "<server>":
raise InvalidServerResponseError("Received a non-service package before identification.")
if "type" not in response.data:
raise InvalidServerResponseError("Missing 'type' in response data")
if response.data["type"] == "error":
raise ConnectionClosedError(f"Identification error: {response.data['type']}")
assert response.data["type"] == "success"
self.identify_event.set()
log.info(f"Identified successfully!")
@requires_identification
async def send(self, package: Package):
await self.websocket.send(package.to_json_bytes())
log.debug(f"Sent package: {package}")
@requires_identification
async def request(self, message, destination):
package = Package(message, source=self.nid, destination=destination)
request = PendingRequest(loop=self._loop)
self._pending_requests[package.source_conv_id] = request
await self.send(package)
log.debug(f"Sent request: {message} -> {destination}")
await request.event.wait()
response: dict = request.data
log.debug(f"Received response: {request} -> {response}")
return response
async def run(self):
"""Blockingly run the Link."""
log.debug(f"Running main client loop for {self.nid}.")
if self.error_event.is_set():
raise ConnectionClosedError("RoyalnetLinks can't be rerun after an error.")
while True:
if not self.connect_event.is_set():
await self.connect()
if not self.identify_event.is_set():
await self.identify()
package: Package = await self.receive()
# Package is a response
if package.destination_conv_id in self._pending_requests:
request = self._pending_requests[package.destination_conv_id]
request.set(package.data)
continue
# Package is a request
assert isinstance(package, Package)
log.debug(f"Received request {package.source_conv_id}: {package}")
response = await self.request_handler(package.data)
response_package: Package = package.reply(response)
await self.send(response_package)
log.debug(f"Replied to request {response_package.source_conv_id}: {response_package}")

113
royalherald/package.py Normal file
View file

@ -0,0 +1,113 @@
import json
import uuid
import typing
class Package:
"""A ``royalherald`` package, the data type with which a :py:class:`Link` communicates with a :py:class:`Server` or
another Link.
Contains info about the source and the destination."""
def __init__(self,
data: dict,
*,
source: str,
destination: str,
source_conv_id: typing.Optional[str] = None,
destination_conv_id: typing.Optional[str] = None):
"""Create a Package.
Parameters:
data: The data that should be sent.
source: The ``nid`` of the node that created this Package.
destination: The ``link_type`` of the destination node, or alternatively, the ``nid`` of the node. Can also be the ``NULL`` value to send the message to nobody.
source_conv_id: The conversation id of the node that created this package. Akin to the sequence number on IP packets.
destination_conv_id: The conversation id of the node that this Package is a reply to."""
# TODO: something is not right in these type hints. Check them.
self.data: dict = data
self.source: str = source
self.source_conv_id: str = source_conv_id or str(uuid.uuid4())
self.destination: str = destination
self.destination_conv_id: typing.Optional[str] = destination_conv_id
def __repr__(self):
return f"<{self.__class__.__qualname__} {self.source} ({self.source_conv_id}) to {self.destination} ({self.destination_conv_id}>"
def __eq__(self, other):
if isinstance(other, Package):
return (self.data == other.data) and \
(self.source == other.source) and \
(self.destination == other.destination) and \
(self.source_conv_id == other.source_conv_id) and \
(self.destination_conv_id == other.destination_conv_id)
return False
def reply(self, data) -> "Package":
"""Reply to this Package with another Package.
Parameters:
data: The data that should be sent. Usually a :py:class:`royalnet.network.Message`.
Returns:
The reply Package."""
return Package(data,
source=self.destination,
destination=self.source,
source_conv_id=self.destination_conv_id or str(uuid.uuid4()),
destination_conv_id=self.source_conv_id)
@staticmethod
def from_dict(d) -> "Package":
"""Create a Package from a dictionary."""
if "source" not in d:
raise ValueError("Missing source field")
if "nid" not in d["source"]:
raise ValueError("Missing source.nid field")
if "conv_id" not in d["source"]:
raise ValueError("Missing source.conv_id field")
if "destination" not in d:
raise ValueError("Missing destination field")
if "nid" not in d["destination"]:
raise ValueError("Missing destination.nid field")
if "conv_id" not in d["destination"]:
raise ValueError("Missing destination.conv_id field")
if "data" not in d:
raise ValueError("Missing data field")
return Package(d["data"],
source=d["source"]["nid"],
destination=d["destination"]["nid"],
source_conv_id=d["source"]["conv_id"],
destination_conv_id=d["destination"]["conv_id"])
def to_dict(self) -> dict:
"""Convert the Package into a dictionary."""
return {
"source": {
"nid": self.source,
"conv_id": self.source_conv_id
},
"destination": {
"nid": self.destination,
"conv_id": self.destination_conv_id
},
"data": self.data
}
@staticmethod
def from_json_string(string: str) -> "Package":
"""Create a Package from a JSON string."""
return Package.from_dict(json.loads(string))
def to_json_string(self) -> str:
"""Convert the Package into a JSON string."""
return json.dumps(self.to_dict())
@staticmethod
def from_json_bytes(b: bytes) -> "Package":
"""Create a Package from UTF8-encoded JSON bytes."""
return Package.from_json_string(str(b, encoding="utf8"))
def to_json_bytes(self) -> bytes:
"""Convert the Package into UTF8-encoded JSON bytes."""
return bytes(self.to_json_string(), encoding="utf8")

24
royalherald/request.py Normal file
View file

@ -0,0 +1,24 @@
class Request:
"""A request sent from a :py:class:`Link` to another.
It contains the name of the requested handler, in addition to the data."""
def __init__(self, handler: str, data: dict):
super().__init__()
self.handler: str = handler
self.data: dict = data
def to_dict(self):
return self.__dict__
@staticmethod
def from_dict(d: dict):
return Request(**d)
def __eq__(self, other):
if isinstance(other, Request):
return self.handler == other.handler and self.data == other.data
return False
def __repr__(self):
return f"{self.__class__.__qualname__}(handler={self.handler}, data={self.data})"

61
royalherald/response.py Normal file
View file

@ -0,0 +1,61 @@
import typing
from .errors import ResponseError
class Response:
"""A base class to be inherited by all other response types."""
def to_dict(self) -> dict:
"""Prepare the Response to be sent by converting it to a JSONable :py:class:`dict`."""
return {
"type": self.__class__.__name__,
**self.__dict__
}
def __eq__(self, other):
if isinstance(other, Response):
return self.to_dict() == other.to_dict()
return False
@classmethod
def from_dict(cls, d: dict) -> "Response":
"""Recreate the response from a received :py:class:`dict`."""
# Ignore type in dict
del d["type"]
# noinspection PyArgumentList
return cls(**d)
def raise_on_error(self):
"""Raise an :py:exc:`ResponseError` if the Response is an error, do nothing otherwise."""
raise NotImplementedError("Please override Response.raise_on_error()")
class ResponseSuccess(Response):
"""A response to a successful :py:class:`Request`."""
def __init__(self, data: typing.Optional[dict] = None):
if data is None:
self.data = {}
else:
self.data = data
def __repr__(self):
return f"{self.__class__.__qualname__}(data={self.data})"
def raise_on_error(self):
pass
class ResponseFailure(Response):
"""A response to a invalid :py:class:`Request`."""
def __init__(self, name: str, description: str, extra_info: typing.Optional[dict] = None):
self.name: str = name
self.description: str = description
self.extra_info: typing.Optional[dict] = extra_info
def __repr__(self):
return f"{self.__class__.__qualname__}(name={self.name}, description={self.description}, extra_info={self.extra_info})"
def raise_on_error(self):
raise ResponseError(self)

143
royalherald/server.py Normal file
View file

@ -0,0 +1,143 @@
import typing
import websockets
import re
import datetime
import uuid
import asyncio
import logging as _logging
from .package import Package
log = _logging.getLogger(__name__)
class ConnectedClient:
"""The :py:class:`Server`-side representation of a connected :py:class:`Link`."""
def __init__(self, socket: websockets.WebSocketServerProtocol):
self.socket: websockets.WebSocketServerProtocol = socket
self.nid: typing.Optional[str] = None
self.link_type: typing.Optional[str] = None
self.connection_datetime: datetime.datetime = datetime.datetime.now()
@property
def is_identified(self) -> bool:
"""Has the client sent a valid identification package?"""
return bool(self.nid)
async def send_service(self, msg_type: str, message: str):
await self.send(Package({"type": msg_type, "service": message},
source="<server>",
destination=self.nid))
async def send(self, package: Package):
"""Send a :py:class:`Package` to the :py:class:`Link`."""
await self.socket.send(package.to_json_bytes())
class Server:
def __init__(self, address: str, port: int, required_secret: str, *, loop: asyncio.AbstractEventLoop = None):
self.address: str = address
self.port: int = port
self.required_secret: str = required_secret
self.identified_clients: typing.List[ConnectedClient] = []
self.loop = loop
def find_client(self, *, nid: str = None, link_type: str = None) -> typing.List[ConnectedClient]:
assert not (nid and link_type)
if nid:
matching = [client for client in self.identified_clients if client.nid == nid]
assert len(matching) <= 1
return matching
if link_type:
matching = [client for client in self.identified_clients if client.link_type == link_type]
return matching or []
async def listener(self, websocket: websockets.server.WebSocketServerProtocol):
log.info(f"{websocket.remote_address} connected to the server.")
connected_client = ConnectedClient(websocket)
# Wait for identification
identify_msg = await websocket.recv()
log.debug(f"{websocket.remote_address} identified itself with: {identify_msg}.")
if not isinstance(identify_msg, str):
await connected_client.send_service("error", "Invalid identification message (not a str)")
return
identification = re.match(r"Identify ([^:\s]+):([^:\s]+):([^:\s]+)", identify_msg)
if identification is None:
await connected_client.send_service("error", "Invalid identification message (regex failed)")
return
secret = identification.group(3)
if secret != self.required_secret:
await connected_client.send_service("error", "Invalid secret")
return
# Identification successful
connected_client.nid = identification.group(1)
connected_client.link_type = identification.group(2)
self.identified_clients.append(connected_client)
log.debug(f"{websocket.remote_address} identified successfully as {connected_client.nid}"
f" ({connected_client.link_type}).")
await connected_client.send_service("success", "Identification successful!")
log.debug(f"{connected_client.nid}'s identification confirmed.")
# Main loop
while True:
# Receive packages
raw_bytes = await websocket.recv()
package: Package = Package.from_json_bytes(raw_bytes)
log.debug(f"Received package: {package}")
# Check if the package destination is the server itself.
if package.destination == "<server>":
# TODO: do stuff
pass
# Otherwise, route the package to its destination
# noinspection PyAsyncCall
self.loop.create_task(self.route_package(package))
def find_destination(self, package: Package) -> typing.List[ConnectedClient]:
"""Find a list of destinations for the package.
Parameters:
package: The package to find the destination of.
Returns:
A :py:class:`list` of :py:class:`ConnectedClient` to send the package to."""
# Parse destination
# Is it nothing?
if package.destination == "<none>":
return []
# Is it a valid nid?
try:
destination = str(uuid.UUID(package.destination))
except ValueError:
pass
else:
return self.find_client(nid=destination)
# Is it a link_type?
return self.find_client(link_type=package.destination)
async def route_package(self, package: Package) -> None:
"""Executed every time a package is received and must be routed somewhere."""
destinations = self.find_destination(package)
log.debug(f"Routing package: {package} -> {destinations}")
for destination in destinations:
# This may have some consequences
specific_package = Package(package.data,
source=package.source,
destination=destination.nid,
source_conv_id=package.source_conv_id,
destination_conv_id=package.destination_conv_id)
await destination.send(specific_package)
def serve(self):
log.debug(f"Serving on ws://{self.address}:{self.port}")
self.loop.run_until_complete(self.run())
self.loop.run_forever()
async def run(self):
await websockets.serve(self.listener,
host=self.address,
port=self.port,
loop=self.loop)
def run_blocking(self):
if self.loop is None:
self.loop = asyncio.get_event_loop()
self.serve()

24
setup.py Normal file
View file

@ -0,0 +1,24 @@
import setuptools
with open("README.md", "r") as f:
long_description = f.read()
setuptools.setup(
name="royalherald",
version="5.0b1",
author="Stefano Pigozzi",
author_email="ste.pigozzi@gmail.com",
description="A websocket communication protocol",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/Steffo99/royalherald",
packages=setuptools.find_packages(),
install_requires=[],
python_requires=">=3.7",
classifiers=[
"Development Status :: 4 - Beta",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.7",
"License :: OSI Approved :: MIT License"
]
)

60
tests/test_network.py Normal file
View file

@ -0,0 +1,60 @@
import pytest
import uuid
import asyncio
import logging
import royalherald as h
log = logging.root
stream_handler = logging.StreamHandler()
stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{")
log.addHandler(stream_handler)
log.setLevel(logging.WARNING)
@pytest.fixture
def async_loop():
loop = asyncio.get_event_loop()
yield loop
loop.close()
async def echo_request_handler(message):
return message
def test_package_serialization():
pkg = h.Package({"ciao": "ciao"},
source=str(uuid.uuid4()),
destination=str(uuid.uuid4()),
source_conv_id=str(uuid.uuid4()),
destination_conv_id=str(uuid.uuid4()))
assert pkg == h.Package.from_dict(pkg.to_dict())
assert pkg == h.Package.from_json_string(pkg.to_json_string())
assert pkg == h.Package.from_json_bytes(pkg.to_json_bytes())
def test_request_creation():
request = h.Request("pytest", {"testing": "is fun", "bugs": "are less fun"})
assert request == h.Request.from_dict(request.to_dict())
# Broken!
#
# def test_links(async_loop: asyncio.AbstractEventLoop):
# address, port = "127.0.0.1", 1234
# master = h.Server(address, port, "test", loop=async_loop)
# async_loop.create_task(master.run())
# async_loop.run_until_complete(asyncio.sleep(5))
# # Test invalid secret
# wrong_secret_link = h.Link(f"ws://{address}:{port}", "invalid", "test", echo_request_handler, loop=async_loop)
# with pytest.raises(h.ConnectionClosedError):
# async_loop.run_until_complete(wrong_secret_link.run())
# # Test regular connection
# link1 = h.Link("ws://127.0.0.1:1235", "test", "one", echo_request_handler, loop=async_loop)
# async_loop.create_task(link1.run())
# link2 = h.Link("ws://127.0.0.1:1235", "test", "two", echo_request_handler, loop=async_loop)
# async_loop.create_task(link2.run())
# message = {"ciao": "ciao"}
# response = async_loop.run_until_complete(link1.request(message, "two"))
# assert message == response