1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-27 05:24:20 +00:00

🗑 Remove extensions, introduce bolts

This commit is contained in:
Steffo 2021-04-19 03:18:58 +02:00
parent 0c0a62cfe8
commit becd9ce3f5
Signed by: steffo
GPG key ID: 6965406171929D01
7 changed files with 80 additions and 228 deletions

View file

@ -16,3 +16,4 @@ from .wrench import *
from .bullet import * from .bullet import *
from .pda import * from .pda import *
from .router import * from .router import *
from .bolts import *

View file

@ -0,0 +1,50 @@
"""
This module contains **bolts**, utility decorators which can be used to enhance
:class:`~royalnet.engineer.conversation.Conversation`\\ s.
"""
from __future__ import annotations
import typing as t
import logging
import sqlalchemy.orm
import functools
import royalnet.lazy
log = logging.getLogger(__name__)
async def use_database(session_class: t.Union[t.Type[sqlalchemy.orm.Session], royalnet.lazy.Lazy], *args, **kwargs):
"""
Decorator factory which allows a :class:`~royalnet.engineer.conversation.Conversation` to use a
:class:`sqlalchemy.orm.Session` created from the passed :class:`sqlalchemy.orm.sessionmaker` .
The session is automatically opened and closed, and will be available in the `_session` kwarg.
:param session_class: The :class:`sqlalchemy.orm.Session` class to use when creating the session.
It can also be provided wrapped in a :class:`royalnet.lazy.Lazy` object, from which it will
be evaluated.
:return: The decorator to use to decorate the function.
"""
if isinstance(session_class, royalnet.lazy.Lazy):
session_class = session_class.evaluate()
def decorator(f):
@functools.wraps(f)
async def decorated(**f_kwargs):
log.debug(f"Opening database session from {session_class!r}...")
with session_class(*args, **kwargs) as session:
log.debug(f"Opened database session {session!r} successfully!")
result = await f(**f_kwargs, _session=session)
log.debug(f"Closing database session {session!r}...")
log.debug(f"Closed database session from {session_class!r} successfully!")
# Shouldn't be necessary, conversations return None anyways
return result
return decorated
return decorator
__all__ = (
"use_database",
)

View file

@ -1,3 +1,2 @@
from .base import * from .base import *
from .extensions import *
from .implementations import * from .implementations import *

View file

@ -1,6 +0,0 @@
"""
.. todo:: Document this.
"""
from .base import *
from .database import *

View file

@ -1,31 +0,0 @@
"""
This module contains the base :class:`~royalnet.engineer.pda.extensions.base.PDAExtension`\\ .
"""
import royalnet.royaltyping as t
import abc
import contextlib
class PDAExtension(metaclass=abc.ABCMeta):
"""
A :class:`.PDAExtension` is an object which extends a
:class:`~royalnet.engineer.pda.implementations.base.PDAImplementation` by providing additional kwargs to
:class:`~royalnet.engineer.conversation.Conversation`\\ s.
"""
@abc.abstractmethod
@contextlib.asynccontextmanager
async def kwargs(self, kwargs: t.Kwargs) -> t.Kwargs:
"""
An :func:`~contextlib.asynccontextmanager` which takes the kwargs that would be passed to the
:class:`~royalnet.engineer.conversation.Conversation`\\ , modifies them (for example by adding new items) and
yields them, then performs cleanup operations.
"""
yield NotImplemented
__all__ = (
"PDAExtension",
)

View file

