HDF Reference Recipe for CMIP6

This example illustrates how to create a pangeo_forge_recipes.recipes.HDFReferenceRecipe. This recipe does not actually copy the original source data. Instead, it generates metadata files which reference and index the original data, allowing it to be accessed more efficiently. For more background, see this blog post.

As the input for this recipe, we will use some CMIP6 NetCDF4 files provided by ESGF and stored in Amazon S3 (CMIP6 AWS Open Data Page). Many CMIP6 simulations spread their outputs over many HDF5/ NetCDF4 files, in order to limit the individual file size. This can be inconvenient for analysis. In this recipe, we will see how to virtually concatentate many HDF5 files into one big viritual Zarr dataset.

Define the FilePattern

Let’s pick a random dataset: ocean model output from the GFDL ocean model from the OMIP experiments.

import s3fs
fs = s3fs.S3FileSystem(anon=True)
base_path = 's3://esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/'
all_paths = fs.ls(base_path)
all_paths
['esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_170801-172712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_172801-174712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_174801-176712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_176801-178712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_178801-180712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_180801-182712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_182801-184712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_184801-186712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_186801-188712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_188801-190712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_190801-192712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_192801-194712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_194801-196712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_196801-198712.nc',
 'esgf-world/CMIP6/OMIP/NOAA-GFDL/GFDL-CM4/omip1/r1i1p1f1/Omon/thetao/gr/v20180701/thetao_Omon_GFDL-CM4_omip1_r1i1p1f1_gr_198801-200712.nc']

We see there are 15 individual NetCDF files. Let’s time how long it takes to open and display one of them using Xarray.

Note

The argument decode_coords='all' helps Xarray promote all of the _bnds variables to coordinates (rather than data variables).

import xarray as xr
%%time
ds_orig = xr.open_dataset(fs.open(all_paths[0]), engine='h5netcdf', chunks={}, decode_coords='all')
ds_orig
<timed exec>:1: UserWarning: Variable(s) referenced in cell_measures not in variables: ['areacello', 'volcello']
CPU times: user 2.02 s, sys: 440 ms, total: 2.46 s
Wall time: 33 s
<xarray.Dataset>
Dimensions:    (lat: 180, bnds: 2, lon: 360, time: 240, lev: 35)
Coordinates:
  * lat        (lat) float64 -89.5 -88.5 -87.5 -86.5 ... 86.5 87.5 88.5 89.5
    lat_bnds   (lat, bnds) float64 dask.array<chunksize=(180, 2), meta=np.ndarray>
  * lon        (lon) float64 0.5 1.5 2.5 3.5 4.5 ... 356.5 357.5 358.5 359.5
    lon_bnds   (lon, bnds) float64 dask.array<chunksize=(360, 2), meta=np.ndarray>
  * time       (time) object 1708-01-16 12:00:00 ... 1727-12-16 12:00:00
    time_bnds  (time, bnds) object dask.array<chunksize=(240, 2), meta=np.ndarray>
    lev_bnds   (lev, bnds) float64 dask.array<chunksize=(35, 2), meta=np.ndarray>
  * lev        (lev) float64 2.5 10.0 20.0 32.5 ... 5e+03 5.5e+03 6e+03 6.5e+03
Dimensions without coordinates: bnds
Data variables:
    thetao     (time, lev, lat, lon) float32 dask.array<chunksize=(240, 35, 180, 360), meta=np.ndarray>
