API Reference#

File Patterns#

class pangeo_forge_recipes.patterns.FilePattern(format_function, *combine_dims, fsspec_open_kwargs=None, query_string_secrets=None, file_type='netcdf4')#

Represents an n-dimensional matrix of individual files to be combined through a combination of merge and concat operations. Each operation generates a new dimension to the matrix.

Parameters:
  • format_function (Callable) – A function that takes one argument for each combine_op and returns a string representing the filename / url paths. Each argument name should correspond to a name in the combine_dims list.

  • combine_dims (CombineDim) – A sequence of either concat or merge dimensions. The outer product of the keys is used to generate the full list of file paths.

  • fsspec_open_kwargs (Optional[Dict[str, Any]]) – A dictionary of kwargs to pass to fsspec.open to aid opening of source files. For example, {"block_size": 0} may be passed if an HTTP source file server does not permit range requests. Authentication for fsspec-compatible filesystems may be handled here as well. For HTTP username/password-based authentication, your specific fsspec_open_kwargs will depend on the configuration of the source file server, but are likely to conform to one of the following two formats: {"username": "<username>", "password": "<password>"} or {"auth": aiohttp.BasicAuth("<username>", "<password>")}.

  • query_string_secrets (Optional[Dict[str, str]]) – If provided, these key/value pairs are appended to the query string of each url at runtime. Query string parameters which are not secrets should instead be included directly in the URLs returns by the format_function.

  • file_type (str) – The file format of the source files for this pattern. Must be one of the options defined by pangeo_forge_recipes.patterns.FileType. Note: FileType.opendap cannot be used with caching.

__getitem__(indexer)#

Get a filename path for a particular key.

Return type:

str

__iter__()#

Iterate over all keys in the pattern.

Return type:

Iterator[Index]

property concat_dims: List[str]#

List of dims that are concat operations

property concat_sequence_lens: Dict[str, int | None]#

Dictionary mapping concat dims to sequence lengths. Only available if nitems_per_input is set on the dimension.

property dims: Dict[str, int]#

Dictionary representing the dimensions of the FilePattern. Keys are dimension names, values are the number of items along each dimension.

get_merkle_list()#

Compute the merkle tree for the current FilePattern.

Return a list of hashes, of length len(filepattern)+1. The first item in the list is calculated by hashing attributes of the FilePattern instance. Each subsequent item is calculated by hashing the byte string produced by concatinating the next index:filepath pair yielded by items() with the previous hash in the list.

Return type:

List[bytes]

items()#

Iterate over key, filename pairs.

property merge_dims: List[str]#

List of dims that are merge operations

property nitems_per_input: Dict[str, int | None]#

Dictionary mapping concat dims to number of items per file.

prune(nkeep=2)#

Create a smaller pattern from a full pattern. Keeps all MergeDims but only the first nkeep items from each ConcatDim

Parameters:

nkeep (int) – The number of items to keep from each ConcatDim sequence.

Return type:

FilePattern

sha256()#

Compute a sha256 hash for the instance.

property shape: Tuple[int, ...]#

Shape of the filename matrix.

start_processing_from(old_pattern_last_hash)#

Given the last hash of the merkle tree of a previous pattern, determine which (if any) Index key of the current pattern to begin data processing from, in order to append to a dataset built using the previous pattern.

Parameters:

old_pattern_last_hash (bytes) – The last hash of the merkle tree for the FilePattern instance which was used to build the existing dataset.

Return type:

Optional[Index]

Combine Dimensions#

class pangeo_forge_recipes.patterns.ConcatDim(name, keys, nitems_per_file=None)#

Represents a concatenation operation across a dimension of a FilePattern.

Parameters:
  • name (str) – The name of the dimension we are concatenating over. For files with labeled dimensions, this should match the dimension name within the file. The most common value is "time".

  • keys (Sequence[Any]) – The keys used to represent each individual item along this dimension. This will be used by a FilePattern object to evaluate the file name.

  • nitems_per_file (Optional[int]) – If each file contains the exact same known number of items in each file along the concat dimension, this can be set to provide a fast path for recipes.

class pangeo_forge_recipes.patterns.MergeDim(name, keys)#

Represents a merge operation across a dimension of a FilePattern.

