datumaro.util.multi_procs_util#

Functions

consumer_generator(producer_generator[, ...])

Context manager that creates a generator to consume items produced by another generator.

Classes

ProducerMessage(value)

An enumeration.

datumaro.util.multi_procs_util.consumer_generator(producer_generator: Iterator[Item], queue_size: int = 100, enqueue_timeout: float = 5.0, join_timeout: float | None = 10.0) Generator[Iterator[Item], None, None][source]#

Context manager that creates a generator to consume items produced by another generator.

This context manager sets up a producer thread that generates items from the producer_generator and enqueues them to be consumed by the consumer generator, which is also created by this function.

Parameters:
  • producer_generator – A generator that produces items.

  • queue_size – The maximum size of the shared queue between the producer and consumer.

  • enqueue_timeout – The maximum time to wait for enqueuing an item to the queue if it’s full.

  • join_timeout – The maximum time to wait for the producer thread to finish when exiting the context. If None, wait until the producer thread terminates.

Returns:

A context for iterating over the generated items.

Return type:

Iterator

class datumaro.util.multi_procs_util.Condition(lock=None)[source]#

Bases: object

Class that implements a condition variable.

A condition variable allows one or more threads to wait until they are notified by another thread.

If the lock argument is given and not None, it must be a Lock or RLock object, and it is used as the underlying lock. Otherwise, a new RLock object is created and used as the underlying lock.

wait(timeout=None)[source]#

Wait until notified or until a timeout occurs.

If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.

This method releases the underlying lock, and then blocks until it is awakened by a notify() or notify_all() call for the same condition variable in another thread, or until the optional timeout occurs. Once awakened or timed out, it re-acquires the lock and returns.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof).

When the underlying lock is an RLock, it is not released using its release() method, since this may not actually unlock the lock when it was acquired multiple times recursively. Instead, an internal interface of the RLock class is used, which really unlocks it even when it has been recursively acquired several times. Another internal interface is then used to restore the recursion level when the lock is reacquired.

wait_for(predicate, timeout=None)[source]#

Wait until a condition evaluates to True.

predicate should be a callable which result will be interpreted as a boolean value. A timeout may be provided giving the maximum time to wait.

notify(n=1)[source]#

Wake up one or more threads waiting on this condition, if any.

If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.

This method wakes up at most n of the threads waiting for the condition variable; it is a no-op if no threads are waiting.

notify_all()[source]#

Wake up all threads waiting on this condition.

If the calling thread has not acquired the lock when this method is called, a RuntimeError is raised.

notifyAll()[source]#

Wake up all threads waiting on this condition.

This method is deprecated, use notify_all() instead.

exception datumaro.util.multi_procs_util.Full[source]#

Bases: Exception

Exception raised by Queue.put(block=0)/put_nowait().

class datumaro.util.multi_procs_util.IntEnum(value)[source]#

Bases: int, Enum

Enum where members are also (and must be) ints

class datumaro.util.multi_procs_util.ProducerMessage(value)[source]#

Bases: IntEnum

An enumeration.

START = 0#
END = 1#
class datumaro.util.multi_procs_util.Queue(maxsize=0)[source]#

Bases: object

Create a queue object with a given maximum size.

If maxsize is <= 0, the queue size is infinite.

task_done()[source]#

Indicate that a formerly enqueued task is complete.

Used by Queue consumer threads. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join()[source]#

Blocks until all items in the Queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate the item was retrieved and all work on it is complete.

When the count of unfinished tasks drops to zero, join() unblocks.

qsize()[source]#

Return the approximate size of the queue (not reliable!).

empty()[source]#

Return True if the queue is empty, False otherwise (not reliable!).

This method is likely to be removed at some point. Use qsize() == 0 as a direct substitute, but be aware that either approach risks a race condition where a queue can grow before the result of empty() or qsize() can be used.

To create code that needs to wait for all queued tasks to be completed, the preferred technique is to use the join() method.

full()[source]#

Return True if the queue is full, False otherwise (not reliable!).

This method is likely to be removed at some point. Use qsize() >= n as a direct substitute, but be aware that either approach risks a race condition where a queue can shrink before the result of full() or qsize() can be used.

put(item, block=True, timeout=None)[source]#

Put an item into the queue.

If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until a free slot is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Full exception if no free slot was available within that time. Otherwise (‘block’ is false), put an item on the queue if a free slot is immediately available, else raise the Full exception (‘timeout’ is ignored in that case).

get(block=True, timeout=None)[source]#

Remove and return an item from the queue.