Attributes: (12/44)
    title:                 NOAA GFDL GFDL-CM4 model output prepared for CMIP6...
    history:               File was processed by fremetar (GFDL analog of CMO...
    external_variables:    areacello volcello
    table_id:              Omon
    activity_id:           OMIP
    branch_method:         none provided
    ...                    ...
    sub_experiment_id:     none
    tracking_id:           hdl:21.14100/97e4edf3-22e7-4e5f-831a-f2a671b7094f
    variable_id:           thetao
    variant_info:          N/A
    references:            see further_info_url attribute
    variant_label:         r1i1p1f1

It took ~30 seconds to open this one dataset. So it would take 7-8 minutes for us to open every file. This would be annoyingly slow.

As a first step in our recipe, we create a File Pattern to represent the input files. In this case, since we already have a list of inputs, we just use the pattern_from_file_sequence convenience function.

from pangeo_forge_recipes.patterns import pattern_from_file_sequence
pattern = pattern_from_file_sequence(['s3://' + path for path in all_paths], 'time')
pattern
<FilePattern {'time': 15}>

Define the Recipe

Once we have our FilePattern defined, defining our Recipe is straightforward. The only custom options we need are to specify that we’ll be accessing the source files anonymously and to use decode_coords='all' when opening them.

from pangeo_forge_recipes.recipes.reference_hdf_zarr import HDFReferenceRecipe

rec = HDFReferenceRecipe(
    pattern,
    xarray_open_kwargs={"decode_coords": "all"}
    netcdf_storage_options={"anon": True}
)
rec
HDFReferenceRecipe(file_pattern=<FilePattern {'time': 15}>, output_json_fname='reference.json', output_intake_yaml_fname='reference.yaml', target=None, metadata_cache=None, netcdf_storage_options={'anon': True}, inline_threshold=500, output_storage_options={}, template_count=20, xarray_open_kwargs={'decode_coords': 'all'}, xarray_concat_args={})

Assign Storage

If the recipe excecution occurs in a Bakery, storage will be assigned automatically. For this example, we use temporary files. This type of recipe needs both a target for the final outputs, and a metadata_cache for temporary metadata storage.

import tempfile
from fsspec.implementations.local import LocalFileSystem
from pangeo_forge_recipes.storage import FSSpecTarget, MetadataTarget

fs_local = LocalFileSystem()

cache_dir = tempfile.TemporaryDirectory()
metadata_cache = MetadataTarget(fs_local, cache_dir.name)

target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)

rec.target = target
rec.metadata_cache = metadata_cache

Execute with Dask

This runs relatively slowly in serial on a small laptop, but it would scale out very well on the cloud.

from dask.diagnostics import ProgressBar
delayed = rec.to_dask()
with ProgressBar():
    delayed.compute()