Parameters:
  • name (str) – The name of the dimension we are are merging over. The actual value is not used by most recipes. The most common value is "variable".

  • keys (Sequence[Any]) – The keys used to represent each individual item along this dimension. This will be used by a FilePattern object to evaluate the file name.

Indexing#

class pangeo_forge_recipes.patterns.Index#

An Index is a special sort of dictionary which describes a position within a multidimensional set.

  • The key is a Dimension which tells us which dimension we are addressing.

  • The value is a Position which tells us where we are within that dimension.

This object is hashable and deterministically serializable.

class pangeo_forge_recipes.patterns.CombineOp(value)#

Used to uniquely identify different combine operations across Pangeo Forge Recipes.

Storage#

class pangeo_forge_recipes.storage.AbstractTarget#
abstract exists(path)#

Check that the file exists.

Return type:

bool

open(path, **kwargs)#

Open file with a context manager.

abstract rm(path)#

Remove file.

Return type:

None

abstract size(path)#

Get file size

Return type:

int

class pangeo_forge_recipes.storage.CacheFSSpecTarget(fs, root_path='')#

Alias for FlatFSSpecTarget

class pangeo_forge_recipes.storage.FSSpecTarget(fs, root_path='')#

Representation of a storage target for Pangeo Forge.

Parameters:
  • fs (AbstractFileSystem) – The filesystem object we are writing to.

  • root_path (str) – The path under which the target data will be stored.

exists(path)#

Check that the file is in the cache.

Return type:

bool

get_mapper()#

Get a mutable mapping object suitable for storing Zarr data.

Return type:

FSMap

open(path, **kwargs)#

Open file with a context manager.

Return type:

Iterator[Union[OpenFile, AbstractBufferedFile, IOBase]]

open_file(path, **kwargs)#

Returns an fsspec open file

Return type:

Union[OpenFile, AbstractBufferedFile, IOBase]

rm(path, recursive=False)#

Remove file from the cache.

Return type:

None

size(path)#

Get file size

Return type:

int

class pangeo_forge_recipes.storage.FlatFSSpecTarget(fs, root_path='')#

A target that sanitizes all the path names so that everything is stored in a single directory.

Designed to be used as a cache for inputs.

Processing Functions#

The Beam PTransform Style Guide recommends:

Expose large, non-trivial, reusable sequential bits of the transform’s code, which others might want to reuse in ways you haven’t anticipated, as a regular function or class library. The transform should simply wire this logic together.

These are those functions.

Standalone functions for opening sources as Dataset objects.

pangeo_forge_recipes.openers.open_url(url, cache=None, secrets=None, open_kwargs=None)#

Open a string-based URL with fsspec.

Parameters:
  • url (str) – The URL to be parsed by fsspec.

  • cache (Optional[CacheFSSpecTarget]) – If provided, data will be cached in the object before opening.

  • secrets (Optional[Dict]) – If provided these secrets will be injected into the URL as a query string.

  • open_kwargs (Optional[Dict]) – Extra arguments passed to fsspec.open.

Return type:

Union[OpenFile, AbstractBufferedFile, IOBase]

pangeo_forge_recipes.openers.open_with_kerchunk(url_or_file_obj, file_type=FileType.unknown, inline_threshold=100, storage_options=None, remote_protocol=None, kerchunk_open_kwargs=None)#

Scan through item(s) with one of Kerchunk’s file readers (SingleHdf5ToZarr, scan_grib etc.) and create reference objects.

All file readers return dicts, with the exception of scan_grib, which returns a list of dicts. Therefore, to provide a consistent return type, this function always returns a list of dicts (placing dicts inside a single-element list as needed).

Parameters:
  • url_or_file_obj (Union[OpenFile, AbstractBufferedFile, IOBase, str, FSStore]) – The url or file object to be opened.

  • file_type (FileType) – The type of file to be openend; e.g. “netcdf4”, “netcdf3”, “grib”, etc.

  • inline_threshold (Optional[int]) – Passed to kerchunk opener.

  • storage_options (Optional[Dict]) – Storage options dict to pass to the kerchunk opener.

  • remote_protocol (Optional[str]) – If files are accessed over the network, provide the remote protocol over which they are accessed. e.g.: “s3”, “https”, etc.

  • kerchunk_open_kwargs (Optional[dict]) – Additional kwargs to pass to kerchunk opener. Any kwargs which are specific to a particular input file type should be passed here; e.g., {"filter": ...} for GRIB; {"max_chunk_size": ...} for NetCDF3, etc.

