undr.task#

Multi-process task management.

Overview#

Classes#

Chain

A sequence of tasks that must run sequentially.

CloseRequest

Special task used to request a worker thread shutdown.

Manager

Schedules and keeps track of tasks.

NullManager

Manager placeholder that ignores messages and raises errors if one attemps to use it to schedule more tasks.

ProcessManager

Implements a manager that controls a pool of worker processes.

Task

A processing task to be performed by a worker.

Functions#

receive_bytes

Reads TCP bytes until enough are received to generate a full a type and a raw message.

receive_message

Reads TCP bytes until enough are received to generate a full a type and message.

receive_type

Reads TCP bytes bytes until an acknowledge type is received.

send_bytes

Packs and sends the bytes of a type and message.

send_message

Packs and sends the bytes of a type and an unencoded message.

send_type

Packs and sends the bytes of a type that does not require a message.

Module Contents#

class undr.task.Chain(tasks: Sequence[Task])#

Bases: Task

A sequence of tasks that must run sequentially.

Parameters:

Task (Sequence[Task]) – The list of tasks to run sequentially in the given order.

run(session: requests.Session, manager: Manager)#
class undr.task.CloseRequest#

Special task used to request a worker thread shutdown.

class undr.task.Manager#

Schedules and keeps track of tasks.

This is an abtract class, use of one of its implementations such as ProcessManager to create objects.

abstract schedule(task: Task, priority: int = 1) None#

Runs a task with the given priority.

Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.

Parameters:
  • task (Task) – The task that this manager must run (possibly on a different thread).

  • priority (int, optional) – Priority level. Defaults to 1.

abstract send_message(message: Any) None#

Queues a message in the manager’s “inbox”.

A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.

This function is meant to be called by tasks, which have access to the manager in their Task.run() function.

Parameters:

message (Any) – Any object. Currently implemeted managers require the message to be compatible with the pickle module.

class undr.task.NullManager#

Bases: Manager

Manager placeholder that ignores messages and raises errors if one attemps to use it to schedule more tasks.

This manager can be used with one-off tasks whose progress need not be monitored and which do not generate more tasks.

send_message(message: Any)#

Queues a message in the manager’s “inbox”.

A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.

This function is meant to be called by tasks, which have access to the manager in their Task.run() function.

Parameters:

message (Any) – Any object. Currently implemeted managers require the message to be compatible with the pickle module.

class undr.task.ProcessManager(workers: int = multiprocessing.cpu_count() * 2, priority_levels: int = 2, log_directory: pathlib.Path | None = None)#

Bases: Manager

Implements a manager that controls a pool of worker processes.

This class is similar to multiprocessing.pool.Pool but it has better support for user-initiated shutdowns (sigint / CTRL-C) and worker-initiated shutdowns (exceptions). It also supports priorities levels.

Whenever a worker is idle, the manager scans its (the manager’s) task queues in order of priority until it finds a non-empty queue, and sends the first task from that queue to the worker. Hence, tasks with lower priorities are scheduled first. However, since a task may asynchronously spawn more tasks with arbitrary priority levels, there is no guarantee that all tasks with priority 0 spawned by a program overall are executed before all tasks with priority 1. In particular, tasks are never cancelled, even if a task with a lower priority level (i.e. more urgent) becomes available while a worker is already running a task with a higher priority level (i.e. less urgent).

Parameters:
  • workers (int, optional) – Number of parallel workers (threads). Defaults to twice multiprocessing.cpu_count().

  • priority_levels (int, optional) – Number of priority queues. Defaults to 2.

  • log_directory (Optional[pathlib.Path], optional) – Directory to store log files. Logs are not generated if this is None. Defaults to None.

class ClosePolicy(*args, **kwds)#

Bases: enum.Enum

Strategy used to terminate worker threads.

CANCEL = 1#

Shutdown threads without consuming buffered messages.

This should be used to stop thread workers after user-initiated cancellation (CTRL-C).

JOIN = 0#

Consume all messages and shutdown threads.

