noether.data.zarr_store

Chunked, sharded, pre-shuffled Zarr store for CFD datasets.

This package provides a blob-storage-friendly alternative to the per-field .pt layout: each sample is a Zarr group with fused float32 coords and float16 fields arrays per domain, chunked along the point axis and packed into shards. Pre-shuffling points at write time lets the dataloader subsample by reading a few random chunks (byte-range reads) instead of loading the whole sample.

Submodules

Attributes

Classes

FieldSpec

Static description of one CFD field.

ArrayLayout

Layout of a single per-field Zarr array.

DomainLayout

Per-field arrays of one domain.

DomainSample

Per-sample chunk grid for one domain.

SampleEntry

Manifest entry for a single sample.

StoreManifest

Top-level manifest for a converted Zarr store.

ZarrChunkReader

Reads chunk-subsampled (or full) per-field tensors from a converted store.

ZarrStoreWriter

Convert CFD samples into the chunked/sharded Zarr format and track the manifest.

Functions

build_domain_layouts(filemap[, coords_dtype, ...])

Build the per-domain DomainLayout (one array per field) present in filemap.

filename_to_canonical(filemap)

Reverse map stored_filename -> canonical_field for the present fields.

present_specs(filemap)

Return the FIELD_SPECS whose filemap_attr is set on filemap.

calculate_store_statistics(store_root, *[, fields, ...])

Stream all samples of a Zarr store and accumulate per-field running statistics.

is_remote(store_root)

Return True if store_root is an fsspec URL backed by a non-local filesystem.

make_store(path, *[, read_only])

Build a Zarr store for path.

Package Contents

noether.data.zarr_store.CANONICAL_TO_SPEC: dict[str, FieldSpec]
noether.data.zarr_store.FIELD_SPECS: list[FieldSpec] = []
class noether.data.zarr_store.FieldSpec

Static description of one CFD field.

filemap_attr: str

Attribute name on FileMap.

domain: str
name: str

Short name within the domain; combined as f"{domain}_{name}".

dim: int

Channel width (1 for scalars, 3 for vectors).

kind: str

"coord" or "value".

property canonical: str

Canonical field key, e.g. "volume_velocity".

Return type:

str

noether.data.zarr_store.build_domain_layouts(filemap, coords_dtype='float32', values_dtype='float16', field_dtypes=None)

Build the per-domain DomainLayout (one array per field) present in filemap.

Parameters:
  • filemap (noether.data.schemas.FileMap) – Field-to-filename mapping for the dataset being converted.

  • coords_dtype (str) – Dtype string for position arrays.

  • values_dtype (str) – Dtype string for physical field arrays.

  • field_dtypes (dict[str, str] | None) – Per-field dtype overrides keyed by canonical name, e.g. {"volume_vorticity": "float32"} for fields whose dynamic range exceeds values_dtype (float16 caps at ~6.6e4).

Returns:

Mapping domain -> DomainLayout.

Raises:

ValueError – If a domain has no (or more than one) coordinate field.

Return type:

dict[str, noether.data.zarr_store.manifest.DomainLayout]

noether.data.zarr_store.filename_to_canonical(filemap)

Reverse map stored_filename -> canonical_field for the present fields.

Lets a Zarr-backed dataset translate the .pt filename referenced by an AeroDataset.getitem_* method back to the canonical field key used in the Zarr store.

Parameters:

filemap (noether.data.schemas.FileMap)

Return type:

dict[str, str]

noether.data.zarr_store.present_specs(filemap)

Return the FIELD_SPECS whose filemap_attr is set on filemap.

Parameters:

filemap (noether.data.schemas.FileMap)

Return type:

list[FieldSpec]

class noether.data.zarr_store.ArrayLayout(/, **data)

Bases: pydantic.BaseModel

Layout of a single per-field Zarr array.

Every field is its own array (<domain>/<name>) so fields can be read independently. The channel axis is never chunked, and each array is packed into a single whole-array shard, so the per-sample object count stays at one object per field while chunks remain individually range-readable.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

array_name: str

Zarr path of the array within the per-sample group, e.g. "volume/velocity".

field: str

Canonical field name served by this array, e.g. "volume_velocity".

dtype: str

On-disk dtype of the array, e.g. "float32" or "float16".

dim: int

Channel width (1 for scalars, 3 for vectors).

class noether.data.zarr_store.DomainLayout(/, **data)

Bases: pydantic.BaseModel

Per-field arrays of one domain.

All arrays of a domain share the point axis, the shuffle permutation and the chunk grid, so chunk c addresses the same physical points in every field.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

position: str

Canonical name of the domain’s coordinate field (e.g. "volume_position").

arrays: dict[str, ArrayLayout]

Mapping canonical_field -> array layout (includes the position array).

