diff --git a/royalnet/__main__.py b/royalnet/__main__.py index b788ccda..b2009519 100644 --- a/royalnet/__main__.py +++ b/royalnet/__main__.py @@ -1,10 +1,14 @@ +from typing import * + import logging import multiprocessing +import time import click import toml import royalnet.utils as ru +import royalnet.serf as rs try: import royalnet.serf.telegram as rst @@ -40,12 +44,11 @@ log = logging.getLogger(__name__) @click.command() -@click.option("-c", "--config-filename", default="./config.toml", type=click.Path(exists=True), +@click.option("-c", "--config-file", default="./config.toml", type=click.File(encoding="utf8"), help="The filename of the Royalnet configuration file.") -def run(config_filename: str): +def run(config_file: str): # Read the configuration file - with open(config_filename, "r") as t: - config: dict = toml.load(t) + config: dict = toml.load(config_file) ru.init_logging(config["Logging"]) @@ -56,123 +59,126 @@ def run(config_filename: str): ru.init_sentry(config["Sentry"]) except ImportError: log.info("Sentry: not installed") - - # Herald Server - herald_cfg = None - herald_process = None - if rh is not None and "Herald" in config: - if "Local" in config["Herald"] and config["Herald"]["Local"]["enabled"]: - # Create a Herald server - herald_server = rh.Server(rh.Config.from_config(name="", **config["Herald"]["Local"])) - # Run the Herald server on a new process - herald_process = multiprocessing.Process(name="Herald.Local", - target=herald_server.run_blocking, - daemon=True, - kwargs={ - "logging_cfg": config["Logging"] - }) - herald_process.start() - herald_cfg = config["Herald"]["Local"] - log.info("Herald: Enabled (Local)") - elif "Remote" in config["Herald"] and config["Herald"]["Remote"]["enabled"]: - log.info("Herald: Enabled (Remote)") - herald_cfg = config["Herald"]["Remote"] else: - log.info("Herald: Disabled") - else: + log.info("Sentry: enabled") + + processes: Dict[str, ru.RoyalnetProcess] = {} + """A list of all processes that the launcher should start and monitor.""" + + herald_cfg = config.get("Herald") + if rh is None: + log.info("Herald: Not installed") + elif herald_cfg is None: + log.warning("Herald: Not configured") + elif not herald_cfg["enabled"]: log.info("Herald: Disabled") + elif herald_cfg["mode"] == "local": + log.info("Herald: Enabled (local server)") + + def herald_constructor() -> multiprocessing.Process: + # Create a Herald server + herald_server = rh.Server(rh.Config.from_config(name="", **herald_cfg)) + # Run the Herald server on a new process + return multiprocessing.Process( + name="Herald.Local", + target=herald_server.run_blocking, + daemon=True, + kwargs={ + "logging_cfg": config["Logging"] + } + ) + + processes["Herald"] = ru.RoyalnetProcess(herald_constructor, None) + elif herald_cfg["mode"] == "remote": + log.info("Herald: Enabled (remote server)") + else: + log.error(f"Invalid Herald mode: {herald_cfg['mode']}") # Serfs - telegram_process = None - if rst is not None and "Telegram" in config["Serfs"] and config["Serfs"]["Telegram"]["enabled"]: - telegram_process = multiprocessing.Process(name="Serf.Telegram", - target=rst.TelegramSerf.run_process, - daemon=True, - kwargs={ - "alchemy_cfg": config["Alchemy"], - "herald_cfg": herald_cfg, - "packs_cfg": config["Packs"], - "sentry_cfg": config["Sentry"], - "logging_cfg": config["Logging"], - "serf_cfg": config["Serfs"]["Telegram"], - }) - telegram_process.start() - log.info("Serf.Telegram: Started") + serfs_cfg = config.get("Serfs") + if serfs_cfg is None: + log.warning("__serfs__: Not configured") else: - log.info("Serf.Telegram: Disabled") + log.debug("__serfs__: Configured") - discord_process = None - if rsd is not None and "Discord" in config["Serfs"] and config["Serfs"]["Discord"]["enabled"]: - discord_process = multiprocessing.Process(name="Serf.Discord", - target=rsd.DiscordSerf.run_process, - daemon=True, - kwargs={ - "alchemy_cfg": config["Alchemy"], - "herald_cfg": herald_cfg, - "packs_cfg": config["Packs"], - "sentry_cfg": config["Sentry"], - "logging_cfg": config["Logging"], - "serf_cfg": config["Serfs"]["Discord"], - }) - discord_process.start() - log.info("Serf.Discord: Started") - else: - log.info("Serf.Discord: Disabled") + def configure_serf(name: str, module, class_: Type[rs.Serf]): + serf_cfg = serfs_cfg.get(name) + if module is None: + log.info(f"Serf.{name}: Not installed") + elif serf_cfg is None: + log.warning(f"Serf.{name}: Not configured") + elif not serf_cfg["enabled"]: + log.info(f"Serf.{name}: Disabled") + else: + def serf_constructor() -> multiprocessing.Process: + return multiprocessing.Process( + name=f"Serf.{name}", + target=class_.run_process, + daemon=True, + kwargs={ + "alchemy_cfg": config["Alchemy"], + "herald_cfg": herald_cfg, + "packs_cfg": config["Packs"], + "sentry_cfg": config["Sentry"], + "logging_cfg": config["Logging"], + "serf_cfg": serf_cfg, + } + ) + processes[f"Serf.{name}"] = ru.RoyalnetProcess(serf_constructor, None) + log.info(f"Serf.{name}: Enabled") - matrix_process = None - if rsm is not None and "Matrix" in config["Serfs"] and config["Serfs"]["Matrix"]["enabled"]: - matrix_process = multiprocessing.Process(name="Serf.Matrix", - target=rsm.MatrixSerf.run_process, - daemon=True, - kwargs={ - "alchemy_cfg": config["Alchemy"], - "herald_cfg": herald_cfg, - "packs_cfg": config["Packs"], - "sentry_cfg": config["Sentry"], - "logging_cfg": config["Logging"], - "serf_cfg": config["Serfs"]["Matrix"], - }) - matrix_process.start() - log.info("Serf.Matrix: Started") - else: - log.info("Serf.Matrix: Disabled") + configure_serf("Telegram", rst, rst.TelegramSerf) + configure_serf("Discord", rsd, rsd.DiscordSerf) + configure_serf("Matrix", rsm, rsm.MatrixSerf) # Constellation - constellation_process = None - if rc is not None and "Constellation" in config and config["Constellation"]["enabled"]: - constellation_process = multiprocessing.Process(name="Constellation", - target=rc.Constellation.run_process, - daemon=True, - kwargs={ - "alchemy_cfg": config["Alchemy"], - "herald_cfg": herald_cfg, - "packs_cfg": config["Packs"], - "sentry_cfg": config["Sentry"], - "logging_cfg": config["Logging"], - "constellation_cfg": config["Constellation"], - }) - constellation_process.start() - log.info("Constellation: Started") + constellation_cfg = config.get("Constellation") + if rc is None: + log.info(f"Constellation: Not installed") + elif constellation_cfg is None: + log.warning(f"Constellation: Not configured") + elif not constellation_cfg["enabled"]: + log.info(f"Constellation: Disabled") else: - log.info("Constellation: Disabled") + def constellation_constructor() -> multiprocessing.Process: + return multiprocessing.Process( + name="Constellation", + target=rc.Constellation.run_process, + daemon=True, + kwargs={ + "alchemy_cfg": config["Alchemy"], + "herald_cfg": herald_cfg, + "packs_cfg": config["Packs"], + "sentry_cfg": config["Sentry"], + "logging_cfg": config["Logging"], + "constellation_cfg": config["Constellation"], + } + ) + processes["Constellation"] = ru.RoyalnetProcess(constellation_constructor, None) - log.info("All processes started!") - if constellation_process is not None: - log.info("Waiting for Constellation to stop...") - constellation_process.join() - if telegram_process is not None: - log.info("Waiting for Serf.Telegram to stop...") - telegram_process.join() - if discord_process is not None: - log.info("Waiting for Serf.Discord to stop...") - discord_process.join() - if matrix_process is not None: - log.info("Waiting for Serf.Matrix to stop...") - matrix_process.join() - if herald_process is not None: - log.info("Waiting for Herald to stop...") - herald_process.join() + log.info("Constellation: Enabled") + try: + # Monitor processes + while True: + log.debug("Checking process status...") + for name, process in processes.items(): + if process.current_process is None: + log.info(f"{name}: Starting...") + process.current_process = process.constructor() + process.current_process.start() + elif not process.current_process.is_alive(): + log.error(f"{name}: Process is dead, restarting...") + process.current_process = process.constructor() + process.current_process.start() + log.debug("Done, checking again in 60 seconds.") + time.sleep(60) + except KeyboardInterrupt: + log.info("Received SIGTERM, stopping everything!") + for name, process in processes.items(): + log.info(f"{name}: Killing...") + process.current_process.kill() + log.info("Goodbye!") if __name__ == "__main__": run() diff --git a/royalnet/herald/config.py b/royalnet/herald/config.py index e74a8577..c96f779c 100644 --- a/royalnet/herald/config.py +++ b/royalnet/herald/config.py @@ -53,15 +53,16 @@ class Config: return f"" @classmethod - def from_config(cls, *, - name: str, - address: str, - port: int, - secret: str, - secure: bool = False, - path: str = "/", - enabled: ... = ... - ): + def from_config( + cls, *, + name: str, + address: str, + port: int, + secret: str, + secure: bool = False, + path: str = "/", + **_, + ): return cls( name=name, address=address, diff --git a/royalnet/serf/discord/__init__.py b/royalnet/serf/discord/__init__.py index 9e1eb224..6e9ff0d8 100644 --- a/royalnet/serf/discord/__init__.py +++ b/royalnet/serf/discord/__init__.py @@ -10,12 +10,8 @@ Install it with: :: from .discordserf import DiscordSerf from .escape import escape -from .playable import Playable -from .voiceplayer import VoicePlayer __all__ = [ "escape", "DiscordSerf", - "Playable", - "VoicePlayer", ] diff --git a/royalnet/utils/__init__.py b/royalnet/utils/__init__.py index fa7c78b9..979f7e49 100644 --- a/royalnet/utils/__init__.py +++ b/royalnet/utils/__init__.py @@ -8,6 +8,7 @@ from .sleep_until import sleep_until from .strip_tabs import strip_tabs from .taskslist import TaskList from .urluuid import to_urluuid, from_urluuid +from .royalnetprocess import RoyalnetProcess __all__ = [ "asyncify", @@ -28,4 +29,5 @@ __all__ = [ "JSON", "strip_tabs", "TaskList", + "RoyalnetProcess", ] diff --git a/royalnet/utils/royalnetprocess.py b/royalnet/utils/royalnetprocess.py new file mode 100644 index 00000000..b9acc1a0 --- /dev/null +++ b/royalnet/utils/royalnetprocess.py @@ -0,0 +1,9 @@ +from typing import * +import dataclasses +import multiprocessing + + +@dataclasses.dataclass() +class RoyalnetProcess: + constructor: Callable[[], multiprocessing.Process] + current_process: Optional[multiprocessing.Process]