Return type:

list[dict]

pangeo_forge_recipes.openers.open_with_xarray(url_or_file_obj, file_type=FileType.unknown, load=False, copy_to_local=False, xarray_open_kwargs=None)#

Open item with Xarray. Accepts either fsspec open-file-like objects or string URLs that can be passed directly to Xarray.

Parameters:
  • url_or_file_obj (Union[OpenFile, AbstractBufferedFile, IOBase, str, FSStore]) – The url or file object to be opened.

  • file_type (FileType) – Provide this if you know what type of file it is.

  • load (bool) – Whether to eagerly load the data into memory ofter opening.

  • copy_to_local – Whether to copy the file-like-object to a local path and pass the path to Xarray. Required for some file types (e.g. Grib). Can only be used with file-like-objects, not URLs.

Xarray_open_kwargs:

Extra arguments to pass to Xarray’s open function.

Return type:

Dataset

class pangeo_forge_recipes.aggregation.XarrayCombineAccumulator(schema=<factory>, concat_dim=None)#

An object used to help combine Xarray schemas.

Parameters:
  • schema (XarraySchema) – A schema to initialize the accumulator with.

  • concat_dim (Optional[str]) – If set, this accumulator applies concat rules. Otherwise applies merge rules.

PTransforms#

The Beam PTransform Style Guide recommends:

Expose every major data-parallel task accomplished by your library as a composite PTransform. This allows the structure of the transform to evolve transparently to the code that uses it.

class pangeo_forge_recipes.transforms.CombineReferences(concat_dims, identical_dims, mzz_kwargs=<factory>, precombine_inputs=False)#

Combines Kerchunk references into a single reference dataset.

Parameters:
  • concat_dims (List[str]) – Dimensions along which to concatenate inputs.

  • identical_dims (List[str]) – Dimensions shared among all inputs.

Mzz_kwargs:

Additional kwargs to pass to kerchunk.combine.MultiZarrToZarr.

Precombine_inputs:

If True, precombine each input with itself, using kerchunk.combine.MultiZarrToZarr, before adding it to the accumulator. Used for multi-message GRIB2 inputs, which produce > 1 reference when opened with kerchunk’s scan_grib function, and therefore need to be consolidated into a single reference before adding to the accumulator. Also used for inputs consisting of single reference, for cases where the output dataset concatenates along a dimension that does not exist in the individual inputs. In this latter case, precombining adds the additional dimension to the input so that its dimensionality will match that of the accumulator.

class pangeo_forge_recipes.transforms.DatasetToSchema#
class pangeo_forge_recipes.transforms.DetermineSchema(combine_dims)#

Combine many Datasets into a single schema along multiple dimensions. This is a reduction that produces a singleton PCollection.

Parameters:

combine_dims (List[Dimension]) – The dimensions to combine

class pangeo_forge_recipes.transforms.IndexItems(schema)#

Augment dataset indexes with information about start and stop position.

class pangeo_forge_recipes.transforms.MapWithConcurrencyLimit(fn, args=<factory>, kwargs=<factory>, max_concurrency=None)#

A transform which maps calls to the provided function, optionally limiting the maximum number of concurrent calls. Useful for situations where the provided function requests data from an external service that does not support an unlimited number of concurrent requests.

Parameters:
  • fn (Callable) – Callable object passed to beam.Map (in the case of no concurrency limit) or beam.FlatMap (if max_concurrency is specified).

  • args (Optional[list]) – Positional arguments passed to all invocations of fn.

  • kwargs (Optional[dict]) – Keyword arguments passed to all invocations of fn.

  • max_concurrency (Optional[int]) – The maximum number of concurrent invocations of fn. If unspecified, no limit is imposed by this transform (therefore the concurrency limit will be set by the Beam Runner’s configuration).

class pangeo_forge_recipes.transforms.OpenURLWithFSSpec(cache=None, secrets=None, open_kwargs=None, max_concurrency=None)#

