noether.data

Submodules

Attributes

Classes

Dataset

Noether dataset implementation, which is a wrapper around torch.utils.data.Dataset that can hold a dataset_config_provider.

DatasetWrapper

Wrapper around arbitrary noether.data.Dataset instances to generically change something about the dataset.

Subset

Wrapper around arbitrary noether.data.Dataset instances to only use a subset of the samples, similar to

PropertySubsetWrapper

Wrapper around arbitrary noether.data.Dataset instances to make __getitem__ load the properties that are defined

RepeatWrapper

Repeats the wrapped dataset repetitions times.

ShuffleWrapper

Shuffles the dataset, optionally with seed.

SubsetWrapper

Wraps the dataset with a noether.data.Subset using indices generated by the properties from the constructor.

BatchProcessor

Collator

Base object that uses torch.utils.data.default_collate in its __call__ function. Derived classes can overwrite

MultiStagePipeline

A Collator that processes the list of samples into a batch in multiple stages:

SampleProcessor

ComposePreProcess

Compose multiple transforms and support inversion by reversing the sequence and inverting each transform.

PreProcessor

Base class for all data preprocessors.

InterleavedSampler

Sampler to allow efficient dataloading by using a single large dataset containing train/test/... datasets all at

InterleavedSamplerConfig

SamplerIntervalConfig

Configuration dataclass for setting up the dataloading pipeline, which is structured to load data from a "main"

Functions

with_normalizers([_func_or_key])

Decorator to apply a normalizer to the output of a getitem_* function of the implemented Dataset class.

to_tensor(data)

Helper function to convert input data to a PyTorch tensor if it is not already one.

Package Contents

class noether.data.Dataset(dataset_config)

Bases: torch.utils.data.Dataset

Noether dataset implementation, which is a wrapper around torch.utils.data.Dataset that can hold a dataset_config_provider. A dataset should map a key (i.e., an index) to its corresponding data. Each sub-class should implement individual getitem_* methods, where * is the name of an item in the dataset. Each getitem_* method loads an individual tensor/data sample from disk. For example, if you dataset consists of images and targets/labels (stored as tensors), a getitem_image(idx) and getitem_target(idx) method should be implemented in the dataset subclass. The __getitem__ method of this class will loop over all the individual getitem_* methods implemented by the child class and return their results. Optionally it is possible to configure which getitem methods are called.

Example: Image classification datasets

class CarAeroDynamicsDataset(Dataset):
    def __init__(self, dataset_config, dataset_normalizers, **kwargs):
        super().__init__(dataset_config=dataset_config, **kwargs)
        self.path = dataset_config.path

    def __len__(self):
        return 100  # Example length

    def getitem_surface_pressure(self, idx):
        # Load surface pressure tensor
        return torch.load(f"{self.path}/surface_pressure_tensor/{idx}.pt")

    def getitem_surface_geometry(self, idx):
        # Load surface geometry tensor
        return torch.load(f"{self.path}/surface_geometry_tensor/{idx}.pt")


dataset = CarAeroDynamicsDataset("path/to/dataset")
sample0 = dataset[0]
surface_pressure_0 = sample0["surface_pressure"]
surface_geometry_0 = sample0["surface_geometry"]

Data from a getitem method should be normalized in many cases. To apply normalization, add a the decorator function to the getitem method. For example:

@with_normalizers("surface_pressure")
def getitem_surface_pressure(self, idx):
    # Load surface pressure tensor
    return torch.load(f"{self.path}/surface_pressure_tensor/{idx}.pt")

“surface_pressure” is the key in the self.normalizers dictionary, this key maps to a preprocessor that should implement the correct data normalization.

Example configuration for dataset normalizers:

# dummy example configuration for an image classification
dataset:
    kind: noether.data.datasets.CarAeroDynamicsDataset
    pipeline:  # configure the data pipeline to collate individual samples into batches
    dataset_normalizers:
        surface_pressure:
            - kind: noether.data.preprocessors.normalizers.MeanStdNormalization
              mean: [1., 2., 3.]
              std: [0.1, 0.2, 0.3]
Parameters:

dataset_config (noether.core.schemas.dataset.DatasetBaseConfig) – Configuration for the dataset. See DatasetBaseConfig for available options including dataset normalizers.

