1
Fork 0
mirror of https://github.com/RYGhub/royalnet.git synced 2024-11-23 03:24:20 +00:00

Merge pull request #1 from Steffo99/filters

💥 Implement engineers.sentry.filters
This commit is contained in:
Steffo 2020-12-19 11:17:12 +01:00 committed by GitHub
commit 116d8217f5
Signed by: github
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 459 additions and 106 deletions

View file

@ -3,7 +3,7 @@
<module name="royalnet-6" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<option name="SDK_HOME" value="" />
<option name="SDK_HOME" value="$USER_HOME$/.cache/pypoetry/virtualenvs/royalnet-1-y9ycdx-py3.9/bin/python" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />

14
poetry.lock generated
View file

@ -6,6 +6,14 @@ category = "dev"
optional = false
python-versions = "*"
[[package]]
name = "async-timeout"
version = "3.0.1"
description = "Timeout context manager for asyncio programs"
category = "dev"
optional = false
python-versions = ">=3.5.3"
[[package]]
name = "atomicwrites"
version = "1.4.0"
@ -409,13 +417,17 @@ socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7,<2.0)"]
[metadata]
lock-version = "1.0"
python-versions = "^3.8"
content-hash = "4363aefc0ea9322445ee375edce430712159829b5ca40561eea7cd74bbf0a7bd"
content-hash = "e53fe1488633a7fa98cd473258ed6501d33201c07c0169b0a13a5f8f38149ac4"
[metadata.files]
alabaster = [
{file = "alabaster-0.7.12-py2.py3-none-any.whl", hash = "sha256:446438bdcca0e05bd45ea2de1668c1d9b032e1a9154c2c259092d77031ddd359"},
{file = "alabaster-0.7.12.tar.gz", hash = "sha256:a661d72d58e6ea8a57f7a86e37d86716863ee5e92788398526d58b26a4e4dc02"},
]
async-timeout = [
{file = "async-timeout-3.0.1.tar.gz", hash = "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f"},
{file = "async_timeout-3.0.1-py3-none-any.whl", hash = "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3"},
]
atomicwrites = [
{file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"},
{file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"},

View file

@ -16,6 +16,7 @@ pytest = "^6.1.1"
pytest-asyncio = "^0.14.0"
sphinx = "^3.3.1"
sphinx_rtd_theme = "^0.5.0"
async-timeout = "^3.0.1"
[build-system]
requires = ["poetry>=0.12"]

View file

@ -42,12 +42,11 @@ class Blueprint(metaclass=abc.ABCMeta):
"""
@abc.abstractmethod
def __init__(self):
"""
:return: The created object.
"""
raise NotImplementedError()
pass
@abc.abstractmethod
def __hash__(self):
@ -56,7 +55,7 @@ class Blueprint(metaclass=abc.ABCMeta):
"""
raise NotImplementedError()
def requires(self, *fields) -> None:
def requires(self, *fields) -> True:
"""
Ensure that this blueprint has the specified fields, re-raising the highest priority exception raised between
all of them.
@ -64,7 +63,7 @@ class Blueprint(metaclass=abc.ABCMeta):
.. code-block::
def print_msg(message: Message):
message.requires(Message.text, Message.timestamp)
message.requires("text", "timestamp")
print(f"{message.timestamp().isoformat()}: {message.text()}")
:raises .exc.NeverAvailableError: If at least one of the fields raised a :exc:`.exc.NeverAvailableError`.
@ -76,7 +75,7 @@ class Blueprint(metaclass=abc.ABCMeta):
for field in fields:
try:
field(self)
self.__getattribute__(field)()
except exc.NeverAvailableError as ex:
exceptions.append(ex)
except exc.NotAvailableError as ex:
@ -85,6 +84,8 @@ class Blueprint(metaclass=abc.ABCMeta):
if len(exceptions) > 0:
raise max(exceptions, key=lambda e: e.priority)
return True
__all__ = (
"Blueprint",

View file

@ -8,7 +8,7 @@ from royalnet.royaltyping import *
import functools
import logging
from engineer import exc, blueprints
from .. import exc, blueprints
log = logging.getLogger(__name__)
@ -25,132 +25,220 @@ class Filter:
"""
Wait until an :class:`object` leaves the queue and passes through the filter, then return it.
:return: The :class:`object` which entered the queue.
:return: The :class:`object` which left the queue.
"""
while True:
try:
result = await self.func(None)
except exc.Discard as e:
log.debug(str(e))
return await self.get_single()
except exc.Discard:
continue
else:
log.debug(f"Dequeued {result}")
return result
async def get_single(self) -> Any:
"""
Let one :class:`object` pass through the filter, then either return it or raise an error if the object should be
discarded.
:return: The :class:`object` which left the queue.
:raises exc.Discard: If the object was filtered.
"""
try:
result = await self.func(None)
except exc.Discard as e:
log.debug(str(e))
raise
else:
log.debug(f"Dequeued {result}")
return result
@staticmethod
def _deco_type(t: type):
def _deco_filter(c: Callable[[Any], bool], *, error: str):
"""
A decorator which checks the condition ``c`` on all objects transiting through the queue:
- If the check **passes**, the object itself is returned;
- If the check **fails**, :exc:`.exc.Discard` is raised, with the object and the ``error`` string as parameters;
- If an error is raised, propagate the error upwards.
.. warning:: Raising :exc:`.exc.Discard` in ``c`` will automatically cause the object to be discarded, as if
:data:`False` was returned.
:param c: A function that takes in input an enqueued object and returns either the same object or a new one to
pass to the next filter in the queue.
:param error: The string that :exc:`.exc.Discard` should display if the object is discarded.
"""
def decorator(func):
@functools.wraps(func)
def decorated(obj):
result: Any = func(obj)
if not isinstance(result, t):
raise exc.Discard(result, f"Not instance of type {t}")
return result
async def decorated(obj):
result: Any = await func(obj)
if c(result):
return result
else:
raise exc.Discard(obj=result, message=error)
return decorated
return decorator
def filter(self, c: Callable[[Any], bool], error: str) -> Filter:
"""
Check the condition ``c`` on all objects transiting through the queue:
- If the check **passes**, the object goes on to the next filter;
- If the check **fails**, the object is discarded, with ``error`` as reason;
- If an error is raised, propagate the error upwards.
.. seealso:: :meth:`._deco_filter`, :func:`filter`
:param c: A function that takes in input an object and performs a check on it, returning either :data:`True`
or :data:`False`.
:param error: The reason for which objects should be discarded.
:return: A new :class:`Filter` with this new condition.
"""
return self.__class__(self._deco_filter(c, error=error)(self.func))
@staticmethod
def _deco_map(c: Callable[[Any], object]):
"""
A decorator which applies the function ``c`` on all objects transiting through the queue:
- If the function **returns**, return its return value;
- If the function **raises** an error, it is propagated upwards.
.. seealso:: :func:`map`
:param c: A function that takes in input an enqueued object and returns either the same object or something
else.
"""
def decorator(func):
@functools.wraps(func)
async def decorated(obj):
result: Any = await func(obj)
return c(result)
return decorated
return decorator
def map(self, c: Callable[[Any], object]) -> Filter:
"""
Apply the function ``c`` on all objects transiting through the queue:
- If the function **returns**, its return value replaces the object in the queue;
- If the function **raises** :exc:`.exc.Discard`, the object is discarded;
- If the function **raises another error**, propagate the error upwards.
.. seealso:: :meth:`._deco_map`, :func:`filter`
:param c: A function that takes in input an enqueued object and returns either the same object or something
else.
:return: A new :class:`Filter` with this new condition.
"""
return self.__class__(self._deco_map(c)(self.func))
def type(self, t: type) -> Filter:
"""
:exc:`exc.Discard` all objects that are not an instance of ``t``.
Check if an object passing through the queue :func:`isinstance` of the type ``t``.
:param t: The type that objects should be instances of.
:return: A new :class:`Filter` with the new requirements.
:return: A new :class:`Filter` with this new condition.
"""
return self.__class__(self._deco_type(t)(self.func))
return self.filter(lambda o: isinstance(o, t), error=f"Not instance of type {t}")
def msg(self) -> Filter:
"""
:exc:`exc.Discard` all objects that are not an instance of :class:`.blueprints.Message`.
Check if an object passing through the queue :func:`isinstance` of :class:`.blueprints.Message`.
:return: A new :class:`Filter` with the new requirements.
:return: A new :class:`Filter` with this new condition.
"""
return self.__class__(self._deco_type(blueprints.Message)(self.func))
return self.type(blueprints.Message)
@staticmethod
def _deco_requires(*fields):
def decorator(func):
@functools.wraps(func)
def decorated(obj):
result: blueprints.Blueprint = func(obj)
try:
result.requires(*fields)
except exc.NotAvailableError:
raise exc.Discard(result, "Missing data")
except AttributeError:
raise exc.Discard(result, "Missing .requires() method")
return result
return decorated
return decorator
def requires(self, *fields) -> Filter:
def requires(self, *fields,
propagate_not_available=False,
propagate_never_available=True) -> Filter:
"""
Test an object's fields by using its ``.requires()`` method (expecting it to be
:meth:`.blueprints.Blueprint.requires`) and discard everything that does not pass the check.
Test a :class:`.blueprints.Blueprint`'s fields by using its ``.requires()`` method:
- If the :class:`.blueprints.Blueprint` has the appropriate fields, return it;
- If the :class:`.blueprints.Blueprint` doesn't have data for at least one of the fields, the object is
discarded;
- the :class:`.blueprints.Blueprint` never has data for at least one of the fields,
:exc:`.exc.NotAvailableError` is propagated upwards.
.. seealso:: :meth:`.blueprints.Blueprint.requires`, :meth:`.filter`
:param fields: The fields to test for.
:param propagate_not_available: If :exc:`.exc.NotAvailableError` should be propagated
instead of discarding the errored object.
:param propagate_never_available: If :exc:`.exc.NeverAvailableError` should be propagated
instead of discarding the errored object.
:return: A new :class:`Filter` with this new condition.
"""
def check(obj):
try:
return obj.requires(*fields)
except exc.NotAvailableError:
if propagate_not_available:
raise
raise exc.Discard(obj, "Data is not available")
except exc.NeverAvailableError:
if propagate_never_available:
raise
raise exc.Discard(obj, "Data is never available")
return self.filter(check, error=".requires() method returned False")
def field(self, field: str,
propagate_not_available=False,
propagate_never_available=True) -> Filter:
"""
Replace a :class:`.blueprints.Blueprint` with the value of one of its fields.
.. seealso:: :meth:`.map`
:param field: The field to access.
:param propagate_not_available: If :exc:`.exc.NotAvailableError` should be propagated
instead of discarding the errored object.
:param propagate_never_available: If :exc:`.exc.NeverAvailableError` should be propagated
instead of discarding the errored object.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_requires(*fields)(self.func))
def replace(obj):
try:
return obj.__getattribute__(field)()
except exc.NotAvailableError:
if propagate_not_available:
raise
raise exc.Discard(obj, "Data is not available")
except exc.NeverAvailableError:
if propagate_never_available:
raise
raise exc.Discard(obj, "Data is never available")
@staticmethod
def _deco_text():
def decorator(func):
@functools.wraps(func)
def decorated(obj):
result: blueprints.Message = func(obj)
try:
text = result.text()
except exc.NotAvailableError:
raise exc.Discard(result, "No text")
except AttributeError:
raise exc.Discard(result, "Missing text method")
return text
return decorated
return decorator
return self.map(replace)
def text(self) -> Filter:
def startswith(self, prefix: str):
"""
Get the text of the passed object by using its ``.text()`` method (expecting it to be
:meth:`.blueprints.Message.text`), while discarding all objects that don't have a text.
Check if an object starts with the specified prefix and discard the objects that do not.
:param prefix: The prefix object should start with.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_text()(self.func))
return self.filter(lambda x: x.startswith(prefix), error=f"Text didn't start with {prefix}")
@staticmethod
def _deco_regex(pattern: Pattern):
def decorator(func):
@functools.wraps(func)
def decorated(obj):
result: str = func(obj)
if match := pattern.match(result):
return match
else:
raise exc.Discard(result, f"Text didn't match pattern {pattern}")
return decorated
return decorator
def endswith(self, suffix: str):
"""
Check if an object ends with the specified suffix and discard the objects that do not.
:param suffix: The prefix object should start with.
:return: A new :class:`Filter` with the new requirements.
"""
return self.filter(lambda x: x.endswith(suffix), error=f"Text didn't end with {suffix}")
def regex(self, pattern: Pattern):
"""
Apply a regex over an object's text (obtained through its ``.text()`` method, expecting it to be
:meth:`.blueprints.Message.text`) and discard the object if it does not match.
Apply a regex over an object and discard the object if it does not match.
:param pattern: The pattern that should be matched by the text.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_regex(pattern)(self.func))
def mapping(x):
if match := pattern.match(x):
return match
else:
raise exc.Discard(x, f"Text didn't match pattern {pattern}")
@staticmethod
def _deco_choices(*choices):
def decorator(func):
@functools.wraps(func)
def decorated(obj: blueprints.Message):
result = func(obj)
if result not in choices:
raise exc.Discard(result, "Not a valid choice")
return result
return decorated
return decorator
return self.map(mapping)
def choices(self, *choices):
"""
@ -159,7 +247,7 @@ class Filter:
:param choices: The pattern that should be matched by the text.
:return: A new :class:`Filter` with the new requirements.
"""
return self.__class__(self._deco_choices(*choices)(self.func))
return self.filter(lambda o: o in choices, error="Not a valid choice")
__all__ = (

View file

@ -18,32 +18,39 @@ class Sentry:
The size of the object :attr:`.queue`.
"""
def __init__(self):
self.queue: asyncio.Queue = asyncio.Queue(maxsize=12)
def __init__(self, filter_type: Type[Filter] = Filter):
self.queue: asyncio.Queue = asyncio.Queue(maxsize=self.QUEUE_SIZE)
"""
An object queue where incoming :class:`object` are stored.
An object queue where incoming :class:`object` are stored, with a size limit of :attr:`.QUEUE_SIZE`.
"""
self.filter_type: Type[Filter] = filter_type
"""
The filter to be used in :meth:`.f` calls, by default :class:`.filters.Filter`.
"""
def __repr__(self):
return f"<Sentry>"
async def get(self, *_, **__) -> Any:
def f(self):
"""
Wait until an :class:`object` leaves the queue, then return it.
Create a :attr:`.filter_type` object, which can be configured through its fluent interface.
:return: The :class:`object` which entered the queue.
"""
return await self.queue.get()
Remember to call ``.get()`` on the end of the chain to finally get the object.
async def filter(self):
"""
Create a :class:`.filters.Filter` object, which can be configured through its fluent interface.
To get any object, call:
Remember to call ``.get()`` on the end of the chain.
.. code-block::
await sentry.f().get()
.. seealso:: :class:`.filters.Filter`
:return: The created :class:`.filters.Filter`.
"""
return Filter(self.get)
async def func(_):
return await self.queue.get()
return self.filter_type(func)
__all__ = (

View file

@ -0,0 +1,230 @@
import pytest
import asyncio
import async_timeout
import re
from royalnet.engineer import sentry, exc, blueprints
@pytest.fixture
def s() -> sentry.Sentry:
return sentry.Sentry()
class TestSentry:
def test_creation(self, s: sentry.Sentry):
assert s
assert isinstance(s, sentry.Sentry)
@pytest.mark.asyncio
async def test_put(self, s: sentry.Sentry):
await s.queue.put(None)
@pytest.mark.asyncio
async def test_get(self, s: sentry.Sentry):
await s.queue.put(None)
assert await s.queue.get() is None
@pytest.mark.asyncio
async def test_f(self, s: sentry.Sentry):
await s.queue.put(None)
f = s.f()
assert f
assert isinstance(f, sentry.Filter)
assert hasattr(f, "get")
@pytest.fixture
def discarding_filter() -> sentry.Filter:
async def discard(_):
raise exc.Discard(None, "This filter discards everything!")
return sentry.Filter(discard)
class ErrorTest(Exception):
pass
def error_test(*_, **__):
raise ErrorTest("This was raised by error_raiser.")
class TestFilter:
def test_creation(self):
f = sentry.Filter(lambda _: _)
assert f
assert isinstance(f, sentry.Filter)
class TestGetSingle:
@pytest.mark.asyncio
async def test_success(self, s: sentry.Sentry):
await s.queue.put(None)
assert await s.f().get_single() is None
@pytest.mark.asyncio
async def test_failure(self, discarding_filter: sentry.Filter):
with pytest.raises(exc.Discard):
await discarding_filter.get_single()
class TestGet:
@pytest.mark.asyncio
async def test_success(self, s: sentry.Sentry):
await s.queue.put(None)
assert await s.f().get() is None
@pytest.mark.asyncio
async def test_timeout(self, s: sentry.Sentry):
with pytest.raises(asyncio.TimeoutError):
async with async_timeout.timeout(0.001):
await s.f().get()
@pytest.mark.asyncio
async def test_filter(self, s: sentry.Sentry):
await s.queue.put(None)
await s.queue.put(None)
await s.queue.put(None)
assert await s.f().filter(lambda x: x is None, "Is not None").get_single() is None
with pytest.raises(exc.Discard):
await s.f().filter(lambda x: isinstance(x, type), error="Is not type").get_single()
with pytest.raises(ErrorTest):
await s.f().filter(error_test, error="Is error").get_single()
@pytest.mark.asyncio
async def test_map(self, s: sentry.Sentry):
await s.queue.put(None)
await s.queue.put(None)
assert await s.f().map(lambda x: 1).get_single() == 1
with pytest.raises(ErrorTest):
await s.f().map(error_test).get_single()
@pytest.mark.asyncio
async def test_type(self, s: sentry.Sentry):
await s.queue.put(1)
await s.queue.put("no")
assert await s.f().type(int).get_single() == 1
with pytest.raises(exc.Discard):
await s.f().type(int).get_single()
@pytest.mark.asyncio
async def test_msg(self, s: sentry.Sentry):
class ExampleMessage(blueprints.Message):
def __hash__(self):
return 1
msg = ExampleMessage()
await s.queue.put(msg)
await s.queue.put("no")
assert await s.f().msg().get_single() is msg
with pytest.raises(exc.Discard):
await s.f().msg().get_single()
class AvailableMessage(blueprints.Message):
def __hash__(self):
return 1
def text(self) -> str:
return "1"
class NotAvailableMessage(blueprints.Message):
def __hash__(self):
return 2
def text(self) -> str:
raise exc.NotAvailableError()
class NeverAvailableMessage(blueprints.Message):
def __hash__(self):
return 3
@pytest.mark.asyncio
async def test_requires(self, s: sentry.Sentry):
avmsg = self.AvailableMessage()
await s.queue.put(avmsg)
assert await s.f().requires("text").get_single() is avmsg
await s.queue.put(self.NotAvailableMessage())
with pytest.raises(exc.Discard):
await s.f().requires("text").get_single()
await s.queue.put(self.NeverAvailableMessage())
with pytest.raises(exc.NeverAvailableError):
await s.f().requires("text").get_single()
await s.queue.put(self.NotAvailableMessage())
with pytest.raises(exc.NotAvailableError):
await s.f().requires("text", propagate_not_available=True).get_single()
await s.queue.put(self.NeverAvailableMessage())
with pytest.raises(exc.Discard):
await s.f().requires("text", propagate_never_available=False).get_single()
@pytest.mark.asyncio
async def test_field(self, s: sentry.Sentry):
avmsg = self.AvailableMessage()
await s.queue.put(avmsg)
assert await s.f().field("text").get_single() == "1"
await s.queue.put(self.NotAvailableMessage())
with pytest.raises(exc.Discard):
await s.f().field("text").get_single()
await s.queue.put(self.NeverAvailableMessage())
with pytest.raises(exc.NeverAvailableError):
await s.f().field("text").get_single()
await s.queue.put(self.NotAvailableMessage())
with pytest.raises(exc.NotAvailableError):
await s.f().field("text", propagate_not_available=True).get_single()
await s.queue.put(self.NeverAvailableMessage())
with pytest.raises(exc.Discard):
await s.f().field("text", propagate_never_available=False).get_single()
@pytest.mark.asyncio
async def test_startswith(self, s: sentry.Sentry):
await s.queue.put("yarrharr")
await s.queue.put("yohoho")
assert await s.f().startswith("yarr").get_single() == "yarrharr"
with pytest.raises(exc.Discard):
await s.f().startswith("yarr").get_single()
@pytest.mark.asyncio
async def test_endswith(self, s: sentry.Sentry):
await s.queue.put("yarrharr")
await s.queue.put("yohoho")
assert await s.f().endswith("harr").get_single() == "yarrharr"
with pytest.raises(exc.Discard):
await s.f().endswith("harr").get_single()
@pytest.mark.asyncio
async def test_regex(self, s: sentry.Sentry):
await s.queue.put("yarrharr")
await s.queue.put("yohoho")
assert isinstance(await s.f().regex(re.compile(r"[yh]arr")).get_single(), re.Match)
with pytest.raises(exc.Discard):
await s.f().regex(re.compile(r"[yh]arr")).get_single()
@pytest.mark.asyncio
async def test_choices(self, s: sentry.Sentry):
await s.queue.put("yarrharr")
await s.queue.put("yohoho")
assert await s.f().choices("yarrharr", "banana").get_single() == "yarrharr"
with pytest.raises(exc.Discard):
await s.f().choices("yarrharr", "banana").get_single()

View file

@ -0,0 +1,14 @@
import asyncio
import pytest
async def sleep_and_raise():
await asyncio.sleep(0.001)
raise Exception("Please except this gift!")
@pytest.mark.asyncio
class TestAsyncio:
async def test_exception(self):
with pytest.raises(Exception):
await sleep_and_raise()