1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-30 15:04:18 +00:00

Merge pull request #32 from Steffo99/unity-network

Unity network
This commit is contained in:
Steffo 2019-03-22 12:31:45 +01:00 committed by GitHub
commit 6b1f13c1d8
12 changed files with 419 additions and 10 deletions

2
README.md Normal file
View file

@ -0,0 +1,2 @@
# `royalnet`

View file

@ -1 +1,3 @@
python-telegram-bot>=11.1.0 python-telegram-bot>=11.1.0
websockets>=7.0
pytest>=4.3.1

View file

@ -0,0 +1,3 @@
from .telegram import TelegramBot
__all__ = ["TelegramBot"]

View file

@ -3,19 +3,29 @@ import asyncio
import typing import typing
from ..commands import NullCommand from ..commands import NullCommand
from ..utils import asyncify, Call, Command from ..utils import asyncify, Call, Command
from ..network import RoyalnetLink, Message
async def todo(message: Message):
pass
class TelegramBot: class TelegramBot:
def __init__(self, api_key: str, commands: typing.List[typing.Type[Command]], *, missing_command: Command=NullCommand): def __init__(self,
self.bot = telegram.Bot(api_key) api_key: str,
self.should_run = False master_server_uri: str,
self.offset = -100 master_server_secret: str,
self.commands = commands commands: typing.List[typing.Type[Command]],
self.missing_command: typing.Callable = missing_command missing_command: Command = NullCommand):
self.bot: telegram.Bot = telegram.Bot(api_key)
self.should_run: bool = False
self.offset: int = -100
self.missing_command = missing_command
self.network: RoyalnetLink = RoyalnetLink(master_server_uri, master_server_secret, "telegram", todo)
# Generate commands # Generate commands
self._commands = {} self.commands = {}
for command in self.commands: for command in commands:
self._commands[f"/{command.command_name}"] = command self.commands[f"/{command.command_name}"] = command
class TelegramCall(Call): class TelegramCall(Call):
interface_name = "telegram" interface_name = "telegram"
@ -23,6 +33,11 @@ class TelegramBot:
async def reply(self, text: str): async def reply(self, text: str):
await asyncify(self.channel.send_message, text, parse_mode="HTML") await asyncify(self.channel.send_message, text, parse_mode="HTML")
async def net_request(self, message: Message, destination: str):
response = await self.network.request(message, destination)
return response
self.Call = TelegramCall self.Call = TelegramCall
async def run(self): async def run(self):
@ -56,9 +71,12 @@ class TelegramBot:
command_text.replace(f"@{self.bot.username}", "") command_text.replace(f"@{self.bot.username}", "")
# Find the function # Find the function
try: try:
command = self._commands[command_text] command = self.commands[command_text]
except KeyError: except KeyError:
# Skip inexistent commands # Skip inexistent commands
command = self.missing_command command = self.missing_command
# Call the command # Call the command
return await self.Call(message.chat, command, *parameters).run() return await self.Call(message.chat, command, *parameters).run()
async def handle_net_request(self, message: Message):
pass

View file

@ -0,0 +1,16 @@
from .messages import Message, ErrorMessage, InvalidSecretEM, InvalidDestinationEM, InvalidPackageEM
from .packages import Package
from .royalnetlink import RoyalnetLink, NetworkError, NotConnectedError, NotIdentifiedError
from .royalnetserver import RoyalnetServer
__all__ = ["Message",
"ErrorMessage",
"InvalidSecretEM",
"InvalidDestinationEM",
"InvalidPackageEM",
"RoyalnetLink",
"NetworkError",
"NotConnectedError",
"NotIdentifiedError",
"Package",
"RoyalnetServer"]

View file

@ -0,0 +1,25 @@
class Message:
def __repr__(self):
return f"<{self.__class__.__name__}>"
class IdentifySuccessfulMessage(Message):
pass
class ErrorMessage(Message):
def __init__(self, reason):
super().__init__()
self.reason = reason
class InvalidSecretEM(ErrorMessage):
pass
class InvalidPackageEM(ErrorMessage):
pass
class InvalidDestinationEM(InvalidPackageEM):
pass

View file

@ -0,0 +1,19 @@
import pickle
import uuid
class Package:
def __init__(self, data, destination: str, source: str, *, conversation_id: str = None):
self.data = data
self.destination: str = destination
self.source = source
self.conversation_id = conversation_id or str(uuid.uuid4())
def __repr__(self):
return f"<Package to {self.destination}: {self.data.__class__.__name__}>"
def reply(self, data) -> "Package":
return Package(data, self.source, self.destination, conversation_id=self.conversation_id)
def pickle(self):
return pickle.dumps(self)

View file

