API Reference#

File Patterns#

class pangeo_forge_recipes.patterns.FilePattern(format_function, *combine_dims, fsspec_open_kwargs=None, query_string_secrets=None, file_type='netcdf4')#

Represents an n-dimensional matrix of individual files to be combined through a combination of merge and concat operations. Each operation generates a new dimension to the matrix.

Parameters:
  • format_function (Callable) – A function that takes one argument for each combine_op and returns a string representing the filename / url paths. Each argument name should correspond to a name in the combine_dims list.

  • combine_dims (CombineDim) – A sequence of either concat or merge dimensions. The outer product of the keys is used to generate the full list of file paths.

  • fsspec_open_kwargs (Optional[Dict[str, Any]]) – A dictionary of kwargs to pass to fsspec.open to aid opening of source files. For example, {"block_size": 0} may be passed if an HTTP source file server does not permit range requests. Authentication for fsspec-compatible filesystems may be handled here as well. For HTTP username/password-based authentication, your specific fsspec_open_kwargs will depend on the configuration of the source file server, but are likely to conform to one of the following two formats: {"username": "<username>", "password": "<password>"} or {"auth": aiohttp.BasicAuth("<username>", "<password>")}.

  • query_string_secrets (Optional[Dict[str, str]]) – If provided, these key/value pairs are appended to the query string of each url at runtime. Query string parameters which are not secrets should instead be included directly in the URLs returns by the format_function.

  • file_type (str) – The file format of the source files for this pattern. Must be one of the options defined by pangeo_forge_recipes.patterns.FileType. Note: FileType.opendap cannot be used with caching.

__getitem__(indexer)#

Get a filename path for a particular key.

Return type:

str

__iter__()#

Iterate over all keys in the pattern.

Return type:

Iterator[Index]

property concat_dims: List[str]#

List of dims that are concat operations

property concat_sequence_lens: Dict[str, int | None]#

Dictionary mapping concat dims to sequence lengths. Only available if nitems_per_input is set on the dimension.

property dims: Dict[str, int]#

Dictionary representing the dimensions of the FilePattern. Keys are dimension names, values are the number of items along each dimension.

get_merkle_list()#

Compute the merkle tree for the current FilePattern.

Return a list of hashes, of length len(filepattern)+1. The first item in the list is calculated by hashing attributes of the FilePattern instance. Each subsequent item is calculated by hashing the byte string produced by concatinating the next index:filepath pair yielded by items() with the previous hash in the list.

Return type:

List[bytes]

items()#

Iterate over key, filename pairs.

property merge_dims: List[str]#

List of dims that are merge operations

property nitems_per_input: Dict[str, int | None]#

Dictionary mapping concat dims to number of items per file.

prune(nkeep=2)#

Create a smaller pattern from a full pattern. Keeps all MergeDims but only the first nkeep items from each ConcatDim

Parameters:

nkeep (int) – The number of items to keep from each ConcatDim sequence.

Return type:

FilePattern

sha256()#

Compute a sha256 hash for the instance.

property shape: Tuple[int, ...]#

Shape of the filename matrix.

start_processing_from(old_pattern_last_hash)#

Given the last hash of the merkle tree of a previous pattern, determine which (if any) Index key of the current pattern to begin data processing from, in order to append to a dataset built using the previous pattern.

Parameters:

old_pattern_last_hash (bytes) – The last hash of the merkle tree for the FilePattern instance which was used to build the existing dataset.

Return type:

Optional[Index]

Combine Dimensions#

class pangeo_forge_recipes.patterns.ConcatDim(name, keys, nitems_per_file=None)#

Represents a concatenation operation across a dimension of a FilePattern.

Parameters:
  • name (str) – The name of the dimension we are concatenating over. For files with labeled dimensions, this should match the dimension name within the file. The most common value is "time".

  • keys (Sequence[Any]) – The keys used to represent each individual item along this dimension. This will be used by a FilePattern object to evaluate the file name.

  • nitems_per_file (Optional[int]) – If each file contains the exact same known number of items in each file along the concat dimension, this can be set to provide a fast path for recipes.

class pangeo_forge_recipes.patterns.MergeDim(name, keys)#

Represents a merge operation across a dimension of a FilePattern.

Parameters:
  • name (str) – The name of the dimension we are are merging over. The actual value is not used by most recipes. The most common value is "variable".

  • keys (Sequence[Any]) – The keys used to represent each individual item along this dimension. This will be used by a FilePattern object to evaluate the file name.

Indexing#

class pangeo_forge_recipes.patterns.Index#

