undr#

Submodules#

Overview#

Classes#

ApsFile

A file that contains luminance (grey levels) frames.

Configuration

Represents a dataset configuration (TOML).

Directory

A file or directory in a dataset.

DirectoryScanned

Reports information on a local directory.

DvsFile

A file that contains DVS (polarity) events.

File

Represents a local or remote file.

ImuFile

A file that contains IMU events.

Index

Downloads an index file (-index.json).

IndexStatus

Keeps track of the indexing progress for a dataset.

IndexesStatuses

Maps dataset names to index statuses.

Manager

Schedules and keeps track of tasks.

Mode

Download strategy for a dataset.

ProcessFile

Generic task for file processing.

ProcessFilesRecursive

Spawns a processing task for each file in the given directory.

ProcessManager

Implements a manager that controls a pool of worker processes.

ReadOnlyStore

Stores the IDs of processed tasks.

Selector

Delegate called to pick an action for each file.

Store

Stores the IDs of processed tasks.

Switch

Calls specialized file handlers while iterating a dataset.

Task

A processing task to be performed by a worker.

Functions#

configuration_from_path

Reads the configuration (TOML) with the given path.

default_datasets

Generates a list of the default datasets’ names.

install

Downloads (and optionally decompresses) a dataset.

Attributes#

SendMessage

Callback channel for messages generated by a file handler during data iteration.

__version__

UNDR’s version, conforms to Semantic Versioning 2.0.0.

Package Contents#

class undr.ApsFile#

Bases: undr.path.File

A file that contains luminance (grey levels) frames.

Active-pixel sensors (APS) describe, strictly speaking, any sensor with pixels that use MOSFET amplifiers. However, the term is commonly used to refer to the integrating pixels found in non-CCD conventional cameras. In the context of Neuromorphic Engineering, APS is used to describe DAVIS frames.

height: int#

Frame height (y direction) in pixels.

width: int#

Frame width (x direction) in pixels.

packets() Iterable[numpy.ndarray]#

Iterates over the file data.

This function streams the file from the remote server if it is not available locally, and decompresses the file in memory if it is locally available but compressed.

Returns:

Iterator over the file’s data converted into numpy arrays with dtype undr.raw.aps_dtype().

Return type:

Iterable[numpy.ndarray]

word_size() int#

The size of an entry in this file, in bytes.

This can be used to ensure that entries (events, frames…) are not split while reading. A decoded file’s size in bytes must be a multiple of the value returned by this function.

Returns:

Number of bytes used by each entry.

Return type:

int

class undr.Configuration#

Represents a dataset configuration (TOML).

directory: pathlib.Path#

Local path of the root datasets directory (usually called datasets).

name_to_dataset_settings: dict[str, DatasetSettings]#

Maps dataset names to their parameters.

bibtex(show_display: bool, workers: int, force: bool, bibtex_timeout: float, log_directory: pathlib.Path | None) str#

Downloads index files and BibTeX references for enabled datasets.

Parameters:
  • show_display (bool) – Whether to show progress in the terminal.

  • workers (int) – Number of parallel workers (threads).

  • force (bool) – Whether to re-download resources even if they are already present locally.

  • bibtex_timeout (float) – Timeout for requests to https://dx.doi.org/.

  • log_directory (Optional[pathlib.Path]) – Directory to store log files. Logs are not generated if this is None.

Raises:

task.WorkerException – if a worker raises an error.

Returns:

BibTeX references as a string.

Return type:

str

dataset(name: str) undr.path_directory.Directory#

Returns the dataset with the given name.

Parameters:

name (str) – The dataset name.

Raises:

ValueError – if the dataset exists but is disabled.

Returns:

The dataset’s root directory.

Return type:

path_directory.Directory

display(download_tag: undr.display.Tag = display.Tag(label='download', icon='↓'), process_tag: undr.display.Tag = display.Tag(label='process', icon='⚛')) undr.display.Display#

Returns a display that shows download and process progress for enabled datasets.

Parameters:
  • download_tag (display.Tag, optional) – Label and icon for download. Defaults to display.Tag(label=”download”, icon=”↓”).

  • process_tag (display.Tag, optional) – Label and icon for process. Defaults to display.Tag(label=”process”, icon=”⚛”).

