1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00

BREAKING: Update process launcher

This commit is contained in:
Steffo 2020-08-18 17:05:45 +02:00
parent 1081f11fc1
commit 6590a56ab5
5 changed files with 136 additions and 122 deletions

View file

@ -1,10 +1,14 @@
from typing import *
import logging import logging
import multiprocessing import multiprocessing
import time
import click import click
import toml import toml
import royalnet.utils as ru import royalnet.utils as ru
import royalnet.serf as rs
try: try:
import royalnet.serf.telegram as rst import royalnet.serf.telegram as rst
@ -40,12 +44,11 @@ log = logging.getLogger(__name__)
@click.command() @click.command()
@click.option("-c", "--config-filename", default="./config.toml", type=click.Path(exists=True), @click.option("-c", "--config-file", default="./config.toml", type=click.File(encoding="utf8"),
help="The filename of the Royalnet configuration file.") help="The filename of the Royalnet configuration file.")
def run(config_filename: str): def run(config_file: str):
# Read the configuration file # Read the configuration file
with open(config_filename, "r") as t: config: dict = toml.load(config_file)
config: dict = toml.load(t)
ru.init_logging(config["Logging"]) ru.init_logging(config["Logging"])
@ -56,123 +59,126 @@ def run(config_filename: str):
ru.init_sentry(config["Sentry"]) ru.init_sentry(config["Sentry"])
except ImportError: except ImportError:
log.info("Sentry: not installed") log.info("Sentry: not installed")
# Herald Server
herald_cfg = None
herald_process = None
if rh is not None and "Herald" in config:
if "Local" in config["Herald"] and config["Herald"]["Local"]["enabled"]:
# Create a Herald server
herald_server = rh.Server(rh.Config.from_config(name="<server>", **config["Herald"]["Local"]))
# Run the Herald server on a new process
herald_process = multiprocessing.Process(name="Herald.Local",
target=herald_server.run_blocking,
daemon=True,
kwargs={
"logging_cfg": config["Logging"]
})
herald_process.start()
herald_cfg = config["Herald"]["Local"]
log.info("Herald: Enabled (Local)")
elif "Remote" in config["Herald"] and config["Herald"]["Remote"]["enabled"]:
log.info("Herald: Enabled (Remote)")
herald_cfg = config["Herald"]["Remote"]
else: else:
log.info("Herald: Disabled") log.info("Sentry: enabled")
else:
processes: Dict[str, ru.RoyalnetProcess] = {}
"""A list of all processes that the launcher should start and monitor."""
herald_cfg = config.get("Herald")
if rh is None:
log.info("Herald: Not installed")
elif herald_cfg is None:
log.warning("Herald: Not configured")
elif not herald_cfg["enabled"]:
log.info("Herald: Disabled") log.info("Herald: Disabled")
elif herald_cfg["mode"] == "local":
log.info("Herald: Enabled (local server)")
def herald_constructor() -> multiprocessing.Process:
# Create a Herald server
herald_server = rh.Server(rh.Config.from_config(name="<server>", **herald_cfg))
# Run the Herald server on a new process
return multiprocessing.Process(
name="Herald.Local",
target=herald_server.run_blocking,
daemon=True,
kwargs={
"logging_cfg": config["Logging"]
}
)
processes["Herald"] = ru.RoyalnetProcess(herald_constructor, None)
elif herald_cfg["mode"] == "remote":
log.info("Herald: Enabled (remote server)")
else:
log.error(f"Invalid Herald mode: {herald_cfg['mode']}")
# Serfs # Serfs
telegram_process = None serfs_cfg = config.get("Serfs")
if rst is not None and "Telegram" in config["Serfs"] and config["Serfs"]["Telegram"]["enabled"]: if serfs_cfg is None:
telegram_process = multiprocessing.Process(name="Serf.Telegram", log.warning("__serfs__: Not configured")
target=rst.TelegramSerf.run_process,
daemon=True,
kwargs={
"alchemy_cfg": config["Alchemy"],
"herald_cfg": herald_cfg,
"packs_cfg": config["Packs"],
"sentry_cfg": config["Sentry"],
"logging_cfg": config["Logging"],
"serf_cfg": config["Serfs"]["Telegram"],
})
telegram_process.start()
log.info("Serf.Telegram: Started")
else: else:
log.info("Serf.Telegram: Disabled") log.debug("__serfs__: Configured")
discord_process = None def configure_serf(name: str, module, class_: Type[rs.Serf]):
if rsd is not None and "Discord" in config["Serfs"] and config["Serfs"]["Discord"]["enabled"]: serf_cfg = serfs_cfg.get(name)
discord_process = multiprocessing.Process(name="Serf.Discord", if module is None:
target=rsd.DiscordSerf.run_process, log.info(f"Serf.{name}: Not installed")
daemon=True, elif serf_cfg is None:
kwargs={ log.warning(f"Serf.{name}: Not configured")
"alchemy_cfg": config["Alchemy"], elif not serf_cfg["enabled"]:
"herald_cfg": herald_cfg, log.info(f"Serf.{name}: Disabled")
"packs_cfg": config["Packs"], else:
"sentry_cfg": config["Sentry"], def serf_constructor() -> multiprocessing.Process:
"logging_cfg": config["Logging"], return multiprocessing.Process(
"serf_cfg": config["Serfs"]["Discord"], name=f"Serf.{name}",
}) target=class_.run_process,
discord_process.start() daemon=True,
log.info("Serf.Discord: Started") kwargs={
else: "alchemy_cfg": config["Alchemy"],
log.info("Serf.Discord: Disabled") "herald_cfg": herald_cfg,
"packs_cfg": config["Packs"],
"sentry_cfg": config["Sentry"],
"logging_cfg": config["Logging"],
"serf_cfg": serf_cfg,
}
)
processes[f"Serf.{name}"] = ru.RoyalnetProcess(serf_constructor, None)
log.info(f"Serf.{name}: Enabled")
matrix_process = None configure_serf("Telegram", rst, rst.TelegramSerf)
if rsm is not None and "Matrix" in config["Serfs"] and config["Serfs"]["Matrix"]["enabled"]: configure_serf("Discord", rsd, rsd.DiscordSerf)
matrix_process = multiprocessing.Process(name="Serf.Matrix", configure_serf("Matrix", rsm, rsm.MatrixSerf)
target=rsm.MatrixSerf.run_process,
daemon=True,
kwargs={
"alchemy_cfg": config["Alchemy"],
"herald_cfg": herald_cfg,
"packs_cfg": config["Packs"],
"sentry_cfg": config["Sentry"],
"logging_cfg": config["Logging"],
"serf_cfg": config["Serfs"]["Matrix"],
})
matrix_process.start()
log.info("Serf.Matrix: Started")
else:
log.info("Serf.Matrix: Disabled")
# Constellation # Constellation
constellation_process = None constellation_cfg = config.get("Constellation")
if rc is not None and "Constellation" in config and config["Constellation"]["enabled"]: if rc is None:
constellation_process = multiprocessing.Process(name="Constellation", log.info(f"Constellation: Not installed")
target=rc.Constellation.run_process, elif constellation_cfg is None:
daemon=True, log.warning(f"Constellation: Not configured")
kwargs={ elif not constellation_cfg["enabled"]:
"alchemy_cfg": config["Alchemy"], log.info(f"Constellation: Disabled")
"herald_cfg": herald_cfg,
"packs_cfg": config["Packs"],
"sentry_cfg": config["Sentry"],
"logging_cfg": config["Logging"],
"constellation_cfg": config["Constellation"],
})
constellation_process.start()
log.info("Constellation: Started")
else: else:
log.info("Constellation: Disabled") def constellation_constructor() -> multiprocessing.Process:
return multiprocessing.Process(
name="Constellation",
target=rc.Constellation.run_process,
daemon=True,
kwargs={
"alchemy_cfg": config["Alchemy"],
"herald_cfg": herald_cfg,
"packs_cfg": config["Packs"],
"sentry_cfg": config["Sentry"],
"logging_cfg": config["Logging"],
"constellation_cfg": config["Constellation"],
}
)
processes["Constellation"] = ru.RoyalnetProcess(constellation_constructor, None)
log.info("All processes started!") log.info("Constellation: Enabled")
if constellation_process is not None:
log.info("Waiting for Constellation to stop...")
constellation_process.join()
if telegram_process is not None:
log.info("Waiting for Serf.Telegram to stop...")
telegram_process.join()
if discord_process is not None:
log.info("Waiting for Serf.Discord to stop...")
discord_process.join()
if matrix_process is not None:
log.info("Waiting for Serf.Matrix to stop...")
matrix_process.join()
if herald_process is not None:
log.info("Waiting for Herald to stop...")
herald_process.join()
try:
# Monitor processes
while True:
log.debug("Checking process status...")
for name, process in processes.items():
if process.current_process is None:
log.info(f"{name}: Starting...")
process.current_process = process.constructor()
process.current_process.start()
elif not process.current_process.is_alive():
log.error(f"{name}: Process is dead, restarting...")
process.current_process = process.constructor()
process.current_process.start()
log.debug("Done, checking again in 60 seconds.")
time.sleep(60)
except KeyboardInterrupt:
log.info("Received SIGTERM, stopping everything!")
for name, process in processes.items():
log.info(f"{name}: Killing...")
process.current_process.kill()
log.info("Goodbye!")
if __name__ == "__main__": if __name__ == "__main__":
run() run()