An Index is a special sort of dictionary which describes a position within a multidimensional set.

  • The key is a Dimension which tells us which dimension we are addressing.

  • The value is a Position which tells us where we are within that dimension.

This object is hashable and deterministically serializable.

class pangeo_forge_recipes.patterns.CombineOp(value)#

Used to uniquely identify different combine operations across Pangeo Forge Recipes.

Storage#

class pangeo_forge_recipes.storage.AbstractTarget#
abstract exists(path)#

Check that the file exists.

Return type:

bool

open(path, **kwargs)#

Open file with a context manager.

abstract rm(path)#

Remove file.

Return type:

None

abstract size(path)#

Get file size

Return type:

int

class pangeo_forge_recipes.storage.CacheFSSpecTarget(fs, root_path='', fsspec_kwargs=<factory>)#

Alias for FlatFSSpecTarget

class pangeo_forge_recipes.storage.FSSpecTarget(fs, root_path='', fsspec_kwargs=<factory>)#

Representation of a storage target for Pangeo Forge.

Parameters:
  • fs (AbstractFileSystem) – The filesystem object we are writing to.

  • root_path (str) – The path under which the target data will be stored.

  • fsspec_kwargs (Dict[Any, Any]) – The fsspec kwargs that can be reused as target_options and remote_options for fsspec class instantiation

exists(path)#

Check that the file is in the cache.

Return type:

bool

get_fsspec_remote_protocol()#

fsspec implementation-specific remote protocal

get_mapper()#

Get a mutable mapping object suitable for storing Zarr data.

Return type:

FSMap

open(path, **kwargs)#

Open file with a context manager.

Return type:

Iterator[Union[OpenFile, AbstractBufferedFile, IOBase]]

open_file(path, **kwargs)#

Returns an fsspec open file

Return type:

Union[OpenFile, AbstractBufferedFile, IOBase]

rm(path, recursive=False)#

Remove file from the cache.

Return type:

None

size(path)#

Get file size

Return type:

int

class pangeo_forge_recipes.storage.FlatFSSpecTarget(fs, root_path='', fsspec_kwargs=<factory>)#

A target that sanitizes all the path names so that everything is stored in a single directory.

Designed to be used as a cache for inputs.

Processing Functions#

The Beam PTransform Style Guide recommends:

Expose large, non-trivial, reusable sequential bits of the transform’s code, which others might want to reuse in ways you haven’t anticipated, as a regular function or class library. The transform should simply wire this logic together.

These are those functions.

Standalone functions for opening sources as Dataset objects.

pangeo_forge_recipes.openers.open_url(url, cache=None, secrets=None, open_kwargs=None)#

Open a string-based URL with fsspec.

Parameters:
  • url (str) – The URL to be parsed by fsspec.

  • cache (Optional[CacheFSSpecTarget]) – If provided, data will be cached in the object before opening.

  • secrets (Optional[Dict]) – If provided these secrets will be injected into the URL as a query string.

  • open_kwargs (Optional[Dict]) – Extra arguments passed to fsspec.open.

Return type:

Union[OpenFile, AbstractBufferedFile, IOBase]

pangeo_forge_recipes.openers.open_with_kerchunk(url_or_file_obj, file_type=FileType.unknown, inline_threshold=100, storage_options=None, remote_protocol=None, kerchunk_open_kwargs=None)#

Scan through item(s) with one of Kerchunk’s file readers (SingleHdf5ToZarr, scan_grib etc.) and create reference objects.

All file readers return dicts, with the exception of scan_grib, which returns a list of dicts. Therefore, to provide a consistent return type, this function always returns a list of dicts (placing dicts inside a single-element list as needed).

Parameters:
  • url_or_file_obj (Union[OpenFile, AbstractBufferedFile, IOBase, str, FSStore]) – The url or file object to be opened.

  • file_type (FileType) – The type of file to be openend; e.g. “netcdf4”, “netcdf3”, “grib”, etc.

  • inline_threshold (Optional[int]) – Passed to kerchunk opener.

  • storage_options (Optional[Dict]) – Storage options dict to pass to the kerchunk opener.

  • remote_protocol (Optional[str]) – If files are accessed over the network, provide the remote protocol over which they are accessed. e.g.: “s3”, “https”, etc.

  • kerchunk_open_kwargs (Optional[dict]) – Additional kwargs to pass to kerchunk opener. Any kwargs which are specific to a particular input file type should be passed here; e.g., {"filter": ...} for GRIB; {"max_chunk_size": ...} for NetCDF3, etc.

Return type:

list[dict]

pangeo_forge_recipes.openers.open_with_xarray(url_or_file_obj, file_type=FileType.unknown, load=False, copy_to_local=False, xarray_open_kwargs=None)#