class noether.data.zarr_store.DomainSample(/, **data)

Bases: pydantic.BaseModel

Per-sample chunk grid for one domain.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

n_points: int

Number of points (rows) for this sample/domain.

chunk_points: int

Chunk size along the point axis.

shard_points: int

Shard size along the point axis — a whole number of chunks; the full array (n_chunks * chunk_points) unless the writer’s shard_points cap split the arrays into multiple shards.

n_chunks: int

Number of chunks along the point axis (ceil(n_points / chunk_points)).

class noether.data.zarr_store.SampleEntry(/, **data)

Bases: pydantic.BaseModel

Manifest entry for a single sample.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

relpath: str

Path of the sample’s Zarr group relative to the store root.

domains: dict[str, DomainSample]

Per-domain chunk grids.

class noether.data.zarr_store.StoreManifest(/, **data)

Bases: pydantic.BaseModel

Top-level manifest for a converted Zarr store.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

dataset_name: str
format_version: int = 1
shuffle_seed: int

Base seed used to derive the per-sample point shuffle permutations.

coords_dtype: str = 'float32'
values_dtype: str = 'float16'
compressor: str = 'blosc-zstd'
domains: dict[str, DomainLayout]

Global column layout, shared by every sample.

samples: dict[str, SampleEntry] = None

Per-sample chunk grids keyed by sample id.

MANIFEST_NAME: ClassVar[str] = 'manifest.json'
save(store_root)

Write the manifest to <store_root>/manifest.json (local path or fsspec URL).

Parameters:

store_root (str | pathlib.Path)

Return type:

str

classmethod load(store_root)

Load the manifest from <store_root>/manifest.json (local path or fsspec URL).

Parameters:

store_root (str | pathlib.Path)

Return type:

StoreManifest

class noether.data.zarr_store.ZarrChunkReader(store_root, manifest=None, store_factory=_default_store_factory, read_concurrency=1)

Reads chunk-subsampled (or full) per-field tensors from a converted store.