@ -0,0 +1,146 @@
import asyncio
import websockets
import uuid
import functools
import typing
import pickle
import logging
from .messages import Message, ErrorMessage
from .packages import Package
loop = asyncio.get_event_loop()
log = logging.getLogger(__name__)
class NotConnectedError(Exception):
pass
class NotIdentifiedError(Exception):
pass
class NetworkError(Exception):
def __init__(self, error_msg: ErrorMessage, *args):
super().__init__(*args)
self.error_msg: ErrorMessage = error_msg
class PendingRequest:
def __init__(self):
self.event: asyncio.Event = asyncio.Event()
self.data: Message = None
def __repr__(self):
if self.event.is_set():
return f"<PendingRequest: {self.data.__class__.__name__}>"
return f"<PendingRequest>"
def set(self, data):
self.data = data
self.event.set()
def requires_connection(func):
@functools.wraps(func)
async def new_func(self, *args, **kwargs):
await self._connect_event.wait()
return await func(self, *args, **kwargs)
return new_func
def requires_identification(func):
@functools.wraps(func)
async def new_func(self, *args, **kwargs):
await self._identify_event.wait()
return await func(self, *args, **kwargs)
return new_func
class RoyalnetLink:
def __init__(self, master_uri: str, secret: str, link_type: str, request_handler):
assert ":" not in link_type
self.master_uri: str = master_uri
self.link_type: str = link_type
self.nid: str = str(uuid.uuid4())
self.secret: str = secret
self.websocket: typing.Optional[websockets.WebSocketClientProtocol] = None
self.request_handler = request_handler
self._pending_requests: typing.Dict[typing.Optional[Message]] = {}
self._connect_event: asyncio.Event = asyncio.Event()
self.identify_event: asyncio.Event = asyncio.Event()
async def connect(self):
log.info(f"Connecting to {self.master_uri}...")
self.websocket = await websockets.connect(self.master_uri)
self._connect_event.set()
log.info(f"Connected!")
@requires_connection
async def receive(self) -> Package:
try:
raw_pickle = await self.websocket.recv()
except websockets.ConnectionClosed:
self.websocket = None
self._connect_event.clear()
self.identify_event.clear()
log.info(f"Connection to {self.master_uri} was closed.")
# What to do now? Let's just reraise.
raise
package: typing.Union[Package, Package] = pickle.loads(raw_pickle)
assert package.destination == self.nid
log.debug(f"Received package: {package}")
return package
@requires_connection
async def identify(self) -> None:
log.info(f"Identifying to {self.master_uri}...")
await self.websocket.send(f"Identify {self.nid}:{self.link_type}:{self.secret}")
response_package = await self.receive()
response = response_package.data
if isinstance(response, ErrorMessage):
raise NetworkError(response, "Server returned error while identifying self")
self.identify_event.set()
log.info(f"Identified successfully!")
@requires_identification
async def send(self, package: Package):
raw_pickle: bytes = pickle.dumps(package)
await self.websocket.send(raw_pickle)
log.debug(f"Sent package: {package}")
@requires_identification
async def request(self, message, destination):
package = Package(message, destination, self.nid)
request = PendingRequest()
self._pending_requests[package.conversation_id] = request
await self.send(package)
log.debug(f"Sent request: {message} -> {destination}")
await request.event.wait()
result: Message = request.data
log.debug(f"Received response: {request} -> {result}")
if isinstance(result, ErrorMessage):
raise NetworkError(result, "Server returned error while requesting something")
return result
async def run(self):
log.debug(f"Running main client loop for {self.nid}.")
while True:
if self.websocket is None:
await self.connect()
if not self.identify_event.is_set():
await self.identify()
package: Package = await self.receive()
# Package is a response
if package.conversation_id in self._pending_requests:
request = self._pending_requests[package.conversation_id]
request.set(package.data)
continue
# Package is a request
assert isinstance(package, Package)
log.debug(f"Received request {package.conversation_id}: {package}")
response = await self.request_handler(package.data)
if response is not None:
response_package: Package = package.reply(response)
await self.send(response_package)
log.debug(f"Replied to request {response_package.conversation_id}: {response_package}")

View file

