1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 11:34:18 +00:00

This is actually working? I'm amazed

This commit is contained in:
Steffo 2019-11-25 18:21:18 +01:00
parent dbd29ff35a
commit 2505e8caf7
20 changed files with 517 additions and 272 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@
dist/
**/__pycache__/
config.toml
downloads/

3
.idea/.gitignore vendored
View file

@ -1,4 +1,5 @@
# Default ignored files
/workspace.xml
# Datasource local storage ignored files
/dataSources/
/dataSources/
/dataSources.local.xml

View file

@ -1,6 +1,6 @@
__version__ = "5.1a1"
from . import alchemy, bard, commands, constellation, herald, backpack, serf, utils, version
from . import alchemy, bard, commands, constellation, herald, backpack, serf, utils
__version__ = version.semantic
__all__ = [
"alchemy",

View file

@ -1,10 +1,11 @@
import click
import typing
import importlib
import royalnet as r
import multiprocessing
import royalnet.constellation as rc
import royalnet.serf as rs
import royalnet.utils as ru
import royalnet.herald as rh
import toml
from logging import Formatter, StreamHandler, getLogger, Logger
import logging
try:
import coloredlogs
@ -12,7 +13,7 @@ except ImportError:
coloredlogs = None
log = getLogger(__name__)
log = logging.getLogger(__name__)
@click.command()
@ -23,20 +24,104 @@ def run(config_filename: str):
with open(config_filename, "r") as t:
config: dict = toml.load(t)
# Initialize logging
royalnet_log: Logger = getLogger("royalnet")
royalnet_log.setLevel(config["Logging"]["log_level"])
stream_handler = StreamHandler()
if coloredlogs is not None:
stream_handler.formatter = coloredlogs.ColoredFormatter("{asctime}\t| {processName}\t| {name}\t| {message}",
style="{")
else:
stream_handler.formatter = Formatter("{asctime}\t| {processName}\t| {name}\t| {message}",
style="{")
royalnet_log.addHandler(stream_handler)
log.info("Logging: ready")
ru.init_logging(config["Logging"])
if config["Sentry"] is None or not config["Sentry"]["enabled"]:
log.info("Sentry: disabled")
else:
try:
ru.init_sentry(config["Sentry"])
except ImportError:
log.info("Sentry: not installed")
# Herald Server
herald_cfg = None
herald_process = None
if config["Herald"]["Local"]["enabled"]:
# Create a Herald server
herald_server = rh.Server(rh.Config.from_config(name="<server>", **config["Herald"]["Local"]))
# Run the Herald server on a new process
herald_process = multiprocessing.Process(name="Herald.Local",
target=herald_server.run_blocking,
daemon=True,
kwargs={
"logging_cfg": config["Logging"]
})
herald_process.start()
herald_cfg = config["Herald"]["Local"]
log.info("Herald: Enabled (Local)")
elif config["Herald"]["Remote"]["enabled"]:
log.info("Herald: Enabled (Remote)")
herald_cfg = config["Herald"]["Remote"]
else:
log.info("Herald: Disabled")
# Serfs
telegram_process = None
if config["Serfs"]["Telegram"]["enabled"]:
telegram_process = multiprocessing.Process(name="Serf.Telegram",
target=rs.telegram.TelegramSerf.run_process,
daemon=True,
kwargs={
"alchemy_cfg": config["Alchemy"],
"herald_cfg": herald_cfg,
"packs_cfg": config["Packs"],
"sentry_cfg": config["Sentry"],
"logging_cfg": config["Logging"],
"serf_cfg": config["Serfs"]["Telegram"],
})
telegram_process.start()
log.info("Serf.Telegram: Started")
else:
log.info("Serf.Telegram: Disabled")
discord_process = None
if config["Serfs"]["Discord"]["enabled"]:
discord_process = multiprocessing.Process(name="Serf.Discord",
target=rs.discord.DiscordSerf.run_process,
daemon=True,
kwargs={
"alchemy_cfg": config["Alchemy"],
"herald_cfg": herald_cfg,
"packs_cfg": config["Packs"],
"sentry_cfg": config["Sentry"],
"logging_cfg": config["Logging"],
"serf_cfg": config["Serfs"]["Discord"],
})
discord_process.start()
log.info("Serf.Discord: Started")
else:
log.info("Serf.Discord: Disabled")
# Constellation
constellation_process = None
if config["Constellation"]["enabled"]:
constellation_process = multiprocessing.Process(name="Constellation",
target=rc.Constellation.run_process,
daemon=True,
kwargs={
"alchemy_cfg": config["Alchemy"],
"herald_cfg": herald_cfg,
"packs_cfg": config["Packs"],
"sentry_cfg": config["Sentry"],
"logging_cfg": config["Logging"],
"constellation_cfg": config["Constellation"],
})
constellation_process.start()
log.info("Constellation: Started")
else:
log.info("Constellation: Disabled")
log.info("All processes started!")
if constellation_process is not None:
constellation_process.join()
if telegram_process is not None:
telegram_process.join()
if discord_process is not None:
discord_process.join()
if herald_process is not None:
herald_process.join()
...
if __name__ == "__main__":

View file

@ -8,4 +8,6 @@ class ExceptionCommand(Command):
description: str = "Raise an exception in the command."
async def run(self, args: CommandArgs, data: CommandData) -> None:
if not self.interface.cfg["exc_debug"]:
raise UserError(f"{self.interface.prefix}{self.name} is not enabled.")
raise Exception(f"{self.interface.prefix}{self.name} was called")

View file

@ -8,5 +8,7 @@ class ExceventCommand(Command):
description: str = "Call an event that raises an exception."
async def run(self, args: CommandArgs, data: CommandData) -> None:
if not self.interface.cfg["exc_debug"]:
raise UserError(f"{self.interface.prefix}{self.name} is not enabled.")
await self.interface.call_herald_event(self.interface.name, "exception")
await data.reply("✅ Event called!")

View file

@ -5,4 +5,6 @@ class ExceptionEvent(Event):
name = "exception"
def run(self, **kwargs):
if not self.interface.cfg["exc_debug"]:
raise UserError(f"{self.interface.prefix}{self.name} is not enabled.")
raise Exception(f"{self.name} event was called")

View file

@ -6,6 +6,7 @@ if TYPE_CHECKING:
from .command import Command
from ..alchemy import Alchemy
from ..serf import Serf
from ..constellation import Constellation
class CommandInterface:
@ -19,14 +20,21 @@ class CommandInterface:
"""The prefix used by commands on the interface.
Examples:
``/`` on Telegram, ``!`` on Discord"""
``/`` on Telegram, ``!`` on Discord."""
serf: "Serf" = NotImplemented
serf: Optional["Serf"] = None
"""A reference to the Serf that is implementing this :class:`CommandInterface`.
Examples:
Example:
A reference to a :class:`~royalnet.serf.telegram.TelegramSerf`."""
constellation: Optional["Constellation"] = None
"""A reference to the Constellation that is implementing this :class:`CommandInterface`.
Example:
A reference to a :class:`~royalnet.constellation.Constellation`."""
@property
def alchemy(self) -> "Alchemy":
"""A shortcut for :attr:`serf.alchemy`."""

View file

@ -1,7 +1,11 @@
import typing
import logging
import royalnet
import keyring
import importlib
import asyncio as aio
from typing import *
import royalnet.alchemy as ra
import royalnet.herald as rh
import royalnet.utils as ru
import royalnet.commands as rc
from .star import PageStar, ExceptionStar
try:
@ -13,9 +17,6 @@ except ImportError:
try:
import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
except ImportError:
sentry_sdk = None
AioHttpIntegration = None
@ -46,138 +47,264 @@ class Constellation:
It also handles the :class:`Alchemy` connection, and it *will eventually* support Herald connections too."""
def __init__(self,
secrets_name: str,
database_uri: str,
page_stars: typing.List[typing.Type[PageStar]] = None,
exc_stars: typing.List[typing.Type[ExceptionStar]] = None,
*,
debug: bool = __debug__,):
alchemy_cfg: Dict[str, Any],
herald_cfg: Dict[str, Any],
packs_cfg: Dict[str, Any],
constellation_cfg: Dict[str, Any],
**_):
if Starlette is None:
raise ImportError("'constellation' extra is not installed")
raise ImportError("`constellation` extra is not installed")
if page_stars is None:
page_stars = []
if exc_stars is None:
exc_stars = []
self.secrets_name: str = secrets_name
"""The secrets_name this Constellation is currently using."""
self.running: bool = False
"""Is the Constellation currently running?"""
log.info(f"Creating Starlette in {'Debug' if __debug__ else 'Production'} mode...")
self.starlette = Starlette(debug=debug)
"""The :class:`starlette.Starlette` app."""
log.debug("Finding required Tables...")
tables = set(royalnet.backpack.available_tables)
for SelectedPageStar in page_stars:
tables = tables.union(SelectedPageStar.tables)
for SelectedExcStar in exc_stars:
tables = tables.union(SelectedExcStar.tables)
log.debug(f"Found Tables: {' '.join([table.__name__ for table in tables])}")
# Import packs
pack_names = packs_cfg["active"]
packs = {}
for pack_name in pack_names:
log.debug(f"Importing pack: {pack_name}")
try:
packs[pack_name] = importlib.import_module(pack_name)
except ImportError as e:
log.error(f"Error during the import of {pack_name}: {e}")
log.info(f"Packs: {len(packs)} imported")
self.alchemy = None
"""The :class:`Alchemy` of this Constellation."""
if database_uri is not None:
log.info(f"Creating Alchemy...")
self.alchemy: royalnet.alchemy.Alchemy = royalnet.alchemy.Alchemy(database_uri=database_uri, tables=tables)
# Alchemy
if ra.Alchemy is None:
log.info("Alchemy: not installed")
elif not alchemy_cfg["enabled"]:
log.info("Alchemy: disabled")
else:
# Find all tables
tables = set()
for pack in packs.values():
try:
tables = tables.union(pack.available_tables)
except AttributeError:
log.warning(f"Pack `{pack}` does not have the `available_tables` attribute.")
continue
# Create the Alchemy
self.alchemy = ra.Alchemy(alchemy_cfg["database_url"], tables)
log.info(f"Alchemy: {self.alchemy}")
log.info("Registering PageStars...")
for SelectedPageStar in page_stars:
log.info(f"Registering: {SelectedPageStar.path} -> {SelectedPageStar.__qualname__}")
# Herald
self.herald: Optional[rh.Link] = None
"""The :class:`Link` object connecting the Serf to the rest of the herald network."""
self.herald_task: Optional[aio.Task] = None
"""A reference to the :class:`asyncio.Task` that runs the :class:`Link`."""
self.Interface: Type[rc.CommandInterface] = self.interface_factory()
"""The :class:`CommandInterface` class of this Constellation."""
self.events: Dict[str, rc.Event] = {}
"""A dictionary containing all :class:`Event` that can be handled by this :class:`Serf`."""
self.starlette = Starlette(debug=__debug__)
"""The :class:`starlette.Starlette` app."""
# Register Events
for pack_name in packs:
pack = packs[pack_name]
pack_cfg = packs_cfg.get(pack_name, {})
try:
page_star_instance = SelectedPageStar(constellation=self)
events = pack.available_events
except AttributeError:
log.warning(f"Pack `{pack}` does not have the `available_events` attribute.")
else:
self.register_events(events, pack_cfg)
log.info(f"Events: {len(self.events)} events")
if rh.Link is None:
log.info("Herald: not installed")
elif not herald_cfg["enabled"]:
log.info("Herald: disabled")
else:
self.init_herald(herald_cfg)
log.info(f"Herald: enabled")
# Register PageStars and ExceptionStars
for pack_name in packs:
pack = packs[pack_name]
pack_cfg = packs_cfg.get(pack_name, {})
try:
page_stars = pack.available_page_stars
except AttributeError:
log.warning(f"Pack `{pack}` does not have the `available_page_stars` attribute.")
else:
self.register_page_stars(page_stars, pack_cfg)
try:
exc_stars = pack.available_exception_stars
except AttributeError:
log.warning(f"Pack `{pack}` does not have the `available_exception_stars` attribute.")
else:
self.register_exc_stars(exc_stars, pack_cfg)
log.info(f"PageStars: {len(self.starlette.routes)} stars")
log.info(f"ExceptionStars: {len(self.starlette.exception_handlers)} stars")
self.running: bool = False
"""Is the Constellation server currently running?"""
self.address: str = constellation_cfg["address"]
"""The address that the Constellation will bind to when run."""
self.port: int = constellation_cfg["port"]
"""The port on which the Constellation will listen for connection on."""
# TODO: is this a good idea?
def interface_factory(self) -> Type[rc.CommandInterface]:
"""Create the :class:`CommandInterface` class for the Constellation."""
# noinspection PyMethodParameters
class GenericInterface(rc.CommandInterface):
alchemy: ra.Alchemy = self.alchemy
constellation = self
async def call_herald_event(ci, destination: str, event_name: str, **kwargs) -> Dict:
"""Send a :class:`royalherald.Request` to a specific destination, and wait for a
:class:`royalherald.Response`."""
if self.herald is None:
raise rc.UnsupportedError("`royalherald` is not enabled on this Constellation.")
request: rh.Request = rh.Request(handler=event_name, data=kwargs)
response: rh.Response = await self.herald.request(destination=destination, request=request)
if isinstance(response, rh.ResponseFailure):
if response.name == "no_event":
raise rc.CommandError(f"There is no event named {event_name} in {destination}.")
elif response.name == "exception_in_event":
# TODO: pretty sure there's a better way to do this
if response.extra_info["type"] == "CommandError":
raise rc.CommandError(response.extra_info["message"])
elif response.extra_info["type"] == "UserError":
raise rc.UserError(response.extra_info["message"])
elif response.extra_info["type"] == "InvalidInputError":
raise rc.InvalidInputError(response.extra_info["message"])
elif response.extra_info["type"] == "UnsupportedError":
raise rc.UnsupportedError(response.extra_info["message"])
elif response.extra_info["type"] == "ConfigurationError":
raise rc.ConfigurationError(response.extra_info["message"])
elif response.extra_info["type"] == "ExternalError":
raise rc.ExternalError(response.extra_info["message"])
else:
raise TypeError(f"Herald action call returned invalid error:\n"
f"[p]{response}[/p]")
elif isinstance(response, rh.ResponseSuccess):
return response.data
else:
raise TypeError(f"Other Herald Link returned unknown response:\n"
f"[p]{response}[/p]")
return GenericInterface
def init_herald(self, herald_cfg: Dict[str, Any]):
"""Create a :class:`Link` and bind :class:`Event`."""
herald_cfg["name"] = "constellation"
self.herald: rh.Link = rh.Link(rh.Config.from_config(**herald_cfg), self.network_handler)
async def network_handler(self, message: Union[rh.Request, rh.Broadcast]) -> rh.Response:
try:
event: rc.Event = self.events[message.handler]
except KeyError:
log.warning(f"No event for '{message.handler}'")
return rh.ResponseFailure("no_event", f"This serf does not have any event for {message.handler}.")
log.debug(f"Event called: {event.name}")
if isinstance(message, rh.Request):
try:
response_data = await event.run(**message.data)
return rh.ResponseSuccess(data=response_data)
except Exception as e:
log.error(f"{e.__class__.__qualname__} during the registration of {SelectedPageStar.__qualname__}!")
raise
ru.sentry_exc(e)
return rh.ResponseFailure("exception_in_event",
f"An exception was raised in the event for '{message.handler}'.",
extra_info={
"type": e.__class__.__qualname__,
"message": str(e)
})
elif isinstance(message, rh.Broadcast):
await event.run(**message.data)
def register_events(self, events: List[Type[rc.Event]], pack_cfg: Dict[str, Any]):
for SelectedEvent in events:
# Create a new interface
interface = self.Interface(cfg=pack_cfg)
# Initialize the event
try:
event = SelectedEvent(interface)
except Exception as e:
log.error(f"Skipping: "
f"{SelectedEvent.__qualname__} - {e.__class__.__qualname__} in the initialization.")
ru.sentry_exc(e)
continue
# Register the event
if SelectedEvent.name in self.events:
log.warning(f"Overriding (already defined): {SelectedEvent.__qualname__} -> {SelectedEvent.name}")
else:
log.debug(f"Registering: {SelectedEvent.__qualname__} -> {SelectedEvent.name}")
self.events[SelectedEvent.name] = event
def register_page_stars(self, page_stars: List[Type[PageStar]], pack_cfg: Dict[str, Any]):
for SelectedPageStar in page_stars:
log.debug(f"Registering: {SelectedPageStar.path} -> {SelectedPageStar.__qualname__}")
try:
page_star_instance = SelectedPageStar(constellation=self, config=pack_cfg)
except Exception as e:
log.error(f"Skipping: "
f"{SelectedPageStar.__qualname__} - {e.__class__.__qualname__} in the initialization.")
ru.sentry_exc(e)
continue
self.starlette.add_route(page_star_instance.path, page_star_instance.page, page_star_instance.methods)
log.info("Registering ExceptionStars...")
for SelectedExcStar in exc_stars:
log.info(f"Registering: {SelectedExcStar.error} -> {SelectedExcStar.__name__}")
def register_exc_stars(self, exc_stars: List[Type[ExceptionStar]], pack_cfg: Dict[str, Any]):
for SelectedPageStar in exc_stars:
log.debug(f"Registering: {SelectedPageStar.error} -> {SelectedPageStar.__qualname__}")
try:
exc_star_instance = SelectedExcStar(constellation=self)
page_star_instance = SelectedPageStar(constellation=self, config=pack_cfg)
except Exception as e:
log.error(f"{e.__class__.__qualname__} during the registration of {SelectedExcStar.__qualname__}!")
raise
self.starlette.add_exception_handler(exc_star_instance.error, exc_star_instance.page)
log.error(f"Skipping: "
f"{SelectedPageStar.__qualname__} - {e.__class__.__qualname__} in the initialization.")
ru.sentry_exc(e)
continue
self.starlette.add_exception_handler(page_star_instance.error, page_star_instance.page)
def get_secret(self, username: str) -> typing.Optional[str]:
"""Get a Royalnet secret from the keyring.
Args:
username: the name of the secret that should be retrieved."""
return keyring.get_password(f"Royalnet/{self.secrets_name}", username)
def run_blocking(self):
log.info(f"Running Constellation on https://{self.address}:{self.port}/...")
loop: aio.AbstractEventLoop = aio.get_event_loop()
self.running = True
# FIXME: might not work as expected
loop.create_task(self.herald.run())
try:
uvicorn.run(self.starlette, host=self.address, port=self.port, log_config=UVICORN_LOGGING_CONFIG)
finally:
self.running = False
@classmethod
def run_process(cls,
address: str,
port: int,
secrets_name: str,
database_uri: str,
page_stars: typing.List[typing.Type[PageStar]] = None,
exc_stars: typing.List[typing.Type[ExceptionStar]] = None,
log_level: str = "WARNING",
*,
debug: bool = __debug__,):
alchemy_cfg: Dict[str, Any],
herald_cfg: Dict[str, Any],
sentry_cfg: Dict[str, Any],
packs_cfg: Dict[str, Any],
constellation_cfg: Dict[str, Any],
logging_cfg: Dict[str, Any]):
"""Blockingly create and run the Constellation.
This should be used as the target of a :class:`multiprocessing.Process`.
This should be used as the target of a :class:`multiprocessing.Process`."""
ru.init_logging(logging_cfg)
Args:
address: The IP address this Constellation should bind to.
port: The port this Constellation should listen for requests on."""
constellation = cls(secrets_name=secrets_name,
database_uri=database_uri,
page_stars=page_stars,
exc_stars=exc_stars,
debug=debug)
# Initialize logging, as Windows doesn't have fork
royalnet_log: logging.Logger = logging.getLogger("royalnet")
royalnet_log.setLevel(log_level)
stream_handler = logging.StreamHandler()
if coloredlogs is not None:
stream_handler.formatter = coloredlogs.ColoredFormatter("{asctime}\t| {processName}\t| {name}\t| {message}",
style="{")
if sentry_cfg is None or not sentry_cfg["enabled"]:
log.info("Sentry: disabled")
else:
stream_handler.formatter = logging.Formatter("{asctime}\t| {processName}\t| {name}\t| {message}",
style="{")
if len(royalnet_log.handlers) < 1:
royalnet_log.addHandler(stream_handler)
log.debug("Logging: ready")
try:
ru.init_sentry(sentry_cfg)
except ImportError:
log.info("Sentry: not installed")
constellation = cls(alchemy_cfg=alchemy_cfg,
herald_cfg=herald_cfg,
packs_cfg=packs_cfg,
constellation_cfg=constellation_cfg)
# Initialize Sentry on the process
if sentry_sdk is None:
log.info("Sentry: not installed")
else:
sentry_dsn = constellation.get_secret("sentry")
if not sentry_dsn:
log.info("Sentry: disabled")
else:
# noinspection PyUnreachableCode
if __debug__:
release = f"Dev"
else:
release = f"{royalnet.__version__}"
log.debug("Initializing Sentry...")
sentry_sdk.init(sentry_dsn,
integrations=[AioHttpIntegration(),
SqlalchemyIntegration(),
LoggingIntegration(event_level=None)],
release=release)
log.info(f"Sentry: enabled (Royalnet {release})")
# Run the server
log.info(f"Running Constellation on https://{address}:{port}/...")
constellation.running = True
try:
uvicorn.run(constellation.starlette, host=address, port=port, log_config=UVICORN_LOGGING_CONFIG)
finally:
constellation.running = False
constellation.run_blocking()
def __repr__(self):
return f"<{self.__class__.__qualname__}: {'running' if self.running else 'inactive'}>"

View file

@ -1,4 +1,4 @@
from typing import Type, TYPE_CHECKING, List, Union
from typing import *
if TYPE_CHECKING:
from .constellation import Constellation
@ -10,10 +10,8 @@ class Star:
"""A Star is a class representing a part of the website.
It shouldn't be used directly: please use :class:`PageStar` and :class:`ExceptionStar` instead!"""
tables: set = {}
"""The set of :mod:`~royalnet.alchemy` table classes required by this :class:`Star` to function."""
def __init__(self, constellation: "Constellation"):
def __init__(self, config: Dict[str, Any], constellation: "Constellation"):
self.config: Dict[str, Any] = config
self.constellation: "Constellation" = constellation
async def page(self, request: "Request") -> "Response":
@ -27,6 +25,7 @@ class Star:
"""A shortcut for the :class:`Alchemy` of the :class:`Constellation`."""
return self.constellation.alchemy
# noinspection PyPep8Naming
@property
def Session(self):
"""A shortcut for the alchemy :class:`Session` of the :class:`Constellation`."""