Open item with Xarray. Accepts either fsspec open-file-like objects or string URLs that can be passed directly to Xarray.

Parameters:
  • url_or_file_obj (Union[OpenFile, AbstractBufferedFile, IOBase, str, FSStore]) – The url or file object to be opened.

  • file_type (FileType) – Provide this if you know what type of file it is.

  • load (bool) – Whether to eagerly load the data into memory ofter opening.

  • copy_to_local – Whether to copy the file-like-object to a local path and pass the path to Xarray. Required for some file types (e.g. Grib). Can only be used with file-like-objects, not URLs.

Xarray_open_kwargs:

Extra arguments to pass to Xarray’s open function.

Return type:

Dataset

class pangeo_forge_recipes.aggregation.XarrayCombineAccumulator(schema=<factory>, concat_dim=None)#

An object used to help combine Xarray schemas.

Parameters:
  • schema (XarraySchema) – A schema to initialize the accumulator with.

  • concat_dim (Optional[str]) – If set, this accumulator applies concat rules. Otherwise applies merge rules.

PTransforms#

The Beam PTransform Style Guide recommends:

Expose every major data-parallel task accomplished by your library as a composite PTransform. This allows the structure of the transform to evolve transparently to the code that uses it.

class pangeo_forge_recipes.transforms.CombineReferences(concat_dims, identical_dims, target_options=<factory>, remote_options=<factory>, remote_protocol=None, max_refs_per_merge=5, mzz_kwargs=<factory>)#

Combines Kerchunk references into a single reference dataset.

Parameters:
  • concat_dims (List[str]) – Dimensions along which to concatenate inputs.

  • identical_dims (List[str]) – Dimensions shared among all inputs.

  • target_options (Optional[Dict]) – Storage options for opening target files

  • remote_options (Optional[Dict]) – Storage options for opening remote files

  • remote_protocol (Optional[str]) – If files are accessed over the network, provide the remote protocol over which they are accessed. e.g.: “s3”, “gcp”, “https”, etc.

  • max_refs_per_merge (int) – Maximum number of references to combine in a single merge operation.

  • mzz_kwargs (dict) – Additional kwargs to pass to kerchunk.combine.MultiZarrToZarr.

bucket_by_position(indexed_references, global_position_min_max_count)#

Assigns a bucket based on the index position to order data during GroupByKey.

Parameters:
  • indexed_references (Tuple[Index, dict]) – A tuple containing the index and the reference dictionary. The index is used to determine the reference’s position within the global data order.

  • global_position_min_max_count (Tuple[int, int, int]) – A tuple containing the global minimum and maximum positions and the total count of references. These values are used to determine the range and distribution of buckets.

Return type:

Tuple[int, dict]

Returns:

A tuple where the first element is the bucket number (an integer) assigned to the reference, and the second element is the original reference dictionary.

global_combine_refs(refs)#

Performs a global combination of references to produce the final dataset.

Return type:

FSMap

handle_gribs(indexed_references)#

Handles the special case of GRIB format files by combining multiple references.

Return type:

Tuple[Index, dict]

to_mzz(references)#

Converts references into a MultiZarrToZarr object with configured parameters.

class pangeo_forge_recipes.transforms.ConsolidateDimensionCoordinates(*args, **kwargs)#
class pangeo_forge_recipes.transforms.ConsolidateMetadata#

