Using the asyncio multiple-reader queue#
Queue
, provided by the basic library, is an asyncio queue implementation, but the protocol it implements delivers each item in the queue to only one reader.
In some cases, you may need behavior more like a publish/subscribe queue: multiple readers all see the full contents of the queue, independently.
Safir provides the AsyncMultiQueue
data structure for this use case.
Its API is somewhat inspired by that of Queue
, but it is intended for use as an async iterator rather than by calling a get
method.
The writer should use the queue as follows:
from safir.asyncio import AsyncMultiQueue
queue = AsyncMultiQueue[str]()
queue.put("soemthing")
queue.put("else")
queue.close()
Once close
is called, no more data can be added to the queue, and the iterators for all readers will stop when they reach the point where close
was called.
To reset the queue entirely, use clear
.
queue.clear()
This does the same thing as close
for all existing readers, and then empties the queue and resets it so that new data can be added.
New readers will see a fresh, empty queue.
The type information for AsyncMultiQueue
can be any type.
Note that the writer interface is fully synchronous.
A typical reader looks like this:
async for item in queue:
await do_something(item)
This iterates over the full contents of the queue until close
or clear
is called by the writer.
Readers can also start at any position and specify a timeout. The timeout, if given, is the total length of time the iterator is allowed to run, not the time to wait for the next element.
from datetime import timedelta
timeout = timedelta(seconds=5)
async for item in queue.aiter_from(4, timeout):
await do_something(item)
This reader will ignore all elements until the fourth, and will raise TimeoutError
after five seconds of total time in the iterator.