logger
config
normalizers: dict[str, noether.data.preprocessors.ComposePreProcess]
compute_statistics = False
property pipeline: noether.data.pipeline.Collator | None

Returns the pipeline for the dataset.

Return type:

noether.data.pipeline.Collator | None

get_all_getitem_names()

Returns all names of getitem functions that are implemented. E.g., image classification has getitem_x and getitem_class -> the result will be [“x”, “class”].

Return type:

list[str]

class noether.data.DatasetWrapper(dataset)

Wrapper around arbitrary noether.data.Dataset instances to generically change something about the dataset. For example:

  • Create a subset of the dataset (noether.data.Subset)

  • Define which properties/items to load from the dataset, i.e., which getitem_* methods to call (noether.data.ModeWrapper)

What exactly is changed depends on the specific implementation of the DatasetWrapper child class.

Parameters:

dataset (noether.data.base.dataset.Dataset | DatasetWrapper) – base dataset to be wrapped

dataset
class noether.data.Subset(dataset, indices)

Bases: noether.data.base.wrapper.DatasetWrapper

Wrapper around arbitrary noether.data.Dataset instances to only use a subset of the samples, similar to torch.utils.Subset, but with support for individual getitem_* methods instead of the __getitem__ method.

Example:

from noether.data import SubsetWrapper, Dataset

len(dataset)  # 10
subset = SubsetWrapper(dataset=dataset, indices=[0, 2, 5, 7])
len(subset)  # 4
subset[4]  # returns dataset[7]

Initializes the Subset wrapper.

Parameters:
indices
noether.data.with_normalizers(_func_or_key=None)

Decorator to apply a normalizer to the output of a getitem_* function of the implemented Dataset class.

This decorator will look for a normalizer registered under the specified key and apply it to the output of the decorated function. If no key is provided, the key is automatically inferred from the function name by removing the ‘getitem_’ prefix.

Example usage:

# Inferred key: "surface_pressure"
@with_normalizers
def getitem_surface_pressure(self, idx):
    return torch.load(f"{self.path}/surface_pressure/{idx}.pt")


# Explicit key: "pressure"
@with_normalizers("pressure")
def getitem_surface_pressure(self, idx):
    return torch.load(f"{self.path}/surface_pressure/{idx}.pt")
Parameters:

_func_or_key (str | Any | None) – The normalizer key (str) or the function being decorated. If used as @with_normalizers (no arguments), this will be the decorated function. If used as @with_normalizers(“key”), this will be the string key.

Returns:

The decorated function with normalization applied.

Raises:
  • ValueError – If the normalizer key cannot be resolved from the function name.

  • AttributeError – If the class instance does not have a ‘normalizers’ attribute.

  • KeyError – If the requested normalizer key is not found in the ‘normalizers’ dictionary.

class noether.data.PropertySubsetWrapper(dataset, properties)

Bases: noether.data.base.DatasetWrapper

Wrapper around arbitrary noether.data.Dataset instances to make __getitem__ load the properties that are defined in the properties attribute of this wrapper. For example, if we have a dataset that contains three kinds of items: “x”, “y”, and “z” (i.e., the dataset implements getitem_x, getitem_y, and getitem_z methods), we can create a PropertySubsetWrapper around that dataset with properties={“x”, “y”}. to only load “x” and “y” when __getitem__ is called. This is useful to avoid loading unnecessary data from disk. For example, it might be that you need different items from the same dataset during training and validation. During training, you might only need “x” and “y”, while during validation you might need “x”, “y”, and “z”. By using a PropertySubsetWrapper, you can create two different datasets for training and validation that only load the necessary items.

Example:

from noether.data import PropertySubsetWrapper, Dataset


class DummyDataset(Dataset):
    def __init__(self):
        self.data = torch.arange(10)

    def getitem_x(self, idx):
        return self.data[idx] * 2

    def getitem_y(self, idx):
        return self.data[idx] + 3

    def getitem_z(self, idx):
        return self.data[idx] - 5

    def __len__(self):
        return len(self.data)


dataset = DummyDataset()
wrapper = PropertySubsetWrapper(dataset=dataset, properties={"x", "y"})
sample = wrapper[4]  # calls dataset.getitem_x(4) and dataset.getitem_y(4), getitem_z is not called
sample  # {"x": 8, "y": 7}
wrapper.properties  # {"x", "y"}
Parameters:
Raises:
  • TypeError – If properties is not a set.

  • ValueError – If properties is empty or if any property does not correspond to a getitem