Open indexed string-based URLs with fsspec.

Parameters:
  • cache (Optional[str | CacheFSSpecTarget]) – If provided, data will be cached at this url path before opening.

  • secrets (Optional[dict]) – If provided these secrets will be injected into the URL as a query string.

  • open_kwargs (Optional[dict]) – Extra arguments passed to fsspec.open.

  • max_concurrency (Optional[int]) – Max concurrency for this transform.

class pangeo_forge_recipes.transforms.OpenWithKerchunk(file_type=FileType.unknown, inline_threshold=300, storage_options=None, remote_protocol=None, kerchunk_open_kwargs=<factory>, drop_keys=True)#

Open indexed items with Kerchunk. Accepts either fsspec open-file-like objects or string URLs that can be passed directly to Kerchunk.

Parameters:
  • file_type (FileType) – The type of file to be openend; e.g. “netcdf4”, “netcdf3”, “grib”, etc.

  • inline_threshold (Optional[int]) – Passed to kerchunk opener.

  • storage_options (Optional[Dict]) – Storage options dict to pass to the kerchunk opener.

  • remote_protocol (Optional[str]) – If files are accessed over the network, provide the remote protocol over which they are accessed. e.g.: “s3”, “https”, etc.

  • kerchunk_open_kwargs (Optional[dict]) – Additional kwargs to pass to kerchunk opener. Any kwargs which are specific to a particular input file type should be passed here; e.g., {"filter": ...} for GRIB; {"max_chunk_size": ...} for NetCDF3, etc.

  • drop_keys (bool) – If True, remove Pangeo Forge’s FilePattern keys from the output PCollection before returning. This is the default behavior, which is used for cases where the output PCollection of references is passed to the CombineReferences transform for creation of a Kerchunk reference dataset as the target dataset of the pipeline. If this transform is used for other use cases (e.g., opening inputs for creation of another target dataset type), you may want to set this option to False to preserve the keys on the output PCollection.

class pangeo_forge_recipes.transforms.OpenWithXarray(file_type=FileType.unknown, load=False, copy_to_local=False, xarray_open_kwargs=<factory>)#

Open indexed items with Xarray. Accepts either fsspec open-file-like objects or string URLs that can be passed directly to Xarray.

Parameters:
  • file_type (FileType) – Provide this if you know what type of file it is.

  • load (bool) – Whether to eagerly load the data into memory ofter opening.

  • copy_to_local (bool) – Whether to copy the file-like-object to a local path and pass the path to Xarray. Required for some file types (e.g. Grib). Can only be used with file-like-objects, not URLs.

  • xarray_open_kwargs (Optional[dict]) – Extra arguments to pass to Xarray’s open function.

class pangeo_forge_recipes.transforms.PrepareZarrTarget(target, target_chunks=<factory>, attrs=<factory>)#

From a singleton PCollection containing a dataset schema, initialize a Zarr store with the correct variables, dimensions, attributes and chunking. Note that the dimension coordinates will be initialized with dummy values.

Parameters:
  • target (str | FSSpecTarget) – Where to store the target Zarr dataset.

  • target_chunks (Dict[str, int]) – Dictionary mapping dimension names to chunks sizes. If a dimension is a not named, the chunks will be inferred from the schema. If chunking is present in the schema for a given dimension, the length of the first fragment will be used. Otherwise, the dimension will not be chunked.

  • attrs (Dict[str, str]) – Extra group-level attributes to inject into the dataset.

class pangeo_forge_recipes.transforms.Rechunk(target_chunks, schema)#
class pangeo_forge_recipes.transforms.RequiredAtRuntimeDefault#

Sentinel class to use as default for transform attributes which are required to run a pipeline, but may not be available (or preferable) to define during recipe develoment; for example, the target_root kwarg of a transform that writes data to a target location. By using this sentinel as the default value for such an kwarg, a recipe module can define all required arguments on the transform (and therefore be importable, satisfy type-checkers, be unit-testable, etc.) before it is deployed, with the understanding that the attribute using this sentinel as default will be re-assigned to the desired value at deploy time.

