Noether Development CLI

To make development of the Noether Framework easier, we introduce the noether-development CLI that allows you to develop and test individual components of the framework in isolation. It makes sure that one does not have to set up a full training loop with all the components of the framework to test a single component, e.g., a dataset, model, or callbacks.

However, although modules can be developed in isolation, some modules still depend on each other in the larger framework.

Here is how the dependency graph of the main Noether modules looks like:

  • Dataset (1): The dataset is responsible for loading the data per sample from disk and is at the root of the dependency graph.

    • Pipeline (2): The multi-stage pipeline (i.e., collator) collates the data loaded by the dataset into a batch and applies additional transformations to the data (if applicable). The pipeline depends on the dataset and cannot be configured without one.

      • Model (3): Performs the forward pass on the collated batch.

        • Trainer (4.1): Manages the training loop, mainly defining the loss computation.

        • Callbacks (4.2): For now only PeriodicDataIteratorCallbacks are supported.

This implies that a model (3) cannot be developed without a dataset (1) and pipeline (2), and a trainer (4.1) or callbacks (4.2) cannot be developed without a model (3), etc. In practice, it would be possible to develop each component fully in isolation by using dummy data. However, we decided to enforce that the dependencies between the modules are correct and close to the real use case.

Building a Dataset

We start with building a simple custom dataset that generates random dummy data. This dataset is also presented in development/datasets.py.

import torch

from noether.core.schemas.dataset import StandardDatasetConfig
from noether.data import Dataset, with_normalizers


class DevelopmentDatasetConfig(StandardDatasetConfig):
    root: str | None = None
    x_dim: int
    y_dim: int
    z_dim: int
    sample_size: int
    num_samples: int


class DevelopmentDataset(Dataset):
    def __init__(self, dataset_config: DevelopmentDatasetConfig):
        super().__init__(dataset_config=dataset_config)
        self.dataset_config = dataset_config

    def __len__(self):
        return self.dataset_config.num_samples

    @with_normalizers
    def getitem_x(self, idx: int):
        return torch.randn((self.dataset_config.sample_size, self.dataset_config.x_dim))

    @with_normalizers
    def getitem_y(self, idx: int):
        return torch.randn((self.dataset_config.sample_size, self.dataset_config.y_dim))

    @with_normalizers
    def getitem_z(self, idx: int):
        return torch.randn((self.dataset_config.sample_size, self.dataset_config.z_dim))

Now, if we create a file development_config.yaml with the following content, we can test the dataset by running the noether-development CLI. You can configure the batch size to be any value, but for testing purposes we set it to 8 (this is not the same batch size as the effective batch size used in the trainer).

config_schema_kind: development.development_schema.DevelopmentSchema
batch_size: 8
datasets:
    development_dataset:
        kind: development.datasets.DevelopmentDatase
        x_dim: 3
        y_dim: 2
        z_dim: 1
        sample_size: 10
        num_samples: 1000
        split: train

Via the following command, the noether-development CLI will load the dataset and print out the content of a random batch.

uv run noether-development --hp configs/development_config.yaml

The output should look something like this:

============================================================
RAW BATCH SAMPLE CONTENTS (first random sample from the batch)
============================================================
Batch type: <class 'dict'>
Batch keys (4): ['index', 'x', 'y', 'z']

[index]
    type: int
    value:  455