[#####################################   ] | 94% Completed | 42min 19.8s
/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/fsspec_reference_maker/combine.py:290: UserWarning: Variable(s) referenced in cell_measures not in variables: ['areacello', 'volcello']
  dss = [xr.open_dataset(m, engine="zarr", chunks={},  **xr_kwargs_copy)
[########################################] | 100% Completed | 42min 22.9s

Examine the Result

Load with Intake

The easiest way to load the dataset created by fsspec_reference_maker is via intake. An intake catalog is automatically created in the target.

cat_url = target._full_path("reference.yaml")
cat_url
'/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpdcb5rw4l/reference.yaml'
import intake
cat = intake.open_catalog(cat_url)
cat
reference:
  args:
    path: /var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpdcb5rw4l/reference.yaml
  description: ''
  driver: intake.catalog.local.YAMLFileCatalog
  metadata: {}

To load the data lazily:

%time ds = cat.data.to_dask()
ds
CPU times: user 70 µs, sys: 1e+03 ns, total: 71 µs
Wall time: 76.1 µs
<xarray.Dataset>
Dimensions:    (lat: 180, bnds: 2, lev: 35, lon: 360, time: 3600)
Coordinates:
  * lat        (lat) float64 -89.5 -88.5 -87.5 -86.5 ... 86.5 87.5 88.5 89.5
  * lev        (lev) float64 2.5 10.0 20.0 32.5 ... 5e+03 5.5e+03 6e+03 6.5e+03
  * lon        (lon) float64 0.5 1.5 2.5 3.5 4.5 ... 356.5 357.5 358.5 359.5
  * time       (time) object 1708-01-17 03:30:00 ... 1720-07-17 08:30:00
    time_bnds  (time, bnds) object dask.array<chunksize=(1, 2), meta=np.ndarray>
Dimensions without coordinates: bnds
Data variables:
    lat_bnds   (lat, bnds) float64 dask.array<chunksize=(180, 2), meta=np.ndarray>
    lev_bnds   (lev, bnds) float64 dask.array<chunksize=(35, 2), meta=np.ndarray>
    lon_bnds   (lon, bnds) float64 dask.array<chunksize=(360, 2), meta=np.ndarray>
    thetao     (time, lev, lat, lon) float32 dask.array<chunksize=(1, 18, 90, 180), meta=np.ndarray>
Attributes: (12/45)
    Conventions:           CF-1.7 CMIP-6.0 UGRID-1.0
    _nc3_strict:           1
    activity_id:           OMIP
    branch_method:         none provided
    branch_time_in_child:  0.0
    comment:               Experiment name = OM4p25_IAF_BLING_csf_rerun\nFor ...
    ...                    ...
    table_id:              Omon
    title:                 NOAA GFDL GFDL-CM4 model output prepared for CMIP6...
    tracking_id:           hdl:21.14100/97e4edf3-22e7-4e5f-831a-f2a671b7094f
    variable_id:           thetao
    variant_info:          N/A
    variant_label:         r1i1p1f1

Note that it opened immediately! 🎉

Note

The Zarr chunks of the reference dataset correspond 1:1 to the HDF5 chunks in the original dataset. These chunks are often smaller than optimal for cloud-based analysis.

If we want to pass custom options to xarray when loading the dataset, we do so as follows. In this example, we specify larger chunks and fix the coords.

ds = cat.data(
    chunks={'time': 10, 'lev': -1, 'lat': -1, 'lon': -1},
    decode_coords='all'
).to_dask()
ds
<xarray.Dataset>
Dimensions:    (lat: 180, bnds: 2, lev: 35, lon: 360, time: 3600)
Coordinates:
  * lat        (lat) float64 -89.5 -88.5 -87.5 -86.5 ... 86.5 87.5 88.5 89.5
    lat_bnds   (lat, bnds) float64 dask.array<chunksize=(180, 2), meta=np.ndarray>
  * lev        (lev) float64 2.5 10.0 20.0 32.5 ... 5e+03 5.5e+03 6e+03 6.5e+03
    lev_bnds   (lev, bnds) float64 dask.array<chunksize=(35, 2), meta=np.ndarray>
  * lon        (lon) float64 0.5 1.5 2.5 3.5 4.5 ... 356.5 357.5 358.5 359.5
    lon_bnds   (lon, bnds) float64 dask.array<chunksize=(360, 2), meta=np.ndarray>
  * time       (time) object 1708-01-17 03:30:00 ... 1720-07-17 08:30:00
    time_bnds  (time, bnds) object dask.array<chunksize=(10, 2), meta=np.ndarray>
Dimensions without coordinates: bnds
Data variables:
    thetao     (time, lev, lat, lon) float32 dask.array<chunksize=(10, 35, 180, 360), meta=np.ndarray>
Attributes: (12/45)
    Conventions:           CF-1.7 CMIP-6.0 UGRID-1.0
    _nc3_strict:           1
    activity_id:           OMIP
    branch_method:         none provided
    branch_time_in_child:  0.0
    comment:               Experiment name = OM4p25_IAF_BLING_csf_rerun\nFor ...
    ...                    ...
    table_id:              Omon
    title:                 NOAA GFDL GFDL-CM4 model output prepared for CMIP6...
    tracking_id:           hdl:21.14100/97e4edf3-22e7-4e5f-831a-f2a671b7094f
    variable_id:           thetao
    variant_info:          N/A
    variant_label:         r1i1p1f1

Manual Loading

It is also possible to load the reference dataset directly with xarray, bypassing intake.

ref_url = target._full_path("reference.json")
ref_url
'/var/folders/n8/63q49ms55wxcj_gfbtykwp5r0000gn/T/tmpdcb5rw4l/reference.json'
import fsspec
m = fsspec.get_mapper(
    "reference://",
    fo=ref_url,
    target_protocol="file",
    remote_protocol="s3",
    skip_instance_cache=True,
)
ds = xr.open_dataset(
    m,
    engine='zarr',
    backend_kwargs={'consolidated': False},
    chunks={},
    decode_coords="all"
)
ds
<xarray.Dataset>
Dimensions:    (lat: 180, bnds: 2, lev: 35, lon: 360, time: 3600)
Coordinates:
  * lat        (lat) float64 -89.5 -88.5 -87.5 -86.5 ... 86.5 87.5 88.5 89.5
    lat_bnds   (lat, bnds) float64 dask.array<chunksize=(180, 2), meta=np.ndarray>
  * lev        (lev) float64 2.5 10.0 20.0 32.5 ... 5e+03 5.5e+03 6e+03 6.5e+03
    lev_bnds   (lev, bnds) float64 dask.array<chunksize=(35, 2), meta=np.ndarray>
  * lon        (lon) float64 0.5 1.5 2.5 3.5 4.5 ... 356.5 357.5 358.5 359.5
    lon_bnds   (lon, bnds) float64 dask.array<chunksize=(360, 2), meta=np.ndarray>
  * time       (time) object 1708-01-17 03:30:00 ... 1720-07-17 08:30:00
    time_bnds  (time, bnds) object dask.array<chunksize=(1, 2), meta=np.ndarray>
Dimensions without coordinates: bnds
Data variables:
    thetao     (time, lev, lat, lon) float32 dask.array<chunksize=(1, 18, 90, 180), meta=np.ndarray>
Attributes: (12/45)
    Conventions:           CF-1.7 CMIP-6.0 UGRID-1.0
    _nc3_strict:           1
    activity_id:           OMIP
    branch_method:         none provided
    branch_time_in_child:  0.0
    comment:               Experiment name = OM4p25_IAF_BLING_csf_rerun\nFor ...
    ...                    ...
    table_id:              Omon
    title:                 NOAA GFDL GFDL-CM4 model output prepared for CMIP6...
    tracking_id:           hdl:21.14100/97e4edf3-22e7-4e5f-831a-f2a671b7094f
    variable_id:           thetao
    variant_info:          N/A
    variant_label:         r1i1p1f1

Problem with Time Encoding

Warning

There is currently a bug with time encoding in fsspec reference maker which causes the time coordinate in this dataset to be messed up. Until this bug is fixed, we can manually override the time variable.

We know the data are monthly, starting in Jan. 1708.

import pandas as pd
import datetime as dt

ds = ds.assign_coords(
    time=pd.date_range(start='1708-01', freq='MS', periods=ds.dims['time']) + dt.timedelta(days=14)
)
ds.time
<xarray.DataArray 'time' (time: 3600)>
array(['1708-01-15T00:00:00.000000000', '1708-02-15T00:00:00.000000000',
       '1708-03-15T00:00:00.000000000', ..., '2007-10-15T00:00:00.000000000',
       '2007-11-15T00:00:00.000000000', '2007-12-15T00:00:00.000000000'],
      dtype='datetime64[ns]')
Coordinates:
  * time     (time) datetime64[ns] 1708-01-15 1708-02-15 ... 2007-12-15

Make a Map

Let’s just verify that we can read an visualize the data. We’ll compare the first year to the last year.

ds_ann = ds.resample(time='A').mean()
sst_diff = ds_ann.thetao.isel(time=-1, lev=0) - ds_ann.thetao.isel(time=0, lev=0)
sst_diff.plot()
/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/dask/array/numpy_compat.py:39: RuntimeWarning: invalid value encountered in true_divide
  x = np.divide(x1, x2, out)
<matplotlib.collections.QuadMesh at 0x18aa40790>
../../_images/reference_cmip6_27_2.png