This should be used to wait for the end of the program normally.

KILL = 2#

Kill threads without consuming buffered messages.

This should be used after a thread raises an error, to stop the remaning worker threads.

class Proxy(server_port: int)#

Bases: Manager

Manager interface that can be sent to workers.

Since ProcessManager implements a custom message passing system and owns message queues, it cannot be shared between processes. Worker processes require a handle to the manager to send messages and schedule new tasks. However, the handle does not have to be the actual manager, it is merely a means to pass around the two fuctions of its public API. This proxy prentends to be the manager but forwards messages to the actual manager using TCP. See send_bytes() for a description of message encoding.

Parameters:

server_port (int) – Port of the manager’s TCP server used to send messages between workers and the manager.

acknowledge_and_next_task() None | Task | CloseRequest#

Called by a worker to indicate that they completed the current task and are asking for a new one.

Returns:

None if there are no tasks waiting (but more tasks may become available in the future), a Task instance if the manager returned a task for this worker, and CloseRequest must shutdown.

Return type:

Union[None, Task, CloseRequest]

next_task() None | Task | CloseRequest#

Called by a worker to receive the next task.

Returns:

None if there are no tasks waiting (but more tasks may become available in the future), a Task instance if the manager returned a task for this worker, and CloseRequest must shutdown.

Return type:

Union[None, Task, CloseRequest]

schedule(task: Task, priority: int = 1)#

Runs a task with the given priority.

Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.

Parameters:
  • task (Task) – The task that this manager must run (possibly on a different thread).

  • priority (int, optional) – Priority level. Defaults to 1.

send_message(message: Any)#

Queues a message in the manager’s “inbox”.

A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.

This function is meant to be called by tasks, which have access to the manager in their Task.run() function.

Parameters:

message (Any) – Any object. Currently implemeted managers require the message to be compatible with the pickle module.

setup()#

Called by each worker to create the TCP connection with the actual manager.

class RequestHandler(request, client_address, server)#

Bases: socketserver.BaseRequestHandler

Processes TCP requests for the actual manager (TCP server).

handle()#

Processes a TCP request.

See send_bytes() for a description of message encoding.

__enter__()#

Enables the use of the “with” statement.

Returns:

A process manager context that calls close() on exit.

Return type:

ProcessManager

__exit__(type: Type[BaseException] | None, value: BaseException | None, traceback: types.TracebackType | None)#

Enables the use of the “with” statement.

This function calls close() with the policy ProcessManager.ClosePolicy.CANCEL if there is no active exception (typically caused by a soft cancellation) and with the policy ProcessManager.ClosePolicy.KILL if there is an active exception.

Parameters:
  • type (Optional[Type[BaseException]]) – None if the context exits without an exception, and the raised exception’s class otherwise.

  • value (Optional[BaseException]) – None if the context exits without an exception, and the raised exception otherwise.

  • traceback (Optional[types.TracebackType]) – None if the context exits without an exception, and the raised exception’s traceback otherwise.

close(policy: ProcessManager)#

Terminates the manager.

Depending on the value of policy, this function will return almost immediately or block until all the tasks complete. See ProcessManager.ClosePolicy for details.

Parameters:

policy (ProcessManager.ClosePolicy) – Termination policy for the manager and its workers.

messages() Iterable[Any]#

Iterates over the messages sent by all workers until all the tasks are complete.

The thread that iterates over the messages has access to the manager and may use it to schedule new tasks.

Returns:

Iterator over messages from all workers.

Return type:

Iterable[Any]

schedule(task: Task, priority: int = 1)#

Runs a task with the given priority.

Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.

Parameters:
  • task (Task) – The task that this manager must run (possibly on a different thread).

  • priority (int, optional) – Priority level. Defaults to 1.

send_message(message: Any)#

Queues a message in the manager’s “inbox”.

A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.

This function is meant to be called by tasks, which have access to the manager in their Task.run() function.

Parameters:

message (Any) – Any object. Currently implemeted managers require the message to be compatible with the pickle module.

