1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 19:44:20 +00:00
royalnet/royalnet_console/pda.py

121 lines
3.6 KiB
Python
Raw Normal View History

2021-01-10 17:37:18 +00:00
# Module docstring
"""
The PDA ("main" class) for the :mod:`royalnet_console` frontend.
"""
# Special imports
from __future__ import annotations
import royalnet.royaltyping as t
# External imports
import logging
import asyncio
import royalnet.engineer as engi
# Internal imports
from . import magazine
# Special global objects
log = logging.getLogger(__name__)
# Code
class ConsolePDA:
"""
.. todo:: Document the :class:`.ConsolePDA` class.
"""
def __init__(self):
log.debug(f"Creating new ConsolePDA...")
log.debug(f"Creating new magazine...")
self.mag = magazine.ConsoleMagazine()
self.dispenser: t.Optional[engi.Dispenser] = None
"""
The dispenser for this PDA.
"""
self.conversations: t.List[engi.Conversation] = []
"""
A :class:`list` of conversations to run before a new event is :meth:`.put` in a
:class:`~royalnet.engineer.dispenser.Dispenser`.
"""
async def run(self):
while True:
message = input()
log.debug(f"Received a new message: {message!r}")
log.debug(f"Creating ConsoleMessage from: {message!r}")
bullet = self.mag.Message(_text=message)
log.debug(f"Putting bullet: {bullet!r}")
await self.put_bullet(bullet=bullet)
def register_conversation(self, conv: engi.Conversation) -> None:
"""
Register a new conversation in the PDA.
:param conv: The conversation to register.
"""
log.info(f"Registering conversation: {conv!r}")
self.conversations.append(conv)
def unregister_conversation(self, conv: engi.Conversation) -> None:
"""
Unregister a conversation from the PDA.
:param conv: The conversation to unregister.
"""
log.info(f"Unregistering conversation: {conv!r}")
self.conversations.remove(conv)
def register_partial(self, part: engi.PartialCommand, names: t.List[str]) -> engi.Command:
"""
Register a new :class:`PartialCommand` in the PDA, converting it to a :class:`Command` in the process.
:param part: The :class:`PartialCommand` to register.
:param names: The :attr:`~royalnet.engineer.Command.names` to register the command as.
:return: The resulting :class:`Command`.
"""
log.debug(f"Completing partial: {part!r}")
if part.syntax:
2021-01-10 18:54:47 +00:00
command = part.complete(pattern=r"^{name}\s+{syntax}$", names=names)
2021-01-10 17:37:18 +00:00
else:
2021-01-10 18:54:47 +00:00
command = part.complete(pattern=r"^{name}$", names=names)
2021-01-10 17:37:18 +00:00
self.register_conversation(command)
return command
async def put_bullet(self, bullet: engi.Bullet) -> None:
"""
Insert a new bullet into the dispenser.
:param bullet: The bullet to put in the dispenser.
"""
if not self.dispenser:
log.debug(f"Dispenser not found, creating one...")
self.dispenser = engi.Dispenser()
log.debug("Getting running loop...")
loop = asyncio.get_running_loop()
for conversation in self.conversations:
log.debug(f"Creating run task for: {conversation!r}")
loop.create_task(self.dispenser.run(conversation), name=f"{repr(conversation)}")
log.debug("Running a event loop cycle...")
await asyncio.sleep(0)
log.debug(f"Putting bullet {bullet!r} in dispenser {self.dispenser!r}...")
await self.dispenser.put(bullet)
log.debug("Awaiting another loop cycle")
await asyncio.sleep(0)
2021-01-10 17:37:18 +00:00
# Objects exported by this module
__all__ = (
"ConsolePDA",
)