Calls Zarr Python consolidate_metadata on an existing Zarr store (https://zarr.readthedocs.io/en/stable/_modules/zarr/convenience.html#consolidate_metadata)

class pangeo_forge_recipes.transforms.DatasetToSchema#
class pangeo_forge_recipes.transforms.DetermineSchema(combine_dims)#

Combine many Datasets into a single schema along multiple dimensions. This is a reduction that produces a singleton PCollection.

Parameters:

combine_dims (List[Dimension]) – The dimensions to combine

class pangeo_forge_recipes.transforms.IndexItems(schema)#

Augment dataset indexes with information about start and stop position.

class pangeo_forge_recipes.transforms.MapWithConcurrencyLimit(fn, args=<factory>, kwargs=<factory>, max_concurrency=None)#

A transform which maps calls to the provided function, optionally limiting the maximum number of concurrent calls. Useful for situations where the provided function requests data from an external service that does not support an unlimited number of concurrent requests.

Parameters:
  • fn (Callable) – Callable object passed to beam.Map (in the case of no concurrency limit) or beam.FlatMap (if max_concurrency is specified).

  • args (Optional[list]) – Positional arguments passed to all invocations of fn.

  • kwargs (Optional[dict]) – Keyword arguments passed to all invocations of fn.

  • max_concurrency (Optional[int]) – The maximum number of concurrent invocations of fn. If unspecified, no limit is imposed by this transform (therefore the concurrency limit will be set by the Beam Runner’s configuration).

class pangeo_forge_recipes.transforms.OpenURLWithFSSpec(cache=None, secrets=None, open_kwargs=None, max_concurrency=None)#

Open indexed string-based URLs with fsspec.

Parameters:
  • cache (Optional[str | CacheFSSpecTarget]) – If provided, data will be cached at this url path before opening.

  • secrets (Optional[dict]) – If provided these secrets will be injected into the URL as a query string.

  • open_kwargs (Optional[dict]) – Extra arguments passed to fsspec.open.

  • max_concurrency (Optional[int]) – Max concurrency for this transform.

class pangeo_forge_recipes.transforms.OpenWithKerchunk(file_type=FileType.unknown, inline_threshold=300, storage_options=<factory>, remote_protocol=None, kerchunk_open_kwargs=<factory>)#

Open indexed items with Kerchunk. Accepts either fsspec open-file-like objects or string URLs that can be passed directly to Kerchunk.

Parameters:
  • file_type (FileType) – The type of file to be openend; e.g. “netcdf4”, “netcdf3”, “grib”, etc.

  • inline_threshold (Optional[int]) – Passed to kerchunk opener.

  • storage_options (Optional[Dict]) – Storage options dict to pass to the kerchunk opener.

  • remote_protocol (Optional[str]) – If files are accessed over the network, provide the remote protocol over which they are accessed. e.g.: “s3”, “https”, etc.

  • kerchunk_open_kwargs (Optional[dict]) – Additional kwargs to pass to kerchunk opener. Any kwargs which are specific to a particular input file type should be passed here; e.g., {"filter": ...} for GRIB; {"max_chunk_size": ...} for NetCDF3, etc.

class pangeo_forge_recipes.transforms.OpenWithXarray(file_type=FileType.unknown, load=False, copy_to_local=False, xarray_open_kwargs=<factory>)#

Open indexed items with Xarray. Accepts either fsspec open-file-like objects or string URLs that can be passed directly to Xarray.

Parameters:
  • file_type (FileType) – Provide this if you know what type of file it is.

  • load (bool) – Whether to eagerly load the data into memory ofter opening.

  • copy_to_local (bool) – Whether to copy the file-like-object to a local path and pass the path to Xarray. Required for some file types (e.g. Grib). Can only be used with file-like-objects, not URLs.

  • xarray_open_kwargs (Optional[dict]) – Extra arguments to pass to Xarray’s open function.

class pangeo_forge_recipes.transforms.PrepareZarrTarget(target, target_chunks=<factory>, attrs=<factory>, consolidated_metadata=True, encoding=<factory>)#

From a singleton PCollection containing a dataset schema, initialize a Zarr store with the correct variables, dimensions, attributes and chunking. Note that the dimension coordinates will be initialized with dummy values.

Parameters:
  • target (str | FSSpecTarget) – Where to store the target Zarr dataset.

  • target_chunks (Dict[str, int]) – Dictionary mapping dimension names to chunks sizes. If a dimension is a not named, the chunks will be inferred from the schema. If chunking is present in the schema for a given dimension, the length of the first fragment will be used. Otherwise, the dimension will not be chunked.

  • attrs (Dict[str, str]) – Extra group-level attributes to inject into the dataset.

  • encoding (Optional[dict]) – Dictionary describing encoding for xarray.to_zarr()

  • consolidated_metadata (Optional[bool]) – Bool controlling if xarray.to_zarr() writes consolidated metadata. Default’s to False. In StoreToZarr, always default to unconsolidated. This leaves it up to the user whether or not they want to consolidate with ConsolidateMetadata(). Also, it prevents a broken/inconsistent state that could arise from metadata being consolidated here, and then falling out of sync with coordinates if ConsolidateDimensionCoordinates() is applied to the output of StoreToZarr().

class pangeo_forge_recipes.transforms.Rechunk(target_chunks, schema)#
class pangeo_forge_recipes.transforms.RequiredAtRuntimeDefault#

Sentinel class to use as default for transform attributes which are required to run a pipeline, but may not be available (or preferable) to define during recipe develoment; for example, the target_root kwarg of a transform that writes data to a target location. By using this sentinel as the default value for such an kwarg, a recipe module can define all required arguments on the transform (and therefore be importable, satisfy type-checkers, be unit-testable, etc.) before it is deployed, with the understanding that the attribute using this sentinel as default will be re-assigned to the desired value at deploy time.

class pangeo_forge_recipes.transforms.StoreDatasetFragments(target_store)#
class pangeo_forge_recipes.transforms.StoreToZarr(combine_dims, store_name, target_root=<factory>, target_chunks=<factory>, dynamic_chunking_fn=None, dynamic_chunking_fn_kwargs=<factory>, attrs=<factory>, encoding=<factory>)#

Store a PCollection of Xarray datasets to Zarr.

Parameters:
  • combine_dims (List[Dimension]) – The dimensions to combine

  • store_name (str) – Name for the Zarr store. It will be created with this name under target_root.

  • target_root (Union[str, FSSpecTarget, RequiredAtRuntimeDefault]) – Root path the Zarr store will be created inside; store_name will be appended to this prefix to create a full path.

  • target_chunks (Dict[str, int]) – Dictionary mapping dimension names to chunks sizes. If a dimension is a not named, the chunks will be inferred from the data.

  • consolidate_dimension_coordinates – Whether to rewrite coordinate variables as a single chunk. We recommend consolidating coordinate variables to avoid many small read requests to get the coordinates in xarray. Defaults to True.

  • dynamic_chunking_fn (Optional[Callable[[Dataset], dict]]) – Optionally provide a function that takes an xarray.Dataset template dataset as its first argument and returns a dynamically generated chunking dict. If provided, target_chunks cannot also be passed. You can use this to determine chunking based on the full dataset (e.g. divide along a certain dimension based on a desired chunk size in memory). For more advanced chunking strategies, check out jbusecke/dynamic_chunks

  • dynamic_chunking_fn_kwargs (Optional[dict]) – Optional keyword arguments for dynamic_chunking_fn.

  • attrs (Dict[str, str]) – Extra group-level attributes to inject into the dataset.

  • encoding (Optional[dict]) – Dictionary encoding for xarray.to_zarr().

class pangeo_forge_recipes.transforms.WriteCombinedReference(store_name, concat_dims, identical_dims, mzz_kwargs=<factory>, remote_options=<factory>, remote_protocol=None, target_root=<factory>, output_file_name='reference.json')#

Store a singleton PCollection consisting of a kerchunk.combine.MultiZarrToZarr object.

Parameters:
  • store_name (str) – Zarr store will be created with this name under target_root.

  • concat_dims (List[str]) – Dimensions along which to concatenate inputs.

  • identical_dims (List[str]) – Dimensions shared among all inputs.

  • mzz_kwargs (dict) – Additional kwargs to pass to kerchunk.combine.MultiZarrToZarr.

  • remote_options (Optional[Dict]) – options to pass to kerchunk.combine.MultiZarrToZarr to read reference inputs (can include credentials).

  • remote_protocol (Optional[str]) – If files are accessed over the network, provide the remote protocol over which they are accessed. e.g.: “s3”, “https”, etc.

  • target_root (Union[str, FSSpecTarget, RequiredAtRuntimeDefault]) – Output root path the store will be created inside; store_name will be appended to this prefix to create a full path.

  • output_file_name (str) – Name to give the output references file (.json or .parquet suffix).

class pangeo_forge_recipes.transforms.WriteReference(store_name, concat_dims, target_root=<factory>, output_file_name='reference.json', mzz_kwargs=<factory>)#

Store a singleton PCollection consisting of a kerchunk.combine.MultiZarrToZarr object.

Parameters:
  • store_name (str) – Zarr store will be created with this name under target_root.

  • concat_dims (List[str]) – Dimensions along which to concatenate inputs.

  • target_root (Union[str, FSSpecTarget, RequiredAtRuntimeDefault]) – Root path the Zarr store will be created inside; store_name will be appended to this prefix to create a full path.

  • output_file_name (str) – Name to give the output references file (.json or .parquet suffix) over which they are accessed. e.g.: “s3”, “gcp”, “https”, etc.

  • mzz_kwargs (dict) – Additional kwargs to pass to kerchunk.combine.MultiZarrToZarr.

Combiners#

class pangeo_forge_recipes.combiners.CombineXarraySchemas(dimension)#

A beam CombineFn which we can use to combine multiple xarray schemas along a single dimension

Parameters:

dimension (Dimension) – The dimension along which to combine

pangeo_forge_recipes.combiners.MinMaxCountCombineFn#

alias of AnonymousCombineFn

pangeo_forge_recipes.combiners.build_reduce_fn(accumulate_op, merge_op, initializer)#

Factory to construct reducers without so much ceremony

Return type:

CombineFn