Returns:

Controller for the display thread.

Return type:

display.Display

enabled_datasets_settings() list[DatasetSettings]#

The settings of enabled datasets.

The list always contains at least one item (the function otherwise raises an error).

Raises:

RuntimeError – if all the datasets are disabled or there are no datasets.

Returns:

The settings of the datasets that are enabled, in the same order as the configuration file.

Return type:

list[DatasetSettings]

indexes_statuses(selector: undr.json_index_tasks.Selector) IndexesStatuses#

Builds an indexing report for enabled datasets.

Parameters:

selector (json_index_tasks.Selector) – The selector used to index the dataset.

Returns:

Index status for enabled datasets, in the same order as the configuration file.

Return type:

IndexesStatuses

install(show_display: bool, workers: int, force: bool, log_directory: pathlib.Path | None)#

Downloads index files and data files and decompresses data files.

The action (index only, download, download and decompress) may be different for each dataset and is controlled by undr.install_mode.Mode.

Parameters:
  • show_display (bool) – Whether to show progress in the terminal.

  • workers (int) – Number of parallel workers (threads).

  • force (bool) – Whether to re-download resources even if they are already present locally.

  • log_directory (Optional[pathlib.Path]) – Directory to store log files. Logs are not generated if this is None.

Raises:

task.WorkerException – if a worker raises an error.

iter(recursive: bool = False) Iterable[undr.path.Path]#

Iterates the files in the dataset.

Parameters:

recursive (bool, optional) – Whether to recursively search child directories. Defaults to False.

Returns:

Iterator over the child paths. If recursive is false, the iterator yields the direct children (files and directories) of the root dataset directory. If recursive is true, the iterator yields all the children (files and directories) of the dataset.

Return type:

Iterable[path.Path]

map(switch: undr.formats.Switch, store: undr.persist.Store | None = None, show_display: bool = True, workers: int = multiprocessing.cpu_count() * 2, log_directory: pathlib.Path | None = None) Iterable[Any]#

Applies a function to eacch file in a dataset.

Parameters:
  • switch (formats.Switch) – Specifies the action to perform on each file type.

  • store (Optional[persist.Store], optional) – Saves progress, makes it possible to resume interrupted processing. Defaults to None.

  • show_display (bool, optional) – Whether to show progress in the terminal. Defaults to True.

  • workers (int, optional) – Number of parallel workers (threads). Defaults to twice multiprocessing.cpu_count().

  • log_directory (Optional[pathlib.Path], optional) – Directory to store log files. Logs are not generated if this is None. Defaults to None.

Raises:

task.WorkerException – if a worker raises an error.

Returns:

Iterator over the non-error messages generated by the workers.

Return type:

Iterable[Any]

mktree(root: str | os.PathLike, parents: bool = False, exist_ok: bool = False)#

Creates a copy of the datasets’ file hierarchy without the index or data files.

This function can be combined with map() to implement a map-reduce algorithm over entire datasets.
  1. Use mktree to create a empty copy of the file hierarchy.

  2. Use Configuration.map() to create a result file in the new hierarchy for each data file in the originall hierarchy (for instance, a file that contains a measure algorithm’s performance as a single number).

  3. Collect the results (“reduce”) by reading the result files in the new hierarchy.

This approach has several benefits. The most expensive step b. runs in parallell and can be interrupted and resumed. Result files are stored in a different directory and can easily be deleted without altering the original data. The new file hierarchy prevents name clashes as long as result files are named after data files, and workers do not need to worry about directory existence since mktree runs first.

Parameters:
  • root (Union[str, os.PathLike]) – Directory where the new file hierarchy is created.

  • parents (bool, optional) – Whether to create the parents of the new directory, if they do not exist. Defaults to False.

  • exist_ok (bool, optional) – Whether to silence exeptions if the root directory already exists. Defaults to False.

class undr.Directory#

Bases: undr.path.Path

A file or directory in a dataset.

A path can point to a local resource or represent a remote resource.

doi_and_metadata_loaded: bool#
__getattribute__(__name: str)#

