undr.task
#
Multi-process task management.
Overview#
Classes#
A sequence of tasks that must run sequentially. |
|
Special task used to request a worker thread shutdown. |
|
Schedules and keeps track of tasks. |
|
Manager placeholder that ignores messages and raises errors if one attemps to use it to schedule more tasks. |
|
Implements a manager that controls a pool of worker processes. |
|
A processing task to be performed by a worker. |
Functions#
Reads TCP bytes until enough are received to generate a full a type and a raw message. |
|
Reads TCP bytes until enough are received to generate a full a type and message. |
|
Reads TCP bytes bytes until an acknowledge type is received. |
|
Packs and sends the bytes of a type and message. |
|
Packs and sends the bytes of a type and an unencoded message. |
|
Packs and sends the bytes of a type that does not require a message. |
Module Contents#
- class undr.task.Chain(tasks: Sequence[Task])#
Bases:
Task
A sequence of tasks that must run sequentially.
- run(session: requests.Session, manager: Manager)#
- class undr.task.CloseRequest#
Special task used to request a worker thread shutdown.
- class undr.task.Manager#
Schedules and keeps track of tasks.
This is an abtract class, use of one of its implementations such as
ProcessManager
to create objects.- abstract schedule(task: Task, priority: int = 1) None #
Runs a task with the given priority.
Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.
- abstract send_message(message: Any) None #
Queues a message in the manager’s “inbox”.
A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.
This function is meant to be called by tasks, which have access to the manager in their
Task.run()
function.
- class undr.task.NullManager#
Bases:
Manager
Manager placeholder that ignores messages and raises errors if one attemps to use it to schedule more tasks.
This manager can be used with one-off tasks whose progress need not be monitored and which do not generate more tasks.
- send_message(message: Any)#
Queues a message in the manager’s “inbox”.
A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.
This function is meant to be called by tasks, which have access to the manager in their
Task.run()
function.
- class undr.task.ProcessManager(workers: int = multiprocessing.cpu_count() * 2, priority_levels: int = 2, log_directory: pathlib.Path | None = None)#
Bases:
Manager
Implements a manager that controls a pool of worker processes.
This class is similar to
multiprocessing.pool.Pool
but it has better support for user-initiated shutdowns (sigint / CTRL-C) and worker-initiated shutdowns (exceptions). It also supports priorities levels.Whenever a worker is idle, the manager scans its (the manager’s) task queues in order of priority until it finds a non-empty queue, and sends the first task from that queue to the worker. Hence, tasks with lower priorities are scheduled first. However, since a task may asynchronously spawn more tasks with arbitrary priority levels, there is no guarantee that all tasks with priority 0 spawned by a program overall are executed before all tasks with priority 1. In particular, tasks are never cancelled, even if a task with a lower priority level (i.e. more urgent) becomes available while a worker is already running a task with a higher priority level (i.e. less urgent).
- Parameters:
workers (int, optional) – Number of parallel workers (threads). Defaults to twice
multiprocessing.cpu_count()
.priority_levels (int, optional) – Number of priority queues. Defaults to 2.
log_directory (Optional[pathlib.Path], optional) – Directory to store log files. Logs are not generated if this is None. Defaults to None.
- class ClosePolicy(*args, **kwds)#
Bases:
enum.Enum
Strategy used to terminate worker threads.
- CANCEL = 1#
Shutdown threads without consuming buffered messages.
This should be used to stop thread workers after user-initiated cancellation (CTRL-C).
- JOIN = 0#
Consume all messages and shutdown threads.
This should be used to wait for the end of the program normally.
- KILL = 2#
Kill threads without consuming buffered messages.
This should be used after a thread raises an error, to stop the remaning worker threads.
- class Proxy(server_port: int)#
Bases:
Manager
Manager interface that can be sent to workers.
Since
ProcessManager
implements a custom message passing system and owns message queues, it cannot be shared between processes. Worker processes require a handle to the manager to send messages and schedule new tasks. However, the handle does not have to be the actual manager, it is merely a means to pass around the two fuctions of its public API. This proxy prentends to be the manager but forwards messages to the actual manager using TCP. Seesend_bytes()
for a description of message encoding.- Parameters:
server_port (int) – Port of the manager’s TCP server used to send messages between workers and the manager.
- acknowledge_and_next_task() None | Task | CloseRequest #
Called by a worker to indicate that they completed the current task and are asking for a new one.
- Returns:
None if there are no tasks waiting (but more tasks may become available in the future), a
Task
instance if the manager returned a task for this worker, andCloseRequest
must shutdown.- Return type:
Union[None, Task, CloseRequest]
- next_task() None | Task | CloseRequest #
Called by a worker to receive the next task.
- Returns:
None if there are no tasks waiting (but more tasks may become available in the future), a
Task
instance if the manager returned a task for this worker, andCloseRequest
must shutdown.- Return type:
Union[None, Task, CloseRequest]
- schedule(task: Task, priority: int = 1)#
Runs a task with the given priority.
Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.
- send_message(message: Any)#
Queues a message in the manager’s “inbox”.
A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.
This function is meant to be called by tasks, which have access to the manager in their
Task.run()
function.
- setup()#
Called by each worker to create the TCP connection with the actual manager.
- class RequestHandler(request, client_address, server)#
Bases:
socketserver.BaseRequestHandler
Processes TCP requests for the actual manager (TCP server).
- handle()#
Processes a TCP request.
See
send_bytes()
for a description of message encoding.
- __enter__()#
Enables the use of the “with” statement.
- Returns:
A process manager context that calls
close()
on exit.- Return type:
- __exit__(type: Type[BaseException] | None, value: BaseException | None, traceback: types.TracebackType | None)#
Enables the use of the “with” statement.
This function calls
close()
with the policyProcessManager.ClosePolicy.CANCEL
if there is no active exception (typically caused by a soft cancellation) and with the policyProcessManager.ClosePolicy.KILL
if there is an active exception.- Parameters:
type (Optional[Type[BaseException]]) – None if the context exits without an exception, and the raised exception’s class otherwise.
value (Optional[BaseException]) – None if the context exits without an exception, and the raised exception otherwise.
traceback (Optional[types.TracebackType]) – None if the context exits without an exception, and the raised exception’s traceback otherwise.
- close(policy: ProcessManager)#
Terminates the manager.
Depending on the value of policy, this function will return almost immediately or block until all the tasks complete. See
ProcessManager.ClosePolicy
for details.- Parameters:
policy (ProcessManager.ClosePolicy) – Termination policy for the manager and its workers.
- messages() Iterable[Any] #
Iterates over the messages sent by all workers until all the tasks are complete.
The thread that iterates over the messages has access to the manager and may use it to schedule new tasks.
- schedule(task: Task, priority: int = 1)#
Runs a task with the given priority.
Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.
- send_message(message: Any)#
Queues a message in the manager’s “inbox”.
A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.
This function is meant to be called by tasks, which have access to the manager in their
Task.run()
function.
- serve()#
Server thread implementation.
- static target(proxy: ProcessManager, log_directory: pathlib.Path | None)#
Worker thread implementation.
- Parameters:
proxy (ProcessManager.Proxy) – The manager proxy to request tasks, spawn new tasks, and send messages.
log_directory (Optional[pathlib.Path]) – Directory to store log files. Logs are not generated if this is None.
- class undr.task.Task#
A processing task to be performed by a worker.
- abstract run(session: requests.Session, manager: Manager)#
- exception undr.task.WorkerException(traceback_exception: traceback.TracebackException)#
Bases:
Exception
An exception wrapper than can be sent across threads.
This exception captures the stack trace of the thread that raised it to improve error reporting.
- Parameters:
traceback_exception (traceback.TracebackException) – Traceback of the orignal exception, can be obtained with
traceback.TracebackException.from_exception()
.
- __str__()#
Return str(self).
- undr.task.receive_bytes(client: socket.socket) tuple[bytes, bytes] #
Reads TCP bytes until enough are received to generate a full a type and a raw message.
- Parameters:
client (socket.socket) – TCP client used to send messages between workers and the manager.
- Returns:
The type’s bytes and the raw message’s bytes.
- Return type:
- undr.task.receive_message(client: socket.socket, unpickle: bool = True) tuple[bytes, Any] #
Reads TCP bytes until enough are received to generate a full a type and message.
- Parameters:
client (socket.socket) – TCP client used to send messages between workers and the manager.
unpickle (bool, optional) – Whether to pass the message bytes to
pickle.loads()
. Defaults to True.
- Returns:
The type’s bytes and the decoded message. The type’s bytes and the raw message’s bytes are returned instead if unpickle is False.
- Return type:
- undr.task.receive_type(client: socket.socket, expected_type: bytes)#
Reads TCP bytes bytes until an acknowledge type is received.
- Parameters:
client (socket.socket) – TCP client used to send messages between workers and the manager.
expected_type (bytes) – The execpted acknowledge type.
- undr.task.send_bytes(client: socket.socket, type: bytes, message: bytes)#
Packs and sends the bytes of a type and message.
This message encoding scheme is used internally by
ProcessManager
.- Parameters:
client (socket.socket) – TCP client used to send messages between workers and the manager.
type (bytes) – Encoded type bytes. See
send_bytes()
for a description of type encoding.message (bytes) – Pickled message bytes.
- undr.task.send_message(client: socket.socket, type: bytes, message: Any)#
Packs and sends the bytes of a type and an unencoded message.
This message encoding scheme is used internally by
ProcessManager
.- Parameters:
client (socket.socket) – TCP client used to send messages between workers and the manager.
type (bytes) – Encoded type bytes. See
send_bytes()
for a description of type encoding.message (Any) – Any object compatible with the
pickle
module.
- undr.task.send_type(client: socket.socket, type: bytes)#
Packs and sends the bytes of a type that does not require a message.
Types encode the following information.
Messages sent by a worker to the manager.
b"n"
: Reports that the worker started a task, must not have an attached message.b"t"
: Reports that the worker completed a task and is idle, must not have an attached message.b"m"
Generic message that must be forwarded to the “inbox”, must have an attached message>= 0x80
: Tells the manager to spawn a new task (a worker may do this multiple times per task). The new task priority istype - 0x80
. This scheme supports up to 128 priority levels. The current implementation uses 2 by default.
Messages sent by the manager to a worker.
b"t"
: Tells the worker to start a new task, must have an attached message. The task may be an instance ofCloseRequest
, which tells the worker to shutdown.b"m"
: Acknowledges a generic message (message to workerb"m"
request).b"s"
: Acknowledges a task message (message to worker>= 0x80
request).
- Parameters:
client (socket.socket) – TCP client used to send messages between workers and the manager.
type (bytes) – Encoded type bytes.