View file

@ -53,15 +53,16 @@ class Config:
return f"<HeraldConfig for {self.url}>" return f"<HeraldConfig for {self.url}>"
@classmethod @classmethod
def from_config(cls, *, def from_config(
name: str, cls, *,
address: str, name: str,
port: int, address: str,
secret: str, port: int,
secure: bool = False, secret: str,
path: str = "/", secure: bool = False,
enabled: ... = ... path: str = "/",
): **_,
):
return cls( return cls(
name=name, name=name,
address=address, address=address,

View file

@ -10,12 +10,8 @@ Install it with: ::
from .discordserf import DiscordSerf from .discordserf import DiscordSerf
from .escape import escape from .escape import escape
from .playable import Playable
from .voiceplayer import VoicePlayer
__all__ = [ __all__ = [
"escape", "escape",
"DiscordSerf", "DiscordSerf",
"Playable",
"VoicePlayer",
] ]

View file

@ -8,6 +8,7 @@ from .sleep_until import sleep_until
from .strip_tabs import strip_tabs from .strip_tabs import strip_tabs
from .taskslist import TaskList from .taskslist import TaskList
from .urluuid import to_urluuid, from_urluuid from .urluuid import to_urluuid, from_urluuid
from .royalnetprocess import RoyalnetProcess
__all__ = [ __all__ = [
"asyncify", "asyncify",
@ -28,4 +29,5 @@ __all__ = [
"JSON", "JSON",
"strip_tabs", "strip_tabs",
"TaskList", "TaskList",
"RoyalnetProcess",
] ]

View file

@ -0,0 +1,9 @@
from typing import *
import dataclasses
import multiprocessing
@dataclasses.dataclass()
class RoyalnetProcess:
constructor: Callable[[], multiprocessing.Process]
current_process: Optional[multiprocessing.Process]