AsyncMultiQueue#

class safir.asyncio.AsyncMultiQueue#

Bases: Generic[T]

An asyncio multiple reader, multiple writer queue.

Provides a generic queue for asyncio that supports multiple readers (via async iterator) and multiple writers. Readers can start reading at any time and will begin reading from the start of the queue. There is no maximum size of the queue; new items will be added subject only to the limits of available memory.

This data structure is not thread-safe. It uses only asyncio locking, not thread-safe locking.

The ellipsis object (...) is used as a placeholder to indicate the end of the queue, so cannot be pushed onto the queue.

Attributes Summary

finished

Whether close has been called on the queue.

Methods Summary

aiter_from(start[, timeout])

Return an async iterator over the queue.

clear()

Empty the contents of the queue.

close()

Mark the end of the queue data.

put(item)

Add an item to the queue.

qsize()

Return the number of items currently in the queue.

Attributes Documentation

finished#

Whether close has been called on the queue.

If this property is True, the contents of the queue are finalized and no new items will be added unless the queue is cleared with clear.

Methods Documentation

aiter_from(start, timeout=None)#

Return an async iterator over the queue.

Each call to this function returns a separate iterator over the same underlying contents, and each iterator will be triggered separately.

Parameters:
  • start (int) – Starting position in the queue. This can be larger than the current queue size, in which case no items are returned until the queue passes the given starting position.

  • timeout (timedelta | None, default: None) – If given, total length of time for the iterator. This isn’t the timeout waiting for the next item; this is the total execution time of the iterator.

Returns:

An async iterator over the contents of the queue.

Return type:

AsyncIterator

Raises:

TimeoutError – Raised when the timeout is reached.

clear()#

Empty the contents of the queue.

Any existing readers will still see all items pushed to the queue before the clear, but will become detached from the queue and will not see any new events added after the clear. :rtype: None

close()#

Mark the end of the queue data.

Similar to clear in that any existing readers of the queue will see the end of the iterator, but the data will not be deleted and new readers can still read all of the data in the queue.

After close is called, clear must be called before any subsequent put. :rtype: None

put(item)#

Add an item to the queue.

Parameters:

item (TypeVar(T)) – Item to add.

Raises:

AsyncMultiQueueError – Raised if put was called after close without an intervening call to clear.

Return type:

None

qsize()#

Return the number of items currently in the queue. :rtype: int