[x]
    type: Tensor
    shape:  (1024, 3)
    dtype:  torch.float32
    device: cpu
    first 10 values: [-1.154662013053894, 0.4905768632888794, ....

You can now test the behavior of the dataset and its normalizers, either via the CLI or via breakpoints in debug mode (via calling the CLI tool). If you want to add normalizers to the dataset, you can do so by adding the following to the dataset config in development_config.yaml:

development_dataset:
    ...
    dataset_normalizers:
        x:
            - kind: noether.data.preprocessors.normalizers.MeanStdNormalization
              mean: [0, 0, 0]
              std: [1, 1, 1]
        y:
            - kind: noether.data.preprocessors.normalizers.MeanStdNormalization
              mean: [1, 1]
              std: [1, 1]
        z:
            - kind: noether.data.preprocessors.normalizers.MeanStdNormalization
              mean: [2]
              std: [1]

Running the CLI again will now print out the normalized batch content, centered around the mean values defined in the normalizers (i.e., 0, 1, 2).

Building a Pipeline

Given that we have a dataset that loads and normalizes tensor data, we can continue to build the pipeline that collates the samples into batches.

from pydantic import Field

from noether.core.schemas.dataset import PipelineConfig
from noether.data.pipeline import MultiStagePipeline
from noether.data.pipeline.collators import (
    DefaultCollator,
)
from noether.data.pipeline.sample_processors import (
    ConcatTensorSampleProcessor,
    PointSamplingSampleProcessor,
)


class DevelopmentPipelineConfig(PipelineConfig):
    num_points: int = Field(default=56, description="Number of points to sample from the point cloud.")


class DevelopmentPipeline(MultiStagePipeline):
    def __init__(self, config: DevelopmentPipelineConfig):
        self.config = config

        super().__init__(
            sample_processors=[
                PointSamplingSampleProcessor(items=["x", "y", "z"], num_points=self.config.num_points),
                ConcatTensorSampleProcessor(items=["x", "z"], target_key="x_z", dim=1),
            ],
            batch_processors=[],
            collators=[DefaultCollator(items=["x_z", "y"])],
        )

This simple multi-stage pipeline samples the x, y, z tensors to a fixed number of points and concatenates the x and z tensors into a new tensor called x_z, which is then collated together with the y tensor into a batch. To add the pipeline to the development config, we can add the following to the dataset config in development_config.yaml:

development_dataset:
    ...
    pipeline:
        kind: development.pipeline.DevelopmentPipeline
        num_points: 64

Now when we run the CLI again, we can see the collated batch content, which should now contain the collated tensors x_z and y.

============================================================
COLLATED BATCH CONTENTS
============================================================
Batch type: <class 'dict'>
Batch keys (2): ['x_z', 'y']

[x_z]
    type: Tensor
    shape:  (8, 64, 5)
    dtype:  torch.float32
    device: cpu
    first 10 values: [-0.24723079800605774, -0.06603412330150604, -0.47881123423576355, 0.3252887725830078, ...

[y]
    type: Tensor
    shape:  (8, 64, 2)
    dtype:  torch.float32
    device: cpu
    first 10 values: [-1.2091279029846191, -1.2091279029846191, -0.3098295032978058, -0.3098295032978058, ...

We can see that the collated batch contains the x_z (with dimensions x + z) and y tensors, both containing 8 samples with 64 points, which are the outputs of the pipeline and the collator, respectively.

Building a Model

Next we can build a simple model that takes the collated batch as input and performs a forward pass on the x_z tensor to predict the y tensor.

import torch

from noether.core.models import Model
from noether.core.schemas.models import ModelBaseConfig


class DevelopmentModelConfig(ModelBaseConfig):
    kind: str = "development.DevelopmentModel"
    name: str = "development_model"
    input_dim: int
    hidden_dim: int = 256
    output_dim: int


class DevelopmentModel(Model):
    def __init__(self, model_config: DevelopmentModelConfig, **kwargs):
        super().__init__(model_config=model_config, **kwargs)

        self.layer1 = torch.nn.Linear(self.model_config.input_dim, self.model_config.hidden_dim)
        self.layer2 = torch.nn.Linear(self.model_config.hidden_dim, self.model_config.output_dim)

    def forward(self, x_z: torch.Tensor) -> torch.Tensor:
        x = torch.relu(self.layer1(x_z))
        x = self.layer2(x)

        return {"output": x}

Matching to the model, we add the following config to the development_config.yaml:

...
model: # can be commented out to test behavior when no model is defined, skip model instantiation and forward pass
    kind: development.model.DevelopmentModel
    input_dim: 5
    output_dim: ${datasets.development_dataset.y_dim}
    forward_properties: ["x_z"]

If we now run the CLI again, we can see the output of the forward pass of the model on the collated batch content.

============================================================
MODEL OUTPUT CONTENTS
============================================================
Batch type: <class 'dict'>
Batch keys (1): ['output']

[output]
    type: Tensor
    shape:  (8, 64, 1)
    dtype:  torch.float32
    device: cpu
    first 10 values: [-0.19360095262527466, 0.00014679878950119019, -0.21560004353523254, ...
============================================================

You can see that we get an output tensor with the same number of samples and points as the input x_z tensor, but with a different number of features (i.e., output_dim), which is the output of the model forward pass. Note that it is not necessary to configure an optimizer with the model (which is needed for running a training loop).

Building a Trainer and Callbacks

Finally, we can work on the last two modules that depend on the dataset/pipeline/model, which are the trainer and the callbacks. We first start with the trainer, where the user has to define the loss computation:

import torch.nn.functional as F

from noether.core.schemas.trainers import BaseTrainerConfig
from noether.training.trainers import BaseTrainer


class DevelopmentTrainerConfig(BaseTrainerConfig):
    pass


class DevelopmentTrainer(BaseTrainer):
    def __init__(self, trainer_config: DevelopmentTrainerConfig, **kwargs):
        super().__init__(
            config=trainer_config,
            **kwargs,
        )

        self.config = trainer_config

    def loss_compute(self, forward_output: dict[str, any], targets: dict[str, any]) -> dict[str, any]:
        loss = F.mse_loss(forward_output["output"], targets["y"])
        return {"loss": loss}

Then add the following to the development_config.yaml to configure the trainer:

...
trainer: # can be commented out to test behavior when no trainer is defined, skip training loop
    kind: development.trainer.DevelopmentTrainer
    effective_batch_size: ${development_batch_size} # has to be set, but not used during development
    max_epochs: 10 # has to be set, but not used during development
    callbacks: null
    forward_properties: ${model.forward_properties}
    target_properties: ["y"]

The model will call the trainer.train_step method, which will compute the loss on the forward output of the model and the target y tensor from the dataset. Adding the trainer to the config produces output like this:

Backward pass completed successfully.

All model parameters received gradients successfully.

============================================================
TRAINER OUTPUT
============================================================
total_loss: 1.061652

losses_to_log (1):
    [loss]: 1.061652

additional_outputs: None
============================================================

The development pipeline performs a backward pass on the loss computed by the trainer, and checks that all model parameters receive gradients successfully. Additionally, the outputs of loss_compute are printed as the trainer output.

Finally, we can add a callback to the trainer (since the callbacks are configured as part of the trainer) to test the behavior of the callbacks in the training loop. For now, it is only possible to add PeriodicDataIteratorCallbacks to the trainer during development.

from typing import Literal

import torch
import torch.nn.functional as F

from noether.core.callbacks.periodic import PeriodicDataIteratorCallback
from noether.core.schemas.callbacks import PeriodicDataIteratorCallbackConfig


class DevelopmentCallbackConfig(PeriodicDataIteratorCallbackConfig):
    name: Literal["DevelopmentCallback"] = "DevelopmentCallback"
    forward_properties: list[str] = []


class DevelopmentCallback(PeriodicDataIteratorCallback):
    def __init__(self, callback_config: DevelopmentCallbackConfig, **kwargs):
        super().__init__(callback_config=callback_config, **kwargs)
        self.config = callback_config

    def process_data(self, batch: dict[str, torch.Tensor], **kwargs) -> None:
        loss = F.mse_loss(
            self.model(**{prop: batch[prop] for prop in self.config.forward_properties})["output"], batch["y"]
        )

        return {"mse_loss": loss.item()}

This simple callback computes the mean squared error loss on the forward output of the model and the target y tensor from the dataset, and returns it as an additional output. To use the callback, we can add the following to the trainer config in development_config.yaml:

...
trainer:
    ...
    callbacks:
        - kind: development.callbacks.DevelopmentCallback
          batch_size: 1
          every_n_epochs: 1
          dataset_key: development_dataset
          name: DevelopmentCallback
          forward_properties: ${model.forward_properties}

Only the process_data method is called in the development pipeline. The output of the callback is printed in the CLI, which should look something like this:

============================================================
CALLBACK OUTPUT CONTENTS
============================================================
Batch type: <class 'dict'>
Batch keys (1): ['mse_loss']

[mse_loss]
    type: float
    value:  1.2776843309402466
============================================================

Running a full training with the development setup

Now that we have built all the required components for a full training loop, we can put them to use with noether-train.

We need to add an optimization config to the model configuration, for example:

...
model:
    ...
    optimizer_config:
        kind: torch.optim.AdamW
        lr: 1.0e-3
        weight_decay: 0.05
        clip_grad_norm: 1.0
        schedule_config:
            kind: noether.core.schedules.LinearWarmupCosineDecaySchedule
            warmup_percent: 0.05
            end_value: 1.0e-6
            max_value: ${model.optimizer_config.lr}

and at the top of the config we need to add:

output_path: <path_to_output_dir>

That’s it. Now it should be possible to run a full training with the noether-train CLI using the config and modules just developed.

uv run noether-train --hp configs/development_config.yaml  +seed=1 +devices=\"0\" +accelerator=cpu