serve()#

Server thread implementation.

static target(proxy: ProcessManager, log_directory: pathlib.Path | None)#

Worker thread implementation.

Parameters:
  • proxy (ProcessManager.Proxy) – The manager proxy to request tasks, spawn new tasks, and send messages.

  • log_directory (Optional[pathlib.Path]) – Directory to store log files. Logs are not generated if this is None.

class undr.task.Task#

A processing task to be performed by a worker.

__repr__() str#

Return repr(self).

abstract run(session: requests.Session, manager: Manager)#
exception undr.task.WorkerException(traceback_exception: traceback.TracebackException)#

Bases: Exception

An exception wrapper than can be sent across threads.

This exception captures the stack trace of the thread that raised it to improve error reporting.

Parameters:

traceback_exception (traceback.TracebackException) – Traceback of the orignal exception, can be obtained with traceback.TracebackException.from_exception().

__str__()#

Return str(self).

undr.task.receive_bytes(client: socket.socket) tuple[bytes, bytes]#

Reads TCP bytes until enough are received to generate a full a type and a raw message.

Parameters:

client (socket.socket) – TCP client used to send messages between workers and the manager.

Returns:

The type’s bytes and the raw message’s bytes.

Return type:

tuple[bytes, bytes]

undr.task.receive_message(client: socket.socket, unpickle: bool = True) tuple[bytes, Any]#

Reads TCP bytes until enough are received to generate a full a type and message.

Parameters:
  • client (socket.socket) – TCP client used to send messages between workers and the manager.

  • unpickle (bool, optional) – Whether to pass the message bytes to pickle.loads(). Defaults to True.

Returns:

The type’s bytes and the decoded message. The type’s bytes and the raw message’s bytes are returned instead if unpickle is False.

Return type:

tuple[bytes, Any]

undr.task.receive_type(client: socket.socket, expected_type: bytes)#

Reads TCP bytes bytes until an acknowledge type is received.

Parameters:
  • client (socket.socket) – TCP client used to send messages between workers and the manager.

  • expected_type (bytes) – The execpted acknowledge type.

undr.task.send_bytes(client: socket.socket, type: bytes, message: bytes)#

Packs and sends the bytes of a type and message.

This message encoding scheme is used internally by ProcessManager.

Parameters:
  • client (socket.socket) – TCP client used to send messages between workers and the manager.

  • type (bytes) – Encoded type bytes. See send_bytes() for a description of type encoding.

  • message (bytes) – Pickled message bytes.

undr.task.send_message(client: socket.socket, type: bytes, message: Any)#

Packs and sends the bytes of a type and an unencoded message.

This message encoding scheme is used internally by ProcessManager.

Parameters:
  • client (socket.socket) – TCP client used to send messages between workers and the manager.

  • type (bytes) – Encoded type bytes. See send_bytes() for a description of type encoding.

  • message (Any) – Any object compatible with the pickle module.

undr.task.send_type(client: socket.socket, type: bytes)#

Packs and sends the bytes of a type that does not require a message.

Types encode the following information.

Messages sent by a worker to the manager.

  • b"n": Reports that the worker started a task, must not have an attached message.

  • b"t": Reports that the worker completed a task and is idle, must not have an attached message.

  • b"m" Generic message that must be forwarded to the “inbox”, must have an attached message

  • >= 0x80: Tells the manager to spawn a new task (a worker may do this multiple times per task). The new task priority is type - 0x80. This scheme supports up to 128 priority levels. The current implementation uses 2 by default.

Messages sent by the manager to a worker.

  • b"t": Tells the worker to start a new task, must have an attached message. The task may be an instance of CloseRequest, which tells the worker to shutdown.

  • b"m": Acknowledges a generic message (message to worker b"m" request).

  • b"s": Acknowledges a task message (message to worker >= 0x80 request).

Parameters:
  • client (socket.socket) – TCP client used to send messages between workers and the manager.

  • type (bytes) – Encoded type bytes.