mirror of
https://github.com/RYGhub/royalnet.git
synced 2024-11-23 11:34:18 +00:00
✨ Restructure PDA
This commit is contained in:
parent
98846f5c56
commit
ccaaf4fb34
11 changed files with 415 additions and 286 deletions
|
@ -1,6 +1,6 @@
|
||||||
[tool.poetry]
|
[tool.poetry]
|
||||||
name = "royalnet"
|
name = "royalnet"
|
||||||
version = "6.1.4"
|
version = "6.2.0"
|
||||||
description = "A multipurpose bot framework"
|
description = "A multipurpose bot framework"
|
||||||
authors = ["Stefano Pigozzi <me@steffo.eu>"]
|
authors = ["Stefano Pigozzi <me@steffo.eu>"]
|
||||||
license = "AGPL-3.0-or-later"
|
license = "AGPL-3.0-or-later"
|
||||||
|
|
18
royalnet.iml
18
royalnet.iml
|
@ -2,8 +2,24 @@
|
||||||
<module type="PYTHON_MODULE" version="4">
|
<module type="PYTHON_MODULE" version="4">
|
||||||
<component name="NewModuleRootManager" inherit-compiler-output="true">
|
<component name="NewModuleRootManager" inherit-compiler-output="true">
|
||||||
<exclude-output />
|
<exclude-output />
|
||||||
<content url="file://$MODULE_DIR$" />
|
<content url="file://$MODULE_DIR$">
|
||||||
|
<sourceFolder url="file://$MODULE_DIR$/docs/source" isTestSource="false" />
|
||||||
|
<sourceFolder url="file://$MODULE_DIR$/docs/source/_static" type="java-resource" />
|
||||||
|
<sourceFolder url="file://$MODULE_DIR$/royalnet" isTestSource="false" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/.pytest_cache" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/dist" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/docs/build" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/royalnet.egg-info" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/royalnet/.pytest_cache" />
|
||||||
|
</content>
|
||||||
<orderEntry type="jdk" jdkName="Poetry (royalnet)" jdkType="Python SDK" />
|
<orderEntry type="jdk" jdkName="Poetry (royalnet)" jdkType="Python SDK" />
|
||||||
<orderEntry type="sourceFolder" forTests="false" />
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
</component>
|
</component>
|
||||||
|
<component name="TemplatesService">
|
||||||
|
<option name="TEMPLATE_FOLDERS">
|
||||||
|
<list>
|
||||||
|
<option value="$MODULE_DIR$/docs/source/_templates" />
|
||||||
|
</list>
|
||||||
|
</option>
|
||||||
|
</component>
|
||||||
</module>
|
</module>
|
|
@ -1,284 +0,0 @@
|
||||||
import abc
|
|
||||||
import asyncio
|
|
||||||
import contextlib
|
|
||||||
import sqlalchemy.orm
|
|
||||||
import sqlalchemy.engine
|
|
||||||
import royalnet.royaltyping as t
|
|
||||||
|
|
||||||
if t.TYPE_CHECKING:
|
|
||||||
from royalnet.engineer.dispenser import Dispenser
|
|
||||||
from royalnet.engineer.conversation import ConversationProtocol
|
|
||||||
from royalnet.engineer.bullet.projectiles import Projectile
|
|
||||||
from royalnet.engineer.command import PartialCommand, FullCommand
|
|
||||||
DispenserKey = t.TypeVar("DispenserKey")
|
|
||||||
|
|
||||||
|
|
||||||
class PDA(metaclass=abc.ABCMeta):
|
|
||||||
"""
|
|
||||||
A :class:`.PDA` is an implementation of the :mod:`royalnet.engineer` stack for a specific chat platform.
|
|
||||||
|
|
||||||
This is the :class:`abc.ABC` that new PDAs should use to implement their chat platform features.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.dispensers: dict["DispenserKey", "Dispenser"] = self._create_dispensers()
|
|
||||||
"""
|
|
||||||
A :class:`dict` where the :class:`~royalnet.engineer.dispenser.Dispenser`\\ s of the :class:`.PDA` are mapped
|
|
||||||
to a implementation-specific key.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
return f"<{self.__class__.__qualname__} ({len(self.dispensers)} dispensers)>"
|
|
||||||
|
|
||||||
def _create_dispensers(self) -> dict[t.Any, "Dispenser"]:
|
|
||||||
"""
|
|
||||||
Create the :attr:`.dispensers` dictionary of the PDA.
|
|
||||||
|
|
||||||
:return: The created dictionary (empty by default).
|
|
||||||
"""
|
|
||||||
|
|
||||||
return {}
|
|
||||||
|
|
||||||
def _get_dispenser(self, key: "DispenserKey") -> t.Optional["Dispenser"]:
|
|
||||||
"""
|
|
||||||
Get a :class:`~royalnet.engineer.dispenser.Dispenser` from :attr:`.dispenser` knowing its key.
|
|
||||||
|
|
||||||
:param key: The key to get the dispenser with.
|
|
||||||
:return: The retrieved dispenser.
|
|
||||||
|
|
||||||
.. seealso:: :meth:`dict.get`
|
|
||||||
"""
|
|
||||||
|
|
||||||
return self.dispensers.get(key)
|
|
||||||
|
|
||||||
def _make_dispenser(self) -> "Dispenser":
|
|
||||||
"""
|
|
||||||
Create a new dispenser.
|
|
||||||
|
|
||||||
:return: The created dispenser.
|
|
||||||
"""
|
|
||||||
|
|
||||||
return Dispenser()
|
|
||||||
|
|
||||||
async def _hook_pre_put(self, key: "DispenserKey", dispenser, projectile) -> bool:
|
|
||||||
"""
|
|
||||||
**Hook** called before putting a :class:`~royalnet.engineer.bullet.projectile.Projectile` in a
|
|
||||||
:class:`~royalnet.engineer.dispenser.Dispenser`.
|
|
||||||
|
|
||||||
:param key: The key identifying the :class:`~royalnet.engineer.dispenser.Dispenser` among the other
|
|
||||||
:attr:`.dispensers`.
|
|
||||||
:param dispenser: The :class:`~royalnet.engineer.dispenser.Dispenser` the projectile is being
|
|
||||||
:class:`~royalnet.engineer.dispenser.Dispenser.put` in.
|
|
||||||
:param projectile: The :class:`~royalnet.engineer.bullet.projectile.Projectile` which is being inserted.
|
|
||||||
:return: A :class:`bool`, which will cancel the :meth:`.put` if is :data:`False`.
|
|
||||||
"""
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def _hook_post_put(self, key, dispenser, projectile) -> None:
|
|
||||||
"""
|
|
||||||
**Hook** called after putting a :class:`~royalnet.engineer.bullet.projectile.Projectile` in a
|
|
||||||
:class:`~royalnet.engineer.dispenser.Dispenser`.
|
|
||||||
|
|
||||||
:param key: The key identifying the :class:`~royalnet.engineer.dispenser.Dispenser` among the other
|
|
||||||
:attr:`.dispensers`.
|
|
||||||
:param dispenser: The :class:`~royalnet.engineer.dispenser.Dispenser` the projectile was
|
|
||||||
:class:`~royalnet.engineer.dispenser.Dispenser.put` in.
|
|
||||||
:param projectile: The :class:`~royalnet.engineer.bullet.projectile.Projectile` which was inserted.
|
|
||||||
"""
|
|
||||||
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def _asyncio_step(self) -> None:
|
|
||||||
"""
|
|
||||||
Perform an iteration of the event loop.
|
|
||||||
|
|
||||||
Equivalent to ``asyncio.sleep(0)``.
|
|
||||||
"""
|
|
||||||
|
|
||||||
await asyncio.sleep(0)
|
|
||||||
|
|
||||||
async def put(self, key: "DispenserKey", projectile: "Projectile") -> None:
|
|
||||||
"""
|
|
||||||
Put a :class:`~royalnet.engineer.bullet.projectile.Projectile` in the
|
|
||||||
:class:`~royalnet.engineer.dispenser.Dispenser` with the specified key.
|
|
||||||
|
|
||||||
:param key: The key identifying the :class:`~royalnet.engineer.dispenser.Dispenser` among the other
|
|
||||||
:attr:`.dispensers`.
|
|
||||||
:param projectile: The :class:`~royalnet.engineer.bullet.projectile.Projectile` to insert.
|
|
||||||
|
|
||||||
.. seealso:: :meth:`._hook_pre_put`\\ , :meth:`.hook_post_put`
|
|
||||||
"""
|
|
||||||
|
|
||||||
dispenser = self._get_dispenser(key)
|
|
||||||
|
|
||||||
if dispenser is None:
|
|
||||||
dispenser = self._make_dispenser()
|
|
||||||
self.dispensers[key] = dispenser
|
|
||||||
|
|
||||||
go_on = await self._hook_pre_put(key=key, dispenser=dispenser, projectile=projectile)
|
|
||||||
await self._asyncio_step()
|
|
||||||
|
|
||||||
if go_on:
|
|
||||||
return
|
|
||||||
|
|
||||||
await dispenser.put(projectile)
|
|
||||||
await self._asyncio_step()
|
|
||||||
|
|
||||||
await self._hook_post_put(key=key, dispenser=dispenser, projectile=projectile)
|
|
||||||
await self._asyncio_step()
|
|
||||||
|
|
||||||
|
|
||||||
class ConversationListPDA(PDA, metaclass=abc.ABCMeta):
|
|
||||||
"""
|
|
||||||
A :class:`.PDA` which instantiates multiple :class:`~royalnet.engineer.conversation.Conversation`\\ s before putting
|
|
||||||
a :class:`~royalnet.engineer.bullet.projectile.Projectile` in a :class:`~royalnet.engineer.dispenser.Dispenser` .
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, conversation_kwargs: dict[str, t.Any]):
|
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
self.conversations: list["ConversationProtocol"] = self._create_conversations()
|
|
||||||
self.conversation_kwargs: dict[str, t.Any] = conversation_kwargs
|
|
||||||
self._conversation_coro: list[t.Awaitable[t.Any]] = []
|
|
||||||
|
|
||||||
def _create_conversations(self) -> list["ConversationProtocol"]:
|
|
||||||
"""
|
|
||||||
Create the :attr:`.conversations` :class:`list` of the :class:`.ConversationListPDA`\\ .
|
|
||||||
|
|
||||||
:return: The created :class:`list`\\ .
|
|
||||||
"""
|
|
||||||
|
|
||||||
return []
|
|
||||||
|
|
||||||
@contextlib.asynccontextmanager
|
|
||||||
async def _conversation_kwargs(self, conv: "ConversationProtocol") -> dict[str, t.Any]:
|
|
||||||
"""
|
|
||||||
:func:`contextlib.asynccontextmanager` factory which yields the arguments to pass to newly created
|
|
||||||
:class:`~royalnet.engineer.conversation.Conversation`\\ s .
|
|
||||||
|
|
||||||
By default, the following arguments are passed:
|
|
||||||
- ``_pda``: contains this :class:`.PDA`
|
|
||||||
- ``_conv``: contains the :class:`~royalnet.engineer.conversation.Conversation` which was just created.
|
|
||||||
|
|
||||||
:param conv: The :class:`~royalnet.engineer.conversation.Conversation` to create the args for.
|
|
||||||
:return: The corresponding :func:`contextlib.asynccontextmanager`\\ .
|
|
||||||
"""
|
|
||||||
|
|
||||||
yield {
|
|
||||||
"_pda": self,
|
|
||||||
"_conv": conv,
|
|
||||||
**self.conversation_kwargs,
|
|
||||||
}
|
|
||||||
|
|
||||||
async def _run_conversation(self, dispenser: "Dispenser", conv: "ConversationProtocol", **override_kwargs) -> None:
|
|
||||||
"""
|
|
||||||
Run a :class:`~royalnet.engineer.conversation.Conversation` with its proper kwargs.
|
|
||||||
|
|
||||||
:param dispenser: The :class:`~royalnet.engineer.dispenser.Dispenser` to run the
|
|
||||||
:class:`~royalnet.engineer.conversation.Conversation` in.
|
|
||||||
:param conv: The :class:`~royalnet.engineer.conversation.Conversation` to run.
|
|
||||||
:param override_kwargs: Kwargs to be passed to the conversation in addition to the ones generated by
|
|
||||||
:meth:`_conversation_kwargs`\\ .
|
|
||||||
"""
|
|
||||||
async with self._conversation_kwargs(conv=conv) as default_kwargs:
|
|
||||||
coro = dispenser.run(conv, **default_kwargs, **override_kwargs)
|
|
||||||
|
|
||||||
self._conversation_coro.append(coro)
|
|
||||||
try:
|
|
||||||
await coro
|
|
||||||
finally:
|
|
||||||
self._conversation_coro.remove(coro)
|
|
||||||
|
|
||||||
async def _hook_pre_put(self, key, dispenser, projectile):
|
|
||||||
await super()._hook_pre_put(key=key, dispenser=dispenser, projectile=projectile)
|
|
||||||
for conv in self.conversations:
|
|
||||||
asyncio.create_task(self._run_conversation(dispenser=dispenser, conv=conv))
|
|
||||||
|
|
||||||
async def _hook_post_put(self, key, dispenser, projectile):
|
|
||||||
await super()._hook_post_put(key=key, dispenser=dispenser, projectile=projectile)
|
|
||||||
|
|
||||||
def register_conversation(self, conversation: "ConversationProtocol") -> None:
|
|
||||||
"""
|
|
||||||
Register a new :class:`~royalnet.engineer.conversation.Conversation` to be run when a new
|
|
||||||
:class:`~royalnet.engineer.bullet.projectile.Projectile` is :meth:`.put`\\ .
|
|
||||||
|
|
||||||
:param conversation: The :class:`~royalnet.engineer.conversation.Conversation` to register.
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.conversations.append(conversation)
|
|
||||||
|
|
||||||
def unregister_conversation(self, conversation: "ConversationProtocol") -> None:
|
|
||||||
"""
|
|
||||||
Unregister a :class:`~royalnet.engineer.conversation.Conversation`, stopping it from being run when a new
|
|
||||||
:class:`~royalnet.engineer.bullet.projectile.Projectile` is :meth:`.put`\\ .
|
|
||||||
|
|
||||||
:param conversation: The :class:`~royalnet.engineer.conversation.Conversation` to unregister.
|
|
||||||
"""
|
|
||||||
|
|
||||||
self.conversations.remove(conversation)
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
|
||||||
def _make_partialcommand_pattern(self, partial: "PartialCommand") -> str:
|
|
||||||
"""
|
|
||||||
The pattern to use when :meth:`.complete_partialcommand` is called.
|
|
||||||
|
|
||||||
:param partial: The :class:`~royalnet.engineer.command.PartialCommand` to complete.
|
|
||||||
:return: A :class:`str` to use as pattern.
|
|
||||||
"""
|
|
||||||
|
|
||||||
raise NotImplementedError()
|
|
||||||
|
|
||||||
def complete_partialcommand(self, partial: "PartialCommand", names: list[str]) -> "FullCommand":
|
|
||||||
"""
|
|
||||||
Complete a :class:`~royalnet.engineer.command.PartialCommand` with its missing fields.
|
|
||||||
|
|
||||||
:param partial: The :class:`~royalnet.engineer.command.PartialCommand` to complete.
|
|
||||||
:param names: The :attr:`~royalnet.engineer.command.FullCommand.names` of that the command should have.
|
|
||||||
:return: The completed :class:`~royalnet.engineer.command.FulLCommand` .
|
|
||||||
"""
|
|
||||||
|
|
||||||
return partial.complete(names=names, pattern=self._make_partialcommand_pattern(partial))
|
|
||||||
|
|
||||||
def register_partialcommand(self, partial: "PartialCommand", names: list[str]) -> "FullCommand":
|
|
||||||
"""
|
|
||||||
A combination of :meth:`.register_conversation` and :meth:`.complete_partialcommand` .
|
|
||||||
|
|
||||||
:param partial: The :class:`~royalnet.engineer.command.PartialCommand` to complete.
|
|
||||||
:param names: The :attr:`~royalnet.engineer.command.FullCommand.names` of that the command should have.
|
|
||||||
:return: The completed :class:`~royalnet.engineer.command.FulLCommand` .
|
|
||||||
"""
|
|
||||||
|
|
||||||
full = self.complete_partialcommand(partial=partial, names=names)
|
|
||||||
self.register_conversation(full)
|
|
||||||
return full
|
|
||||||
|
|
||||||
|
|
||||||
class SQLAlchemyPDA(ConversationListPDA, metaclass=abc.ABCMeta):
|
|
||||||
"""
|
|
||||||
Extends :class:`.ConversationListPDA` with database support provided by :mod:`sqlalchemy`\\ by adding the
|
|
||||||
``_session`` kwarg to the :meth:`._conversation_kwargs`.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, engine: sqlalchemy.engine.Engine, conversation_kwargs: dict[str, t.Any]):
|
|
||||||
super().__init__(conversation_kwargs)
|
|
||||||
self.engine: sqlalchemy.engine.Engine = engine
|
|
||||||
self.Session: sqlalchemy.orm.sessionmaker = sqlalchemy.orm.sessionmaker(bind=self.engine)
|
|
||||||
|
|
||||||
@contextlib.asynccontextmanager
|
|
||||||
async def _conversation_kwargs(self, conv: "ConversationProtocol") -> dict[str, t.Any]:
|
|
||||||
|
|
||||||
with self.Session(future=True) as session:
|
|
||||||
|
|
||||||
yield {
|
|
||||||
"_session": session,
|
|
||||||
**super()._conversation_kwargs(conv=conv),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
__all__ = (
|
|
||||||
"PDA",
|
|
||||||
"ConversationListPDA",
|
|
||||||
"SQLAlchemyPDA",
|
|
||||||
|
|
||||||
)
|
|
3
royalnet/engineer/pda/__init__.py
Normal file
3
royalnet/engineer/pda/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
from .base import *
|
||||||
|
from .extensions import *
|
||||||
|
from .implementations import *
|
33
royalnet/engineer/pda/base.py
Normal file
33
royalnet/engineer/pda/base.py
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
"""
|
||||||
|
This module contains the base :class:`.PDA` class.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import royalnet.royaltyping as t
|
||||||
|
|
||||||
|
if t.TYPE_CHECKING:
|
||||||
|
from royalnet.engineer.pda.implementations.base import PDAImplementation
|
||||||
|
DispenserKey = t.TypeVar("DispenserKey")
|
||||||
|
|
||||||
|
|
||||||
|
class PDA(metaclass=abc.ABCMeta):
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, implementations: list["PDAImplementation"]):
|
||||||
|
self.implementations: dict[str, "PDAImplementation"] = {}
|
||||||
|
for implementation in implementations:
|
||||||
|
implementation.bind(pda=self)
|
||||||
|
self.implementations[implementation.name] = implementation
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<{self.__class__.__qualname__} implementing {', '.join(self.implementations.keys())}>"
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.implementations)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
"PDA",
|
||||||
|
)
|
6
royalnet/engineer/pda/extensions/__init__.py
Normal file
6
royalnet/engineer/pda/extensions/__init__.py
Normal file
|
@ -0,0 +1,6 @@
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .base import *
|
||||||
|
from .database import *
|
30
royalnet/engineer/pda/extensions/base.py
Normal file
30
royalnet/engineer/pda/extensions/base.py
Normal file
|
@ -0,0 +1,30 @@
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import royalnet.royaltyping as t
|
||||||
|
import abc
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
|
||||||
|
class PDAExtension(metaclass=abc.ABCMeta):
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
@contextlib.asynccontextmanager
|
||||||
|
async def kwargs(self, kwargs: t.Kwargs) -> t.Kwargs:
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
yield NotImplemented
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
"PDAExtension",
|
||||||
|
)
|
35
royalnet/engineer/pda/extensions/database.py
Normal file
35
royalnet/engineer/pda/extensions/database.py
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import royalnet.royaltyping as t
|
||||||
|
import sqlalchemy
|
||||||
|
import sqlalchemy.orm
|
||||||
|
import contextlib
|
||||||
|
|
||||||
|
from . import base
|
||||||
|
|
||||||
|
|
||||||
|
class SQLAlchemyExtension(base.PDAExtension):
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, engine: sqlalchemy.engine.Engine, session_kwargs: t.Kwargs = None):
|
||||||
|
super().__init__()
|
||||||
|
self.engine: sqlalchemy.engine.Engine = engine
|
||||||
|
self.Session: sqlalchemy.orm.sessionmaker = sqlalchemy.orm.sessionmaker(bind=self.engine)
|
||||||
|
self.session_kwargs: t.Kwargs = {"future": True, **(session_kwargs or {})}
|
||||||
|
|
||||||
|
@contextlib.asynccontextmanager
|
||||||
|
async def kwargs(self, kwargs: dict[str, t.Any]) -> dict[str, t.Any]:
|
||||||
|
with self.Session(**self.session_kwargs) as session:
|
||||||
|
yield {
|
||||||
|
**kwargs,
|
||||||
|
"_session": session,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
"SQLAlchemyExtension",
|
||||||
|
)
|
5
royalnet/engineer/pda/implementations/__init__.py
Normal file
5
royalnet/engineer/pda/implementations/__init__.py
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from .base import *
|
282
royalnet/engineer/pda/implementations/base.py
Normal file
282
royalnet/engineer/pda/implementations/base.py
Normal file
|
@ -0,0 +1,282 @@
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import royalnet.royaltyping as t
|
||||||
|
import abc
|
||||||
|
import contextlib
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
if t.TYPE_CHECKING:
|
||||||
|
from royalnet.engineer.conversation import ConversationProtocol
|
||||||
|
from royalnet.engineer.dispenser import Dispenser
|
||||||
|
from royalnet.engineer.pda.extensions.base import PDAExtension
|
||||||
|
from royalnet.engineer.pda.base import PDA
|
||||||
|
from royalnet.engineer.command import PartialCommand, FullCommand
|
||||||
|
from royalnet.engineer.bullet.projectiles import Projectile
|
||||||
|
|
||||||
|
DispenserKey = t.TypeVar("DispenserKey")
|
||||||
|
|
||||||
|
|
||||||
|
class PDAImplementation(metaclass=abc.ABCMeta):
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name: str, extensions: list["PDAExtension"] = None):
|
||||||
|
self.name: str = f"{self.namespace}.{name}"
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.extensions: list["PDAExtension"] = extensions or []
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.bound_to: t.Optional["PDA"] = None
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return f"<PDAImplementation {self.name}>"
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return self.name
|
||||||
|
|
||||||
|
def bind(self, pda: "PDA") -> None:
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if self.bound_to is not None:
|
||||||
|
raise ImplementationAlreadyBound()
|
||||||
|
self.bound_to = pda
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
@property
|
||||||
|
def namespace(self):
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
|
class ImplementationException(Exception):
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class ImplementationAlreadyBound(ImplementationException):
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class ConversationListImplementation(PDAImplementation, metaclass=abc.ABCMeta):
|
||||||
|
"""
|
||||||
|
A :class:`.PDAImplementation` which instantiates multiple :class:`~royalnet.engineer.conversation.Conversation`\\ s
|
||||||
|
before putting a :class:`~royalnet.engineer.bullet.projectile.Projectile` in a
|
||||||
|
:class:`~royalnet.engineer.dispenser.Dispenser` .
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name: str):
|
||||||
|
super().__init__(name)
|
||||||
|
|
||||||
|
self.conversations: list["ConversationProtocol"] = self._create_conversations()
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.dispensers: dict[DispenserKey, "Dispenser"] = self._create_dispensers()
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _create_conversations(self) -> list["ConversationProtocol"]:
|
||||||
|
"""
|
||||||
|
Create the :attr:`.conversations` :class:`list` of the :class:`.ConversationListPDA`\\ .
|
||||||
|
|
||||||
|
:return: The created :class:`list`\\ .
|
||||||
|
"""
|
||||||
|
|
||||||
|
return []
|
||||||
|
|
||||||
|
def _create_dispensers(self) -> dict[t.Any, "Dispenser"]:
|
||||||
|
"""
|
||||||
|
Create the :attr:`.dispensers` dictionary of the PDA.
|
||||||
|
|
||||||
|
:return: The created dictionary (empty by default).
|
||||||
|
"""
|
||||||
|
|
||||||
|
return {}
|
||||||
|
|
||||||
|
def get_dispenser(self, key: "DispenserKey") -> t.Optional["Dispenser"]:
|
||||||
|
"""
|
||||||
|
Get a :class:`~royalnet.engineer.dispenser.Dispenser` from :attr:`.dispenser` knowing its key.
|
||||||
|
|
||||||
|
:param key: The key to get the dispenser with.
|
||||||
|
:return: The retrieved dispenser.
|
||||||
|
|
||||||
|
.. seealso:: :meth:`dict.get`
|
||||||
|
"""
|
||||||
|
|
||||||
|
return self.dispensers.get(key)
|
||||||
|
|
||||||
|
def _create_dispenser(self) -> "Dispenser":
|
||||||
|
"""
|
||||||
|
Create a new dispenser.
|
||||||
|
|
||||||
|
:return: The created dispenser.
|
||||||
|
"""
|
||||||
|
|
||||||
|
return Dispenser()
|
||||||
|
|
||||||
|
def get_or_create_dispenser(self, key: "DispenserKey") -> "Dispenser":
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if key not in self.dispensers:
|
||||||
|
self.dispensers[key] = self._create_dispenser()
|
||||||
|
return self.get_dispenser(key=key)
|
||||||
|
|
||||||
|
@contextlib.asynccontextmanager
|
||||||
|
async def kwargs(self, conv: "ConversationProtocol") -> t.Kwargs:
|
||||||
|
"""
|
||||||
|
:func:`contextlib.asynccontextmanager` factory which yields the arguments to pass to newly created
|
||||||
|
:class:`~royalnet.engineer.conversation.Conversation`\\ s .
|
||||||
|
|
||||||
|
By default, the following arguments are passed:
|
||||||
|
- ``_pda``: contains the :class:`.PDA` this implementation is bound to.
|
||||||
|
- ``_imp``: contains this :class:`.PDAImplementation` .
|
||||||
|
- ``_conv``: contains the :class:`~royalnet.engineer.conversation.Conversation` which was just created.
|
||||||
|
|
||||||
|
:param conv: The :class:`~royalnet.engineer.conversation.Conversation` to create the args for.
|
||||||
|
:return: The corresponding :func:`contextlib.asynccontextmanager`\\ .
|
||||||
|
"""
|
||||||
|
|
||||||
|
default_kwargs = {
|
||||||
|
"_pda": self.bound_to,
|
||||||
|
"_imp": self,
|
||||||
|
"_conv": conv,
|
||||||
|
}
|
||||||
|
|
||||||
|
async with self._kwargs(default_kwargs, self.extensions) as kwargs:
|
||||||
|
yield kwargs
|
||||||
|
|
||||||
|
@contextlib.asynccontextmanager
|
||||||
|
async def _kwargs(self, kwargs: t.Kwargs, remaining: list["PDAExtension"]) -> t.Kwargs:
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
extension: "PDAExtension" = remaining.pop(0)
|
||||||
|
|
||||||
|
async with extension.kwargs(kwargs) as kwargs:
|
||||||
|
if not remaining:
|
||||||
|
yield kwargs
|
||||||
|
else:
|
||||||
|
async with self._kwargs(kwargs=kwargs, remaining=remaining) as kwargs:
|
||||||
|
yield kwargs
|
||||||
|
|
||||||
|
def register_conversation(self, conversation: "ConversationProtocol") -> None:
|
||||||
|
"""
|
||||||
|
Register a new :class:`~royalnet.engineer.conversation.Conversation` to be run when a new
|
||||||
|
:class:`~royalnet.engineer.bullet.projectile.Projectile` is :meth:`.put`\\ .
|
||||||
|
|
||||||
|
:param conversation: The :class:`~royalnet.engineer.conversation.Conversation` to register.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.conversations.append(conversation)
|
||||||
|
|
||||||
|
def unregister_conversation(self, conversation: "ConversationProtocol") -> None:
|
||||||
|
"""
|
||||||
|
Unregister a :class:`~royalnet.engineer.conversation.Conversation`, stopping it from being run when a new
|
||||||
|
:class:`~royalnet.engineer.bullet.projectile.Projectile` is :meth:`.put`\\ .
|
||||||
|
|
||||||
|
:param conversation: The :class:`~royalnet.engineer.conversation.Conversation` to unregister.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.conversations.remove(conversation)
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def _partialcommand_pattern(self, partial: "PartialCommand") -> str:
|
||||||
|
"""
|
||||||
|
The pattern to use when :meth:`.complete_partialcommand` is called.
|
||||||
|
|
||||||
|
:param partial: The :class:`~royalnet.engineer.command.PartialCommand` to complete.
|
||||||
|
:return: A :class:`str` to use as pattern.
|
||||||
|
"""
|
||||||
|
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def complete_partialcommand(self, partial: "PartialCommand", names: list[str]) -> "FullCommand":
|
||||||
|
"""
|
||||||
|
Complete a :class:`~royalnet.engineer.command.PartialCommand` with its missing fields.
|
||||||
|
|
||||||
|
:param partial: The :class:`~royalnet.engineer.command.PartialCommand` to complete.
|
||||||
|
:param names: The :attr:`~royalnet.engineer.command.FullCommand.names` of that the command should have.
|
||||||
|
:return: The completed :class:`~royalnet.engineer.command.FulLCommand` .
|
||||||
|
"""
|
||||||
|
|
||||||
|
return partial.complete(names=names, pattern=self._partialcommand_pattern(partial))
|
||||||
|
|
||||||
|
def register_partialcommand(self, partial: "PartialCommand", names: list[str]) -> "FullCommand":
|
||||||
|
"""
|
||||||
|
A combination of :meth:`.register_conversation` and :meth:`.complete_partialcommand` .
|
||||||
|
|
||||||
|
:param partial: The :class:`~royalnet.engineer.command.PartialCommand` to complete.
|
||||||
|
:param names: The :attr:`~royalnet.engineer.command.FullCommand.names` of that the command should have.
|
||||||
|
:return: The completed :class:`~royalnet.engineer.command.FulLCommand` .
|
||||||
|
"""
|
||||||
|
|
||||||
|
full = self.complete_partialcommand(partial=partial, names=names)
|
||||||
|
self.register_conversation(full)
|
||||||
|
return full
|
||||||
|
|
||||||
|
async def _run_conversation(self, dispenser: "Dispenser", conv: "ConversationProtocol") -> None:
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
async with self.kwargs(conv=conv) as kwargs:
|
||||||
|
await dispenser.run(conv=conv, **kwargs)
|
||||||
|
|
||||||
|
def _run_all_conversations(self, dispenser: "Dispenser") -> list[asyncio.Task]:
|
||||||
|
"""
|
||||||
|
.. todo:: Document this.
|
||||||
|
"""
|
||||||
|
|
||||||
|
tasks: list[asyncio.Task] = []
|
||||||
|
for conv in self.conversations:
|
||||||
|
task = asyncio.create_task(self._run_conversation(dispenser=dispenser, conv=conv))
|
||||||
|
tasks.append(task)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
return tasks
|
||||||
|
|
||||||
|
async def put_projectile(self, key: DispenserKey, projectile: "Projectile") -> None:
|
||||||
|
"""
|
||||||
|
Put a :class:`~royalnet.engineer.bullet.projectile.Projectile` in the
|
||||||
|
:class:`~royalnet.engineer.dispenser.Dispenser` with the specified key.
|
||||||
|
|
||||||
|
:param key: The key identifying the :class:`~royalnet.engineer.dispenser.Dispenser` among the other
|
||||||
|
:attr:`.dispensers`.
|
||||||
|
:param projectile: The :class:`~royalnet.engineer.bullet.projectile.Projectile` to insert.
|
||||||
|
"""
|
||||||
|
|
||||||
|
dispenser = self.get_or_create_dispenser(key=key)
|
||||||
|
self._run_all_conversations(dispenser=dispenser)
|
||||||
|
await dispenser.put(projectile)
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = (
|
||||||
|
"PDAImplementation",
|
||||||
|
"ImplementationException",
|
||||||
|
"ImplementationAlreadyBound",
|
||||||
|
"ConversationListImplementation",
|
||||||
|
)
|
|
@ -34,3 +34,6 @@ of :class:`str` to :data:`.JSON` mappings.
|
||||||
|
|
||||||
|
|
||||||
WrenchLike = Callable[[Any], Awaitable[Any]]
|
WrenchLike = Callable[[Any], Awaitable[Any]]
|
||||||
|
|
||||||
|
Args = Collection[Any]
|
||||||
|
Kwargs = Mapping[str, Any]
|
||||||
|
|
Loading…
Reference in a new issue