class pangeo_forge_recipes.transforms.StoreDatasetFragments(target_store)#
class pangeo_forge_recipes.transforms.StoreToZarr(combine_dims, store_name, target_root=<factory>, target_chunks=<factory>, dynamic_chunking_fn=None, dynamic_chunking_fn_kwargs=<factory>, attrs=<factory>)#

Store a PCollection of Xarray datasets to Zarr.

Parameters:
  • combine_dims (List[Dimension]) – The dimensions to combine

  • store_name (str) – Name for the Zarr store. It will be created with this name under target_root.

  • target_root (Union[str, FSSpecTarget, RequiredAtRuntimeDefault]) – Root path the Zarr store will be created inside; store_name will be appended to this prefix to create a full path.

  • target_chunks (Dict[str, int]) – Dictionary mapping dimension names to chunks sizes. If a dimension is a not named, the chunks will be inferred from the data.

  • dynamic_chunking_fn (Optional[Callable[[Dataset], dict]]) – Optionally provide a function that takes an xarray.Dataset template dataset as its first argument and returns a dynamically generated chunking dict. If provided, target_chunks cannot also be passed. You can use this to determine chunking based on the full dataset (e.g. divide along a certain dimension based on a desired chunk size in memory). For more advanced chunking strategies, check out jbusecke/dynamic_chunks

  • dynamic_chunking_fn_kwargs (Optional[dict]) – Optional keyword arguments for dynamic_chunking_fn.

  • attrs (Dict[str, str]) – Extra group-level attributes to inject into the dataset.

class pangeo_forge_recipes.transforms.WriteCombinedReference(store_name, concat_dims, identical_dims, mzz_kwargs=<factory>, precombine_inputs=False, target_root=<factory>, output_file_name='reference.json')#

Store a singleton PCollection consisting of a kerchunk.combine.MultiZarrToZarr object.

Parameters:
  • store_name (str) – Zarr store will be created with this name under target_root.

  • concat_dims (List[str]) – Dimensions along which to concatenate inputs.

  • identical_dims (List[str]) – Dimensions shared among all inputs.

  • mzz_kwargs (dict) – Additional kwargs to pass to kerchunk.combine.MultiZarrToZarr.

  • precombine_inputs (bool) – If True, precombine each input with itself, using kerchunk.combine.MultiZarrToZarr, before adding it to the accumulator. Used for multi-message GRIB2 inputs, which produce > 1 reference when opened with kerchunk’s scan_grib function, and therefore need to be consolidated into a single reference before adding to the accumulator. Also used for inputs consisting of single reference, for cases where the output dataset concatenates along a dimension that does not exist in the individual inputs. In this latter case, precombining adds the additional dimension to the input so that its dimensionality will match that of the accumulator.

  • target_root (Union[str, FSSpecTarget, RequiredAtRuntimeDefault]) – Root path the Zarr store will be created inside; store_name will be appended to this prefix to create a full path.

  • output_file_name (str) – Name to give the output references file (.json or .parquet suffix).

Combiners#

class pangeo_forge_recipes.combiners.CombineMultiZarrToZarr(concat_dims, identical_dims, mzz_kwargs=<factory>, precombine_inputs=False)#

A beam CombineFn for combining Kerchunk MultiZarrToZarr objects.

Parameters:
  • concat_dims (List[str]) – Dimensions along which to concatenate inputs.

  • identical_dims (List[str]) – Dimensions shared among all inputs.

Mzz_kwargs:

Additional kwargs to pass to kerchunk.combine.MultiZarrToZarr.

Precombine_inputs:

If True, precombine each input with itself, using kerchunk.combine.MultiZarrToZarr, before adding it to the accumulator. Used for multi-message GRIB2 inputs, which produce > 1 reference when opened with kerchunk’s scan_grib function, and therefore need to be consolidated into a single reference before adding to the accumulator. Also used for inputs consisting of single reference, for cases where the output dataset concatenates along a dimension that does not exist in the individual inputs. In this latter case, precombining adds the additional dimension to the input so that its dimensionality will match that of the accumulator.

class pangeo_forge_recipes.combiners.CombineXarraySchemas(dimension)#

A beam CombineFn which we can use to combine multiple xarray schemas along a single dimension

Parameters:

dimension (Dimension) – The dimension along which to combine