1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-22 19:14:20 +00:00

Implement filtering sentry-queue

This commit is contained in:
Steffo 2020-12-13 01:16:08 +01:00
parent 4f84746736
commit 0a7607c3c3
6 changed files with 254 additions and 21 deletions

View file

@ -21,6 +21,14 @@
----------------------------------------------
.. automodule:: royalnet.engineer.sentry
:imported-members:
``filter`` - Fluent filtering asyncio queue
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. automodule:: royalnet.engineer.sentry.filter
:imported-members:
``dispenser`` - Function parameter validation

View file

@ -46,3 +46,30 @@ class OutTeleporterError(TeleporterError):
"""
The return value validation failed.
"""
class SentryError(EngineerException):
"""
An error related to the :mod:`royalnet.engineer.sentry`.
"""
class FilterError(SentryError):
"""
An error related to the :class:`royalnet.engineer.sentry.Filter`.
"""
class Discard(FilterError):
"""
Discard the object from the queue.
"""
def __init__(self, obj, message):
self.obj = obj
self.message = message
def __repr__(self):
return f"<Discard>"
def __str__(self):
return f"Discarded {self.obj}: {self.message}"

View file

@ -1,21 +0,0 @@
from royalnet.royaltyping import *
import logging
import asyncio
log = logging.getLogger(__name__)
class Sentry:
"""
A class that allows using the ``await`` keyword to suspend a command execution until a new message is received.
"""
def __init__(self):
self.queue = asyncio.queues.Queue()
def __repr__(self):
return f"<Sentry, {self.queue.qsize()} items queued>"
async def wait_for_item(self) -> Any:
log.debug("Waiting for an item...")
return await self.queue.get()

View file

@ -0,0 +1 @@
from .sentry import *

View file

@ -0,0 +1,167 @@
"""
.. note:: I'm not sure about this module. It doesn't seem to be really pythonic. It will probably be deprecated in the
future...
"""
from __future__ import annotations
from royalnet.royaltyping import *
import functools
import logging
from engineer import exc, blueprints
log = logging.getLogger(__name__)
class Filter:
"""
A fluent interface for filtering data.
"""
def __init__(self, func: Callable):
self.func: Callable = func
async def get(self) -> Any:
"""
Wait until an :class:`object` leaves the queue and passes through the filter, then return it.
:return: The :class:`object` which entered the queue.
"""
while True:
try:
result = await self.func(None)
except exc.Discard as e:
log.debug(str(e))
continue
else:
log.debug(f"Dequeued {result}")
return result
@staticmethod
def _deco_type(t: type):
def decorator(func):
@functools.wraps(func)
def decorated(obj):
result: Any = func(obj)
if not isinstance(result, t):
raise exc.Discard(result, f"Not instance of type {t}")
return result
return decorated
return decorator
def type(self, t: type) -> Filter:
"""
:exc:`exc.Discard` all objects that are not an instance of ``t``.
:param t: The type that objects should be instances of.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_type(t)(self.func))
def msg(self) -> Filter:
"""
:exc:`exc.Discard` all objects that are not an instance of :class:`.blueprints.Message`.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_type(blueprints.Message)(self.func))
@staticmethod
def _deco_requires(*fields):
def decorator(func):
@functools.wraps(func)
def decorated(obj):
result: blueprints.Blueprint = func(obj)
try:
result.requires(*fields)
except exc.NotAvailableError:
raise exc.Discard(result, "Missing data")
except AttributeError:
raise exc.Discard(result, "Missing .requires() method")
return result
return decorated
return decorator
def requires(self, *fields) -> Filter:
"""
Test an object's fields by using its ``.requires()`` method (expecting it to be
:meth:`.blueprints.Blueprint.requires`) and discard everything that does not pass the check.
:param fields: The fields to test for.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_requires(*fields)(self.func))
@staticmethod
def _deco_text():
def decorator(func):
@functools.wraps(func)
def decorated(obj):
result: blueprints.Message = func(obj)
try:
text = result.text()
except exc.NotAvailableError:
raise exc.Discard(result, "No text")
except AttributeError:
raise exc.Discard(result, "Missing text method")
return text
return decorated
return decorator
def text(self) -> Filter:
"""
Get the text of the passed object by using its ``.text()`` method (expecting it to be
:meth:`.blueprints.Message.text`), while discarding all objects that don't have a text.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_text()(self.func))
@staticmethod
def _deco_regex(pattern: Pattern):
def decorator(func):
@functools.wraps(func)
def decorated(obj):
result: str = func(obj)
if match := pattern.match(result):
return match
else:
raise exc.Discard(result, f"Text didn't match pattern {pattern}")
return decorated
return decorator
def regex(self, pattern: Pattern):
"""
Apply a regex over an object's text (obtained through its ``.text()`` method, expecting it to be
:meth:`.blueprints.Message.text`) and discard the object if it does not match.
:param pattern: The pattern that should be matched by the text.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_regex(pattern)(self.func))
@staticmethod
def _deco_choices(*choices):
def decorator(func):
@functools.wraps(func)
def decorated(obj: blueprints.Message):
result = func(obj)
if result not in choices:
raise exc.Discard(result, "Not a valid choice")
return result
return decorated
return decorator
def choices(self, *choices):
"""
Ensure an object is in the ``choices`` list, discarding the object otherwise.
:param choices: The pattern that should be matched by the text.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_choices(*choices)(self.func))
__all__ = (
"Filter",
)

View file

@ -0,0 +1,51 @@
from __future__ import annotations
from royalnet.royaltyping import *
import logging
import asyncio
from .filter import Filter
log = logging.getLogger(__name__)
class Sentry:
"""
A class that allows using the ``await`` keyword to suspend a command execution until a new message is received.
"""
QUEUE_SIZE = 12
"""
The size of the object :attr:`.queue`.
"""
def __init__(self):
self.queue: asyncio.Queue = asyncio.Queue(maxsize=12)
"""
An object queue where incoming :class:`object` are stored.
"""
def __repr__(self):
return f"<Sentry>"
async def get(self, *_, **__) -> Any:
"""
Wait until an :class:`object` leaves the queue, then return it.
:return: The :class:`object` which entered the queue.
"""
return await self.queue.get()
async def filter(self):
"""
Create a :class:`.filters.Filter` object, which can be configured through its fluent interface.
Remember to call ``.get()`` on the end of the chain.
:return: The created :class:`.filters.Filter`.
"""
return Filter(self.get)
__all__ = (
"Sentry",
)