Return getattr(self, name).

__truediv__(other: str) undr.path.Path#

Concatenates this path with a string to create a new path.

Parameters:

other (str) – Suffix to append to this path.

Returns:

The concatenated result.

Return type:

Path

iter(recursive: bool = False) Iterable[undr.path.Path]#
class undr.DirectoryScanned#

Reports information on a local directory.

download_bytes: IndexProgress#

Total size of the compressed files in this directory, in bytes.

This size does not include -index.json.

final_count: int#

Total number of files in this directory (“files” and “other_files”).

This count does not include -index.json.

index_bytes: IndexProgress#

Size of the index file (-index.json) in bytes.

initial_download_count: int#

Number of files already downloaded when the action started (“files” and “other_files”).

This count does not include -index.json.

initial_process_count: int#

Number of files already processed when the action started (“files” and “other_files”).

This count does not include -index.json.

path_id: pathlib.PurePosixPath#

Path ID of the directory.

process_bytes: IndexProgress#

Total size of the files in this directory, in bytes.

This size does not include -index.json.

class undr.DvsFile#

Bases: undr.path.File

A file that contains DVS (polarity) events.

Dynamic Vision Sensor events, often called polarity events, contain a timestamp, pixel coordinates, and a polarity (ON or OFF) that indicates whether luminance increased or decreased.

height: int#

Frame height (y direction) in pixels.

width: int#

Frame width (x direction) in pixels.

packets() Iterable[numpy.ndarray]#

Iterates over the file data.

This function streams the file from the remote server if it is not available locally, and decompresses the file in memory if it is locally available but compressed.

Returns:

Iterator over the file’s data converted into numpy arrays with dtype undr.raw.DVS_DTYPE.

Return type:

Iterable[numpy.ndarray]

word_size()#

The size of an entry in this file, in bytes.

This can be used to ensure that entries (events, frames…) are not split while reading. A decoded file’s size in bytes must be a multiple of the value returned by this function.

Returns:

Number of bytes used by each entry.

Return type:

int

class undr.File#

Bases: Path

Represents a local or remote file.

compressions: tuple[undr.decode.Compression, Ellipsis]#

List of compressions available on the server.

hash: str#

The decompressed file hash (SHA3-224).

manager: undr.task.Manager#

Can be called to schedule new tasks and report updates.

session: requests.Session | None#

An open session that can be used to download resources.

size: int#

The decompressed file size in bytes.

__truediv__(other: str) Path#

Concatenates this path with a string to create a new path.

Parameters:

other (str) – Suffix to append to this path.

Returns:

The concatenated result.

Return type:

Path

_chunks(word_size: int) Iterable[bytes]#

Returns an iterator over the file’s decompressed bytes.

Users should prefer chunks() since files know their word size.

Parameters:

word_size (int) – size of an entry (events, frames…) in the file.

Raises:
Returns:

Iterator over the file’s decompressed bytes.

Return type:

Iterable[bytes]

attach_manager(manager: undr.task.Manager | None)#

Binds a manager to this file.

The file sends all subsequent updates (download and processing) to the manager.

Parameters:

manager (Optional[task.Manager]) – The manager to use to keep track of progress.

attach_session(session: requests.Session | None)#

Binds a session to this file.

The session is used for all subsequent downloads.

Parameters:

session (Optional[requests.Session]) – An open session to use for downloads.

static attributes_from_dict(data: dict[str, Any], parent: undr.path_directory.Directory) dict[str, Any]#

Converts -index.json data to a dict of this class’s arguments.

The returned dict can be used to initialise an instance of this class.

Parameters:
Returns:

Data that can be used to initialize this class.

Return type:

dict[str, Any]

best_compression() undr.decode.Compression#

Returns the best compression supported by the remote server for this file.

Best is defined here as “smallest encoded size”.

Returns:

Compression format that yields the smallest version of this file.

Return type:

decode.Compression

chunks() Iterable[bytes]#

Returns an iterator over the file’s decompressed bytes.

Returns:

Iterator over the decompressed file’s bytes. The size of the chunks may vary.

Return type:

Iterable[bytes]