Parameters:
  • store_root (str | pathlib.Path) – Root of the converted Zarr store — a local path or an fsspec URL (s3://, gs://, memory://, …) for object storage.

  • manifest (noether.data.zarr_store.manifest.StoreManifest | None) – Pre-loaded manifest; loaded from store_root if omitted.

  • store_factory (collections.abc.Callable[[str], zarr.abc.store.Store]) – Builds a Zarr store from a sample-group path/URL. Defaults to a read-only LocalStore/FsspecStore by path; override to wrap the store (e.g. for byte-accounting in benchmarks).

  • read_concurrency (int) – Max threads used to fetch a sample’s chunks in parallel. 1 (default) reads serially — best for fast local stores. Use a value around the number of chunks per sample to hide latency on S3.

store_root = ''
manifest
read_concurrency
read_sample(sample_id, num_points=None, generator=None, fields=None)

Read (optionally chunk-subsampled) fields for one sample.

With read_concurrency == 1 each array is read with a single orthogonal selection (lowest overhead, best on local stores). With read_concurrency > 1 all chunk reads of the sample are issued concurrently in a thread pool (best latency hiding on S3). Both paths return identical results.

Parameters:
  • sample_id (str) – Sample id present in the manifest.

  • num_points (dict[str, int | None] | None) – Per-domain target counts, e.g. {"surface": 3586, "volume": 4096}. A None value (or missing domain) reads the full domain.

  • generator (torch.Generator | None) – Torch RNG for chunk selection. Pass a seeded generator for deterministic evaluation; None draws a fresh subset each call.

  • fields (set[str] | None) – Optional subset of canonical fields to read. Because every field is its own array, unrequested fields cost no I/O. None reads all fields.

Returns:

Mapping canonical_field -> tensor with scalar fields shaped (T,) and vector fields shaped (T, dim), all float32 and point-aligned per domain.

Return type:

dict[str, torch.Tensor]

read_coords(sample_id, domain, num_points, generator=None)

Read an independent chunk-subsample of a domain’s point positions only.

This is a separate random draw from read_sample() (it consumes the same generator, so it is uncorrelated with the field draw), reading just the domain’s position array. It backs the AB-UPT geometry_position input — a random draw of the surface points distinct from the surface anchor points.

Parameters:
  • sample_id (str) – Sample id present in the manifest.

  • domain (str) – Domain whose positions to read (e.g. "surface").

  • num_points (int | None) – Number of points to sample (None reads all positions).

  • generator (torch.Generator | None) – Torch RNG for chunk selection.

Returns:

(num_points, position_dim) float32 positions tensor.

Return type:

torch.Tensor

noether.data.zarr_store.calculate_store_statistics(store_root, *, fields=None, exclude_fields=None, sample_ids=None, limit=None, max_workers=1, read_concurrency=1, progress=False)

Stream all samples of a Zarr store and accumulate per-field running statistics.

Parameters:
  • store_root (str | pathlib.Path) – Store root (local path or fsspec URL).

  • fields (set[str] | None) – Restrict to these canonical field names (default: every stored field).

  • exclude_fields (set[str] | None) – Field names to skip.

  • sample_ids (list[str] | None) – Restrict to these manifest sample ids (default: all samples).

  • limit (int | None) – Process at most this many samples (after sample_ids filtering).

  • max_workers (int) – Samples read concurrently (threads); accumulation stays single-threaded.

  • read_concurrency (int) – Per-sample chunk-read threads (see ZarrChunkReader).

  • progress (bool) – Show a tqdm progress bar.

Returns:

Mapping from canonical field name to its RunningStats (per-component mean/std/min/max and logscale moments, accumulated in float64).

Raises:

ValueError – If a requested field is not present in the store, or no samples remain to process. Sample ids missing from the store are skipped with a warning (split files may list samples that were skipped at conversion).

Return type:

dict[str, noether.data.stats.RunningStats]

noether.data.zarr_store.is_remote(store_root)

Return True if store_root is an fsspec URL backed by a non-local filesystem.

Parameters:

store_root (str | pathlib.Path)

Return type:

bool

noether.data.zarr_store.make_store(path, *, read_only=False)

Build a Zarr store for path.

Uses the Rust-backed ObjectStore for s3:// URLs when the optional obstore package is installed, FsspecStore for any other URL (or for S3 without obstore), and LocalStore for local paths.

Parameters:
Return type:

zarr.abc.store.Store

class noether.data.zarr_store.ZarrStoreWriter(store_root, filemap, dataset_name, shuffle_seed=0, chunk_points=4096, shard_points=None, coords_dtype='float32', values_dtype='float16', field_dtypes=None, compression_level=5)

Convert CFD samples into the chunked/sharded Zarr format and track the manifest.

Parameters:
  • store_root (str | pathlib.Path) – Output location for the Zarr store. A local path or an fsspec URL (s3://, gs://, memory://, …) for object storage.

  • filemap (noether.data.schemas.FileMap) – Field-to-filename mapping describing which fields exist.

  • dataset_name (str) – Human-readable dataset name recorded in the manifest.

  • shuffle_seed (int) – Base seed for the per-sample point shuffle.

  • chunk_points (int) – Chunk size along the point axis. Pick close to the training subsample size to minimise read amplification.

  • shard_points (int | None) – Cap on the shard size along the point axis (rounded down to a whole number of chunks, minimum one chunk). None (default) packs each array into a single whole-array shard. Set this when per-field arrays grow large: shard bytes ≈ shard_points × dim × dtype_size, so e.g. a ~128 MB cap on a float32×3 position array is shard_points 11_000_000. Smaller shards bound the writer’s per-shard RAM and the blast radius of a corrupt object, at the cost of more objects per array.

  • coords_dtype (str) – Dtype for the positions array (keep float32).

  • values_dtype (str) – Dtype for the physical fields array (float16 halves bytes).

  • field_dtypes (dict[str, str] | None) – Per-field dtype overrides keyed by canonical name, e.g. {"volume_vorticity": "float32"} for fields whose values exceed the values_dtype range (float16 caps at ~6.6e4); overflowing casts are rejected at write time rather than silently stored as inf.

  • compression_level (int) – blosc/zstd compression level.

store_root = ''
filemap
chunk_points = 4096
shard_points = None
coords_dtype = 'float32'
values_dtype = 'float16'
field_dtypes = None
compression_level = 5
layouts
manifest
write_group(sample_id, field_arrays)

Write one sample’s Zarr group and return its manifest entry (no manifest mutation).

Independent per sample (its own store), so this is safe to call concurrently from multiple threads; the caller records the returned entry in the manifest.

Parameters:
  • sample_id (str) – Stable id used for the relative path and shuffle seed (e.g. "param1/<hash>").

  • field_arrays (dict[str, numpy.ndarray]) – Mapping canonical_field -> numpy array. Positions must be (N, 3); scalar fields may be (N,) or (N, 1).

Returns:

The SampleEntry describing the written group.

Raises:

ValueError – If a domain’s fields disagree on point count.

Return type:

noether.data.zarr_store.manifest.SampleEntry

write_sample(sample_id, field_arrays)

Write one sample and record it in the manifest (sequential convenience).

Parameters:
  • sample_id (str)

  • field_arrays (dict[str, numpy.ndarray])

Return type:

None

to_init_kwargs()

Constructor kwargs to rebuild an identical writer (e.g. in a worker process).

Return type:

dict[str, object]

save_manifest()

Persist the manifest to the store root (local path or fsspec URL).

Return type:

str