@ -0,0 +1,111 @@
import typing
import websockets
import re
import datetime
import pickle
import uuid
import asyncio
import logging
from .messages import Message, ErrorMessage, InvalidPackageEM, InvalidSecretEM, IdentifySuccessfulMessage
from .packages import Package
loop = asyncio.get_event_loop()
log = logging.getLogger(__name__)
class ConnectedClient:
def __init__(self, socket: websockets.WebSocketServerProtocol):
self.socket: websockets.WebSocketServerProtocol = socket
self.nid: str = None
self.link_type: str = None
self.connection_datetime: datetime.datetime = datetime.datetime.now()
@property
def is_identified(self) -> bool:
return bool(self.nid)
async def send(self, package: Package):
await self.socket.send(package.pickle())
class RoyalnetServer:
def __init__(self, address: str, port: int, required_secret: str):
self.address: str = address
self.port: int = port
self.required_secret: str = required_secret
self.identified_clients: typing.List[ConnectedClient] = []
def find_client(self, *, nid: str = None, link_type: str = None) -> typing.List[ConnectedClient]:
assert not (nid and link_type)
if nid:
matching = [client for client in self.identified_clients if client.nid == nid]
assert len(matching) <= 1
return matching
if link_type:
matching = [client for client in self.identified_clients if client.link_type == link_type]
return matching or []
async def listener(self, websocket: websockets.server.WebSocketServerProtocol, request_uri: str):
log.info(f"{websocket.remote_address} connected to the server.")
connected_client = ConnectedClient(websocket)
# Wait for identification
identify_msg = await websocket.recv()
log.debug(f"{websocket.remote_address} identified itself with: {identify_msg}.")
if not isinstance(identify_msg, str):
websocket.send(InvalidPackageEM("Invalid identification message (not a str)"))
return
identification = re.match(r"Identify ([A-Za-z0-9\-]+):([a-z]+):([A-Za-z0-9\-]+)", identify_msg)
if identification is None:
websocket.send(InvalidPackageEM("Invalid identification message (regex failed)"))
return
secret = identification.group(3)
if secret != self.required_secret:
websocket.send(InvalidSecretEM("Invalid secret"))
return
# Identification successful
connected_client.nid = identification.group(1)
connected_client.link_type = identification.group(2)
self.identified_clients.append(connected_client)
log.debug(f"{websocket.remote_address} identified successfully as {connected_client.nid} ({connected_client.link_type}).")
await connected_client.send(Package(IdentifySuccessfulMessage(), connected_client.nid, "__master__"))
log.debug(f"{connected_client.nid}'s identification confirmed.")
# Main loop
while True:
# Receive packages
raw_pickle = await websocket.recv()
package: Package = pickle.loads(raw_pickle)
log.debug(f"Received package: {package}")
# Check if the package destination is the server itself.
if package.destination == "__master__":
# TODO: do stuff
pass
# Otherwise, route the package to its destination
loop.create_task(self.route_package(package))
def find_destination(self, package: Package) -> typing.List[ConnectedClient]:
"""Find a list of destinations for the sent packages"""
# Parse destination
# Is it nothing?
if package.destination == "NULL":
return []
# Is it a valid nid?
try:
destination = str(uuid.UUID(package.destination))
except ValueError:
pass
else:
return self.find_client(nid=destination)
# Is it a link_type?
return self.find_client(link_type=package.destination)
async def route_package(self, package: Package) -> None:
"""Executed every time a package is received and must be routed somewhere."""
destinations = self.find_destination(package)
log.debug(f"Routing package: {package} -> {destinations}")
for destination in destinations:
specific_package = Package(package.data, destination.nid, package.source, conversation_id=package.conversation_id)
await destination.send(specific_package)
async def run(self):
log.debug(f"Running main server loop for __master__ on ws://{self.address}:{self.port}")
await websockets.serve(self.listener, host=self.address, port=self.port)

View file

@ -1,3 +1,5 @@
import typing
from ..network.messages import Message
from .command import Command, CommandArgs from .command import Command, CommandArgs
@ -12,6 +14,11 @@ class Call:
"""Send a text message to the channel the call was made.""" """Send a text message to the channel the call was made."""
raise NotImplementedError() raise NotImplementedError()
async def net_request(self, message: Message, destination: str):
"""Send data to the rest of the Royalnet, and optionally wait for an answer.
The data must be pickleable."""
raise NotImplementedError()
# These parameters / methods should be left alone # These parameters / methods should be left alone
def __init__(self, channel, command: Command, *args, **kwargs): def __init__(self, channel, command: Command, *args, **kwargs):
self.channel = channel self.channel = channel

27
setup.py Normal file
View file

@ -0,0 +1,27 @@
import setuptools
with open("README.md", "r") as f:
long_description = f.read()
setuptools.setup(
name="royalnet",
version="5.0a1",
author="Stefano Pigozzi",
author_email="ste.pigozzi@gmail.com",
description="The great bot network of the Royal Games community",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/Steffo99/royalnet",
packages=setuptools.find_packages(),
install_requires=[
"python-telegram-bot>=11.1.0",
"websockets>=7.0"
],
python_requires=">=3.7",
classifiers=[
"Development Status :: 3 - Alpha",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.7",
"Topic :: Internet"
]
)

33
tests/test_network.py Normal file
View file

@ -0,0 +1,33 @@
import pytest
import asyncio
from royalnet.network import RoyalnetLink, RoyalnetServer
from royalnet.network import Message
async def echo(message: Message):
return message
def test_connection():
loop = asyncio.SelectorEventLoop()
server = RoyalnetServer("localhost", 1234, "testing")
link = RoyalnetLink("ws://localhost:1234", "testing", "testing", echo)
loop.create_task(server.run())
loop.run_until_complete(link.run())
assert link.websocket is not None
assert link.identify_event.is_set()
assert len(server.identified_clients) == 1
assert server.identified_clients[0].link_type == "testing"
def test_request():
loop = asyncio.SelectorEventLoop()
server = RoyalnetServer("localhost", 1234, "testing")
link1 = RoyalnetLink("ws://localhost:1234", "testing", "testing1", echo)
link2 = RoyalnetLink("ws://localhost:1234", "testing", "testing2", echo)
loop.create_task(server.run())
loop.create_task(link1.run())
loop.create_task(link2.run())
message = Message()
response = loop.run_until_complete(link1.request(message, "testing2"))
assert message is response