diff --git a/README.md b/README.md new file mode 100644 index 00000000..bcd7d6b6 --- /dev/null +++ b/README.md @@ -0,0 +1,2 @@ +# `royalnet` + diff --git a/requirements.txt b/requirements.txt index 506efd03..483e63f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ python-telegram-bot>=11.1.0 +websockets>=7.0 +pytest>=4.3.1 diff --git a/royalnet/bots/__init__.py b/royalnet/bots/__init__.py index e69de29b..83599e7e 100644 --- a/royalnet/bots/__init__.py +++ b/royalnet/bots/__init__.py @@ -0,0 +1,3 @@ +from .telegram import TelegramBot + +__all__ = ["TelegramBot"] diff --git a/royalnet/bots/telegram.py b/royalnet/bots/telegram.py index cc7eb58f..b56e23be 100644 --- a/royalnet/bots/telegram.py +++ b/royalnet/bots/telegram.py @@ -3,19 +3,29 @@ import asyncio import typing from ..commands import NullCommand from ..utils import asyncify, Call, Command +from ..network import RoyalnetLink, Message + + +async def todo(message: Message): + pass class TelegramBot: - def __init__(self, api_key: str, commands: typing.List[typing.Type[Command]], *, missing_command: Command=NullCommand): - self.bot = telegram.Bot(api_key) - self.should_run = False - self.offset = -100 - self.commands = commands - self.missing_command: typing.Callable = missing_command + def __init__(self, + api_key: str, + master_server_uri: str, + master_server_secret: str, + commands: typing.List[typing.Type[Command]], + missing_command: Command = NullCommand): + self.bot: telegram.Bot = telegram.Bot(api_key) + self.should_run: bool = False + self.offset: int = -100 + self.missing_command = missing_command + self.network: RoyalnetLink = RoyalnetLink(master_server_uri, master_server_secret, "telegram", todo) # Generate commands - self._commands = {} - for command in self.commands: - self._commands[f"/{command.command_name}"] = command + self.commands = {} + for command in commands: + self.commands[f"/{command.command_name}"] = command class TelegramCall(Call): interface_name = "telegram" @@ -23,6 +33,11 @@ class TelegramBot: async def reply(self, text: str): await asyncify(self.channel.send_message, text, parse_mode="HTML") + + async def net_request(self, message: Message, destination: str): + response = await self.network.request(message, destination) + return response + self.Call = TelegramCall async def run(self): @@ -56,9 +71,12 @@ class TelegramBot: command_text.replace(f"@{self.bot.username}", "") # Find the function try: - command = self._commands[command_text] + command = self.commands[command_text] except KeyError: # Skip inexistent commands command = self.missing_command # Call the command return await self.Call(message.chat, command, *parameters).run() + + async def handle_net_request(self, message: Message): + pass diff --git a/royalnet/network/__init__.py b/royalnet/network/__init__.py new file mode 100644 index 00000000..9015ede6 --- /dev/null +++ b/royalnet/network/__init__.py @@ -0,0 +1,16 @@ +from .messages import Message, ErrorMessage, InvalidSecretEM, InvalidDestinationEM, InvalidPackageEM +from .packages import Package +from .royalnetlink import RoyalnetLink, NetworkError, NotConnectedError, NotIdentifiedError +from .royalnetserver import RoyalnetServer + +__all__ = ["Message", + "ErrorMessage", + "InvalidSecretEM", + "InvalidDestinationEM", + "InvalidPackageEM", + "RoyalnetLink", + "NetworkError", + "NotConnectedError", + "NotIdentifiedError", + "Package", + "RoyalnetServer"] diff --git a/royalnet/network/messages.py b/royalnet/network/messages.py new file mode 100644 index 00000000..99696f02 --- /dev/null +++ b/royalnet/network/messages.py @@ -0,0 +1,25 @@ +class Message: + def __repr__(self): + return f"<{self.__class__.__name__}>" + + +class IdentifySuccessfulMessage(Message): + pass + + +class ErrorMessage(Message): + def __init__(self, reason): + super().__init__() + self.reason = reason + + +class InvalidSecretEM(ErrorMessage): + pass + + +class InvalidPackageEM(ErrorMessage): + pass + + +class InvalidDestinationEM(InvalidPackageEM): + pass diff --git a/royalnet/network/packages.py b/royalnet/network/packages.py new file mode 100644 index 00000000..499ca0a4 --- /dev/null +++ b/royalnet/network/packages.py @@ -0,0 +1,19 @@ +import pickle +import uuid + + +class Package: + def __init__(self, data, destination: str, source: str, *, conversation_id: str = None): + self.data = data + self.destination: str = destination + self.source = source + self.conversation_id = conversation_id or str(uuid.uuid4()) + + def __repr__(self): + return f"" + + def reply(self, data) -> "Package": + return Package(data, self.source, self.destination, conversation_id=self.conversation_id) + + def pickle(self): + return pickle.dumps(self) diff --git a/royalnet/network/royalnetlink.py b/royalnet/network/royalnetlink.py new file mode 100644 index 00000000..557cb1e2 --- /dev/null +++ b/royalnet/network/royalnetlink.py @@ -0,0 +1,146 @@ +import asyncio +import websockets +import uuid +import functools +import typing +import pickle +import logging +from .messages import Message, ErrorMessage +from .packages import Package + +loop = asyncio.get_event_loop() +log = logging.getLogger(__name__) + + +class NotConnectedError(Exception): + pass + + +class NotIdentifiedError(Exception): + pass + + +class NetworkError(Exception): + def __init__(self, error_msg: ErrorMessage, *args): + super().__init__(*args) + self.error_msg: ErrorMessage = error_msg + + +class PendingRequest: + def __init__(self): + self.event: asyncio.Event = asyncio.Event() + self.data: Message = None + + def __repr__(self): + if self.event.is_set(): + return f"" + return f"" + + def set(self, data): + self.data = data + self.event.set() + + +def requires_connection(func): + @functools.wraps(func) + async def new_func(self, *args, **kwargs): + await self._connect_event.wait() + return await func(self, *args, **kwargs) + return new_func + + +def requires_identification(func): + @functools.wraps(func) + async def new_func(self, *args, **kwargs): + await self._identify_event.wait() + return await func(self, *args, **kwargs) + return new_func + + +class RoyalnetLink: + def __init__(self, master_uri: str, secret: str, link_type: str, request_handler): + assert ":" not in link_type + self.master_uri: str = master_uri + self.link_type: str = link_type + self.nid: str = str(uuid.uuid4()) + self.secret: str = secret + self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None + self.request_handler = request_handler + self._pending_requests: typing.Dict[typing.Optional[Message]] = {} + self._connect_event: asyncio.Event = asyncio.Event() + self.identify_event: asyncio.Event = asyncio.Event() + + async def connect(self): + log.info(f"Connecting to {self.master_uri}...") + self.websocket = await websockets.connect(self.master_uri) + self._connect_event.set() + log.info(f"Connected!") + + @requires_connection + async def receive(self) -> Package: + try: + raw_pickle = await self.websocket.recv() + except websockets.ConnectionClosed: + self.websocket = None + self._connect_event.clear() + self.identify_event.clear() + log.info(f"Connection to {self.master_uri} was closed.") + # What to do now? Let's just reraise. + raise + package: typing.Union[Package, Package] = pickle.loads(raw_pickle) + assert package.destination == self.nid + log.debug(f"Received package: {package}") + return package + + @requires_connection + async def identify(self) -> None: + log.info(f"Identifying to {self.master_uri}...") + await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{self.secret}") + response_package = await self.receive() + response = response_package.data + if isinstance(response, ErrorMessage): + raise NetworkError(response, "Server returned error while identifying self") + self.identify_event.set() + log.info(f"Identified successfully!") + + @requires_identification + async def send(self, package: Package): + raw_pickle: bytes = pickle.dumps(package) + await self.websocket.send(raw_pickle) + log.debug(f"Sent package: {package}") + + @requires_identification + async def request(self, message, destination): + package = Package(message, destination, self.nid) + request = PendingRequest() + self._pending_requests[package.conversation_id] = request + await self.send(package) + log.debug(f"Sent request: {message} -> {destination}") + await request.event.wait() + result: Message = request.data + log.debug(f"Received response: {request} -> {result}") + if isinstance(result, ErrorMessage): + raise NetworkError(result, "Server returned error while requesting something") + return result + + async def run(self): + log.debug(f"Running main client loop for {self.nid}.") + while True: + if self.websocket is None: + await self.connect() + if not self.identify_event.is_set(): + await self.identify() + package: Package = await self.receive() + # Package is a response + if package.conversation_id in self._pending_requests: + request = self._pending_requests[package.conversation_id] + request.set(package.data) + continue + # Package is a request + assert isinstance(package, Package) + log.debug(f"Received request {package.conversation_id}: {package}") + response = await self.request_handler(package.data) + if response is not None: + response_package: Package = package.reply(response) + await self.send(response_package) + log.debug(f"Replied to request {response_package.conversation_id}: {response_package}") diff --git a/royalnet/network/royalnetserver.py b/royalnet/network/royalnetserver.py new file mode 100644 index 00000000..208d805e --- /dev/null +++ b/royalnet/network/royalnetserver.py @@ -0,0 +1,111 @@ +import typing +import websockets +import re +import datetime +import pickle +import uuid +import asyncio +import logging +from .messages import Message, ErrorMessage, InvalidPackageEM, InvalidSecretEM, IdentifySuccessfulMessage +from .packages import Package + +loop = asyncio.get_event_loop() +log = logging.getLogger(__name__) + + +class ConnectedClient: + def __init__(self, socket: websockets.WebSocketServerProtocol): + self.socket: websockets.WebSocketServerProtocol = socket + self.nid: str = None + self.link_type: str = None + self.connection_datetime: datetime.datetime = datetime.datetime.now() + + @property + def is_identified(self) -> bool: + return bool(self.nid) + + async def send(self, package: Package): + await self.socket.send(package.pickle()) + + +class RoyalnetServer: + def __init__(self, address: str, port: int, required_secret: str): + self.address: str = address + self.port: int = port + self.required_secret: str = required_secret + self.identified_clients: typing.List[ConnectedClient] = [] + + def find_client(self, *, nid: str = None, link_type: str = None) -> typing.List[ConnectedClient]: + assert not (nid and link_type) + if nid: + matching = [client for client in self.identified_clients if client.nid == nid] + assert len(matching) <= 1 + return matching + if link_type: + matching = [client for client in self.identified_clients if client.link_type == link_type] + return matching or [] + + async def listener(self, websocket: websockets.server.WebSocketServerProtocol, request_uri: str): + log.info(f"{websocket.remote_address} connected to the server.") + connected_client = ConnectedClient(websocket) + # Wait for identification + identify_msg = await websocket.recv() + log.debug(f"{websocket.remote_address} identified itself with: {identify_msg}.") + if not isinstance(identify_msg, str): + websocket.send(InvalidPackageEM("Invalid identification message (not a str)")) + return + identification = re.match(r"Identify ([A-Za-z0-9\-]+):([a-z]+):([A-Za-z0-9\-]+)", identify_msg) + if identification is None: + websocket.send(InvalidPackageEM("Invalid identification message (regex failed)")) + return + secret = identification.group(3) + if secret != self.required_secret: + websocket.send(InvalidSecretEM("Invalid secret")) + return + # Identification successful + connected_client.nid = identification.group(1) + connected_client.link_type = identification.group(2) + self.identified_clients.append(connected_client) + log.debug(f"{websocket.remote_address} identified successfully as {connected_client.nid} ({connected_client.link_type}).") + await connected_client.send(Package(IdentifySuccessfulMessage(), connected_client.nid, "__master__")) + log.debug(f"{connected_client.nid}'s identification confirmed.") + # Main loop + while True: + # Receive packages + raw_pickle = await websocket.recv() + package: Package = pickle.loads(raw_pickle) + log.debug(f"Received package: {package}") + # Check if the package destination is the server itself. + if package.destination == "__master__": + # TODO: do stuff + pass + # Otherwise, route the package to its destination + loop.create_task(self.route_package(package)) + + def find_destination(self, package: Package) -> typing.List[ConnectedClient]: + """Find a list of destinations for the sent packages""" + # Parse destination + # Is it nothing? + if package.destination == "NULL": + return [] + # Is it a valid nid? + try: + destination = str(uuid.UUID(package.destination)) + except ValueError: + pass + else: + return self.find_client(nid=destination) + # Is it a link_type? + return self.find_client(link_type=package.destination) + + async def route_package(self, package: Package) -> None: + """Executed every time a package is received and must be routed somewhere.""" + destinations = self.find_destination(package) + log.debug(f"Routing package: {package} -> {destinations}") + for destination in destinations: + specific_package = Package(package.data, destination.nid, package.source, conversation_id=package.conversation_id) + await destination.send(specific_package) + + async def run(self): + log.debug(f"Running main server loop for __master__ on ws://{self.address}:{self.port}") + await websockets.serve(self.listener, host=self.address, port=self.port) diff --git a/royalnet/utils/call.py b/royalnet/utils/call.py index e7862200..a3181168 100644 --- a/royalnet/utils/call.py +++ b/royalnet/utils/call.py @@ -1,3 +1,5 @@ +import typing +from ..network.messages import Message from .command import Command, CommandArgs @@ -12,6 +14,11 @@ class Call: """Send a text message to the channel the call was made.""" raise NotImplementedError() + async def net_request(self, message: Message, destination: str): + """Send data to the rest of the Royalnet, and optionally wait for an answer. + The data must be pickleable.""" + raise NotImplementedError() + # These parameters / methods should be left alone def __init__(self, channel, command: Command, *args, **kwargs): self.channel = channel diff --git a/setup.py b/setup.py new file mode 100644 index 00000000..6595a41a --- /dev/null +++ b/setup.py @@ -0,0 +1,27 @@ +import setuptools + +with open("README.md", "r") as f: + long_description = f.read() + +setuptools.setup( + name="royalnet", + version="5.0a1", + author="Stefano Pigozzi", + author_email="ste.pigozzi@gmail.com", + description="The great bot network of the Royal Games community", + long_description=long_description, + long_description_content_type="text/markdown", + url="https://github.com/Steffo99/royalnet", + packages=setuptools.find_packages(), + install_requires=[ + "python-telegram-bot>=11.1.0", + "websockets>=7.0" + ], + python_requires=">=3.7", + classifiers=[ + "Development Status :: 3 - Alpha", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3.7", + "Topic :: Internet" + ] +) diff --git a/tests/test_network.py b/tests/test_network.py new file mode 100644 index 00000000..75c2880c --- /dev/null +++ b/tests/test_network.py @@ -0,0 +1,33 @@ +import pytest +import asyncio +from royalnet.network import RoyalnetLink, RoyalnetServer +from royalnet.network import Message + + +async def echo(message: Message): + return message + + +def test_connection(): + loop = asyncio.SelectorEventLoop() + server = RoyalnetServer("localhost", 1234, "testing") + link = RoyalnetLink("ws://localhost:1234", "testing", "testing", echo) + loop.create_task(server.run()) + loop.run_until_complete(link.run()) + assert link.websocket is not None + assert link.identify_event.is_set() + assert len(server.identified_clients) == 1 + assert server.identified_clients[0].link_type == "testing" + + +def test_request(): + loop = asyncio.SelectorEventLoop() + server = RoyalnetServer("localhost", 1234, "testing") + link1 = RoyalnetLink("ws://localhost:1234", "testing", "testing1", echo) + link2 = RoyalnetLink("ws://localhost:1234", "testing", "testing2", echo) + loop.create_task(server.run()) + loop.create_task(link1.run()) + loop.create_task(link2.run()) + message = Message() + response = loop.run_until_complete(link1.request(message, "testing2")) + assert message is response