classmethod from_dict(data: dict[str, Any], parent: undr.path_directory.Directory)#

Conerts -index.json data to an instance of this class.

Parameters:
Returns:

The file represented by the given data.

Return type:

File

word_size() int#

The size of an entry in this file, in bytes.

This can be used to ensure that entries (events, frames…) are not split while reading. A decoded file’s size in bytes must be a multiple of the value returned by this function.

Returns:

Number of bytes used by each entry.

Return type:

int

class undr.ImuFile#

Bases: undr.path.File

A file that contains IMU events.

Inertial Measurement Unit (IMU) events are produced by an accelerometer / gyroscope / magnetometer.

packets() Iterable[numpy.ndarray]#

Iterates over the file data.

This function streams the file from the remote server if it is not available locally, and decompresses the file in memory if it is locally available but compressed.

Returns:

Iterator over the file’s data converted into numpy arrays with dtype undr.raw.IMU_DTYPE.

Return type:

Iterable[numpy.ndarray]

word_size()#

The size of an entry in this file, in bytes.

This can be used to ensure that entries (events, frames…) are not split while reading. A decoded file’s size in bytes must be a multiple of the value returned by this function.

Returns:

Number of bytes used by each entry.

Return type:

int

class undr.Index(path_root: pathlib.Path, path_id: pathlib.PurePosixPath, server: undr.remote.Server, selector: Selector, priority: int, force: bool, directory_doi: bool)#

Bases: undr.remote.DownloadFile

Downloads an index file (-index.json).

Parameters:
  • path_root (pathlib.Path) – The root path used to generate local file paths.

  • path_id (pathlib.PurePosixPath) – The path ID of the directory that will be seached recursively.

  • server (remote.Server) – The remote server to download resources.

  • selector (Selector) – A selector that defines the files to process.

  • priority (int) – Priority of this task (tasks with lower priorities are scheduled first).

  • force (bool) – Download the index file even if it is already present locally.

  • directory_doi (bool) – Whether to dispatch Doi messages while reading the index.

run(session: requests.Session, manager: undr.task.Manager)#
class undr.IndexStatus#

Keeps track of the indexing progress for a dataset.

current_index_files: int#

Number of index files parsed.

The dataset has been indexed if current_index_files and final_index_files are equal.

dataset_settings: DatasetSettings#

User-specified dataset settings.

downloaded_and_processed: bool#

Whether the dataset has been fully downloaded and processed.

final_index_files: int#

Total number of index files.

selector: undr.json_index_tasks.Selector#

Selector to choose actions while indexing.

server: undr.remote.Server#

The remote server for this dataset.

push(message: Any) tuple[bool, IndexStatus | None]#

Updates the status based on the message.

Ignores messages that are not undr.json_index_tasks.IndexLoaded or undr.json_index_tasks.DirectoryScanned.

Returns:

Whether the dataset has been fully indexed and self, if self was updated.

Return type:

tuple[bool, Optional[“IndexStatus”]]

class undr.IndexesStatuses#

Maps dataset names to index statuses.

name_to_status: dict[str, IndexStatus]#

Inner dict.

push(message: Any) tuple[bool, IndexStatus | None]#

Processes relevant messages.

This function updates the indexing status and returns it if message is a undr.json_index_tasks.IndexLoaded or undr.json_index_tasks.DirectoryScanned object. If the message was the last indexing message for this dataset, the first argument is True.

class undr.Manager#

Schedules and keeps track of tasks.

This is an abtract class, use of one of its implementations such as ProcessManager to create objects.

abstract schedule(task: Task, priority: int = 1) None#

Runs a task with the given priority.

Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.

Parameters:
  • task (Task) – The task that this manager must run (possibly on a different thread).

  • priority (int, optional) – Priority level. Defaults to 1.

abstract send_message(message: Any) None#

Queues a message in the manager’s “inbox”.

A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.

This function is meant to be called by tasks, which have access to the manager in their Task.run() function.

Parameters:

message (Any) – Any object. Currently implemeted managers require the message to be compatible with the pickle module.

class undr.Mode(*args, **kwds)#

Bases: enum.Enum

Download strategy for a dataset.

DISABLED = 'disabled'#

