noether.data¶
Submodules¶
Attributes¶
Classes¶
Noether dataset implementation, which is a wrapper around torch.utils.data.Dataset that can hold a dataset_config_provider. |
|
Wrapper around arbitrary noether.data.Dataset instances to generically change something about the dataset. |
|
Wrapper around arbitrary noether.data.Dataset instances to only use a subset of the samples, similar to |
|
Wrapper around arbitrary noether.data.Dataset instances to make __getitem__ load the properties that are defined |
|
Repeats the wrapped dataset repetitions times. |
|
Shuffles the dataset, optionally with seed. |
|
Wraps the dataset with a noether.data.Subset using indices generated by the properties from the constructor. |
|
Base object that uses torch.utils.data.default_collate in its __call__ function. Derived classes can overwrite |
|
A Collator that processes the list of samples into a batch in multiple stages: |
|
Compose multiple transforms and support inversion by reversing the sequence and inverting each transform. |
|
Base class for all data preprocessors. |
|
Sampler to allow efficient dataloading by using a single large dataset containing train/test/... datasets all at |
|
Configuration dataclass for setting up the dataloading pipeline, which is structured to load data from a "main" |
Functions¶
|
Decorator to apply a normalizer to the output of a getitem_* function of the implemented Dataset class. |
|
Helper function to convert input data to a PyTorch tensor if it is not already one. |
Package Contents¶
- class noether.data.Dataset(dataset_config)¶
Bases:
torch.utils.data.DatasetNoether dataset implementation, which is a wrapper around torch.utils.data.Dataset that can hold a dataset_config_provider. A dataset should map a key (i.e., an index) to its corresponding data. Each sub-class should implement individual getitem_* methods, where * is the name of an item in the dataset. Each getitem_* method loads an individual tensor/data sample from disk. For example, if you dataset consists of images and targets/labels (stored as tensors), a getitem_image(idx) and getitem_target(idx) method should be implemented in the dataset subclass. The __getitem__ method of this class will loop over all the individual getitem_* methods implemented by the child class and return their results. Optionally it is possible to configure which getitem methods are called.
Example: Image classification datasets
class CarAeroDynamicsDataset(Dataset): def __init__(self, dataset_config, dataset_normalizers, **kwargs): super().__init__(dataset_config=dataset_config, **kwargs) self.path = dataset_config.path def __len__(self): return 100 # Example length def getitem_surface_pressure(self, idx): # Load surface pressure tensor return torch.load(f"{self.path}/surface_pressure_tensor/{idx}.pt") def getitem_surface_geometry(self, idx): # Load surface geometry tensor return torch.load(f"{self.path}/surface_geometry_tensor/{idx}.pt") dataset = CarAeroDynamicsDataset("path/to/dataset") sample0 = dataset[0] surface_pressure_0 = sample0["surface_pressure"] surface_geometry_0 = sample0["surface_geometry"]
Data from a getitem method should be normalized in many cases. To apply normalization, add a the decorator function to the getitem method. For example:
@with_normalizers("surface_pressure") def getitem_surface_pressure(self, idx): # Load surface pressure tensor return torch.load(f"{self.path}/surface_pressure_tensor/{idx}.pt")
“surface_pressure” is the key in the self.normalizers dictionary, this key maps to a preprocessor that should implement the correct data normalization.
Example configuration for dataset normalizers:
# dummy example configuration for an image classification dataset: kind: noether.data.datasets.CarAeroDynamicsDataset pipeline: # configure the data pipeline to collate individual samples into batches dataset_normalizers: surface_pressure: - kind: noether.data.preprocessors.normalizers.MeanStdNormalization mean: [1., 2., 3.] std: [0.1, 0.2, 0.3]
- Parameters:
dataset_config (noether.core.schemas.dataset.DatasetBaseConfig) – Configuration for the dataset. See
DatasetBaseConfigfor available options including dataset normalizers.
- logger¶
- config¶
- normalizers: dict[str, noether.data.preprocessors.ComposePreProcess]¶
- compute_statistics = False¶
- property pipeline: noether.data.pipeline.Collator | None¶
Returns the pipeline for the dataset.
- Return type:
- class noether.data.DatasetWrapper(dataset)¶
Wrapper around arbitrary noether.data.Dataset instances to generically change something about the dataset. For example:
Create a subset of the dataset (noether.data.Subset)
Define which properties/items to load from the dataset, i.e., which getitem_* methods to call (noether.data.ModeWrapper)
What exactly is changed depends on the specific implementation of the DatasetWrapper child class.
- Parameters:
dataset (noether.data.base.dataset.Dataset | DatasetWrapper) – base dataset to be wrapped
- dataset¶
- class noether.data.Subset(dataset, indices)¶
Bases:
noether.data.base.wrapper.DatasetWrapperWrapper around arbitrary noether.data.Dataset instances to only use a subset of the samples, similar to torch.utils.Subset, but with support for individual getitem_* methods instead of the __getitem__ method.
Example:
from noether.data import SubsetWrapper, Dataset len(dataset) # 10 subset = SubsetWrapper(dataset=dataset, indices=[0, 2, 5, 7]) len(subset) # 4 subset[4] # returns dataset[7]
Initializes the Subset wrapper.
- Parameters:
dataset (noether.data.base.dataset.Dataset) – The base dataset to be wrapped
indices (collections.abc.Sequence[int] | numpy.typing.NDArray[numpy.integer]) – valid indices of the wrapped dataset (list, tuple, or numpy array)
- indices¶
- noether.data.with_normalizers(_func_or_key=None)¶
Decorator to apply a normalizer to the output of a getitem_* function of the implemented Dataset class.
This decorator will look for a normalizer registered under the specified key and apply it to the output of the decorated function. If no key is provided, the key is automatically inferred from the function name by removing the ‘getitem_’ prefix.
Example usage:
# Inferred key: "surface_pressure" @with_normalizers def getitem_surface_pressure(self, idx): return torch.load(f"{self.path}/surface_pressure/{idx}.pt") # Explicit key: "pressure" @with_normalizers("pressure") def getitem_surface_pressure(self, idx): return torch.load(f"{self.path}/surface_pressure/{idx}.pt")
- Parameters:
_func_or_key (str | Any | None) – The normalizer key (str) or the function being decorated. If used as @with_normalizers (no arguments), this will be the decorated function. If used as @with_normalizers(“key”), this will be the string key.
- Returns:
The decorated function with normalization applied.
- Raises:
ValueError – If the normalizer key cannot be resolved from the function name.
AttributeError – If the class instance does not have a ‘normalizers’ attribute.
KeyError – If the requested normalizer key is not found in the ‘normalizers’ dictionary.
- class noether.data.PropertySubsetWrapper(dataset, properties)¶
Bases:
noether.data.base.DatasetWrapperWrapper around arbitrary noether.data.Dataset instances to make __getitem__ load the properties that are defined in the properties attribute of this wrapper. For example, if we have a dataset that contains three kinds of items: “x”, “y”, and “z” (i.e., the dataset implements getitem_x, getitem_y, and getitem_z methods), we can create a PropertySubsetWrapper around that dataset with properties={“x”, “y”}. to only load “x” and “y” when __getitem__ is called. This is useful to avoid loading unnecessary data from disk. For example, it might be that you need different items from the same dataset during training and validation. During training, you might only need “x” and “y”, while during validation you might need “x”, “y”, and “z”. By using a PropertySubsetWrapper, you can create two different datasets for training and validation that only load the necessary items.
Example:
from noether.data import PropertySubsetWrapper, Dataset class DummyDataset(Dataset): def __init__(self): self.data = torch.arange(10) def getitem_x(self, idx): return self.data[idx] * 2 def getitem_y(self, idx): return self.data[idx] + 3 def getitem_z(self, idx): return self.data[idx] - 5 def __len__(self): return len(self.data) dataset = DummyDataset() wrapper = PropertySubsetWrapper(dataset=dataset, properties={"x", "y"}) sample = wrapper[4] # calls dataset.getitem_x(4) and dataset.getitem_y(4), getitem_z is not called sample # {"x": 8, "y": 7} wrapper.properties # {"x", "y"}
- Parameters:
dataset (noether.data.base.Dataset | noether.data.base.DatasetWrapper) – Base dataset to be wrapped. Can be a dataset or another dataset wrapper.
properties (set[str]) – Which properties to load from the wrapped dataset when __getitem__ is called.
- Raises:
TypeError – If properties is not a set.
ValueError – If properties is empty or if any property does not correspond to a getitem
- properties¶
- classmethod from_included_excluded(dataset, included_properties, excluded_properties)¶
Creates a PropertySubsetWrapper from included and excluded properties.
- Parameters:
dataset (noether.data.base.Dataset) – Base dataset to be wrapped.
included_properties (set[str] | None) – If defined, only these properties are included.
excluded_properties (set[str] | None) – If defined, these properties are excluded.
- Returns:
The created PropertySubsetWrapper.
- Return type:
- class noether.data.RepeatWrapper(config, dataset)¶
Bases:
noether.data.base.SubsetRepeats the wrapped dataset repetitions times.
Example:
from noether.data import Dataset as ListDataset dataset = ListDataset([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) len(dataset) 10 repeat_dataset = RepeatWrapper(dataset, repetitions=3) len(repeat_dataset) 30
- Parameters:
config (noether.core.schemas.dataset.RepeatWrapperConfig) – Configuration for the RepeatWrapper. See
RepeatWrapperConfigfor available options.dataset (noether.data.base.Dataset) – The dataset to repeat.
- Raises:
ValueError – If repetitions is less than 2 or if the dataset is empty. You don’t need to use this wrapper with repetitions < 2.
- repetitions¶
- class noether.data.ShuffleWrapper(config, dataset)¶
Bases:
noether.data.base.SubsetShuffles the dataset, optionally with seed.
- Parameters:
config (noether.core.schemas.dataset.ShuffleWrapperConfig) – Configuration for the ShuffleWrapper. See
ShuffleWrapperConfigfor available options.dataset (noether.data.base.Dataset | noether.data.base.DatasetWrapper) – The dataset to shuffle. Can be a base dataset or an already wrapped dataset.
- Raises:
ValueError – If the dataset is not an instance of noether.data.Dataset or DatasetWrapper, or if the seed is not an integer or None.
- seed¶
- class noether.data.SubsetWrapper(config, dataset)¶
Bases:
noether.data.base.SubsetWraps the dataset with a noether.data.Subset using indices generated by the properties from the constructor.
- Parameters:
config (noether.core.schemas.dataset.SubsetWrapperConfig) – The configuration to use. See
SubsetWrapperConfigfor available options.dataset (noether.data.base.Dataset | noether.data.base.wrapper.DatasetWrapper) – The dataset to wrap.
- Raises:
ValueError – If the input parameters are invalid.
RuntimeError – If no valid indices are provided.
- class noether.data.BatchProcessor¶
- abstractmethod denormalize(key, value)¶
Inverts the normalization from the __call__ method of a single item in the batch. If nothing needs to be done for the denormalization, this method should simply return the passed key/value.
- Parameters:
key (str) – The name of the item.
value (torch.Tensor) – The value of the item.
- Returns:
The (potentially) back-mapped name and the (potentially) denormalized value.
- Return type:
(key, value)
- class noether.data.Collator¶
Base object that uses torch.utils.data.default_collate in its __call__ function. Derived classes can overwrite the __call__ implementation to implement a custom collate function. The collator can be passed to torch.utils.data.DataLoader via the collate_fn argument (DataLoader(dataset, batch_size=2, collate_fn=Collator()).
Example
>>> collator = Collator() >>> num_samples = 2 >>> samples = [{"data": torch.randn(3, 256, 256)} for _ in range(num_samples)] >>> batch = collator(samples) >>> batch["data"].shape # torch.Size([2, 3, 256, 256])
- class noether.data.MultiStagePipeline(collators=None, sample_processors=None, batch_processors=None)¶
Bases:
noether.data.pipeline.collator.Collator- A Collator that processes the list of samples into a batch in multiple stages:
sample_processors: Processing the data before collation on a per-sample level.
collators: Conversion from a list of samples into a batch (dict of usually tensors).
batch_processors: Processing after collation on a batch-level.
Most of the work is usually done by the sample_processors. One or two collators, and batch processors are often not needed. However this depends on the use case. .. rubric:: Example
>>> sample_processors = [MySampleProcessor1(), MySampleProcessor2()] >>> collators = [MyCollator1(), MyCollator2()] >>> batch_processors = [MyBatchProcessor1(), MyBatchProcessor2()] >>> multistage_pipeline = MultiStagePipeline( >>> sample_processors=sample_processors, >>> collators=collators, >>> batch_processors=batch_processors >>> ) >>> batch = multistage_pipeline(samples)
- Parameters:
sample_processors (dict[str, SampleProcessorType] | list[SampleProcessorType] | None) – A list of callables that will be applied sequentially to pre-process on a per-sample level (e.g., subsample a pointcloud).
collators (dict[str, noether.data.pipeline.collator.CollatorType] | list[noether.data.pipeline.collator.CollatorType] | None) – A list of callables that will be applied sequentially to convert the list of individual samples into a batched format. If None, the default PyTorch collator will be used.
batch_processors (dict[str, BatchProcessorType] | list[BatchProcessorType] | None) – A list of callables that will be applied sequentially to process on a per-batch level.
- sample_processors = []¶
- batch_processors = []¶
- get_sample_processor(predicate)¶
Retrieves a sample processor by a predicate function. Examples: - Search by type (assumes the sample processor type only occurs once in the list of sample processors)
pipeline.get_sample_processor(lambda p: isinstance(p, MySampleProcessorType))
Search by type and member pipeline.get_sample_processor(lambda p: isinstance(p, PointSamplingSampleProcessor) and “input_pos” in p.items)
- Parameters:
predicate (collections.abc.Callable[[Any], bool]) – A function that is called for each processor and selects if this is the right one.
- Returns:
The matching sample processor.
- Return type:
Any
- Raises:
ValueError – If no matching sample processor are found, multiple matching sample processors are found or if there are no sample processors.
- class noether.data.SampleProcessor¶
- abstractmethod inverse(key, value)¶
Inverts the transformation from the __call__ method of a single item in the batch. Only should be implemented if the SampleProcessor is invertable or if the identity function is valid.
- Parameters:
key (str) – The name of the item.
value (torch.Tensor) – The value of the item.
- Returns:
The (potentially) back-mapped name and the (potentially) denormalized value.
- Return type:
(key, value)
- static save_copy(obj)¶
Make a deep copy of an object to avoid modifying the original object.
- Parameters:
obj (T) – Any object that should be copied.
- Returns:
A deep copy of the input object.
- Return type:
T
- class noether.data.ComposePreProcess(normalization_key, preprocessors)¶
Compose multiple transforms and support inversion by reversing the sequence and inverting each transform.
Example:
from noether.data.preprocessors.compose import ComposePreProcess normalizer = ComposePreProcess( normalization_key="image", preprocessors=[ MyPreProcessor1(), MyPreProcessor2(), ], ) processed_data = normalizer(input_data) original_data = normalizer.inverse(processed_data)
- Parameters:
normalization_key (str) – key to identify on which getitem_ in the dataset/tensor the preprocessor is applied.
preprocessors (list[noether.data.preprocessors.base.PreProcessor]) – list of PreProcessor instances to compose.
- Raises:
TypeError – If preprocessors is not a list or if any item in the list is not an instance of PreProcessor.
ValueError – If the preprocessors list is empty.
- normalization_key¶
- transforms¶
- inverse(x)¶
Return a transform that applies the inverse transformations in reverse order. :param x: The input to be denormalized.
- Parameters:
x (Any)
- Return type:
Any
- class noether.data.PreProcessor(normalization_key)¶
Base class for all data preprocessors. Example:
class MyPreProcessor(PreProcessor): def __init__(self, normalization_key: "image"): super().__init__(normalization_key=normalization_key) def __call__(self, x): # Example processing: normalize to [0, 1] return x / 255.0 def denormalize(self, x): # Example denormalization: scale back to [0, 255] return x * 255.0
- Parameters:
normalization_key (str) – key to identify on which getitem_ in the dataset/tensor the preprocessor is applied.
- Raises:
TypeError – If normalization_key is not a string.
- normalization_key¶
- abstractmethod denormalize(x)¶
Denormalizes the input data. This method should be overridden by subclasses if denormalization is supported. If denormalization is not supported, it raises NotImplementedError or decide to implement the identity function.
- Parameters:
x (torch.Tensor) – The input tensor to denormalize.
- Return type:
torch.Tensor | numpy.typing.NDArray
- noether.data.ScalarOrSequence¶
- noether.data.to_tensor(data)¶
Helper function to convert input data to a PyTorch tensor if it is not already one.
- Parameters:
data (noether.data.preprocessors.types.ScalarOrSequence) – The input data to convert. Can be a sequence of floats, a torch.Tensor, or None.
- Return type:
Returns: The input data as a torch.Tensor if it was a sequence, the original tensor if it was already a torch.Tensor, or None if the input was None. :raises TypeError: If the input data is of an unsupported type.
- class noether.data.InterleavedSampler(train_sampler, config, train_collator=None, callback_samplers=None)¶
Sampler to allow efficient dataloading by using a single large dataset containing train/test/… datasets all at once. The sampler will sample from different regionis in the dataset according to its specification. For example, consider a training dataset of length 100 and a test dataset of length 10. If the sampler is configured with a RandomSampler of the training dataset indices as main_sampler, it will repeatedly iterate over the training dataset. If the test dataset is configured with a sequential sampler that should be invoked after every epoch, the sampler will first return indices for the 100 training samples (randomly sampled) and then indices for the 10 test samples (in sequential order).
- Parameters:
train_sampler (noether.core.utils.common.SizedIterable) – Sampler that is invoked by default (e.g., randomly sample from the trainset)
config (InterleavedSamplerConfig) – Configuration for the InterleavedSampler.
train_collator (collections.abc.Callable | None) – Collator used to collate samples from indices sampled from the train sampler.
callback_samplers (list[SamplerIntervalConfig] | None) – Configurations when the train_sampler should be paused and indices from other samplers (e.g., from a testset) should be returned. Also configures the interval and optionally a different batch_size to use for the interleaved batches.
- config¶
- main_sampler¶
- extra_samplers = []¶
- index_offsets = []¶
- dataset¶
- collator¶
- batch_sampler¶
- batch_size¶
- static calculate_start(config, sampler_len)¶
- Parameters:
config (InterleavedSamplerConfig)
sampler_len (int)
- get_data_loader(num_workers=0, pin_memory=False)¶
Creates the DataLoader that uses the InterleavedSampler with the accordingly configured dataset.
- Parameters:
- Returns:
DataLoader that uses the InterleavedSampler with the accordingly configured dataset.
- Return type:
- class noether.data.InterleavedSamplerConfig(/, **data)¶
Bases:
pydantic.BaseModel- Parameters:
data (Any)
- max_epochs: int | None = None¶
How many epochs to sample at most from the main_sampler. Whatever limit is reached first (epochs/updates/samples) will stop the sampling.
- max_updates: int | None = None¶
How many updates to sample at most from the main_sampler. Whatever limit is reached first (epochs/updates/samples) will stop the sampling.
- max_samples: int | None = None¶
How many samples to sample at most from the main_sampler. Whatever limit is reached first (epochs/updates/samples) will stop the sampling.
- start_epoch: int | None = None¶
At which epoch to start (used for resuming training). Mutually exclusive with start_update and start_sample.
- start_update: int | None = None¶
At which update to start (used for resuming training). Mutually exclusive with start_epoch and start_sample.
- start_sample: int | None = None¶
At which sample to start (used for resuming training). Mutually exclusive with start_epoch and start_update.
- evaluation: bool = False¶
If True, the sampler is used for evaluation and will only iterate over the interleaved samplers once without iterating over the main sampler.
- classmethod check_positive_values(v)¶
Ensures that all integer-based frequency and batch size fields are positive.
- validate_stop()¶
Ensures that at least one frequency (’_n_’) is specified and
- Return type:
- validate_start()¶
Ensures that at least one start (‘start_*’) is specified
- Return type:
- class noether.data.SamplerIntervalConfig¶
Configuration dataclass for setting up the dataloading pipeline, which is structured to load data from a “main” dataset (i.e., the dataset used for training), which is interleaved by iterations over other datasets (e.g., a test dataset to calculate a metric in a callback) in regular intervals.
- Parameters:
sampler (SizedIterable) – Any sampler that would be used in torch.utils.data.DataLoader(sampler=…). Examples: RandomSampler for a training dataset or SequentialSampler for evaluation.
every_n_epochs (int | None) – Epoch-based interval. Invokes the callback after every n epochs. Mutually exclusive with other intervals.
every_n_updates (int | None) – Update-based interval. Invokes the callback after every n epochs. Mutually exclusive with other intervals.
every_n_samples (int | None) – Sample-based interval. Invokes the callback after every n epochs. Mutually exclusive with other intervals.
pipeline (Optional[callable]) – Any function that would be used in torch.utils.data.DataLoader(collate_fn=…).
batch_size (int | None) – Batch size to use for this callback. Default: None (which will use the same batch_size as used for the “main” sampler, i.e., the one used for training).
- pipeline: collections.abc.Callable | None¶
- validate_frequency()¶
Ensures that exactly one frequency (‘every_n_*’) is specified and that ‘batch_size’ is present if ‘every_n_samples’ is used.
- Return type: