diff --git a/.run/pytest for royalnet.run.xml b/.run/pytest for royalnet.run.xml
index a2499cd5..897635ba 100644
--- a/.run/pytest for royalnet.run.xml
+++ b/.run/pytest for royalnet.run.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/poetry.lock b/poetry.lock
index 0ac43528..0ecee720 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -6,6 +6,14 @@ category = "dev"
optional = false
python-versions = "*"
+[[package]]
+name = "async-timeout"
+version = "3.0.1"
+description = "Timeout context manager for asyncio programs"
+category = "dev"
+optional = false
+python-versions = ">=3.5.3"
+
[[package]]
name = "atomicwrites"
version = "1.4.0"
@@ -409,13 +417,17 @@ socks = ["PySocks (>=1.5.6,<1.5.7 || >1.5.7,<2.0)"]
[metadata]
lock-version = "1.0"
python-versions = "^3.8"
-content-hash = "4363aefc0ea9322445ee375edce430712159829b5ca40561eea7cd74bbf0a7bd"
+content-hash = "e53fe1488633a7fa98cd473258ed6501d33201c07c0169b0a13a5f8f38149ac4"
[metadata.files]
alabaster = [
{file = "alabaster-0.7.12-py2.py3-none-any.whl", hash = "sha256:446438bdcca0e05bd45ea2de1668c1d9b032e1a9154c2c259092d77031ddd359"},
{file = "alabaster-0.7.12.tar.gz", hash = "sha256:a661d72d58e6ea8a57f7a86e37d86716863ee5e92788398526d58b26a4e4dc02"},
]
+async-timeout = [
+ {file = "async-timeout-3.0.1.tar.gz", hash = "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f"},
+ {file = "async_timeout-3.0.1-py3-none-any.whl", hash = "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3"},
+]
atomicwrites = [
{file = "atomicwrites-1.4.0-py2.py3-none-any.whl", hash = "sha256:6d1784dea7c0c8d4a5172b6c620f40b6e4cbfdf96d783691f2e1302a7b88e197"},
{file = "atomicwrites-1.4.0.tar.gz", hash = "sha256:ae70396ad1a434f9c7046fd2dd196fc04b12f9e91ffb859164193be8b6168a7a"},
diff --git a/pyproject.toml b/pyproject.toml
index 2df57225..817777a4 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -16,6 +16,7 @@ pytest = "^6.1.1"
pytest-asyncio = "^0.14.0"
sphinx = "^3.3.1"
sphinx_rtd_theme = "^0.5.0"
+async-timeout = "^3.0.1"
[build-system]
requires = ["poetry>=0.12"]
diff --git a/royalnet/engineer/sentry/filter.py b/royalnet/engineer/sentry/filter.py
index b52d6ac0..cb01cb61 100644
--- a/royalnet/engineer/sentry/filter.py
+++ b/royalnet/engineer/sentry/filter.py
@@ -25,17 +25,30 @@ class Filter:
"""
Wait until an :class:`object` leaves the queue and passes through the filter, then return it.
- :return: The :class:`object` which entered the queue.
+ :return: The :class:`object` which left the queue.
"""
while True:
try:
- result = await self.func(None)
- except exc.Discard as e:
- log.debug(str(e))
+ return await self.get_single()
+ except exc.Discard:
continue
- else:
- log.debug(f"Dequeued {result}")
- return result
+
+ async def get_single(self) -> Any:
+ """
+ Let one :class:`object` pass through the filter, then either return it or raise an error if the object should be
+ discarded.
+
+ :return: The :class:`object` which left the queue.
+ :raises exc.Discard: If the object was filtered.
+ """
+ try:
+ result = await self.func(None)
+ except exc.Discard as e:
+ log.debug(str(e))
+ raise
+ else:
+ log.debug(f"Dequeued {result}")
+ return result
@staticmethod
def _deco_filter(c: Callable[[Any], bool], *, error: str):
@@ -221,7 +234,7 @@ class Filter:
:param choices: The pattern that should be matched by the text.
:return: A new :class:`Filter` with the new requirements.
"""
- return self.__class__(self._deco_check(lambda o: o in choices, error="Not a valid choice")(self.func))
+ return self.filter(lambda o: o in choices, error="Not a valid choice")
__all__ = (
diff --git a/royalnet/engineer/sentry/sentry.py b/royalnet/engineer/sentry/sentry.py
index 3b1b6514..6d891158 100644
--- a/royalnet/engineer/sentry/sentry.py
+++ b/royalnet/engineer/sentry/sentry.py
@@ -18,18 +18,23 @@ class Sentry:
The size of the object :attr:`.queue`.
"""
- def __init__(self):
- self.queue: asyncio.Queue = asyncio.Queue(maxsize=12)
+ def __init__(self, filter_type: Type[Filter] = Filter):
+ self.queue: asyncio.Queue = asyncio.Queue(maxsize=self.QUEUE_SIZE)
"""
- An object queue where incoming :class:`object` are stored.
+ An object queue where incoming :class:`object` are stored, with a size limit of :attr:`.QUEUE_SIZE`.
+ """
+
+ self.filter_type: Type[Filter] = filter_type
+ """
+ The filter to be used in :meth:`.f` calls, by default :class:`.filters.Filter`.
"""
def __repr__(self):
return f""
- async def f(self):
+ def f(self):
"""
- Create a :class:`.filters.Filter` object, which can be configured through its fluent interface.
+ Create a :attr:`.filter_type` object, which can be configured through its fluent interface.
Remember to call ``.get()`` on the end of the chain to finally get the object.
@@ -43,7 +48,9 @@ class Sentry:
:return: The created :class:`.filters.Filter`.
"""
- return Filter(self.queue.get)
+ async def func(_):
+ return await self.queue.get()
+ return self.filter_type(func)
__all__ = (
diff --git a/royalnet/engineer/tests/test_sentry.py b/royalnet/engineer/tests/test_sentry.py
new file mode 100644
index 00000000..e6038be1
--- /dev/null
+++ b/royalnet/engineer/tests/test_sentry.py
@@ -0,0 +1,70 @@
+import pytest
+import asyncio
+import async_timeout
+from .. import sentry, exc
+
+
+@pytest.fixture
+def s() -> sentry.Sentry:
+ return sentry.Sentry()
+
+
+class TestSentry:
+ def test_creation(self, s: sentry.Sentry):
+ assert s
+ assert isinstance(s, sentry.Sentry)
+
+ @pytest.mark.asyncio
+ async def test_put(self, s: sentry.Sentry):
+ await s.queue.put(None)
+
+ @pytest.mark.asyncio
+ async def test_get(self, s: sentry.Sentry):
+ await s.queue.put(None)
+ assert await s.queue.get() is None
+
+ @pytest.mark.asyncio
+ async def test_f(self, s: sentry.Sentry):
+ await s.queue.put(None)
+ f = s.f()
+ assert f
+ assert isinstance(f, sentry.Filter)
+ assert hasattr(f, "get")
+
+
+@pytest.fixture
+def discarding_filter() -> sentry.Filter:
+ async def discard(_):
+ raise exc.Discard(None, "This filter discards everything!")
+
+ return sentry.Filter(discard)
+
+
+class TestFilter:
+ def test_creation(self):
+ f = sentry.Filter(lambda _: _)
+ assert f
+ assert isinstance(f, sentry.Filter)
+
+ class TestGetSingle:
+ @pytest.mark.asyncio
+ async def test_success(self, s: sentry.Sentry):
+ await s.queue.put(None)
+ assert await s.f().get_single() is None
+
+ @pytest.mark.asyncio
+ async def test_failure(self, discarding_filter: sentry.Filter):
+ with pytest.raises(exc.Discard):
+ await discarding_filter.get_single()
+
+ class TestGet:
+ @pytest.mark.asyncio
+ async def test_success(self, s: sentry.Sentry):
+ await s.queue.put(None)
+ assert await s.f().get() is None
+
+ @pytest.mark.asyncio
+ async def test_timeout(self, s: sentry.Sentry):
+ with pytest.raises(asyncio.TimeoutError):
+ async with async_timeout.timeout(0.05):
+ await s.f().get()