The dataset is ignored by all actions.

LOCAL = 'local'#

Download all the dataset files locally but do not decompress them.

Most datasets are stored as Brotli archives (https://github.com/google/brotli/). UNDR stream-decompresses files before processing, making this option a good trade-off between disk usage and processing speed.

RAW = 'raw'#

Downloads all the dataset files locally and decompresses them.

Decompressed files use a relatively inefficient plain binary file format. This option requires vast amounts of disk space (3 to 5 times as much as the Brotli archives). However, the plain binary format facilitates processing from languages such as Matlab or C++.

REMOTE = 'remote'#

Only download the dataset’s index files.

UNDR can process the dataset files as if they were locally available, by streaming them from the server. This option is particularly useful for large datasets that do not fit on the hard drive but it requires a fast internet connection since files are re-downloaded every time.

class undr.ProcessFile(file: undr.path.File)#

Bases: undr.task.Task

Generic task for file processing.

Parameters:

file (path.File) – The file (remote or local) to process.

class undr.ProcessFilesRecursive(path_root: pathlib.Path, path_id: pathlib.PurePosixPath, server: undr.remote.Server, selector: Selector, process_file_class: Type[ProcessFileType], process_file_args: Iterable[Any], process_file_kwargs: Mapping[str, Any], priority: int)#

Bases: undr.task.Task

Spawns a processing task for each file in the given directory.

Subdirectories are recursively searched as well.

Parameters:
  • path_root (pathlib.Path) – The root path used to generate local file paths.

  • path_id (pathlib.PurePosixPath) – The path ID of the directory that will be scanned recursively.

  • server (remote.Server) – The remote server used to download resources.

  • selector (Selector) – A selector that defines the files to process.

  • process_file_class (Type[ProcessFileType]) – The class of the task to run on each selected file. Must be a subclass of ProcessFile.

  • process_file_args (Iterable[Any]) – Positional arguments passed to the constructor of process_file_class.

  • process_file_kwargs (Mapping[str, Any]) – Keyword arguments passed to the constructor of process_file_class. The keyword argument file is automatically added by ProcessFilesRecursive after the positional arguments and before other keyword arguments.

  • priority (int) – Priority of this task and all recursively created tasks (tasks with lower priorities are scheduled first).

run(session: requests.Session, manager: undr.task.Manager) None#
class undr.ProcessManager(workers: int = multiprocessing.cpu_count() * 2, priority_levels: int = 2, log_directory: pathlib.Path | None = None)#

Bases: Manager

Implements a manager that controls a pool of worker processes.

This class is similar to multiprocessing.pool.Pool but it has better support for user-initiated shutdowns (sigint / CTRL-C) and worker-initiated shutdowns (exceptions). It also supports priorities levels.

Whenever a worker is idle, the manager scans its (the manager’s) task queues in order of priority until it finds a non-empty queue, and sends the first task from that queue to the worker. Hence, tasks with lower priorities are scheduled first. However, since a task may asynchronously spawn more tasks with arbitrary priority levels, there is no guarantee that all tasks with priority 0 spawned by a program overall are executed before all tasks with priority 1. In particular, tasks are never cancelled, even if a task with a lower priority level (i.e. more urgent) becomes available while a worker is already running a task with a higher priority level (i.e. less urgent).

Parameters:
  • workers (int, optional) – Number of parallel workers (threads). Defaults to twice multiprocessing.cpu_count().

  • priority_levels (int, optional) – Number of priority queues. Defaults to 2.

  • log_directory (Optional[pathlib.Path], optional) – Directory to store log files. Logs are not generated if this is None. Defaults to None.

class ClosePolicy(*args, **kwds)#

Bases: enum.Enum

Strategy used to terminate worker threads.

CANCEL = 1#

Shutdown threads without consuming buffered messages.

This should be used to stop thread workers after user-initiated cancellation (CTRL-C).

JOIN = 0#

Consume all messages and shutdown threads.

This should be used to wait for the end of the program normally.

KILL = 2#

Kill threads without consuming buffered messages.

This should be used after a thread raises an error, to stop the remaning worker threads.

class Proxy(server_port: int)#

Bases: Manager

Manager interface that can be sent to workers.

Since ProcessManager implements a custom message passing system and owns message queues, it cannot be shared between processes. Worker processes require a handle to the manager to send messages and schedule new tasks. However, the handle does not have to be the actual manager, it is merely a means to pass around the two fuctions of its public API. This proxy prentends to be the manager but forwards messages to the actual manager using TCP. See send_bytes() for a description of message encoding.

Parameters:

server_port (int) – Port of the manager’s TCP server used to send messages between workers and the manager.

acknowledge_and_next_task() None | Task | CloseRequest#

Called by a worker to indicate that they completed the current task and are asking for a new one.

Returns:

None if there are no tasks waiting (but more tasks may become available in the future), a Task instance if the manager returned a task for this worker, and CloseRequest must shutdown.

Return type:

Union[None, Task, CloseRequest]

next_task() None | Task | CloseRequest#

Called by a worker to receive the next task.

Returns:

None if there are no tasks waiting (but more tasks may become available in the future), a Task instance if the manager returned a task for this worker, and CloseRequest must shutdown.

Return type:

Union[None, Task, CloseRequest]

schedule(task: Task, priority: int = 1)#

Runs a task with the given priority.

Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.

Parameters:
  • task (Task) – The task that this manager must run (possibly on a different thread).

  • priority (int, optional) – Priority level. Defaults to 1.

send_message(message: Any)#

Queues a message in the manager’s “inbox”.

A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.

This function is meant to be called by tasks, which have access to the manager in their Task.run() function.

Parameters:

message (Any) – Any object. Currently implemeted managers require the message to be compatible with the pickle module.

setup()#

Called by each worker to create the TCP connection with the actual manager.

class RequestHandler(request, client_address, server)#

Bases: socketserver.BaseRequestHandler

Processes TCP requests for the actual manager (TCP server).

handle()#

Processes a TCP request.

See send_bytes() for a description of message encoding.

__enter__()#

Enables the use of the “with” statement.

Returns:

A process manager context that calls close() on exit.

Return type:

ProcessManager

__exit__(type: Type[BaseException] | None, value: BaseException | None, traceback: types.TracebackType | None)#

Enables the use of the “with” statement.

This function calls close() with the policy ProcessManager.ClosePolicy.CANCEL if there is no active exception (typically caused by a soft cancellation) and with the policy ProcessManager.ClosePolicy.KILL if there is an active exception.

Parameters:
  • type (Optional[Type[BaseException]]) – None if the context exits without an exception, and the raised exception’s class otherwise.

  • value (Optional[BaseException]) – None if the context exits without an exception, and the raised exception otherwise.

  • traceback (Optional[types.TracebackType]) – None if the context exits without an exception, and the raised exception’s traceback otherwise.

close(policy: ProcessManager)#

Terminates the manager.

Depending on the value of policy, this function will return almost immediately or block until all the tasks complete. See ProcessManager.ClosePolicy for details.

Parameters:

policy (ProcessManager.ClosePolicy) – Termination policy for the manager and its workers.

messages() Iterable[Any]#

Iterates over the messages sent by all workers until all the tasks are complete.

The thread that iterates over the messages has access to the manager and may use it to schedule new tasks.

Returns:

Iterator over messages from all workers.

Return type:

Iterable[Any]

schedule(task: Task, priority: int = 1)#

Runs a task with the given priority.

Tasks with lower priorities are scheduled first. The maximum priority level depends on the implementation. At least two levels, 0 (highest priority) and 1, must be supported by all implementations.

Parameters:
  • task (Task) – The task that this manager must run (possibly on a different thread).

  • priority (int, optional) – Priority level. Defaults to 1.

send_message(message: Any)#

Queues a message in the manager’s “inbox”.

A manager is responsible for collecting messages from all tasks, which are potentially running on different threads or processes, and serving thse messages in a single-threaded fashion to a reader.

This function is meant to be called by tasks, which have access to the manager in their Task.run() function.

Parameters:

message (Any) – Any object. Currently implemeted managers require the message to be compatible with the pickle module.

serve()#

Server thread implementation.

static target(proxy: ProcessManager, log_directory: pathlib.Path | None)#

Worker thread implementation.

Parameters:
  • proxy (ProcessManager.Proxy) – The manager proxy to request tasks, spawn new tasks, and send messages.

  • log_directory (Optional[pathlib.Path]) – Directory to store log files. Logs are not generated if this is None.

class undr.ReadOnlyStore(path: str | os.PathLike)#

Stores the IDs of processed tasks.

This store provides a method to check whether a task has been performed but it cannot be modified. Most users will probably prefer the writable Store.

Parameters:

path (Union[str, os.PathLike]) – Path of the SQLite database file with extension “.db”.

__contains__(id: str)#

Whether the given ID has been processed.

Parameters:

id (str) – The ID to check.

Returns:

True if the file is in the store, which means that it has been processed.

Return type:

bool

__enter__()#

Enables the use of the “with” statement.

Returns:

A store context that calls close() on exit.

Return type:

Display

__exit__(type: Type[BaseException] | None, value: BaseException | None, traceback: types.TracebackType | None)#

Enables the use of the “with” statement.

Parameters:
  • type (Optional[Type[BaseException]]) – None if the context exits without an exception, and the raised exception’s class otherwise.

  • value (Optional[BaseException]) – None if the context exits without an exception, and the raised exception otherwise.

  • traceback (Optional[types.TracebackType]) – None if the context exits without an exception, and the raised exception’s traceback otherwise.

__getstate__()#

Helper for pickle.

__setstate__(state: pathlib.Path)#
close()#

Closes the store’s database.

exception undr.RemainingBytesError(word_size: int, buffer: bytes)#

Bases: Exception

Raised if the number of bytes in the decompressed resource is not a multiple of its word size.

Parameters:
  • word_size (int) – The resource’s word size.

  • buffer (bytes) – The remaining bytes. Their length is larger than zero and smaller than the word size.

class undr.Selector#

Delegate called to pick an action for each file.

Selectors are used during the indexing phase to calculate the number of bytes to download and/or process, and during the processing phase to choose the action to perform.

class Action(*args, **kwds)#

Bases: enum.Enum

Specifies the operation to perform for a given file.

The action also determines whether the file’s bytes should be accounted for during the indexing phase. This is useful to report non-zero progress after resuming a job, but skip the actual processing.

DECOMPRESS = 5#

Downloads, decompresses, and reports.

DOI = 1#

Skips this file, does not report it, but publishes own DOIs.

DOWNLOAD = 4#

Downloads and reports.

DOWNLOAD_SKIP = 3#

Skips operations on this file but reports it as downloaded.

IGNORE = 0#

Skips this file and does not report it.

PROCESS = 6#

Downloads, decompresses, processes, and reports.

SKIP = 2#

Skips this file but reports it as downloaded and processed.

INSTALL_IGNORE_ACTIONS#

The set of actions that ignore the file for reporting purposes.

REPORT_DOWNLOAD_ACTIONS#

The set of actions that (at least) download the file.

REPORT_PROCESS_ACTIONS#

The set of actions that download and process the file.

SKIP_ACTIONS#

The set of actions that skip all operations on the file.

action(file: undr.path.File) Selector#

Returns the action to apply to the given file.

Called by Index, InstallFilesRecursive and ProcessFilesRecursive. The default implementation returns Selector.Action.PROCESS.

scan_filesystem(directory: undr.path_directory.Directory) bool#

Whether to scan the filesystem.

Called by Index to decide whether it needs to scan the file system. This function may return False if action() returns one of the following for every file in the directory:

undr.SendMessage#

Callback channel for messages generated by a file handler during data iteration.

class undr.Store(path: str | os.PathLike, commit_maximum_delay: float = 0.1, commit_maximum_inserts: int = 100)#

Bases: ReadOnlyStore

Stores the IDs of processed tasks.

Parameters:
  • path (Union[str, os.PathLike]) – Path of the SQLite database file with extension “.db”.

  • commit_maximum_delay (float, optional) – How often changes are commited to the disk, in seconds. Defaults to 0.1.

  • commit_maximum_inserts (int, optional) – Maximum number of changes before commiting changes to the disk. Defaults to 100.

class Commit#

Message requesting a commit (changes are immediately persited to the disk).

class Reset#

Message requesting a reset of the database (existing entries are dropped).

__getstate__()#

Helper for pickle.

__setstate__(state: Tuple[pathlib.Path, int, int])#
add(id: str)#

Adds a row to the database.

The action is ignored if the entry is already in the database.

Parameters:

id (str) – Entry to store in the database.

close()#

Closes the store’s database.

commit()#

Immediately persists changes to the disk.

reset()#

Drops all entries from the database.

target()#

Worker thread implementation.

class undr.Switch#

Calls specialized file handlers while iterating a dataset.

If a handler is None, the corresponding files are ignored by the iterator.

Parameters:
handle_aps: Callable[[ApsFile, SendMessage], None] | None#
handle_dvs: Callable[[DvsFile, SendMessage], None] | None#
handle_imu: Callable[[ImuFile, SendMessage], None] | None#
handle_other: Callable[[undr.path.File, SendMessage], None] | None#
enabled_types() set[Any]#

Lists the file types that have a non-None handler.

Returns:

The set of file classes that will be handled.

Return type:

set[Any]

handle_file(file: undr.path.File, send_message: SendMessage)#

Calls the specialized file handler for the file, if the handler is non-None.

Parameters:
  • file (path.File) – The file to process.

  • send_message (SendMessage) – Callback channel for messages.

Raises:

RuntimeError – if the file type is not supported by this function.

class undr.Task#

A processing task to be performed by a worker.

__repr__() str#

Return repr(self).

abstract run(session: requests.Session, manager: Manager)#
exception undr.WorkerException(traceback_exception: traceback.TracebackException)#

Bases: Exception

An exception wrapper than can be sent across threads.

This exception captures the stack trace of the thread that raised it to improve error reporting.

Parameters:

traceback_exception (traceback.TracebackException) – Traceback of the orignal exception, can be obtained with traceback.TracebackException.from_exception().

__str__()#

Return str(self).

undr.__version__ = '1.0.1'#

UNDR’s version, conforms to Semantic Versioning 2.0.0.

undr.configuration_from_path(path: str | os.PathLike) Configuration#

Reads the configuration (TOML) with the given path.

Parameters:

path (Union[str, os.PathLike]) – Configuration file path.

Raises:

RuntimeError – if two datasets have the same name in the configuration.

Returns:

the parsed TOML configuration.

Return type:

Configuration

undr.default_datasets() list[str]#

Generates a list of the default datasets’ names.

This function calls name_to_url() and has the same caveats regarding caching.

Returns:

The names of the default datasets.

Return type:

list[str]

undr.install(name: str, url: str | None = None, timeout: float = constants.DEFAULT_TIMEOUT, mode: str | undr.install_mode.Mode = install_mode.Mode.LOCAL, directory: str | pathlib.Path = 'datasets', show_display: bool = True, workers: int = multiprocessing.cpu_count() * 2, force: bool = False, log_directory: pathlib.Path | None = None)#

Downloads (and optionally decompresses) a dataset.

See undr.install_mode.Mode for details on the different installation strategies.

Parameters:
  • name (str) – Name of the dataset to install. Unless url is provided, it must be one of the keys returned by name_to_url().

  • url (Optional[str], optional) – URL of the dataset. Defaults to None.

  • timeout (float, optional) – Request timeout in seconds. Defaults to undr.constants.DEFAULT_TIMEOUT.

  • mode (Union[str, install_mode.Mode], optional) – Installation strategy. Defaults to undr.install_mode.Mode.LOCAL.

  • directory (Union[str, pathlib.Path], optional) – Path of the local directory to store datasets. Defaults to “datasets”.

  • show_display (bool, optional) – Whether to show a progress bar. Defaults to True.

  • workers (int, optional) – Number of parallel workers (threads). Defaults to twice multiprocessing.cpu_count()

  • force (bool, optional) – Whether to re-download files even if they are already present locally. Defaults to False.

  • log_directory (Optional[pathlib.Path], optional) – Directory to store log files. Logs are not generated if this is None. Defaults to None.