If optional args ‘block’ is true and ‘timeout’ is None (the default), block if necessary until an item is available. If ‘timeout’ is a non-negative number, it blocks at most ‘timeout’ seconds and raises the Empty exception if no item was available within that time. Otherwise (‘block’ is false), return an item if one is immediately available, else raise the Empty exception (‘timeout’ is ignored in that case).

put_nowait(item)[source]#

Put an item into the queue without blocking.

Only enqueue the item if a free slot is immediately available. Otherwise raise the Full exception.

get_nowait()[source]#

Remove and return an item from the queue without blocking.

Only get an item if one is immediately available. Otherwise raise the Empty exception.

class datumaro.util.multi_procs_util.Thread(group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)[source]#

Bases: object

A class that represents a thread of control.

This class can be safely subclassed in a limited fashion. There are two ways to specify the activity: by passing a callable object to the constructor, or by overriding the run() method in a subclass.

This constructor should always be called with keyword arguments. Arguments are:

group should be None; reserved for future extension when a ThreadGroup class is implemented.

target is the callable object to be invoked by the run() method. Defaults to None, meaning nothing is called.

name is the thread name. By default, a unique name is constructed of the form “Thread-N” where N is a small decimal number.

args is the argument tuple for the target invocation. Defaults to ().

kwargs is a dictionary of keyword arguments for the target invocation. Defaults to {}.

If a subclass overrides the constructor, it must make sure to invoke the base class constructor (Thread.__init__()) before doing anything else to the thread.

start()[source]#

Start the thread’s activity.

It must be called at most once per thread object. It arranges for the object’s run() method to be invoked in a separate thread of control.

This method will raise a RuntimeError if called more than once on the same thread object.

run()[source]#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

join(timeout=None)[source]#

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

property name#

A string used for identification purposes only.

It has no semantics. Multiple threads may be given the same name. The initial name is set by the constructor.

property ident#

Thread identifier of this thread or None if it has not been started.

This is a nonzero integer. See the get_ident() function. Thread identifiers may be recycled when a thread exits and another thread is created. The identifier is available even after the thread has exited.

property native_id#

Native integral thread ID of this thread, or None if it has not been started.

This is a non-negative integer. See the get_native_id() function. This represents the Thread ID as reported by the kernel.

is_alive()[source]#

Return whether the thread is alive.

This method returns True just before the run() method starts until just after the run() method terminates. See also the module function enumerate().

property daemon#

A boolean value indicating whether this thread is a daemon thread.

This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.

The entire Python program exits when only daemon threads are left.

isDaemon()[source]#

Return whether this thread is a daemon.

This method is deprecated, use the daemon attribute instead.

setDaemon(daemonic)[source]#

Set whether this thread is a daemon.

This method is deprecated, use the .daemon property instead.

getName()[source]#

Return a string used for identification purposes only.

This method is deprecated, use the name attribute instead.

setName(name)[source]#

Set the name string for this thread.

This method is deprecated, use the name attribute instead.

class datumaro.util.multi_procs_util.TypeVar(name, *constraints, bound=None, covariant=False, contravariant=False)[source]#

Bases: _Final, _Immutable, _TypeVarLike

Type variable.

Usage:

T = TypeVar('T')  # Can be anything
A = TypeVar('A', str, bytes)  # Must be str or bytes

Type variables exist primarily for the benefit of static type checkers. They serve as the parameters for generic types as well as for generic function definitions. See class Generic for more information on generic types. Generic functions work as follows:

def repeat(x: T, n: int) -> List[T]:

‘’’Return a list containing n references to x.’’’ return [x]*n

def longest(x: A, y: A) -> A:

‘’’Return the longest of two strings.’’’ return x if len(x) >= len(y) else y

The latter example’s signature is essentially the overloading of (str, str) -> str and (bytes, bytes) -> bytes. Also note that if the arguments are instances of some subclass of str, the return type is still plain str.

At runtime, isinstance(x, T) and issubclass(C, T) will raise TypeError.

Type variables defined with covariant=True or contravariant=True can be used to declare covariant or contravariant generic types. See PEP 484 for more details. By default generic types are invariant in all type variables.

Type variables can be introspected. e.g.:

T.__name__ == ‘T’ T.__constraints__ == () T.__covariant__ == False T.__contravariant__ = False A.__constraints__ == (str, bytes)

Note that only type variables defined in global scope can be pickled.

datumaro.util.multi_procs_util.contextmanager(func)[source]#

@contextmanager decorator.

Typical usage:

@contextmanager def some_generator(<arguments>):

<setup> try:

yield <value>

finally:

<cleanup>

This makes this:

with some_generator(<arguments>) as <variable>:

<body>

equivalent to this:

<setup> try:

<variable> = <value> <body>

finally:

<cleanup>