Recipe Execution#

There are many different types of Pangeo Forge recipes. However, all recipes are executed the same way! This is a key part of the Pangeo Forge design.

Once you have created a recipe object (see Recipes) you have two options for executing it. In the subsequent code, we will assume that a recipe has already been initialized in the variable recipe.

Recipe Executors#

Note

API reference documentation for execution can be found in pangeo_forge_recipes.executors.

A recipe is an abstract description of a transformation pipeline. Recipes can be compiled to executable objects. We currently support three types of compilation.

Python Function#

To compile a recipe to a single python function, use the method .to_function(). For example

recipe_func = recipe.to_function()
recipe_func()  # actually execute the recipe

Note that the python function approach does not support parallel or distributed execution. It’s mostly just a convenience utility.

Dask Delayed#

You can compile your recipe to a Dask Delayed object using the .to_dask() method. For example

delayed = recipe.to_dask()
delayed.compute()

The delayed object can be executed by any of Dask’s schedulers, including cloud and HPC distributed schedulers.

Prefect Flow#

You can compile your recipe to a Prefect Flow using the :meth:BaseRecipe.to_prefect() method.

There are two modes of Prefect execution. In the default, every individual step in the recipe is explicitly represented as a distinct Prefect Task within a Flow.

Warning

For large recipes, this default can lead to Prefect Flows with >10000 Tasks. In our experience, Prefect can struggle with this volume.

flow = recipe.to_prefect()
flow.run()

By default the flow is run using Prefect’s LocalExecutor. See executors for more.

An alternative is to create a single Prefect Task for the entire Recipe. This task wraps a Dask Delayed graph, which can then be scheduled on a Dask cluster. To use this mode, pass the option wrap_dask=True:

flow = recipe.to_prefect(wrap_dask=True)
flow.run()

Beam PTransform#

You can compile your recipe to an Apache Beam PTransform to be used within a Pipeline using the :meth:BaseRecipe.to_beam() method. For example

import apache_beam as beam

with beam.Pipeline() as p:
   p | recipe.to_beam()

By default the pipeline runs using Beam’s DirectRunner. See runners for more.

Execution context#

All Pangeo Forge Recipes contain a .get_execution_context() method which returns the following metadata:

{
    "pangeo-forge:version": "{pangeo_forge_recipes version installed at time of execution}"
    "pangeo-forge:recipe_hash": "{recipe hash as returned by `recipe.sha256()`}"
    "pangeo-forge:inputs_hash": "{file pattern hash as returned by `recipe.file_pattern.sha256()`}"
}

Each recipe class defines where to store this metadata:

  • XarrayZarrRecipe: Added to Zarr group attributes, and therefore also available via the xarray.Dataset.attrs when opening Zarr stores with xarray.

  • HDFReferenceRecipe: TODO

The execution context metadata which is persisted in the target dataset is used for tracking dataset provenance.