View file

@ -8,9 +8,7 @@ class Config:
port: int,
secret: str,
secure: bool = False,
path: str = "/",
*,
enabled: ... = ..., # Ignored, but useful to allow creating a config from the config dict
path: str = "/"
):
if ":" in name:
raise ValueError("Herald names cannot contain colons (:)")
@ -53,3 +51,22 @@ class Config:
def __repr__(self):
return f"<HeraldConfig for {self.url}>"
@classmethod
def from_config(cls, *,
name: str,
address: str,
port: int,
secret: str,
secure: bool = False,
path: str = "/",
enabled: ... = ...
):
return cls(
name=name,
address=address,
port=port,
secret=secret,
secure=secure,
path=path
)

View file

@ -1,10 +1,10 @@
import logging
import typing
from typing import *
import re
import datetime
import uuid
import asyncio
import logging as _logging
import royalnet.utils as ru
from .package import Package
from .config import Config
@ -26,8 +26,8 @@ class ConnectedClient:
"""The :py:class:`Server`-side representation of a connected :py:class:`Link`."""
def __init__(self, socket: "websockets.WebSocketServerProtocol"):
self.socket: "websockets.WebSocketServerProtocol" = socket
self.nid: typing.Optional[str] = None
self.link_type: typing.Optional[str] = None
self.nid: Optional[str] = None
self.link_type: Optional[str] = None
self.connection_datetime: datetime.datetime = datetime.datetime.now()
def __repr__(self):
@ -51,13 +51,13 @@ class ConnectedClient:
class Server:
def __init__(self, config: Config, *, loop: asyncio.AbstractEventLoop = None):
self.config: Config = config
self.identified_clients: typing.List[ConnectedClient] = []
self.identified_clients: List[ConnectedClient] = []
self.loop = loop
def __repr__(self):
return f"<{self.__class__.__qualname__}>"
def find_client(self, *, nid: str = None, link_type: str = None) -> typing.List[ConnectedClient]:
def find_client(self, *, nid: str = None, link_type: str = None) -> List[ConnectedClient]:
assert not (nid and link_type)
if nid:
matching = [client for client in self.identified_clients if client.nid == nid]
@ -108,7 +108,7 @@ class Server:
# noinspection PyAsyncCall
self.loop.create_task(self.route_package(package))
def find_destination(self, package: Package) -> typing.List[ConnectedClient]:
def find_destination(self, package: Package) -> List[ConnectedClient]:
"""Find a list of destinations for the package.
Parameters:
@ -162,20 +162,8 @@ class Server:
port=self.config.port,
loop=self.loop)
def run_blocking(self, log_level):
# Initialize logging, as Windows doesn't have fork
royalnet_log: logging.Logger = logging.getLogger("royalnet")
royalnet_log.setLevel(log_level)
stream_handler = logging.StreamHandler()
if coloredlogs is not None:
stream_handler.formatter = coloredlogs.ColoredFormatter("{asctime}\t| {processName}\t| {name}\t| {message}",
style="{")
else:
stream_handler.formatter = logging.Formatter("{asctime}\t| {processName}\t| {name}\t| {message}",
style="{")
if len(royalnet_log.handlers) < 1:
royalnet_log.addHandler(stream_handler)
log.debug("Logging: ready")
def run_blocking(self, logging_cfg: Dict[str, Any]):
ru.init_logging(logging_cfg)
if self.loop is None:
self.loop = asyncio.get_event_loop()
self.serve()

View file

@ -42,7 +42,8 @@ class DiscordSerf(Serf):
herald_cfg: Dict[str, Any],
sentry_cfg: Dict[str, Any],
packs_cfg: Dict[str, Any],
serf_cfg: Dict[str, Any]):
serf_cfg: Dict[str, Any],
**_):
if discord is None:
raise ImportError("'discord' extra is not installed")

View file

@ -1,14 +1,12 @@
import logging
import sys
import traceback
import importlib
import asyncio as aio
from typing import *
from sqlalchemy.schema import Table
from royalnet import __version__
from royalnet.commands import *
import royalnet.utils as ru
import royalnet.alchemy as ra
import royalnet.backpack as rb
import royalnet.herald as rh
@ -44,9 +42,8 @@ class Serf:
def __init__(self,
alchemy_cfg: Dict[str, Any],
herald_cfg: Dict[str, Any],
sentry_cfg: Dict[str, Any],
packs_cfg: Dict[str, Any],
serf_cfg: Dict[str, Any]):
**_):
# Import packs
pack_names = packs_cfg["active"]
@ -57,30 +54,6 @@ class Serf:
packs[pack_name] = importlib.import_module(pack_name)
except ImportError as e:
log.error(f"Error during the import of {pack_name}: {e}")
# pack_commands = []
# try:
# pack_commands = pack.available_commands
# except AttributeError:
# log.warning(f"No commands in pack: {pack_name}")
# else:
# log.debug(f"Imported: {len(pack_commands)} commands")
# commands = [*commands, *pack_commands]
# pack_events = []
# try:
# pack_events = pack.available_events
# except AttributeError:
# log.warning(f"No events in pack: {pack_name}")
# else:
# log.debug(f"Imported: {len(pack_events)} events")
# events = [*events, *pack_events]
# pack_tables = []
# try:
# pack_tables = pack.available_events
# except AttributeError:
# log.warning(f"No tables in pack: {pack_name}")
# else:
# log.debug(f"Imported: {len(pack_tables)} tables")
# tables = [*tables, *pack_tables]
log.info(f"Packs: {len(packs)} imported")
self.alchemy: Optional[ra.Alchemy] = None
@ -96,21 +69,22 @@ class Serf:
# TODO: I'm not sure what this is either
self.identity_column: Optional[str] = None
# Find all tables
tables = set()
for pack in packs.values():
try:
tables = tables.union(pack.available_tables)
except AttributeError:
log.warning(f"Pack `{pack}` does not have the `available_tables` attribute.")
continue
# Alchemy
if ra.Alchemy is None:
log.info("Alchemy: not installed")
elif not alchemy_cfg["enabled"]:
log.info("Alchemy: disabled")
else:
self.init_alchemy(alchemy_cfg["database_url"], tables)
# Find all tables
tables = set()
for pack in packs.values():
try:
tables = tables.union(pack.available_tables)
except AttributeError:
log.warning(f"Pack `{pack}` does not have the `available_tables` attribute.")
continue
# Create the Alchemy
self.init_alchemy(alchemy_cfg, tables)
log.info(f"Alchemy: {self.alchemy}")
self.herald: Optional[rh.Link] = None
@ -133,7 +107,7 @@ class Serf:
for pack_name in packs:
pack = packs[pack_name]
pack_cfg = packs_cfg.get(pack_name, default={})
pack_cfg = packs_cfg.get(pack_name, {})
try:
events = pack.available_events
except AttributeError:
@ -146,7 +120,7 @@ class Serf:
log.warning(f"Pack `{pack}` does not have the `available_commands` attribute.")
else:
self.register_commands(commands, pack_cfg)
log.info(f"Events: {len(self.commands)} events")
log.info(f"Events: {len(self.events)} events")
log.info(f"Commands: {len(self.commands)} commands")
if rh.Link is None:
@ -160,9 +134,6 @@ class Serf:
self.loop: Optional[aio.AbstractEventLoop] = None
"""The event loop this Serf is running on."""
self.sentry_dsn: Optional[str] = sentry_cfg["dsn"] if sentry_cfg["enabled"] else None
"""The Sentry DSN / Token. If :const:`None`, Sentry is disabled."""
def init_alchemy(self, alchemy_cfg: Dict[str, Any], tables: Set[type]) -> None:
"""Create and initialize the :class:`Alchemy` with the required tables, and find the link between the master
table and the identity table."""
@ -190,7 +161,7 @@ class Serf:
"""Send a :class:`royalherald.Request` to a specific destination, and wait for a
:class:`royalherald.Response`."""
if self.herald is None:
raise UnsupportedError("`royalherald` is not enabled on this bot.")
raise UnsupportedError("`royalherald` is not enabled on this serf.")
request: rh.Request = rh.Request(handler=event_name, data=kwargs)
response: rh.Response = await self.herald.request(destination=destination, request=request)
if isinstance(response, rh.ResponseFailure):
@ -211,13 +182,13 @@ class Serf:
elif response.extra_info["type"] == "ExternalError":
raise ExternalError(response.extra_info["message"])
else:
raise TypeError(f"Herald action call returned invalid error:\n"
f"[p]{response}[/p]")
raise ValueError(f"Herald action call returned invalid error:\n"
f"[p]{response}[/p]")
elif isinstance(response, rh.ResponseSuccess):
return response.data
else:
raise TypeError(f"Other Herald Link returned unknown response:\n"
f"[p]{response}[/p]")
raise ValueError(f"Other Herald Link returned unknown response:\n"
f"[p]{response}[/p]")
return GenericInterface
@ -237,7 +208,7 @@ class Serf:
except Exception as e:
log.error(f"Skipping: "
f"{SelectedCommand.__qualname__} - {e.__class__.__qualname__} in the initialization.")
self.sentry_exc(e)
ru.sentry_exc(e)
continue
# Link the interface to the command
interface.command = command
@ -263,7 +234,7 @@ class Serf:
def init_herald(self, herald_cfg: Dict[str, Any]):
"""Create a :class:`Link` and bind :class:`Event`."""
herald_cfg["name"] = self.interface_name
self.herald: rh.Link = rh.Link(rh.Config(**herald_cfg), self.network_handler)
self.herald: rh.Link = rh.Link(rh.Config.from_config(**herald_cfg), self.network_handler)
def register_events(self, events: List[Type[Event]], pack_cfg: Dict[str, Any]):
for SelectedEvent in events:
@ -275,7 +246,7 @@ class Serf:
except Exception as e:
log.error(f"Skipping: "
f"{SelectedEvent.__qualname__} - {e.__class__.__qualname__} in the initialization.")
self.sentry_exc(e)
ru.sentry_exc(e)
continue
# Register the event
if SelectedEvent.name in self.events:
@ -296,7 +267,7 @@ class Serf:
response_data = await event.run(**message.data)
return rh.ResponseSuccess(data=response_data)
except Exception as e:
self.sentry_exc(e)
ru.sentry_exc(e)
return rh.ResponseFailure("exception_in_event",
f"An exception was raised in the event for '{message.handler}'.",
extra_info={
@ -306,31 +277,6 @@ class Serf:
elif isinstance(message, rh.Broadcast):
await event.run(**message.data)
@staticmethod
def init_sentry(dsn):
log.debug("Initializing Sentry...")
release = f"royalnet@{__version__}"
sentry_sdk.init(dsn,
integrations=[AioHttpIntegration(),
SqlalchemyIntegration(),
LoggingIntegration(event_level=None)],
release=release)
log.info(f"Sentry: {release}")
# noinspection PyUnreachableCode
@staticmethod
def sentry_exc(exc: Exception,
level: str = "error"):
if sentry_sdk is not None:
with sentry_sdk.configure_scope() as scope:
scope.set_level(level)
sentry_sdk.capture_exception(exc)
log.log(level, f"Captured {level}: {exc}")
# If started in debug mode (without -O), raise the exception, allowing you to see its source
if __debug__:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)
async def call(self, command: Command, data: CommandData, parameters: List[str]):
log.info(f"Calling command: {command.name}")
try:
@ -350,7 +296,7 @@ class Serf:
except CommandError as e:
await data.reply(f"⚠️ {e.message}")
except Exception as e:
self.sentry_exc(e)
ru.sentry_exc(e)
error_message = f"⛔️ [b]{e.__class__.__name__}[/b]\n" + '\n'.join(e.args)
await data.reply(error_message)
@ -360,34 +306,24 @@ class Serf:
# OVERRIDE THIS METHOD!
@classmethod
def run_process(cls, *args, log_level: str = "WARNING", **kwargs):
def run_process(cls, **kwargs):
"""Blockingly create and run the Serf.
This should be used as the target of a :class:`multiprocessing.Process`."""
serf = cls(*args, **kwargs)
ru.init_logging(kwargs["logging_cfg"])
royalnet_log: logging.Logger = logging.getLogger("royalnet")
royalnet_log.setLevel(log_level)
stream_handler = logging.StreamHandler()
if coloredlogs is not None:
stream_handler.formatter = coloredlogs.ColoredFormatter("{asctime}\t| {processName}\t| {name}\t| {message}",
style="{")
else:
stream_handler.formatter = logging.Formatter("{asctime}\t| {processName}\t| {name}\t| {message}",
style="{")
if len(royalnet_log.handlers) < 1:
royalnet_log.addHandler(stream_handler)
log.debug("Logging: ready")
if sentry_sdk is None:
log.info("Sentry: not installed")
elif serf.sentry_dsn is None:
if kwargs["sentry_cfg"] is None or not kwargs["sentry_cfg"]["enabled"]:
log.info("Sentry: disabled")
else:
serf.init_sentry(serf.sentry_dsn)
try:
ru.init_sentry(kwargs["sentry_cfg"])
except ImportError:
log.info("Sentry: not installed")
serf = cls(**kwargs)
serf.loop = aio.get_event_loop()
try:
serf.loop.run_until_complete(serf.run())
except Exception as e:
serf.sentry_exc(e, level="fatal")
ru.sentry_exc(e, level="fatal")

View file

@ -38,7 +38,8 @@ class TelegramSerf(Serf):
herald_cfg: Dict[str, Any],
sentry_cfg: Dict[str, Any],
packs_cfg: Dict[str, Any],
serf_cfg: Dict[str, Any]):
serf_cfg: Dict[str, Any],
**_):
if telegram is None:
raise ImportError("'telegram' extra is not installed")

View file

@ -5,6 +5,8 @@ from .formatters import andformat, underscorize, ytdldateformat, numberemojiform
from .urluuid import to_urluuid, from_urluuid
from .multilock import MultiLock
from .fileaudiosource import FileAudioSource
from .sentry import init_sentry, sentry_exc
from .log import init_logging
__all__ = [
"asyncify",
@ -19,4 +21,7 @@ __all__ = [
"from_urluuid",
"MultiLock",
"FileAudioSource",
"init_sentry",
"sentry_exc",
"init_logging",
]

23
royalnet/utils/log.py Normal file
View file

@ -0,0 +1,23 @@
from typing import *
import logging
try:
import coloredlogs
except ImportError:
coloredlogs = None
log_format = "{asctime}\t| {processName}\t| {name}\t| {message}"
def init_logging(logging_cfg: Dict[str, Any]):
royalnet_log: logging.Logger = logging.getLogger("royalnet")
royalnet_log.setLevel(logging_cfg["log_level"])
stream_handler = logging.StreamHandler()
if coloredlogs is not None:
stream_handler.formatter = coloredlogs.ColoredFormatter(log_format, style="{")
else:
stream_handler.formatter = logging.Formatter(log_format, style="{")
if len(royalnet_log.handlers) < 1:
royalnet_log.addHandler(stream_handler)
royalnet_log.debug("Logging: ready")

47
royalnet/utils/sentry.py Normal file
View file

@ -0,0 +1,47 @@
import logging
import sys
import traceback
from typing import *
from royalnet.version import semantic
try:
import sentry_sdk
from sentry_sdk.integrations.aiohttp import AioHttpIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
except ImportError:
sentry_sdk = None
AioHttpIntegration = None
SqlalchemyIntegration = None
LoggingIntegration = None
log = logging.getLogger(__name__)
def init_sentry(sentry_cfg: Dict[str, Any]):
if sentry_sdk is None:
raise ImportError("`sentry` extra is not installed")
log.debug("Initializing Sentry...")
release = f"royalnet@{semantic}"
sentry_sdk.init(sentry_cfg["dsn"],
integrations=[AioHttpIntegration(),
SqlalchemyIntegration(),
LoggingIntegration(event_level=None)],
release=release)
log.info(f"Sentry: {release}")
# noinspection PyUnreachableCode
def sentry_exc(exc: Exception,
level: str = "ERROR"):
if sentry_sdk is not None:
with sentry_sdk.configure_scope() as scope:
scope.set_level(level.lower())
sentry_sdk.capture_exception(exc)
level_int: int = logging._nameToLevel[level.upper()]
log.log(level_int, f"Captured {level.capitalize()}: {exc}")
# If started in debug mode (without -O), raise the exception, allowing you to see its source
if __debug__:
exc_type, exc_value, exc_traceback = sys.exc_info()
traceback.print_exception(exc_type, exc_value, exc_traceback)

1
royalnet/version.py Normal file
View file

@ -0,0 +1 @@
semantic = "5.1a1"

View file

@ -83,8 +83,8 @@ token = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
[Logging]
# Print to stderr all logging events of an equal or greater level than this
# Possible values are "debug", "info", "warning", "error", "fatal"
log_level = "info"
# Possible values are "DEBUG", "INFO", "WARNING", "ERROR", "FATAL"
log_level = "INFO"
# Optional: install the `coloredlogs` extra for colored output!
[Sentry]
@ -104,7 +104,6 @@ active = [
]
# Configuration settings for specific packs
# Be aware that packs have access to the whole config file
[Packs."royalnet.backpack"]
# Enable exception debug commands and stars
exc_debug = false