@ -1,121 +0,0 @@
"""
This module contains the :class:`~royalnet.engineer.pda.extensions.base.PDAExtension`\\ s that allow
:class:`~royalnet.engineer.conversation.Conversation`\\ s to access a database.
"""
import royalnet.royaltyping as t
import sqlalchemy
import sqlalchemy.orm
import sqlalchemy.ext.asyncio as sea
import contextlib
import logging
from . import base
log = logging.getLogger(__name__)
class SQLAlchemyExtension(base.PDAExtension):
"""
Extends a :class:`~royalnet.engineer.pda.implementations.base.PDAImplementation` by adding a :mod:`sqlalchemy`
session to conversations through the ``_session`` kwarg.
"""
def __init__(self, engine: sqlalchemy.engine.Engine, session_kwargs: t.Kwargs = None, kwarg_name: str = "_session"):
super().__init__()
self.engine: sqlalchemy.engine.Engine = engine
"""
The :class:`sqlalchemy.engine.Engine` to use.
"""
self.Session: sqlalchemy.orm.sessionmaker = sqlalchemy.orm.sessionmaker(bind=self.engine)
"""
The :class:`sqlalchemy.orm.sessionmaker` to use when creating new sessions.
"""
self.session_kwargs: t.Kwargs = {"future": True, **(session_kwargs or {})}
"""
Additional kwargs to be passed to the :class:`sqlalchemy.orm.sessionmaker` when instantiating a new Session.
Defaults to ``{"future": True}`` .
"""
self.kwarg_name: str = kwarg_name
"""
The name of the kwarg to add.
Defaults to ``"_session"``.
"""
def __repr__(self):
return f"<{self.__class__.__qualname__} with engine {self.engine}>"
@contextlib.asynccontextmanager
async def kwargs(self, kwargs: t.Kwargs) -> t.Kwargs:
log.debug(f"{self!r}: Creating session...")
with self.Session(**self.session_kwargs) as session:
log.debug(f"{self!r}: Yielding kwargs...")
yield {
**kwargs,
self.kwarg_name: session,
}
log.debug(f"{self!r}: Closing session...")
log.debug(f"{self!r}: Session closed!")
class AsyncSQLAlchemyExtension(base.PDAExtension):
"""
Extends a :class:`~royalnet.engineer.pda.implementations.base.PDAImplementation` by adding an asyncronous
:mod:`sqlalchemy` session to conversations through the ``_asession`` kwarg.
"""
def __init__(self, engine: sea.AsyncEngine, session_kwargs: t.Kwargs = None, kwarg_name: str = "_asession"):
super().__init__()
self.engine: sea.AsyncEngine = engine
"""
The :class:`sqlalchemy.engine.Engine` to use.
"""
self.AsyncSession: sqlalchemy.orm.sessionmaker = sqlalchemy.orm.sessionmaker(
bind=self.engine,
expire_on_commit=False,
class_=sea.AsyncSession,
)
"""
The :class:`sqlalchemy.orm.sessionmaker` to use when creating new sessions.
"""
self.session_kwargs: t.Kwargs = {"future": True, **(session_kwargs or {})}
"""
Additional kwargs to be passed to the :class:`sqlalchemy.orm.sessionmaker` when instantiating a new Session.
Defaults to ``{"future": True}`` .
"""
self.kwarg_name: str = kwarg_name
"""
The name of the kwarg to add.
Defaults to ``"_asession"``.
"""
def __repr__(self):
return f"<{self.__class__.__qualname__} with engine {self.engine}>"
@contextlib.asynccontextmanager
async def kwargs(self, kwargs: t.Kwargs) -> t.Kwargs:
log.debug(f"{self!r}: Creating session...")
async with self.AsyncSession(**self.session_kwargs) as session:
log.debug(f"{self!r}: Yielding kwargs...")
yield {
**kwargs,
self.kwarg_name: session,
}
log.debug(f"{self!r}: Closing session...")
log.debug(f"{self!r}: Session closed!")
__all__ = (
"SQLAlchemyExtension",
"AsyncSQLAlchemyExtension",
)

View file