properties
classmethod from_included_excluded(dataset, included_properties, excluded_properties)

Creates a PropertySubsetWrapper from included and excluded properties.

Parameters:
  • dataset (noether.data.base.Dataset) – Base dataset to be wrapped.

  • included_properties (set[str] | None) – If defined, only these properties are included.

  • excluded_properties (set[str] | None) – If defined, these properties are excluded.

Returns:

The created PropertySubsetWrapper.

Return type:

PropertySubsetWrapper

class noether.data.RepeatWrapper(config, dataset)

Bases: noether.data.base.Subset

Repeats the wrapped dataset repetitions times.

Example:

from noether.data import Dataset as ListDataset

dataset = ListDataset([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
len(dataset)
10
repeat_dataset = RepeatWrapper(dataset, repetitions=3)
len(repeat_dataset)
30
Parameters:
Raises:

ValueError – If repetitions is less than 2 or if the dataset is empty. You don’t need to use this wrapper with repetitions < 2.

repetitions
class noether.data.ShuffleWrapper(config, dataset)

Bases: noether.data.base.Subset

Shuffles the dataset, optionally with seed.

Parameters:
Raises:

ValueError – If the dataset is not an instance of noether.data.Dataset or DatasetWrapper, or if the seed is not an integer or None.

seed
class noether.data.SubsetWrapper(config, dataset)

Bases: noether.data.base.Subset

Wraps the dataset with a noether.data.Subset using indices generated by the properties from the constructor.

Parameters:
Raises:
class noether.data.BatchProcessor
abstractmethod denormalize(key, value)

Inverts the normalization from the __call__ method of a single item in the batch. If nothing needs to be done for the denormalization, this method should simply return the passed key/value.

Parameters:
  • key (str) – The name of the item.

  • value (torch.Tensor) – The value of the item.

Returns:

The (potentially) back-mapped name and the (potentially) denormalized value.

Return type:

(key, value)

class noether.data.Collator

Base object that uses torch.utils.data.default_collate in its __call__ function. Derived classes can overwrite the __call__ implementation to implement a custom collate function. The collator can be passed to torch.utils.data.DataLoader via the collate_fn argument (DataLoader(dataset, batch_size=2, collate_fn=Collator()).

Example

>>> collator = Collator()
>>> num_samples = 2
>>> samples = [{"data": torch.randn(3, 256, 256)} for _ in range(num_samples)]
>>> batch = collator(samples)
>>> batch["data"].shape  # torch.Size([2, 3, 256, 256])
class noether.data.MultiStagePipeline(collators=None, sample_processors=None, batch_processors=None)

Bases: noether.data.pipeline.collator.Collator

A Collator that processes the list of samples into a batch in multiple stages:
  • sample_processors: Processing the data before collation on a per-sample level.

  • collators: Conversion from a list of samples into a batch (dict of usually tensors).

  • batch_processors: Processing after collation on a batch-level.

Most of the work is usually done by the sample_processors. One or two collators, and batch processors are often not needed. However this depends on the use case. .. rubric:: Example

>>> sample_processors = [MySampleProcessor1(), MySampleProcessor2()]
>>> collators = [MyCollator1(), MyCollator2()]
>>> batch_processors = [MyBatchProcessor1(), MyBatchProcessor2()]
>>> multistage_pipeline = MultiStagePipeline(
>>>     sample_processors=sample_processors,
>>>     collators=collators,
>>>     batch_processors=batch_processors
>>> )
>>> batch = multistage_pipeline(samples)
Parameters:
  • sample_processors (dict[str, SampleProcessorType] | list[SampleProcessorType] | None) – A list of callables that will be applied sequentially to pre-process on a per-sample level (e.g., subsample a pointcloud).

  • collators (dict[str, noether.data.pipeline.collator.CollatorType] | list[noether.data.pipeline.collator.CollatorType] | None) – A list of callables that will be applied sequentially to convert the list of individual samples into a batched format. If None, the default PyTorch collator will be used.

  • batch_processors (dict[str, BatchProcessorType] | list[BatchProcessorType] | None) – A list of callables that will be applied sequentially to process on a per-batch level.

sample_processors = []
batch_processors = []
get_sample_processor(predicate)

Retrieves a sample processor by a predicate function. Examples: - Search by type (assumes the sample processor type only occurs once in the list of sample processors)

pipeline.get_sample_processor(lambda p: isinstance(p, MySampleProcessorType))

  • Search by type and member pipeline.get_sample_processor(lambda p: isinstance(p, PointSamplingSampleProcessor) and “input_pos” in p.items)

Parameters:

predicate (collections.abc.Callable[[Any], bool]) – A function that is called for each processor and selects if this is the right one.

Returns:

The matching sample processor.

Return type:

Any

Raises:

ValueError – If no matching sample processor are found, multiple matching sample processors are found or if there are no sample processors.

class noether.data.SampleProcessor
abstractmethod inverse(key, value)

Inverts the transformation from the __call__ method of a single item in the batch. Only should be implemented if the SampleProcessor is invertable or if the identity function is valid.

Parameters:
  • key (str) – The name of the item.

  • value (torch.Tensor) – The value of the item.

Returns:

The (potentially) back-mapped name and the (potentially) denormalized value.

Return type:

(key, value)

static save_copy(obj)

Make a deep copy of an object to avoid modifying the original object.

Parameters:

obj (T) – Any object that should be copied.

Returns:

A deep copy of the input object.

Return type:

T

class noether.data.ComposePreProcess(normalization_key, preprocessors)

Compose multiple transforms and support inversion by reversing the sequence and inverting each transform.

Example:

from noether.data.preprocessors.compose import ComposePreProcess

normalizer = ComposePreProcess(
    normalization_key="image",
    preprocessors=[
        MyPreProcessor1(),
        MyPreProcessor2(),
    ],
)
processed_data = normalizer(input_data)
original_data = normalizer.inverse(processed_data)
Parameters:
Raises:
  • TypeError – If preprocessors is not a list or if any item in the list is not an instance of PreProcessor.

  • ValueError – If the preprocessors list is empty.

normalization_key
transforms
inverse(x)

Return a transform that applies the inverse transformations in reverse order. :param x: The input to be denormalized.

Parameters:

x (Any)

Return type:

Any

class noether.data.PreProcessor(normalization_key)

Base class for all data preprocessors. Example:

class MyPreProcessor(PreProcessor):
    def __init__(self, normalization_key: "image"):
        super().__init__(normalization_key=normalization_key)

    def __call__(self, x):
        # Example processing: normalize to [0, 1]
        return x / 255.0

    def denormalize(self, x):
        # Example denormalization: scale back to [0, 255]
        return x * 255.0
Parameters:

normalization_key (str) – key to identify on which getitem_ in the dataset/tensor the preprocessor is applied.

Raises:

TypeError – If normalization_key is not a string.

normalization_key
abstractmethod denormalize(x)

Denormalizes the input data. This method should be overridden by subclasses if denormalization is supported. If denormalization is not supported, it raises NotImplementedError or decide to implement the identity function.

Parameters:

x (torch.Tensor) – The input tensor to denormalize.

Return type:

torch.Tensor | numpy.typing.NDArray

noether.data.ScalarOrSequence
noether.data.to_tensor(data)

Helper function to convert input data to a PyTorch tensor if it is not already one.

Parameters:

data (noether.data.preprocessors.types.ScalarOrSequence) – The input data to convert. Can be a sequence of floats, a torch.Tensor, or None.

Return type:

torch.Tensor

Returns: The input data as a torch.Tensor if it was a sequence, the original tensor if it was already a torch.Tensor, or None if the input was None. :raises TypeError: If the input data is of an unsupported type.

class noether.data.InterleavedSampler(train_sampler, config, train_collator=None, callback_samplers=None)

Sampler to allow efficient dataloading by using a single large dataset containing train/test/… datasets all at once. The sampler will sample from different regionis in the dataset according to its specification. For example, consider a training dataset of length 100 and a test dataset of length 10. If the sampler is configured with a RandomSampler of the training dataset indices as main_sampler, it will repeatedly iterate over the training dataset. If the test dataset is configured with a sequential sampler that should be invoked after every epoch, the sampler will first return indices for the 100 training samples (randomly sampled) and then indices for the 10 test samples (in sequential order).

Parameters:
  • train_sampler (noether.core.utils.common.SizedIterable) – Sampler that is invoked by default (e.g., randomly sample from the trainset)

  • config (InterleavedSamplerConfig) – Configuration for the InterleavedSampler.

  • train_collator (collections.abc.Callable | None) – Collator used to collate samples from indices sampled from the train sampler.

  • callback_samplers (list[SamplerIntervalConfig] | None) – Configurations when the train_sampler should be paused and indices from other samplers (e.g., from a testset) should be returned. Also configures the interval and optionally a different batch_size to use for the interleaved batches.

config
main_sampler
extra_samplers = []
index_offsets = []
dataset
collator
batch_sampler
batch_size
static calculate_start(config, sampler_len)
Parameters:
get_data_loader(num_workers=0, pin_memory=False)

Creates the DataLoader that uses the InterleavedSampler with the accordingly configured dataset.

Parameters:
  • num_workers (int) – Number of workers to use.

  • pin_memory (bool) – Whether to use pin memory.

Returns:

DataLoader that uses the InterleavedSampler with the accordingly configured dataset.

Return type:

torch.utils.data.DataLoader

class noether.data.InterleavedSamplerConfig(/, **data)

Bases: pydantic.BaseModel

Parameters:

data (Any)

batch_size: int

batch_size to use for creating batches of the main_sampler indices.

drop_last: bool = True

Whether to drop the last non-full batch of the main_sampler.

max_epochs: int | None = None

How many epochs to sample at most from the main_sampler. Whatever limit is reached first (epochs/updates/samples) will stop the sampling.

max_updates: int | None = None

How many updates to sample at most from the main_sampler. Whatever limit is reached first (epochs/updates/samples) will stop the sampling.

max_samples: int | None = None

How many samples to sample at most from the main_sampler. Whatever limit is reached first (epochs/updates/samples) will stop the sampling.

start_epoch: int | None = None

At which epoch to start (used for resuming training). Mutually exclusive with start_update and start_sample.

start_update: int | None = None

At which update to start (used for resuming training). Mutually exclusive with start_epoch and start_sample.

start_sample: int | None = None

At which sample to start (used for resuming training). Mutually exclusive with start_epoch and start_update.

evaluation: bool = False

If True, the sampler is used for evaluation and will only iterate over the interleaved samplers once without iterating over the main sampler.

classmethod check_positive_values(v)

Ensures that all integer-based frequency and batch size fields are positive.

Parameters:

v (int | None)

Return type:

int | None

validate_stop()

Ensures that at least one frequency (’_n_’) is specified and

Return type:

InterleavedSamplerConfig

validate_start()

Ensures that at least one start (‘start_*’) is specified

Return type:

InterleavedSamplerConfig

class noether.data.SamplerIntervalConfig

Configuration dataclass for setting up the dataloading pipeline, which is structured to load data from a “main” dataset (i.e., the dataset used for training), which is interleaved by iterations over other datasets (e.g., a test dataset to calculate a metric in a callback) in regular intervals.

Parameters:
  • sampler (SizedIterable) – Any sampler that would be used in torch.utils.data.DataLoader(sampler=…). Examples: RandomSampler for a training dataset or SequentialSampler for evaluation.

  • every_n_epochs (int | None) – Epoch-based interval. Invokes the callback after every n epochs. Mutually exclusive with other intervals.

  • every_n_updates (int | None) – Update-based interval. Invokes the callback after every n epochs. Mutually exclusive with other intervals.

  • every_n_samples (int | None) – Sample-based interval. Invokes the callback after every n epochs. Mutually exclusive with other intervals.

  • pipeline (Optional[callable]) – Any function that would be used in torch.utils.data.DataLoader(collate_fn=…).

  • batch_size (int | None) – Batch size to use for this callback. Default: None (which will use the same batch_size as used for the “main” sampler, i.e., the one used for training).

sampler: noether.core.utils.common.SizedIterable
pipeline: collections.abc.Callable | None
every_n_epochs: int | None = None
every_n_updates: int | None = None
every_n_samples: int | None = None
batch_size: int | None = None
validate_frequency()

Ensures that exactly one frequency (‘every_n_*’) is specified and that ‘batch_size’ is present if ‘every_n_samples’ is used.

Return type:

SamplerIntervalConfig

classmethod check_positive_values(v)

Ensures that all integer-based frequency and batch size fields are positive.

Parameters:

v (int | None)

Return type:

int | None