1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-22 19:14:20 +00:00

Overhaul bullets

This commit is contained in:
Steffo 2021-03-31 04:17:37 +02:00
parent 69d7c88e5c
commit 1683147601
Signed by: steffo
GPG key ID: 6965406171929D01
26 changed files with 521 additions and 375 deletions

View file

@ -7,7 +7,7 @@
``bullet``
----------
.. automodule:: royalnet.engineer.bullet
.. automodule:: royalnet.engineer.gunpowder
``command``

View file

@ -6,13 +6,12 @@ All names are inspired by the `Engineer Class of Team Fortress 2 <https://wiki.t
.. image: /_static/engineer_diagram.png
"""
from .bullet import *
from .command import *
from .conversation import *
from .discard import *
from .dispenser import *
from .exc import *
from .magazine import *
from .sentry import *
from .teleporter import *
from .wrench import *
from .bullet import *

View file

@ -1,259 +0,0 @@
"""
Bullets are parts of the data model that :mod:`royalnet.engineer` uses to build a common interface between different
chat apps (*frontends*).
They exclusively use coroutine functions to access data, as it may be required to fetch it from a remote location before
it is available.
**All** coroutine functions can have three different results:
- :exc:`.exc.BulletException` is raised, meaning that something went wrong during the data retrieval.
- :exc:`.exc.NotSupportedError` is raised, meaning that the frontend does not support the feature the requested data
is about (asking for :meth:`.Message.reply_to` in an IRC frontend, for example).
- :data:`None` is returned, meaning that there is no data in that field (if a message is not a reply to anything,
:meth:`Message.reply_to` will be :data:`None`.
- The data is returned.
To instantiate a new :class:`Bullet` from a bullet, you should use the methods of :attr:`.Bullet.mag`.
"""
from __future__ import annotations
import royalnet.royaltyping as t
import abc
import datetime
import sqlalchemy.orm
import async_property as ap
from . import exc
if t.TYPE_CHECKING:
from . import magazine
class Bullet(metaclass=abc.ABCMeta):
"""
The abstract base class for :mod:`~royalnet.engineer.bullet` models.
"""
def __init__(self, mag: "magazine.Magazine"):
"""
Instantiate a new :class:`.Bullet` .
"""
self.mag: "magazine.Magazine" = mag
"""
The :class:`.magazine.Magazine` to use when instantiating new :class:`.Bullet`\\ s.
"""
@abc.abstractmethod
def __hash__(self) -> int:
"""
:return: A value that uniquely identifies the object in this Python interpreter process.
"""
raise NotImplementedError()
class Message(Bullet, metaclass=abc.ABCMeta):
"""
An abstract class representing a chat message.
"""
@ap.async_cached_property
async def text(self) -> t.Optional[str]:
"""
:return: The raw text contents of the message.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def timestamp(self) -> t.Optional[datetime.datetime]:
"""
:return: The :class:`datetime.datetime` at which the message was sent.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def reply_to(self) -> t.Optional[Message]:
"""
:return: The :class:`.Message` this message is a reply to.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def channel(self) -> t.Optional[Channel]:
"""
:return: The :class:`.Channel` this message was sent in.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def files(self) -> t.Optional[t.List[t.BinaryIO]]:
"""
:return: A :class:`list` of files attached to the message.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def reactions(self) -> t.List[ReactionButton]:
"""
:return: A :class:`list` of reaction buttons attached to the message.
"""
async def reply(self, *,
text: str = None,
files: t.List[t.BinaryIO] = None) -> t.Optional[Message]:
"""
Reply to this message in the same channel it was sent in.
:param text: The text to reply with.
:param files: A :class:`list` of files to attach to the message. The file type should be detected automatically
by the frontend, and sent in the best format possible (if all files are photos, they should be
sent as a photo album, etc.).
:return: The sent reply message.
"""
raise exc.NotSupportedError()
class Channel(Bullet, metaclass=abc.ABCMeta):
"""
An abstract class representing a channel where messages can be sent.
"""
@ap.async_cached_property
async def name(self) -> t.Optional[str]:
"""
:return: The name of the message channel, such as the chat title.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def topic(self) -> t.Optional[str]:
"""
:return: The topic (description) of the message channel.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def users(self) -> t.List[User]:
"""
:return: A :class:`list` of :class:`.User` who can read messages sent in the channel.
"""
raise exc.NotSupportedError()
async def send_message(self, *,
text: str = None,
files: t.List[t.BinaryIO] = None) -> t.Optional[Message]:
"""
Send a message in the channel.
:param text: The text to send in the message.
:param files: A :class:`list` of files to attach to the message. The file type should be detected automatically
by the frontend, and sent in the best format possible (if all files are photos, they should be
sent as a photo album, etc.).
:return: The sent message.
"""
raise exc.NotSupportedError()
class User(Bullet, metaclass=abc.ABCMeta):
"""
An abstract class representing a user who can read or send messages in the chat.
"""
@ap.async_cached_property
async def name(self) -> t.Optional[str]:
"""
:return: The user's name.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def database(self, session: sqlalchemy.orm.Session) -> t.Any:
"""
:param session: A :class:`sqlalchemy.orm.Session` instance to use to fetch the database entry.
:return: The database entry for this user.
"""
raise exc.NotSupportedError()
async def slide(self) -> Channel:
"""
Slide into the DMs of the user and get the private channel they share with with the bot.
:return: The private channel where you can talk to the user.
"""
raise exc.NotSupportedError()
class Button(Bullet, metaclass=abc.ABCMeta):
"""
An abstract class representing a clickable button.
"""
@ap.async_cached_property
async def text(self) -> t.Optional[str]:
"""
:return: The text displayed on the button.
"""
raise exc.NotSupportedError()
class ReactionButton(Button, metaclass=abc.ABCMeta):
"""
An abstract class representing a clickable reaction to a message.
"""
@ap.async_property
async def reactions(self) -> t.List[Reaction]:
"""
:return: The list of reactions generated by this button. It may vary every time this property is accessed,
based on the users who have reacted to the button at the time of access.
"""
raise exc.NotSupportedError()
@ap.async_property
async def count(self) -> int:
"""
:return: The count of reactions that this button generated. It may vary every time this property is accessed,
based on how many users have reacted to the button at the time of access.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def message(self) -> t.Optional[Message]:
"""
:return: The message this button is attached to. Can be :data:`None`, if the button hasn't been attached to a
message yet.
"""
raise exc.NotSupportedError()
class Reaction(Bullet, metaclass=abc.ABCMeta):
"""
An abstract class representing a reaction of a single user to a message, generated by clicking on a ReactionButton.
"""
@ap.async_cached_property
async def user(self) -> User:
"""
:return: The user who reacted to the message.
"""
raise exc.NotSupportedError()
@ap.async_cached_property
async def button(self) -> ReactionButton:
"""
:return: The ReactionButton that the user pressed to generate this reaction.
"""
raise exc.NotSupportedError()
__all__ = (
"Bullet",
"Message",
"Channel",
"User",
"Button",
"ReactionButton",
"Reaction",
)

View file

@ -0,0 +1,4 @@
from .exc import *
from .casing import *
from .contents import *
from .projectiles import *

View file

@ -0,0 +1,40 @@
"""
Casings are parts of the data model that :mod:`royalnet.engineer` uses to build a common interface between
different applications (implemented by individual *PDAs*).
They exclusively use coroutine functions to access data, as it may be required to fetch it from a remote location before
it is available.
**All** coroutine functions can have three different results:
- :exc:`.exc.CasingException` is raised, meaning that something went wrong during the data retrieval.
- :exc:`.exc.NotSupportedError` is raised, meaning that the frontend does not support the feature the requested data
is about (asking for :meth:`.Message.reply_to` in an IRC frontend, for example).
- :data:`None` is returned, meaning that there is no data in that field (if a message is not a reply to anything,
:meth:`Message.reply_to` will be :data:`None`.
- The data is returned.
To instantiate a new :class:`Bullet` from a bullet, you should use the methods of :attr:`.Bullet.mag`.
"""
from __future__ import annotations
import abc
class Casing(metaclass=abc.ABCMeta):
"""
The abstract base class for :mod:`~royalnet.engineer.casing` models.
"""
def __init__(self):
"""
Instantiate a new instance of this class.
"""
@abc.abstractmethod
def __hash__(self) -> int:
"""
:return: A value that uniquely identifies the object in this Python interpreter process.
"""
raise NotImplementedError()

View file

@ -0,0 +1,6 @@
from ._base import *
from .button import *
from .button_reaction import *
from .channel import *
from .message import *
from .user import *

View file

@ -0,0 +1,16 @@
from __future__ import annotations
import abc
from .. import casing
class BulletContents(casing.Casing, metaclass=abc.ABCMeta):
"""
Abstract base class for bullet contents.
"""
__all__ = (
"BulletContents",
)

View file

@ -0,0 +1,16 @@
import royalnet.royaltyping as t
import sqlalchemy.orm as so
import abc
from .. import exc
from ._base import BulletContents
__all__ = (
"t",
"so",
"abc",
"exc",
"BulletContents",
)

View file

@ -0,0 +1,19 @@
from __future__ import annotations
from ._imports import *
class Button(BulletContents, metaclass=abc.ABCMeta):
"""
An abstract class representing a clickable button.
"""
async def text(self) -> t.Optional[str]:
"""
:return: The text displayed on the button.
"""
raise exc.NotSupportedError()
__all__ = (
"Button",
)

View file

@ -0,0 +1,40 @@
from __future__ import annotations
from ._imports import *
from .button import Button
if t.TYPE_CHECKING:
from ..projectiles.reaction import Reaction
from .message import Message
class ButtonReaction(Button, metaclass=abc.ABCMeta):
"""
An abstract class representing a clickable reaction to a message.
"""
async def reactions(self) -> t.List["Reaction"]:
"""
:return: The list of reactions generated by this button. It may vary every time this property is accessed,
based on the users who have reacted to the button at the time of access.
"""
raise exc.NotSupportedError()
async def count(self) -> int:
"""
:return: The count of reactions that this button generated. It may vary every time this property is accessed,
based on how many users have reacted to the button at the time of access.
"""
raise exc.NotSupportedError()
async def message(self) -> t.Optional["Message"]:
"""
:return: The message this button is attached to. Can be :data:`None`, if the button hasn't been attached to a
message yet.
"""
raise exc.NotSupportedError()
__all__ = (
"ButtonReaction",
)

View file

@ -0,0 +1,49 @@
from __future__ import annotations
from ._imports import *
if t.TYPE_CHECKING:
from .message import Message
from .user import User
class Channel(BulletContents, metaclass=abc.ABCMeta):
"""
An abstract class representing a channel where messages can be sent.
"""
async def name(self) -> t.Optional[str]:
"""
:return: The name of the message channel, such as the chat title.
"""
raise exc.NotSupportedError()
async def topic(self) -> t.Optional[str]:
"""
:return: The topic (description) of the message channel.
"""
raise exc.NotSupportedError()
async def users(self) -> t.List["User"]:
"""
:return: A :class:`list` of :class:`.User` who can read messages sent in the channel.
"""
raise exc.NotSupportedError()
async def send_message(self, *,
text: str = None,
files: t.List[t.BinaryIO] = None) -> t.Optional["Message"]:
"""
Send a message in the channel.
:param text: The text to send in the message.
:param files: A :class:`list` of files to attach to the message. The file type should be detected automatically
by the frontend, and sent in the best format possible (if all files are photos, they should be
sent as a photo album, etc.).
:return: The sent message.
"""
raise exc.NotSupportedError()
__all__ = (
"Channel",
)

View file

@ -0,0 +1,68 @@
from __future__ import annotations
from ._imports import *
import datetime
if t.TYPE_CHECKING:
from .channel import Channel
from .button_reaction import ButtonReaction
class Message(BulletContents, metaclass=abc.ABCMeta):
"""
An abstract class representing a chat message.
"""
async def text(self) -> t.Optional[str]:
"""
:return: The raw text contents of the message.
"""
raise exc.NotSupportedError()
async def timestamp(self) -> t.Optional[datetime.datetime]:
"""
:return: The :class:`datetime.datetime` at which the message was sent.
"""
raise exc.NotSupportedError()
async def reply_to(self) -> t.Optional[Message]:
"""
:return: The :class:`.Message` this message is a reply to.
"""
raise exc.NotSupportedError()
async def channel(self) -> t.Optional["Channel"]:
"""
:return: The :class:`.Channel` this message was sent in.
"""
raise exc.NotSupportedError()
async def files(self) -> t.Optional[t.List[t.BinaryIO]]:
"""
:return: A :class:`list` of files attached to the message.
"""
raise exc.NotSupportedError()
async def reactions(self) -> t.List["ButtonReaction"]:
"""
:return: A :class:`list` of reaction buttons attached to the message.
"""
async def reply(self, *,
text: str = None,
files: t.List[t.BinaryIO] = None) -> t.Optional[Message]:
"""
Reply to this message in the same channel it was sent in.
:param text: The text to reply with.
:param files: A :class:`list` of files to attach to the message. The file type should be detected automatically
by the frontend, and sent in the best format possible (if all files are photos, they should be
sent as a photo album, etc.).
:return: The sent reply message.
"""
raise exc.NotSupportedError()
__all__ = (
"Message",
)

View file

@ -0,0 +1,37 @@
from __future__ import annotations
from ._imports import *
if t.TYPE_CHECKING:
from .channel import Channel
class User(BulletContents, metaclass=abc.ABCMeta):
"""
An abstract class representing a user who can read or send messages in the chat.
"""
async def name(self) -> t.Optional[str]:
"""
:return: The user's name.
"""
raise exc.NotSupportedError()
async def database(self, session: so.Session) -> t.Any:
"""
:param session: A :class:`sqlalchemy.orm.Session` instance to use to fetch the database entry.
:return: The database entry for this user.
"""
raise exc.NotSupportedError()
async def slide(self) -> "Channel":
"""
Slide into the DMs of the user and get the private channel they share with with the bot.
:return: The private channel where you can talk to the user.
"""
raise exc.NotSupportedError()
__all__ = (
"User",
)

View file

@ -0,0 +1,29 @@
"""
The exceptions which can happen in bullets.
"""
from .. import exc
class BulletException(exc.EngineerException):
"""
The base class for errors in :mod:`royalnet.engineer.bullet`.
"""
class FrontendError(BulletException):
"""
An error occoured while performing a frontend operation, such as sending a message.
"""
class NotSupportedError(FrontendError, NotImplementedError):
"""
The requested property isn't available on the current frontend.
"""
class ForbiddenError(FrontendError):
"""
The bot user does not have sufficient permissions to perform a frontend operation.
"""

View file

@ -0,0 +1,4 @@
from ._base import *
from .message import *
from .reaction import *
from .user import *

View file

@ -0,0 +1,16 @@
from __future__ import annotations
import abc
from .. import casing
class Projectile(casing.Casing, metaclass=abc.ABCMeta):
"""
Abstract base class for external events which can be inserted in a dispenser.
"""
__all__ = (
"Projectile",
)

View file

@ -0,0 +1,16 @@
import royalnet.royaltyping as t
import sqlalchemy.orm as so
import abc
from .. import exc
from ._base import Projectile
__all__ = (
"t",
"so",
"abc",
"exc",
"Projectile",
)

View file

@ -0,0 +1,48 @@
from __future__ import annotations
from ._imports import *
if t.TYPE_CHECKING:
from ..contents.message import Message
class MessageReceived(Projectile, metaclass=abc.ABCMeta):
"""
An abstract class representing the reception of a single message.
"""
async def message(self) -> "Message":
"""
:return: The received Message.
"""
raise exc.NotSupportedError()
class MessageEdited(Projectile, metaclass=abc.ABCMeta):
"""
An abstract class representing the editing of a single message.
"""
async def message(self) -> "Message":
"""
:return: The edited Message.
"""
raise exc.NotSupportedError()
class MessageDeleted(Projectile, metaclass=abc.ABCMeta):
"""
An abstract class representing the deletion of a single message.
"""
async def message(self) -> "Message":
"""
:return: The edited Message.
"""
raise exc.NotSupportedError()
__all__ = (
"MessageReceived",
"MessageEdited",
"MessageDeleted",
)

View file

@ -0,0 +1,29 @@
from __future__ import annotations
from ._imports import *
if t.TYPE_CHECKING:
from ..contents.user import User
from ..contents.button_reaction import ButtonReaction
class Reaction(Projectile, metaclass=abc.ABCMeta):
"""
An abstract class representing a reaction of a single user to a message, generated by clicking on a ButtonReaction.
"""
async def user(self) -> "User":
"""
:return: The user who reacted to the message.
"""
raise exc.NotSupportedError()
async def button(self) -> "ButtonReaction":
"""
:return: The ButtonReaction that the user pressed to generate this reaction.
"""
raise exc.NotSupportedError()
__all__ = (
"Reaction",
)

View file

@ -0,0 +1,46 @@
from __future__ import annotations
from ._imports import *
if t.TYPE_CHECKING:
from ..contents.user import User
class UserJoined(Projectile, metaclass=abc.ABCMeta):
"""
An abstract class representing an user who just joined the chat channel.
"""
async def user(self) -> "User":
"""
:return: The user who joined.
"""
raise exc.NotSupportedError()
class UserLeft(Projectile, metaclass=abc.ABCMeta):
"""
An abstract class representing an user who just left the chat channel.
"""
async def user(self) -> "User":
"""
:return: The user who left.
"""
raise exc.NotSupportedError()
class UserUpdate(Projectile, metaclass=abc.ABCMeta):
"""
An abstract class representing a change in status of an user in the chat channel.
"""
async def user(self) -> "User":
"""
:return: The user who joined.
"""
raise exc.NotSupportedError()
__all__ = (
"User",
)

View file

@ -80,18 +80,23 @@ class Command(c.Conversation):
async def run(self, *, _sentry: s.Sentry, **base_kwargs) -> t.Optional[c.ConversationProtocol]:
log.debug(f"Awaiting a bullet...")
bullet: b.Bullet = await _sentry
projectile: b.Projectile = await _sentry
log.debug(f"Received: {bullet!r}")
log.debug(f"Received: {projectile!r}")
log.debug(f"Ensuring a message was received: {bullet!r}")
if not isinstance(bullet, b.Message):
log.debug(f"Returning: {bullet!r} is not a message")
log.debug(f"Ensuring a message was received: {projectile!r}")
if not isinstance(projectile, b.MessageReceived):
log.debug(f"Returning: {projectile!r} is not a message")
return
log.debug(f"Getting message text of: {bullet!r}")
if not (text := await bullet.text()):
log.debug(f"Returning: {bullet!r} has no text")
log.debug(f"Getting message of: {projectile!r}")
if not (msg := await projectile.message()):
log.warning(f"Returning: {projectile!r} has no message")
return
log.debug(f"Getting message text of: {msg!r}")
if not (text := await msg.text()):
log.debug(f"Returning: {msg!r} has no text")
return
log.debug(f"Searching for pattern: {text!r}")
@ -107,11 +112,11 @@ class Command(c.Conversation):
with _sentry.dispenser().lock(self):
log.debug(f"Passing args to function: {message_kwargs!r}")
return await super().run(_sentry=_sentry, _msg=bullet, **base_kwargs, **message_kwargs)
return await super().run(_sentry=_sentry, _proj=projectile, **base_kwargs, **message_kwargs)
else:
log.debug(f"Passing args to function: {message_kwargs!r}")
return await super().run(_sentry=_sentry, _msg=bullet, **base_kwargs, **message_kwargs)
return await super().run(_sentry=_sentry, _proj=projectile, **base_kwargs, **message_kwargs)
def help(self) -> t.Optional[str]:
"""

View file

@ -11,6 +11,7 @@ import contextlib
from .sentry import SentrySource
from .conversation import Conversation
from .exc import LockedDispenserError
from .bullet.projectiles import Projectile
log = logging.getLogger(__name__)
@ -29,11 +30,11 @@ class Dispenser:
.. seealso:: :meth:`.lock`
"""
async def put(self, item: t.Any) -> None:
async def put(self, item: Projectile) -> None:
"""
Insert a new item in the queues of all the running sentries.
Insert a new projectile in the queues of all the running sentries.
:param item: The item to insert.
:param item: The projectile to insert.
"""
log.debug(f"Putting {item}...")
for sentry in self.sentries:

View file

@ -37,30 +37,6 @@ class OutTeleporterError(TeleporterError):
"""
class BulletException(EngineerException):
"""
The base class for errors in :mod:`royalnet.engineer.bullet`.
"""
class FrontendError(BulletException):
"""
An error occoured while performing a frontend operation, such as sending a message.
"""
class NotSupportedError(FrontendError, NotImplementedError):
"""
The requested property isn't available on the current frontend.
"""
class ForbiddenError(FrontendError):
"""
The bot user does not have sufficient permissions to perform a frontend operation.
"""
class DispenserException(EngineerException):
"""
The base class for errors in :mod:`royalnet.engineer.dispenser`.
@ -83,10 +59,6 @@ __all__ = (
"TeleporterError",
"InTeleporterError",
"OutTeleporterError",
"BulletException",
"FrontendError",
"NotSupportedError",
"ForbiddenError",
"DispenserException",
"LockedDispenserError",
)

View file

@ -1,58 +0,0 @@
# Module docstring
"""
Magazines are references to the bullet classes used by a specific frontend; they allow bullets to instance each other
without tying the instantiations to specific classes.
"""
# Special imports
from __future__ import annotations
import royalnet.royaltyping as t
# External imports
import functools
import logging
# Internal imports
from . import bullet
# Special global objects
log = logging.getLogger(__name__)
# Code
# noinspection PyPep8Naming
class Magazine:
"""
A reference to all types of bullets to be used when instancing bullets from a bullet.
"""
_BULLET = bullet.Bullet
_USER = bullet.User
_MESSAGE = bullet.Message
_CHANNEL = bullet.Channel
@property
def Bullet(self) -> functools.partial[bullet.Bullet]:
log.debug(f"Extracting Bullet from Magazine: {self._BULLET!r}")
return functools.partial(self._BULLET, mag=self)
@property
def User(self) -> functools.partial[bullet.User]:
log.debug(f"Extracting User from Magazine: {self._USER!r}")
return functools.partial(self._USER, mag=self)
@property
def Message(self) -> functools.partial[bullet.Message]:
log.debug(f"Extracting Message from Magazine: {self._MESSAGE!r}")
return functools.partial(self._MESSAGE, mag=self)
@property
def Channel(self) -> functools.partial[bullet.Channel]:
log.debug(f"Extracting Channel from Magazine: {self._CHANNEL!r}")
return functools.partial(self._CHANNEL, mag=self)
# Objects exported by this module
__all__ = (
"Magazine",
)

View file

@ -1,5 +1,5 @@
"""
Sentries are asynchronous receivers for events (usually :class:`bullet.Bullet`) incoming from Dispensers.
Sentries are asynchronous receivers for events (usually :class:`bullet.Projectile`) incoming from Dispensers.
They support event filtering through Wrenches and coroutine functions.
"""
@ -15,7 +15,7 @@ from . import discard
if t.TYPE_CHECKING:
from .dispenser import Dispenser
from .conversation import Conversation
from .bullet import Projectile, Casing
log = logging.getLogger(__name__)
@ -32,9 +32,9 @@ class Sentry(metaclass=abc.ABCMeta):
@abc.abstractmethod
def get_nowait(self):
"""
Try to get a single :class:`~.bullet.Bullet` from the pipeline, without blocking or handling discards.
Try to get a single :class:`~.bullet.Projectile` from the pipeline, without blocking or handling discards.
:return: The **returned** :class:`~.bullet.Bullet`.
:return: The **returned** :class:`~.bullet.Projectile`.
:raises asyncio.QueueEmpty: If the queue is empty.
:raises .discard.Discard: If the object was **discarded** by the pipeline.
:raises Exception: If an exception was **raised** in the pipeline.
@ -44,10 +44,10 @@ class Sentry(metaclass=abc.ABCMeta):
@abc.abstractmethod
async def get(self):
"""
Try to get a single :class:`~.bullet.Bullet` from the pipeline, blocking until something is available, but
Try to get a single :class:`~.bullet.Projectile` from the pipeline, blocking until something is available, but
without handling discards.
:return: The **returned** :class:`~.bullet.Bullet`.
:return: The **returned** :class:`~.bullet.Projectile`.
:raises .discard.Discard: If the object was **discarded** by the pipeline.
:raises Exception: If an exception was **raised** in the pipeline.
"""
@ -55,10 +55,10 @@ class Sentry(metaclass=abc.ABCMeta):
async def wait(self):
"""
Try to get a single :class:`~.bullet.Bullet` from the pipeline, blocking until something is available and is not
discarded.
Try to get a single :class:`~.bullet.Projectile` from the pipeline, blocking until something is available and is
not discarded.
:return: The **returned** :class:`~.bullet.Bullet`.
:return: The **returned** :class:`~.bullet.Projectile`.
:raises Exception: If an exception was **raised** in the pipeline.
"""
while True:
@ -77,7 +77,7 @@ class Sentry(metaclass=abc.ABCMeta):
return self.get().__await__()
@abc.abstractmethod
async def put(self, item: t.Any) -> None:
async def put(self, item: "Projectile") -> None:
"""
Insert a new item in the queue.
@ -85,7 +85,7 @@ class Sentry(metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
def filter(self, wrench: t.Callable[[t.Any], t.Awaitable[t.Any]]) -> SentryFilter:
def filter(self, wrench: t.WrenchLike) -> SentryFilter:
"""
Chain a new filter to the pipeline.
@ -100,7 +100,7 @@ class Sentry(metaclass=abc.ABCMeta):
else:
raise TypeError("wrench must be either a Wrench or a coroutine function")
def __or__(self, other: t.Callable[[t.Any], t.Awaitable[t.Any]]) -> SentryFilter:
def __or__(self, other: t.WrenchLike) -> SentryFilter:
"""
A unix-pipe-like interface for :meth:`.filter`.
@ -129,13 +129,13 @@ class SentryFilter(Sentry):
A non-root node of the filtering pipeline.
"""
def __init__(self, previous: Sentry, wrench: t.Callable[[t.Any], t.Awaitable[t.Any]]):
def __init__(self, previous: Sentry, wrench: t.WrenchLike):
self.previous: Sentry = previous
"""
The previous node of the pipeline.
"""
self.wrench: t.Callable[[t.Any], t.Awaitable[t.Any]] = wrench
self.wrench: t.WrenchLike = wrench
"""
The coroutine function to apply to all objects passing through this node.
"""

View file

@ -31,3 +31,6 @@ JSON = Union[
A recursive JSON value: either a :data:`.JSONScalar`, or a :class:`list` of :data:`.JSON` objects, or a :class:`dict`
of :class:`str` to :data:`.JSON` mappings.
"""
WrenchLike = Callable[[Any], Awaitable[Any]]