@ -5,13 +5,14 @@ This module contains the base :class:`.PDAImplementation` and its basic implemen
import royalnet.royaltyping as t import royalnet.royaltyping as t
import abc import abc
import contextlib import sys
import asyncio import asyncio
import logging import logging
import types
import traceback
from royalnet.engineer.dispenser import Dispenser from royalnet.engineer.dispenser import Dispenser
if t.TYPE_CHECKING: if t.TYPE_CHECKING:
from royalnet.engineer.pda.extensions.base import PDAExtension
from royalnet.engineer.pda.base import PDA from royalnet.engineer.pda.base import PDA
from royalnet.engineer.bullet.projectiles import Projectile from royalnet.engineer.bullet.projectiles import Projectile
@ -23,17 +24,12 @@ class PDAImplementation(metaclass=abc.ABCMeta):
.. todo:: Document this. .. todo:: Document this.
""" """
def __init__(self, name: str, extensions: list["PDAExtension"] = None): def __init__(self, name: str):
self.name: str = f"{self.namespace}.{name}" self.name: str = f"{self.namespace}.{name}"
""" """
.. todo:: Document this. .. todo:: Document this.
""" """
self.extensions: list["PDAExtension"] = extensions or []
"""
.. todo:: Document this.
"""
self.bound_to: t.Optional["PDA"] = None self.bound_to: t.Optional["PDA"] = None
""" """
.. todo:: Document this. .. todo:: Document this.
@ -106,8 +102,8 @@ class ConversationListImplementation(PDAImplementation, metaclass=abc.ABCMeta):
:class:`~royalnet.engineer.dispenser.Dispenser` . :class:`~royalnet.engineer.dispenser.Dispenser` .
""" """
def __init__(self, name: str, extensions: list["PDAExtension"] = None): def __init__(self, name: str):
super().__init__(name=name, extensions=extensions) super().__init__(name=name)
self.conversations: list[t.ConversationProtocol] = self._create_conversations() self.conversations: list[t.ConversationProtocol] = self._create_conversations()
""" """
@ -179,56 +175,6 @@ class ConversationListImplementation(PDAImplementation, metaclass=abc.ABCMeta):
self.dispensers[key] = self._create_dispenser() self.dispensers[key] = self._create_dispenser()
return self.get(key=key) return self.get(key=key)
@contextlib.asynccontextmanager
async def kwargs(self, conv: t.ConversationProtocol) -> t.Kwargs:
"""
:func:`contextlib.asynccontextmanager` factory which yields the arguments to pass to newly created
:class:`~royalnet.engineer.conversation.Conversation`\\ s .
By default, the following arguments are passed:
- ``_pda``: contains the :class:`.PDA` this implementation is bound to.
- ``_imp``: contains this :class:`.PDAImplementation` .
- ``_conv``: contains the :class:`~royalnet.engineer.conversation.Conversation` which was just created.
:param conv: The :class:`~royalnet.engineer.conversation.Conversation` to create the args for.
:return: The corresponding :func:`contextlib.asynccontextmanager`\\ .
"""
self.log.debug(f"Creating kwargs for: {conv!r}")
default_kwargs = {
"_pda": self.bound_to,
"_imp": self,
"_conv": conv,
}
async with self._kwargs(default_kwargs, self.extensions) as kwargs:
self.log.info(f"Yielding kwargs for {conv!r}: {kwargs!r}")
yield kwargs
@contextlib.asynccontextmanager
async def _kwargs(self, kwargs: t.Kwargs, remaining: list["PDAExtension"]) -> t.Kwargs:
"""
:func:`contextlib.asynccontextmanager` factory used internally to recurse the generation and cleanup of
:meth:`kwargs` .
:param kwargs: The current ``kwargs`` .
:param remaining: The extensions that haven't been processed yet.
:return: The corresponding :func:`contextlib.asynccontextmanager`\\ .
"""
if len(remaining) == 0:
self.log.debug(f"Kwargs recursion ended!")
yield kwargs
else:
extension = remaining[0]
self.log.debug(f"Getting kwargs from {extension}, {len(remaining)} left...")
async with extension.kwargs(kwargs) as kwargs:
self.log.debug(f"Recursing...")
async with self._kwargs(kwargs=kwargs, remaining=remaining[1:]) as kwargs:
self.log.debug(f"Bubbling up yields...")
yield kwargs
def register_conversation(self, conversation: t.ConversationProtocol) -> None: def register_conversation(self, conversation: t.ConversationProtocol) -> None:
""" """
Register a new :class:`~royalnet.engineer.conversation.Conversation` to be run when a new Register a new :class:`~royalnet.engineer.conversation.Conversation` to be run when a new
@ -254,8 +200,11 @@ class ConversationListImplementation(PDAImplementation, metaclass=abc.ABCMeta):
async def _run_conversation(self, dispenser: "Dispenser", conv: t.ConversationProtocol) -> None: async def _run_conversation(self, dispenser: "Dispenser", conv: t.ConversationProtocol) -> None:
""" """
Run the passed :class:`~royalnet.engineer.conversation.Conversation` in the passed Run the passed :class:`~royalnet.engineer.conversation.Conversation` in the passed
:class:`~royalnet.engineer.dispenser.Dispenser`\\ , while passing the :meth:`.kwargs` provided by the :class:`~royalnet.engineer.dispenser.Dispenser` with the following kwargs:
:class:`.PDA` .
- ``_pda``: contains the :class:`.PDA` this implementation is bound to.
- ``_imp``: contains this :class:`.PDAImplementation` .
- ``_conv``: contains the :class:`~royalnet.engineer.conversation.Conversation` which was just created.
:param dispenser: The :class:`~royalnet.engineer.dispenser.Dispenser` to run the :param dispenser: The :class:`~royalnet.engineer.dispenser.Dispenser` to run the
:class:`~royalnet.engineer.conversation.Conversation` in. :class:`~royalnet.engineer.conversation.Conversation` in.
@ -263,12 +212,15 @@ class ConversationListImplementation(PDAImplementation, metaclass=abc.ABCMeta):
""" """
try: try:
async with self.kwargs(conv=conv) as kwargs: self.log.debug(f"Running {conv!r} in {dispenser!r}...")
self.log.debug(f"Running {conv!r} in {dispenser!r}...") await dispenser.run(conv=conv, _conv=conv, _pda=self.bound_to, _imp=self)
await dispenser.run(conv=conv, **kwargs) except Exception:
except Exception as exception:
try: try:
await self._handle_conversation_exc(dispenser=dispenser, conv=conv, exception=exception) await self._handle_conversation_exc(
dispenser=dispenser,
conv=conv,
*sys.exc_info(),
)
except Exception as exception: except Exception as exception:
self.log.error(f"Failed to handle conversation exception: {exception!r}") self.log.error(f"Failed to handle conversation exception: {exception!r}")
@ -276,17 +228,25 @@ class ConversationListImplementation(PDAImplementation, metaclass=abc.ABCMeta):
self, self,
dispenser: "Dispenser", dispenser: "Dispenser",
conv: t.ConversationProtocol, conv: t.ConversationProtocol,
exception: Exception etype: t.Type[Exception],
exception: Exception,
etb: types.TracebackType,
) -> None: ) -> None:
""" """
Handle :exc:`Exception`\\ s that were not caught by :class:`~royalnet.engineer.conversation.Conversation`\\ s. Handle :exc:`Exception`\\ s that were not caught by :class:`~royalnet.engineer.conversation.Conversation`\\ s.
:param dispenser: The dispenser which hosted the :class:`~royalnet.engineer.conversation.Conversation`\\ . :param dispenser: The dispenser which hosted the :class:`~royalnet.engineer.conversation.Conversation`\\ .
:param conv: The :class:`~royalnet.engineer.conversation.Conversation` which didn't catch the error. :param conv: The :class:`~royalnet.engineer.conversation.Conversation` which didn't catch the error.
:param etype: The class of the :class:`Exception` that was raised.
:param exception: The :class:`Exception` that was raised. :param exception: The :class:`Exception` that was raised.
:param etb: A :class:`types.TracebackType` object containing the traceback of the :class:`Exception`.
""" """
self.log.error(f"Unhandled {exception} in {conv} run in {dispenser}!") msg = [
f"Unhandled {etype.__qualname__} in {conv!r} running in {dispenser!r}: {exception!r}",
traceback.format_exc(etb)
]
self.log.error("\n".join(msg))
async def _schedule_conversations(self, dispenser: "Dispenser") -> list[asyncio.Task]: async def _schedule_conversations(self, dispenser: "Dispenser") -> list[asyncio.Task]:
""" """