mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-27 13:34:28 +00:00
Start work on the YOSHI REFACTOR
This commit is contained in:
parent
7f023ff60b
commit
04e5204819
12 changed files with 28 additions and 640 deletions
|
@ -1,7 +1,7 @@
|
||||||
bcrypt>=3.1.7
|
bcrypt>=3.1.7
|
||||||
discord.py>=1.2.2
|
discord.py>=1.2.2
|
||||||
python-telegram-bot>=11.1.0
|
python-telegram-bot>=11.1.0
|
||||||
websockets>=6.0
|
royalherald>=5.0b4
|
||||||
pytest>=4.3.1
|
pytest>=4.3.1
|
||||||
psycopg2-binary>=2.8
|
psycopg2-binary>=2.8
|
||||||
aiohttp>=3.5.4
|
aiohttp>=3.5.4
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
from . import audio, bots, database, network, utils, error, web, version
|
from . import audio, bots, database, utils, error, web, version
|
||||||
from royalnet import commands
|
from royalnet import commands
|
||||||
|
|
||||||
__all__ = ["audio", "bots", "commands", "database", "network", "utils", "error", "web", "version"]
|
__all__ = ["audio", "bots", "commands", "database", "utils", "error", "web", "version"]
|
||||||
|
|
|
@ -3,11 +3,11 @@ import asyncio
|
||||||
import logging
|
import logging
|
||||||
import sentry_sdk
|
import sentry_sdk
|
||||||
import keyring
|
import keyring
|
||||||
|
import royalherald as rh
|
||||||
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
|
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
|
||||||
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
|
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
|
||||||
from sentry_sdk.integrations.logging import LoggingIntegration
|
from sentry_sdk.integrations.logging import LoggingIntegration
|
||||||
from ..utils import *
|
from ..utils import *
|
||||||
from ..network import *
|
|
||||||
from ..database import *
|
from ..database import *
|
||||||
from ..commands import *
|
from ..commands import *
|
||||||
from ..error import *
|
from ..error import *
|
||||||
|
@ -62,16 +62,16 @@ class GenericBot:
|
||||||
def unregister_net_handler(ci, message_type: str):
|
def unregister_net_handler(ci, message_type: str):
|
||||||
del self.network_handlers[message_type]
|
del self.network_handlers[message_type]
|
||||||
|
|
||||||
async def net_request(ci, request: Request, destination: str) -> dict:
|
async def net_request(ci, request: rh.Request, destination: str) -> dict:
|
||||||
if self.network is None:
|
if self.network is None:
|
||||||
raise InvalidConfigError("Royalnet is not enabled on this bot")
|
raise InvalidConfigError("Royalnet is not enabled on this bot")
|
||||||
response_dict: dict = await self.network.request(request.to_dict(), destination)
|
response_dict: dict = await self.network.request(request.to_dict(), destination)
|
||||||
if "type" not in response_dict:
|
if "type" not in response_dict:
|
||||||
raise RoyalnetResponseError("Response is missing a type")
|
raise RoyalnetResponseError("Response is missing a type")
|
||||||
elif response_dict["type"] == "ResponseSuccess":
|
elif response_dict["type"] == "ResponseSuccess":
|
||||||
response: typing.Union[ResponseSuccess, ResponseError] = ResponseSuccess.from_dict(response_dict)
|
response: typing.Union[rh.ResponseSuccess, rh.ResponseFailure] = rh.ResponseSuccess.from_dict(response_dict)
|
||||||
elif response_dict["type"] == "ResponseError":
|
elif response_dict["type"] == "ResponseError":
|
||||||
response = ResponseError.from_dict(response_dict)
|
response = rh.ResponseFailure.from_dict(response_dict)
|
||||||
else:
|
else:
|
||||||
raise RoyalnetResponseError("Response type is unknown")
|
raise RoyalnetResponseError("Response type is unknown")
|
||||||
response.raise_on_error()
|
response.raise_on_error()
|
||||||
|
@ -83,25 +83,25 @@ class GenericBot:
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
def _init_network(self):
|
def _init_network(self):
|
||||||
"""Create a :py:class:`royalnet.network.NetworkLink`, and run it as a :py:class:`asyncio.Task`."""
|
"""Create a :py:class:`royalherald.Link`, and run it as a :py:class:`asyncio.Task`."""
|
||||||
if self.uninitialized_network_config is not None:
|
if self.uninitialized_network_config is not None:
|
||||||
self.network: NetworkLink = NetworkLink(self.uninitialized_network_config.master_uri,
|
self.network: rh.Link = rh.Link(self.uninitialized_network_config.master_uri,
|
||||||
self.uninitialized_network_config.master_secret,
|
self.uninitialized_network_config.master_secret,
|
||||||
self.interface_name, self._network_handler)
|
self.interface_name, self._network_handler)
|
||||||
log.debug(f"Running NetworkLink {self.network}")
|
log.debug(f"Running NetworkLink {self.network}")
|
||||||
self.loop.create_task(self.network.run())
|
self.loop.create_task(self.network.run())
|
||||||
|
|
||||||
async def _network_handler(self, request_dict: dict) -> dict:
|
async def _network_handler(self, request_dict: dict) -> dict:
|
||||||
"""Handle a single :py:class:`dict` received from the :py:class:`royalnet.network.NetworkLink`.
|
"""Handle a single :py:class:`dict` received from the :py:class:`royalherald.Link`.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Another :py:class:`dict`, formatted as a :py:class:`royalnet.network.Response`."""
|
Another :py:class:`dict`, formatted as a :py:class:`royalherald.Response`."""
|
||||||
# Convert the dict to a Request
|
# Convert the dict to a Request
|
||||||
try:
|
try:
|
||||||
request: Request = Request.from_dict(request_dict)
|
request: rh.Request = rh.Request.from_dict(request_dict)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
log.warning(f"Invalid request received: {request_dict}")
|
log.warning(f"Invalid request received: {request_dict}")
|
||||||
return ResponseError("invalid_request",
|
return rh.ResponseFailure("invalid_request",
|
||||||
f"The Request that you sent was invalid. Check extra_info to see what you sent.",
|
f"The Request that you sent was invalid. Check extra_info to see what you sent.",
|
||||||
extra_info={"you_sent": request_dict}).to_dict()
|
extra_info={"you_sent": request_dict}).to_dict()
|
||||||
log.debug(f"Received {request} from the NetworkLink")
|
log.debug(f"Received {request} from the NetworkLink")
|
||||||
|
@ -109,15 +109,15 @@ class GenericBot:
|
||||||
network_handler = self.network_handlers[request.handler]
|
network_handler = self.network_handlers[request.handler]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
log.warning(f"Missing network_handler for {request.handler}")
|
log.warning(f"Missing network_handler for {request.handler}")
|
||||||
return ResponseError("no_handler", f"This Link is missing a network handler for {request.handler}.").to_dict()
|
return rh.ResponseFailure("no_handler", f"This Link is missing a network handler for {request.handler}.").to_dict()
|
||||||
try:
|
try:
|
||||||
log.debug(f"Using {network_handler} as handler for {request.handler}")
|
log.debug(f"Using {network_handler} as handler for {request.handler}")
|
||||||
response: Response = await getattr(network_handler, self.interface_name)(self, request.data)
|
response: typing.Union[rh.ResponseSuccess, rh.ResponseFailure] = await getattr(network_handler, self.interface_name)(self, request.data)
|
||||||
return response.to_dict()
|
return response.to_dict()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
sentry_sdk.capture_exception(e)
|
sentry_sdk.capture_exception(e)
|
||||||
log.debug(f"Exception {e} in {network_handler}")
|
log.debug(f"Exception {e} in {network_handler}")
|
||||||
return ResponseError("exception_in_handler",
|
return rh.ResponseError("exception_in_handler",
|
||||||
f"An exception was raised in {network_handler} for {request.handler}. Check "
|
f"An exception was raised in {network_handler} for {request.handler}. Check "
|
||||||
f"extra_info for details.",
|
f"extra_info for details.",
|
||||||
extra_info={
|
extra_info={
|
||||||
|
@ -168,7 +168,7 @@ class GenericBot:
|
||||||
self.loop = self.uninitialized_loop
|
self.loop = self.uninitialized_loop
|
||||||
|
|
||||||
def __init__(self, *,
|
def __init__(self, *,
|
||||||
network_config: typing.Optional[NetworkConfig] = None,
|
network_config: typing.Optional[rh.Config] = None,
|
||||||
database_config: typing.Optional[DatabaseConfig] = None,
|
database_config: typing.Optional[DatabaseConfig] = None,
|
||||||
commands: typing.List[typing.Type[Command]] = None,
|
commands: typing.List[typing.Type[Command]] = None,
|
||||||
sentry_dsn: typing.Optional[str] = None,
|
sentry_dsn: typing.Optional[str] = None,
|
||||||
|
|
|
@ -1,20 +0,0 @@
|
||||||
"""Royalnet (websocket) related classes."""
|
|
||||||
from .request import Request
|
|
||||||
from .response import Response, ResponseSuccess, ResponseError
|
|
||||||
from .package import Package
|
|
||||||
from .networklink import NetworkLink, NetworkError, NotConnectedError, NotIdentifiedError, ConnectionClosedError
|
|
||||||
from .networkserver import NetworkServer
|
|
||||||
from .networkconfig import NetworkConfig
|
|
||||||
|
|
||||||
__all__ = ["NetworkLink",
|
|
||||||
"NetworkError",
|
|
||||||
"NotConnectedError",
|
|
||||||
"NotIdentifiedError",
|
|
||||||
"Package",
|
|
||||||
"NetworkServer",
|
|
||||||
"NetworkConfig",
|
|
||||||
"ConnectionClosedError",
|
|
||||||
"Request",
|
|
||||||
"Response",
|
|
||||||
"ResponseSuccess",
|
|
||||||
"ResponseError"]
|
|
|
@ -1,9 +0,0 @@
|
||||||
class NetworkConfig:
|
|
||||||
def __init__(self,
|
|
||||||
master_uri: str,
|
|
||||||
master_secret: str):
|
|
||||||
if not (master_uri.startswith("ws://")
|
|
||||||
or master_uri.startswith("wss://")):
|
|
||||||
raise ValueError("Invalid protocol (must be ws:// or wss://)")
|
|
||||||
self.master_uri = master_uri
|
|
||||||
self.master_secret = master_secret
|
|
|
@ -1,184 +0,0 @@
|
||||||
import asyncio
|
|
||||||
import websockets
|
|
||||||
import uuid
|
|
||||||
import functools
|
|
||||||
import math
|
|
||||||
import numbers
|
|
||||||
import logging as _logging
|
|
||||||
import typing
|
|
||||||
from .package import Package
|
|
||||||
|
|
||||||
|
|
||||||
log = _logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class NotConnectedError(Exception):
|
|
||||||
"""The :py:class:`royalnet.network.NetworkLink` is not connected to a :py:class:`royalnet.network.NetworkServer`."""
|
|
||||||
|
|
||||||
|
|
||||||
class NotIdentifiedError(Exception):
|
|
||||||
"""The :py:class:`royalnet.network.NetworkLink` has not identified yet to a :py:class:`royalnet.network.NetworkServer`."""
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectionClosedError(Exception):
|
|
||||||
"""The :py:class:`royalnet.network.NetworkLink`'s connection was closed unexpectedly. The link can't be used anymore."""
|
|
||||||
|
|
||||||
|
|
||||||
class InvalidServerResponseError(Exception):
|
|
||||||
"""The :py:class:`royalnet.network.NetworkServer` sent invalid data to the :py:class:`royalnet.network.NetworkLink`."""
|
|
||||||
|
|
||||||
|
|
||||||
class NetworkError(Exception):
|
|
||||||
def __init__(self, error_data: dict, *args):
|
|
||||||
super().__init__(*args)
|
|
||||||
self.error_data: dict = error_data
|
|
||||||
|
|
||||||
|
|
||||||
class PendingRequest:
|
|
||||||
def __init__(self, *, loop: asyncio.AbstractEventLoop = None):
|
|
||||||
if loop is None:
|
|
||||||
self.loop = asyncio.get_event_loop()
|
|
||||||
else:
|
|
||||||
self.loop = loop
|
|
||||||
self.event: asyncio.Event = asyncio.Event(loop=loop)
|
|
||||||
self.data: typing.Optional[dict] = None
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
if self.event.is_set():
|
|
||||||
return f"<PendingRequest: {self.data.__class__.__name__}>"
|
|
||||||
return f"<PendingRequest>"
|
|
||||||
|
|
||||||
def set(self, data):
|
|
||||||
self.data = data
|
|
||||||
self.event.set()
|
|
||||||
|
|
||||||
|
|
||||||
def requires_connection(func):
|
|
||||||
@functools.wraps(func)
|
|
||||||
async def new_func(self, *args, **kwargs):
|
|
||||||
await self.connect_event.wait()
|
|
||||||
return await func(self, *args, **kwargs)
|
|
||||||
return new_func
|
|
||||||
|
|
||||||
|
|
||||||
def requires_identification(func):
|
|
||||||
@functools.wraps(func)
|
|
||||||
async def new_func(self, *args, **kwargs):
|
|
||||||
await self.identify_event.wait()
|
|
||||||
return await func(self, *args, **kwargs)
|
|
||||||
return new_func
|
|
||||||
|
|
||||||
|
|
||||||
class NetworkLink:
|
|
||||||
def __init__(self, master_uri: str, secret: str, link_type: str, request_handler, *,
|
|
||||||
loop: asyncio.AbstractEventLoop = None):
|
|
||||||
if ":" in link_type:
|
|
||||||
raise ValueError("Link types cannot contain colons.")
|
|
||||||
self.master_uri: str = master_uri
|
|
||||||
self.link_type: str = link_type
|
|
||||||
self.nid: str = str(uuid.uuid4())
|
|
||||||
self.secret: str = secret
|
|
||||||
self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None
|
|
||||||
self.request_handler = request_handler
|
|
||||||
self._pending_requests: typing.Dict[str, PendingRequest] = {}
|
|
||||||
if loop is None:
|
|
||||||
self._loop = asyncio.get_event_loop()
|
|
||||||
else:
|
|
||||||
self._loop = loop
|
|
||||||
self.error_event: asyncio.Event = asyncio.Event(loop=self._loop)
|
|
||||||
self.connect_event: asyncio.Event = asyncio.Event(loop=self._loop)
|
|
||||||
self.identify_event: asyncio.Event = asyncio.Event(loop=self._loop)
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
if self.identify_event.is_set():
|
|
||||||
return f"<{self.__class__.__name__} (identified)>"
|
|
||||||
elif self.connect_event.is_set():
|
|
||||||
return f"<{self.__class__.__name__} (connected)>"
|
|
||||||
elif self.error_event.is_set():
|
|
||||||
return f"<{self.__class__.__name__} (error)>"
|
|
||||||
else:
|
|
||||||
return f"<{self.__class__.__name__} (disconnected)>"
|
|
||||||
|
|
||||||
async def connect(self):
|
|
||||||
"""Connect to the :py:class:`royalnet.network.NetworkServer` at ``self.master_uri``."""
|
|
||||||
log.info(f"Connecting to {self.master_uri}...")
|
|
||||||
self.websocket = await websockets.connect(self.master_uri, loop=self._loop)
|
|
||||||
self.connect_event.set()
|
|
||||||
log.info(f"Connected!")
|
|
||||||
|
|
||||||
@requires_connection
|
|
||||||
async def receive(self) -> Package:
|
|
||||||
"""Recieve a :py:class:`Package` from the :py:class:`royalnet.network.NetworkServer`.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
:py:exc:`royalnet.network.royalnetlink.ConnectionClosedError` if the connection closes."""
|
|
||||||
try:
|
|
||||||
jbytes: bytes = await self.websocket.recv()
|
|
||||||
package: Package = Package.from_json_bytes(jbytes)
|
|
||||||
except websockets.ConnectionClosed:
|
|
||||||
self.error_event.set()
|
|
||||||
self.connect_event.clear()
|
|
||||||
self.identify_event.clear()
|
|
||||||
log.info(f"Connection to {self.master_uri} was closed.")
|
|
||||||
# What to do now? Let's just reraise.
|
|
||||||
raise ConnectionClosedError()
|
|
||||||
if self.identify_event.is_set() and package.destination != self.nid:
|
|
||||||
raise InvalidServerResponseError("Package is not addressed to this NetworkLink.")
|
|
||||||
log.debug(f"Received package: {package}")
|
|
||||||
return package
|
|
||||||
|
|
||||||
@requires_connection
|
|
||||||
async def identify(self) -> None:
|
|
||||||
log.info(f"Identifying to {self.master_uri}...")
|
|
||||||
await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{self.secret}")
|
|
||||||
response: Package = await self.receive()
|
|
||||||
if not response.source == "<server>":
|
|
||||||
raise InvalidServerResponseError("Received a non-service package before identification.")
|
|
||||||
if "type" not in response.data:
|
|
||||||
raise InvalidServerResponseError("Missing 'type' in response data")
|
|
||||||
if response.data["type"] == "error":
|
|
||||||
raise ConnectionClosedError(f"Identification error: {response.data['type']}")
|
|
||||||
assert response.data["type"] == "success"
|
|
||||||
self.identify_event.set()
|
|
||||||
log.info(f"Identified successfully!")
|
|
||||||
|
|
||||||
@requires_identification
|
|
||||||
async def send(self, package: Package):
|
|
||||||
await self.websocket.send(package.to_json_bytes())
|
|
||||||
log.debug(f"Sent package: {package}")
|
|
||||||
|
|
||||||
@requires_identification
|
|
||||||
async def request(self, message, destination):
|
|
||||||
package = Package(message, source=self.nid, destination=destination)
|
|
||||||
request = PendingRequest(loop=self._loop)
|
|
||||||
self._pending_requests[package.source_conv_id] = request
|
|
||||||
await self.send(package)
|
|
||||||
log.debug(f"Sent request: {message} -> {destination}")
|
|
||||||
await request.event.wait()
|
|
||||||
response: dict = request.data
|
|
||||||
log.debug(f"Received response: {request} -> {response}")
|
|
||||||
return response
|
|
||||||
|
|
||||||
async def run(self):
|
|
||||||
"""Blockingly run the Link."""
|
|
||||||
log.debug(f"Running main client loop for {self.nid}.")
|
|
||||||
if self.error_event.is_set():
|
|
||||||
raise ConnectionClosedError("RoyalnetLinks can't be rerun after an error.")
|
|
||||||
while True:
|
|
||||||
if not self.connect_event.is_set():
|
|
||||||
await self.connect()
|
|
||||||
if not self.identify_event.is_set():
|
|
||||||
await self.identify()
|
|
||||||
package: Package = await self.receive()
|
|
||||||
# Package is a response
|
|
||||||
if package.destination_conv_id in self._pending_requests:
|
|
||||||
request = self._pending_requests[package.destination_conv_id]
|
|
||||||
request.set(package.data)
|
|
||||||
continue
|
|
||||||
# Package is a request
|
|
||||||
assert isinstance(package, Package)
|
|
||||||
log.debug(f"Received request {package.source_conv_id}: {package}")
|
|
||||||
response = await self.request_handler(package.data)
|
|
||||||
response_package: Package = package.reply(response)
|
|
||||||
await self.send(response_package)
|
|
||||||
log.debug(f"Replied to request {response_package.source_conv_id}: {response_package}")
|
|
|
@ -1,146 +0,0 @@
|
||||||
import typing
|
|
||||||
import websockets
|
|
||||||
import re
|
|
||||||
import datetime
|
|
||||||
import uuid
|
|
||||||
import asyncio
|
|
||||||
import logging as _logging
|
|
||||||
from .package import Package
|
|
||||||
|
|
||||||
|
|
||||||
log = _logging.getLogger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class ConnectedClient:
|
|
||||||
"""The :py:class:`royalnet.network.NetworkServer`-side representation of a connected :py:class:`royalnet.network.NetworkLink`."""
|
|
||||||
def __init__(self, socket: websockets.WebSocketServerProtocol):
|
|
||||||
self.socket: websockets.WebSocketServerProtocol = socket
|
|
||||||
self.nid: typing.Optional[str] = None
|
|
||||||
self.link_type: typing.Optional[str] = None
|
|
||||||
self.connection_datetime: datetime.datetime = datetime.datetime.now()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def is_identified(self) -> bool:
|
|
||||||
"""Has the client sent a valid identification package?"""
|
|
||||||
return bool(self.nid)
|
|
||||||
|
|
||||||
async def send_service(self, msg_type: str, message: str):
|
|
||||||
await self.send(Package({"type": msg_type, "service": message},
|
|
||||||
source="<server>",
|
|
||||||
destination=self.nid))
|
|
||||||
|
|
||||||
async def send(self, package: Package):
|
|
||||||
"""Send a :py:class:`royalnet.network.Package` to the :py:class:`royalnet.network.NetworkLink`."""
|
|
||||||
await self.socket.send(package.to_json_bytes())
|
|
||||||
|
|
||||||
|
|
||||||
class NetworkServer:
|
|
||||||
def __init__(self, address: str, port: int, required_secret: str, *, loop: asyncio.AbstractEventLoop = None):
|
|
||||||
self.address: str = address
|
|
||||||
self.port: int = port
|
|
||||||
self.required_secret: str = required_secret
|
|
||||||
self.identified_clients: typing.List[ConnectedClient] = []
|
|
||||||
self.loop = loop
|
|
||||||
|
|
||||||
def find_client(self, *, nid: str = None, link_type: str = None) -> typing.List[ConnectedClient]:
|
|
||||||
assert not (nid and link_type)
|
|
||||||
if nid:
|
|
||||||
matching = [client for client in self.identified_clients if client.nid == nid]
|
|
||||||
assert len(matching) <= 1
|
|
||||||
return matching
|
|
||||||
if link_type:
|
|
||||||
matching = [client for client in self.identified_clients if client.link_type == link_type]
|
|
||||||
return matching or []
|
|
||||||
|
|
||||||
async def listener(self, websocket: websockets.server.WebSocketServerProtocol, path):
|
|
||||||
log.info(f"{websocket.remote_address} connected to the server.")
|
|
||||||
connected_client = ConnectedClient(websocket)
|
|
||||||
# Wait for identification
|
|
||||||
identify_msg = await websocket.recv()
|
|
||||||
log.debug(f"{websocket.remote_address} identified itself with: {identify_msg}.")
|
|
||||||
if not isinstance(identify_msg, str):
|
|
||||||
await connected_client.send_service("error", "Invalid identification message (not a str)")
|
|
||||||
return
|
|
||||||
identification = re.match(r"Identify ([^:\s]+):([^:\s]+):([^:\s]+)", identify_msg)
|
|
||||||
if identification is None:
|
|
||||||
await connected_client.send_service("error", "Invalid identification message (regex failed)")
|
|
||||||
return
|
|
||||||
secret = identification.group(3)
|
|
||||||
if secret != self.required_secret:
|
|
||||||
await connected_client.send_service("error", "Invalid secret")
|
|
||||||
return
|
|
||||||
# Identification successful
|
|
||||||
connected_client.nid = identification.group(1)
|
|
||||||
connected_client.link_type = identification.group(2)
|
|
||||||
self.identified_clients.append(connected_client)
|
|
||||||
log.debug(f"{websocket.remote_address} identified successfully as {connected_client.nid} ({connected_client.link_type}).")
|
|
||||||
await connected_client.send_service("success", "Identification successful!")
|
|
||||||
log.debug(f"{connected_client.nid}'s identification confirmed.")
|
|
||||||
# Main loop
|
|
||||||
while True:
|
|
||||||
# Receive packages
|
|
||||||
raw_bytes = await websocket.recv()
|
|
||||||
package: Package = Package.from_json_bytes(raw_bytes)
|
|
||||||
log.debug(f"Received package: {package}")
|
|
||||||
# Check if the package destination is the server itself.
|
|
||||||
if package.destination == "<server>":
|
|
||||||
# TODO: do stuff
|
|
||||||
pass
|
|
||||||
# Otherwise, route the package to its destination
|
|
||||||
# noinspection PyAsyncCall
|
|
||||||
self.loop.create_task(self.route_package(package))
|
|
||||||
|
|
||||||
def find_destination(self, package: Package) -> typing.List[ConnectedClient]:
|
|
||||||
"""Find a list of destinations for the package.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
package: The package to find the destination of.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A :py:class:`list` of :py:class:`ConnectedClients` to send the package to."""
|
|
||||||
# Parse destination
|
|
||||||
# Is it nothing?
|
|
||||||
if package.destination == "<none>":
|
|
||||||
return []
|
|
||||||
# Is it a valid nid?
|
|
||||||
try:
|
|
||||||
destination = str(uuid.UUID(package.destination))
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
return self.find_client(nid=destination)
|
|
||||||
# Is it a link_type?
|
|
||||||
return self.find_client(link_type=package.destination)
|
|
||||||
|
|
||||||
async def route_package(self, package: Package) -> None:
|
|
||||||
"""Executed every time a package is received and must be routed somewhere."""
|
|
||||||
destinations = self.find_destination(package)
|
|
||||||
log.debug(f"Routing package: {package} -> {destinations}")
|
|
||||||
for destination in destinations:
|
|
||||||
# This may have some consequences
|
|
||||||
specific_package = Package(package.data,
|
|
||||||
source=package.source,
|
|
||||||
destination=destination.nid,
|
|
||||||
source_conv_id=package.source_conv_id,
|
|
||||||
destination_conv_id=package.destination_conv_id)
|
|
||||||
await destination.send(specific_package)
|
|
||||||
|
|
||||||
def serve(self):
|
|
||||||
log.debug(f"Serving on ws://{self.address}:{self.port}")
|
|
||||||
server = self.loop.run_until_complete(websockets.serve(self.listener,
|
|
||||||
host=self.address,
|
|
||||||
port=self.port,
|
|
||||||
loop=self.loop))
|
|
||||||
self.loop.run_forever()
|
|
||||||
|
|
||||||
def run_blocking(self, verbose=False):
|
|
||||||
if verbose:
|
|
||||||
core_logger = _logging.getLogger("royalnet")
|
|
||||||
core_logger.setLevel(_logging.DEBUG)
|
|
||||||
stream_handler = _logging.StreamHandler()
|
|
||||||
stream_handler.formatter = _logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{")
|
|
||||||
core_logger.addHandler(stream_handler)
|
|
||||||
core_logger.debug("Logging setup complete.")
|
|
||||||
if self.loop is None:
|
|
||||||
self.loop = asyncio.get_event_loop()
|
|
||||||
self.serve()
|
|
|
@ -1,111 +0,0 @@
|
||||||
import json
|
|
||||||
import uuid
|
|
||||||
import typing
|
|
||||||
|
|
||||||
|
|
||||||
class Package:
|
|
||||||
"""A Royalnet package, the data type with which a :py:class:`royalnet.network.NetworkLink` communicates with a :py:class:`royalnet.network.NetworkServer` or another link.
|
|
||||||
Contains info about the source and the destination."""
|
|
||||||
|
|
||||||
def __init__(self,
|
|
||||||
data: dict,
|
|
||||||
*,
|
|
||||||
source: str,
|
|
||||||
destination: str,
|
|
||||||
source_conv_id: typing.Optional[str] = None,
|
|
||||||
destination_conv_id: typing.Optional[str] = None):
|
|
||||||
"""Create a Package.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
data: The data that should be sent. Usually a :py:class:`royalnet.network.Message`.
|
|
||||||
source: The ``nid`` of the node that created this Package.
|
|
||||||
destination: The ``link_type`` of the destination node, or alternatively, the ``nid`` of the node. Can also be the ``NULL`` value to send the message to nobody.
|
|
||||||
source_conv_id: The conversation id of the node that created this package. Akin to the sequence number on IP packets.
|
|
||||||
destination_conv_id: The conversation id of the node that this Package is a reply to."""
|
|
||||||
# TODO: something is not right in these type hints. Check them.
|
|
||||||
self.data: dict = data
|
|
||||||
self.source: str = source
|
|
||||||
self.source_conv_id: str = source_conv_id or str(uuid.uuid4())
|
|
||||||
self.destination: str = destination
|
|
||||||
self.destination_conv_id: typing.Optional[str] = destination_conv_id
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"<Package {self.source} ({self.source_conv_id}) to {self.destination} ({self.destination_conv_id}>"
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
if isinstance(other, Package):
|
|
||||||
return (self.data == other.data) and \
|
|
||||||
(self.source == other.source) and \
|
|
||||||
(self.destination == other.destination) and \
|
|
||||||
(self.source_conv_id == other.source_conv_id) and \
|
|
||||||
(self.destination_conv_id == other.destination_conv_id)
|
|
||||||
return False
|
|
||||||
|
|
||||||
def reply(self, data) -> "Package":
|
|
||||||
"""Reply to this Package with another Package.
|
|
||||||
|
|
||||||
Parameters:
|
|
||||||
data: The data that should be sent. Usually a :py:class:`royalnet.network.Message`.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The reply Package."""
|
|
||||||
return Package(data,
|
|
||||||
source=self.destination,
|
|
||||||
destination=self.source,
|
|
||||||
source_conv_id=self.destination_conv_id or str(uuid.uuid4()),
|
|
||||||
destination_conv_id=self.source_conv_id)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(d) -> "Package":
|
|
||||||
"""Create a Package from a dictionary."""
|
|
||||||
if "source" not in d:
|
|
||||||
raise ValueError("Missing source field")
|
|
||||||
if "nid" not in d["source"]:
|
|
||||||
raise ValueError("Missing source.nid field")
|
|
||||||
if "conv_id" not in d["source"]:
|
|
||||||
raise ValueError("Missing source.conv_id field")
|
|
||||||
if "destination" not in d:
|
|
||||||
raise ValueError("Missing destination field")
|
|
||||||
if "nid" not in d["destination"]:
|
|
||||||
raise ValueError("Missing destination.nid field")
|
|
||||||
if "conv_id" not in d["destination"]:
|
|
||||||
raise ValueError("Missing destination.conv_id field")
|
|
||||||
if "data" not in d:
|
|
||||||
raise ValueError("Missing data field")
|
|
||||||
return Package(d["data"],
|
|
||||||
source=d["source"]["nid"],
|
|
||||||
destination=d["destination"]["nid"],
|
|
||||||
source_conv_id=d["source"]["conv_id"],
|
|
||||||
destination_conv_id=d["destination"]["conv_id"])
|
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
|
||||||
"""Convert the Package into a dictionary."""
|
|
||||||
return {
|
|
||||||
"source": {
|
|
||||||
"nid": self.source,
|
|
||||||
"conv_id": self.source_conv_id
|
|
||||||
},
|
|
||||||
"destination": {
|
|
||||||
"nid": self.destination,
|
|
||||||
"conv_id": self.destination_conv_id
|
|
||||||
},
|
|
||||||
"data": self.data
|
|
||||||
}
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_json_string(string: str) -> "Package":
|
|
||||||
"""Create a Package from a JSON string."""
|
|
||||||
return Package.from_dict(json.loads(string))
|
|
||||||
|
|
||||||
def to_json_string(self) -> str:
|
|
||||||
"""Convert the Package into a JSON string."""
|
|
||||||
return json.dumps(self.to_dict())
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_json_bytes(b: bytes) -> "Package":
|
|
||||||
"""Create a Package from UTF8-encoded JSON bytes."""
|
|
||||||
return Package.from_json_string(str(b, encoding="utf8"))
|
|
||||||
|
|
||||||
def to_json_bytes(self) -> bytes:
|
|
||||||
"""Convert the Package into UTF8-encoded JSON bytes."""
|
|
||||||
return bytes(self.to_json_string(), encoding="utf8")
|
|
|
@ -1,24 +0,0 @@
|
||||||
class Request:
|
|
||||||
"""A request sent from a :py:class:`royalnet.network.NetworkLink` to another.
|
|
||||||
|
|
||||||
It contains the name of the requested handler, in addition to the data."""
|
|
||||||
|
|
||||||
def __init__(self, handler: str, data: dict):
|
|
||||||
super().__init__()
|
|
||||||
self.handler: str = handler
|
|
||||||
self.data: dict = data
|
|
||||||
|
|
||||||
def to_dict(self):
|
|
||||||
return self.__dict__
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def from_dict(d: dict):
|
|
||||||
return Request(**d)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
if isinstance(other, Request):
|
|
||||||
return self.handler == other.handler and self.data == other.data
|
|
||||||
return False
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"royalnet.network.Request(handler={self.handler}, data={self.data})"
|
|
|
@ -1,61 +0,0 @@
|
||||||
import typing
|
|
||||||
from ..error import RoyalnetRequestError
|
|
||||||
|
|
||||||
|
|
||||||
class Response:
|
|
||||||
"""A base class to be inherited by all other response types."""
|
|
||||||
|
|
||||||
def to_dict(self) -> dict:
|
|
||||||
"""Prepare the Response to be sent by converting it to a JSONable :py:class:`dict`."""
|
|
||||||
return {
|
|
||||||
"type": self.__class__.__name__,
|
|
||||||
**self.__dict__
|
|
||||||
}
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
if isinstance(other, Response):
|
|
||||||
return self.to_dict() == other.to_dict()
|
|
||||||
return False
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def from_dict(cls, d: dict) -> "Response":
|
|
||||||
"""Recreate the response from a received :py:class:`dict`."""
|
|
||||||
# Ignore type in dict
|
|
||||||
del d["type"]
|
|
||||||
# noinspection PyArgumentList
|
|
||||||
return cls(**d)
|
|
||||||
|
|
||||||
def raise_on_error(self):
|
|
||||||
"""Raise an :py:class:`Exception` if the Response is an error, do nothing otherwise."""
|
|
||||||
raise NotImplementedError("Please override Response.raise_on_error()")
|
|
||||||
|
|
||||||
|
|
||||||
class ResponseSuccess(Response):
|
|
||||||
"""A response to a successful :py:class:`royalnet.network.Request`."""
|
|
||||||
|
|
||||||
def __init__(self, data: typing.Optional[dict] = None):
|
|
||||||
if data is None:
|
|
||||||
self.data = {}
|
|
||||||
else:
|
|
||||||
self.data = data
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"royalnet.network.ResponseSuccess(data={self.data})"
|
|
||||||
|
|
||||||
def raise_on_error(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ResponseError(Response):
|
|
||||||
"""A response to a invalid :py:class:`royalnet.network.Request`."""
|
|
||||||
|
|
||||||
def __init__(self, name: str, description: str, extra_info: typing.Optional[dict] = None):
|
|
||||||
self.name: str = name
|
|
||||||
self.description: str = description
|
|
||||||
self.extra_info: typing.Optional[dict] = extra_info
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"royalnet.network.ResponseError(name={self.name}, description={self.description}, extra_info={self.extra_info})"
|
|
||||||
|
|
||||||
def raise_on_error(self):
|
|
||||||
raise RoyalnetRequestError(self)
|
|
2
setup.py
2
setup.py
|
@ -16,7 +16,7 @@ setuptools.setup(
|
||||||
packages=setuptools.find_packages(),
|
packages=setuptools.find_packages(),
|
||||||
install_requires=["python-telegram-bot>=11.1.0",
|
install_requires=["python-telegram-bot>=11.1.0",
|
||||||
"discord.py>=1.0.1",
|
"discord.py>=1.0.1",
|
||||||
"websockets>=6.0",
|
"royalherald>=5.0b4",
|
||||||
"psycopg2-binary>=2.8",
|
"psycopg2-binary>=2.8",
|
||||||
"aiohttp>=3.5.4",
|
"aiohttp>=3.5.4",
|
||||||
"sqlalchemy>=1.3.2",
|
"sqlalchemy>=1.3.2",
|
||||||
|
|
|
@ -1,57 +0,0 @@
|
||||||
import pytest
|
|
||||||
import uuid
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
from royalnet.network import Package, NetworkLink, NetworkServer, ConnectionClosedError, Request
|
|
||||||
|
|
||||||
|
|
||||||
log = logging.root
|
|
||||||
stream_handler = logging.StreamHandler()
|
|
||||||
stream_handler.formatter = logging.Formatter("{asctime}\t{name}\t{levelname}\t{message}", style="{")
|
|
||||||
log.addHandler(stream_handler)
|
|
||||||
log.setLevel(logging.WARNING)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def async_loop():
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
yield loop
|
|
||||||
loop.close()
|
|
||||||
|
|
||||||
|
|
||||||
async def echo_request_handler(message):
|
|
||||||
return message
|
|
||||||
|
|
||||||
|
|
||||||
def test_package_serialization():
|
|
||||||
pkg = Package({"ciao": "ciao"},
|
|
||||||
source=str(uuid.uuid4()),
|
|
||||||
destination=str(uuid.uuid4()),
|
|
||||||
source_conv_id=str(uuid.uuid4()),
|
|
||||||
destination_conv_id=str(uuid.uuid4()))
|
|
||||||
assert pkg == Package.from_dict(pkg.to_dict())
|
|
||||||
assert pkg == Package.from_json_string(pkg.to_json_string())
|
|
||||||
assert pkg == Package.from_json_bytes(pkg.to_json_bytes())
|
|
||||||
|
|
||||||
|
|
||||||
def test_request_creation():
|
|
||||||
request = Request("pytest", {"testing": "is fun", "bugs": "are less fun"})
|
|
||||||
assert request == Request.from_dict(request.to_dict())
|
|
||||||
|
|
||||||
|
|
||||||
def test_links(async_loop: asyncio.AbstractEventLoop):
|
|
||||||
address, port = "127.0.0.1", 1235
|
|
||||||
master = NetworkServer(address, port, "test", loop=async_loop)
|
|
||||||
run_server_task = async_loop.create_task(master.run())
|
|
||||||
# Test invalid secret
|
|
||||||
wrong_secret_link = NetworkLink("ws://127.0.0.1:1235", "invalid", "test", echo_request_handler, loop=async_loop)
|
|
||||||
with pytest.raises(ConnectionClosedError):
|
|
||||||
async_loop.run_until_complete(wrong_secret_link.run())
|
|
||||||
# Test regular connection
|
|
||||||
link1 = NetworkLink("ws://127.0.0.1:1235", "test", "one", echo_request_handler, loop=async_loop)
|
|
||||||
async_loop.create_task(link1.run())
|
|
||||||
link2 = NetworkLink("ws://127.0.0.1:1235", "test", "two", echo_request_handler, loop=async_loop)
|
|
||||||
async_loop.create_task(link2.run())
|
|
||||||
message = {"ciao": "ciao"}
|
|
||||||
response = async_loop.run_until_complete(link1.request(message, "two"))
|
|
||||||
assert message == response
|
|
Loading…
Reference in a new issue