NetCDF Zarr Sequential Recipe: CMIP6

This tutorial describes how to create a suitable recipe for many of the CMIP6 datasets. The source data is a sequence of NetCDF files accessed from the ‘s3://esgf-world’ bucket. The target is a Zarr store.

Background

  • The s3://esgf-world bucket has about 250,000 datasets stored in about 950,000 netcdf files (for an average of about four netcdf files per dataset). This is a small subset of the WCRP-CMIP6 collection available at the Federated ESGF-COG nodes such as https://esgf-node.llnl.gov/search/cmip6, but it is faster and easier to work with.

  • Each CMIP6 dataset can be identified by a 6-tuple consisting of:

      (model,experiment,ensemble_member,mip_table,variable,grid_label)
    

and so a convenient name for a particular dataset is a string of these values joined with a ‘.’ separator:

  dataset = model.experiment.ensemble_member.mip_table.variable.grid_label
  • There can be multiple versions of a dataset, designated by a string beginning with ‘v’ and then an 8 digit date, loosely associated with its creation time

import pandas as pd
import xarray as xr
import s3fs

Step 1: Get to know your source data

The CMIP6 collection is very heterogeneous, so getting to know the source data is rather complicated. We first need to identify a dataset and learn how to list the set of netcdf files which are associated with it. Fortunately, you can explore the data here: https://esgf-world.s3.amazonaws.com/index.html#CMIP6/ or download a CSV file listing all of the netcdf files, one per line.

Here we will read the CSV file into a pandas dataframe so we can search, sort and subset the available datasets and their netcdf files.

netcdf_cat = 's3://cmip6-nc/esgf-world.csv.gz'
df_s3 = pd.read_csv(netcdf_cat, dtype='unicode')
df_s3.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 990611 entries, 0 to 990610
Data columns (total 13 columns):
 #   Column           Non-Null Count   Dtype 
---  ------           --------------   ----- 
 0   project          990611 non-null  object
 1   institute        990611 non-null  object
 2   model            990611 non-null  object
 3   experiment_id    990611 non-null  object
 4   frequency        499599 non-null  object
 5   modeling_realm   499599 non-null  object
 6   mip_table        990611 non-null  object
 7   ensemble_member  990611 non-null  object
 8   grid_label       990611 non-null  object
 9   variable         990611 non-null  object
 10  temporal subset  961917 non-null  object
 11  version          990611 non-null  object
 12  path             990611 non-null  object
dtypes: object(13)
memory usage: 98.3+ MB
# So there are 956,306 entries, one for each netcdf file. We can see the first five here:
# The 'path' column is the most important - you may need to scroll the window to see it!

df_s3.head()
project institute model experiment_id frequency modeling_realm mip_table ensemble_member grid_label variable temporal subset version path
0 CMIP6 AS-RCEC TaiESM1 histSST-piNTCF NaN NaN AERmon r1i1p1f1 gn ps 185001-201412 v20200318 s3://esgf-world/CMIP6/AerChemMIP/AS-RCEC/TaiES...
1 CMIP6 AS-RCEC TaiESM1 histSST-piNTCF NaN NaN CFmon r1i1p1f1 gn ta 185001-201412 v20200318 s3://esgf-world/CMIP6/AerChemMIP/AS-RCEC/TaiES...
2 CMIP6 AS-RCEC TaiESM1 histSST-piNTCF NaN NaN LImon r1i1p1f1 gn snc 185002-201412 v20200318 s3://esgf-world/CMIP6/AerChemMIP/AS-RCEC/TaiES...
3 CMIP6 AS-RCEC TaiESM1 histSST-piNTCF NaN NaN LImon r1i1p1f1 gn snd 185002-201412 v20200318 s3://esgf-world/CMIP6/AerChemMIP/AS-RCEC/TaiES...
4 CMIP6 AS-RCEC TaiESM1 histSST-piNTCF NaN NaN LImon r1i1p1f1 gn snw 185002-201412 v20200318 s3://esgf-world/CMIP6/AerChemMIP/AS-RCEC/TaiES...
# We will add a new column which is our short name for the datasets (may take a moment for all 956306 rows)
df_s3['dataset'] = df_s3.apply(lambda row: '.'.join(row.path.split('/')[6:12]),axis=1)
# the number of unique dataset names can be found using the 'nunique' method
df_s3.dataset.nunique()
242648
# The value in the `path` column of the first row is:
df_s3.path.values[0]
's3://esgf-world/CMIP6/AerChemMIP/AS-RCEC/TaiESM1/histSST-piNTCF/r1i1p1f1/AERmon/ps/gn/v20200318/ps_AERmon_TaiESM1_histSST-piNTCF_r1i1p1f1_gn_185001-201412.nc'
# which has the short name:
df_s3.dataset.values[0]
'TaiESM1.histSST-piNTCF.r1i1p1f1.AERmon.ps.gn'
# some datasets have multiple versions: (will just check one in each 500 of them ...)
for dataset in df_s3.dataset.unique()[::500]:
    df_dataset = df_s3[df_s3.dataset==dataset]
    if df_dataset.version.nunique() > 1:
        print(dataset,df_dataset.version.unique())
MRI-ESM2-0.piClim-SO2.r1i1p1f1.Amon.rlds.gn ['v20190912' 'v20200114']
CanESM5.historical.r1i1p1f1.Omon.hfds.gn ['v20190306' 'v20190429']
EC-Earth3-Veg.amip.r1i1p1f1.Lmon.mrso.gr ['v20190605' 'v20200225']
CESM2.abrupt-4xCO2.r1i1p1f1.Emon.loadso4.gn ['v20190425' 'v20190828' 'v20190927']
SAM0-UNICON.piControl.r1i1p1f1.fx.areacella.gn ['v20190323' 'v20190910']
CanESM5.hist-stratO3.r6i1p1f1.Amon.va.gn ['v20190306' 'v20190429']
CanESM5.ssp245-aer.r7i1p1f1.Omon.fgo2.gn ['v20190306' 'v20190429']
CanESM5.ssp245-nat.r8i1p1f1.Amon.va.gn ['v20190306' 'v20190429']
CanESM5.ssp245-stratO3.r8i1p1f1.Omon.uo.gn ['v20190306' 'v20190429']
IPSL-CM6A-LR.piClim-spAer-anthro.r1i1p1f1.Amon.rsdt.gr ['v20190118' 'v20190507']
CESM2.piClim-aer.r1i1p1f1.AERmon.od550aer.gn ['v20190815' 'v20191205']
CanESM5.ssp126.r2i1p1f1.Amon.psl.gn ['v20190306' 'v20190429']
CanESM5.ssp370.r5i1p1f1.Omon.thetao.gn ['v20190306' 'v20190429']
CanESM5.ssp585.r6i1p1f1.Amon.vas.gn ['v20190306' 'v20190429']
IPSL-CM6A-LR.ssp585.r1i1p1f1.Eday.loadbc.gr ['v20190119' 'v20190903']
# So pick a dataset, any dataset, and try it!  N.B. some datasets are VERY large - especially the day, 6hourly, etc.
#dataset = df_s3.dataset[10450]
# or:
dataset = 'GFDL-CM4.historical.r1i1p1f1.Amon.tas.gr1'
df_dataset = df_s3[df_s3.dataset==dataset]
df_dataset
project institute model experiment_id frequency modeling_realm mip_table ensemble_member grid_label variable temporal subset version path dataset
491687 CMIP6 NOAA-GFDL GFDL-CM4 historical mon atmos Amon r1i1p1f1 gr1 tas 185001-194912 v20180701 s3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/... GFDL-CM4.historical.r1i1p1f1.Amon.tas.gr1
491688 CMIP6 NOAA-GFDL GFDL-CM4 historical mon atmos Amon r1i1p1f1 gr1 tas 195001-201412 v20180701 s3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/... GFDL-CM4.historical.r1i1p1f1.Amon.tas.gr1

So is this what we expect?

  • this dataset is split over 3 netcdf files - see any trouble here?

  • lets do a quick sanity check (make sure one and only one variable is specified) and get only the latest version of the files

dvars = df_dataset.variable.unique()
assert len(dvars) > 0, 'no netcdf files found for this dataset'
assert len(dvars) == 1, f"trouble with this dataset, too many datasets found: {dvars}"
    
var = dvars[0]
print('The variable is:',var)

# make sure we are looking at the last available version:
last_version = sorted(df_dataset.version.unique())[-1]
dze = df_dataset[df_dataset.version == last_version].reset_index(drop=True)

input_urls = sorted(dze.path.unique())
input_urls
The variable is: tas
['s3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_185001-194912.nc',
 's3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_195001-201412.nc']

There are only two files - one netcdf file was from an older version!

  • We want to look at the first netcdf file to make sure we know what to expect

  • To use xarray.open_dataset, we need to turn the input_url (starting with ‘s3://’) into an appropriate file_like object.

# Connect to AWS S3 storage
fs_s3 = s3fs.S3FileSystem(anon=True)

file_url = fs_s3.open(input_urls[0], mode='rb')
ds = xr.open_dataset(file_url)
print(ds)
<xarray.Dataset>
Dimensions:    (bnds: 2, lat: 180, lon: 288, time: 1200)
Coordinates:
  * bnds       (bnds) float64 1.0 2.0
    height     float64 ...
  * lat        (lat) float64 -89.5 -88.5 -87.5 -86.5 ... 86.5 87.5 88.5 89.5
  * lon        (lon) float64 0.625 1.875 3.125 4.375 ... 355.6 356.9 358.1 359.4
  * time       (time) object 1850-01-16 12:00:00 ... 1949-12-16 12:00:00
Data variables:
    lat_bnds   (lat, bnds) float64 ...
    lon_bnds   (lon, bnds) float64 ...
    tas        (time, lat, lon) float32 ...
    time_bnds  (time, bnds) object ...
Attributes: (12/46)
    external_variables:     areacella
    history:                File was processed by fremetar (GFDL analog of CM...
    table_id:               Amon
    activity_id:            CMIP
    branch_method:          standard
    branch_time_in_child:   [0.]
    ...                     ...
    variable_id:            tas
    variant_info:           N/A
    references:             see further_info_url attribute
    variant_label:          r1i1p1f1
    branch_time_in_parent:  [36500.]
    parent_time_units:      days since 0001-1-1

Step 2: Deciding how to chunk the dataset

  • For parallel I/O and subsetting the dataset in time, we will chunk the data in the time dimension

  • In order to figure out the number of time slices in each chunk, we do a small calculation on the first netcdf file

  • Here we set the desired chunk size to 50 Mb, but something between 50-100 Mb is usually alright

ntime = len(ds.time)       # the number of time slices
chunksize_optimal = 50e6  # desired chunk size in bytes
ncfile_size = ds.nbytes    # the netcdf file size
chunksize = max(int(ntime* chunksize_optimal/ ncfile_size),1)

target_chunks = ds.dims.mapping
target_chunks['time'] = chunksize

target_chunks   # a dictionary giving the chunk sizes in each dimension
SortedKeysDict({'bnds': 2, 'lat': 180, 'lon': 288, 'time': 241})

Step 3: Define a pre-processing function

  • This is an optional step which we want to apply to each chunk

  • Here we change some data variables into coordinate variables, but you can define your own pre-processing step here

# the netcdf lists some of the coordinate variables as data variables. This is a fix which we want to apply to each chunk.
def set_bnds_as_coords(ds):
    new_coords_vars = [var for var in ds.data_vars if 'bnds' in var or 'bounds' in var]
    ds = ds.set_coords(new_coords_vars)
    return ds

Step 4: Create a recipe

  • A FilePattern is the starting place for all recipes. These Python objects are the “raw ingredients” upon which the recipe will act. They describe how the individual source files are organized logically as part of a larger dataset. To create a file pattern, the first step is to define a function which takes any variable components of the source file path as inputs, and returns full file path strings.

  • Revisting our input urls, we see that the only variable components of these paths are the 13-character numerical strings which immediatly precede the .nc file extension:

for url in input_urls:
    print(f'''{url}
    ''')
s3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_185001-194912.nc
    
s3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_195001-201412.nc
    

What do these strings refer to?

  • If it was not immediately apparent, comparison to our dataset coordinates makes it clear that these numerical strings are time ranges; the string '185001-194912' from the first url, e.g., represents a time range from Jan 1850 through Dec 1949:

print(ds.coords)
Coordinates:
  * bnds     (bnds) float64 1.0 2.0
    height   float64 ...
  * lat      (lat) float64 -89.5 -88.5 -87.5 -86.5 -85.5 ... 86.5 87.5 88.5 89.5
  * lon      (lon) float64 0.625 1.875 3.125 4.375 ... 355.6 356.9 358.1 359.4
  * time     (time) object 1850-01-16 12:00:00 ... 1949-12-16 12:00:00

Let’s define a function that takes these strings as input

  • … and returns full file paths!

def make_full_path(time):
    '''
    Parameters
    ----------
    time : str
    
        A 13-character string, comprised of two 6-character dates delimited by a dash. 
        The first four characters of each date are the year, and the final two are the month.
        
        e.g. The time range from Jan 1850 through Dec 1949 is expressed as '185001-194912'.
            
    '''
    base_url = 's3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/'
    return base_url + f'tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_{time}.nc'
# And let's be sure to test our function before moving on.

test_url = make_full_path('185001-194912')
print(test_url)

# If our function works, inputting '185001-194912' should have returned a url identical to
# the first of the two urls in the list named `input_urls` defined in cell 10, above:

test_url == input_urls[0]
s3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_185001-194912.nc
True

Combining dimensions

  • Before we initialize our file pattern, we need to define how we want files to be combined in our eventual zarr store

  • We have two options:

    1. Concatenating dimensions with a ConcatDim instance

    2. Merging dimensions with a MergeDim instance

  • Our current dataset requires only concatenation, which we can achieve by instantiating ConcatDim with our variable name ("time") as a positional argument, followed by a keys kwarg, which is a list containing all of the ways which this variable appears in our set of source file paths.

Note: This example reads from only two source files, so we can simply copy-and-paste their respective time variables into a list. If the number of source files was much larger, we might consider finding a way to create this keys list programatically.

from pangeo_forge_recipes.patterns import ConcatDim
time_concat_dim = ConcatDim("time", keys=['185001-194912', '195001-201412'])

Instantiating the file pattern

  • Now that we have a both file path function and our “combine dimensions” object, we can move on to instantiating to file pattern, passing these two objects as arguments.

  • Note that we will use fsspec.open under the hood for most file opening, so if there are any special keyword arguments we want to pass to this function, now is the time to do it.

  • Here we specify fsspec_open_kwargs={'anon':True} as a keyword argument in the FilePattern, because we want to access the source files anonymously.

from pangeo_forge_recipes.patterns import FilePattern
pattern = FilePattern(make_full_path, time_concat_dim, fsspec_open_kwargs={'anon':True})
pattern
<FilePattern {'time': 2}>

By inspecting our instantiated pattern we see that our pattern has indexed our two files chronologically according to the concatenation key we provided it, and assigned the correct url to each file using the file path function:

for index, fname in pattern.items():
    print(index, fname)
time-0 s3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_185001-194912.nc
time-1 s3://esgf-world/CMIP6/CMIP/NOAA-GFDL/GFDL-CM4/historical/r1i1p1f1/Amon/tas/gr1/v20180701/tas_Amon_GFDL-CM4_historical_r1i1p1f1_gr1_195001-201412.nc

Time to make the recipe!

  • In it’s most basic form, XarrayZarrRecipe can be instantiated using a file pattern as the only argument. Here we’ll be using some of the optional arguments to specify a few additional preferences:

from pangeo_forge_recipes.recipes.xarray_zarr import XarrayZarrRecipe

recipe = XarrayZarrRecipe(
    pattern, 
    target_chunks=target_chunks,
    process_chunk=set_bnds_as_coords,
    xarray_concat_kwargs={'join':'exact'},
)

Step 5: Create storage targets

  • Here we are caching the netcdf files locally

  • We also need a temporary metadata cache because we don’t know in advance how many time slices are in each netcdf file

import tempfile
from fsspec.implementations.local import LocalFileSystem
from pangeo_forge_recipes.storage import FSSpecTarget, CacheFSSpecTarget, MetadataTarget

fs_local = LocalFileSystem()

target_dir = tempfile.TemporaryDirectory()
target = FSSpecTarget(fs_local, target_dir.name)

cache_dir = tempfile.TemporaryDirectory()
cache_target = CacheFSSpecTarget(fs_local, cache_dir.name)

meta_dir = tempfile.TemporaryDirectory()
meta_store = MetadataTarget(fs_local, meta_dir.name)

recipe.target = target
recipe.input_cache = cache_target
recipe.metadata_cache = meta_store

cache_target.root_path, target.root_path, meta_store.root_path
('/var/folders/mz/gxy_z7dx1k153xf0c3fks9_40000gp/T/tmppas0xjj8',
 '/var/folders/mz/gxy_z7dx1k153xf0c3fks9_40000gp/T/tmpsykkms9h',
 '/var/folders/mz/gxy_z7dx1k153xf0c3fks9_40000gp/T/tmpx3_wx22z')

Step 6: Execute the recipe

import zarr

for input_key in recipe.iter_inputs():
    recipe.cache_input(input_key)

# use recipe to create the zarr store:
recipe.prepare_target() 

# is it there?
zgroup = zarr.open(target_dir.name)
print(zgroup.tree())

for chunk in recipe.iter_chunks():
    recipe.store_chunk(chunk)
recipe.finalize_target()
/
 ├── bnds (2,) float64
 ├── height () float64
 ├── lat (180,) float64
 ├── lat_bnds (180, 2) float64
 ├── lon (288,) float64
 ├── lon_bnds (288, 2) float64
 ├── tas (1980, 180, 288) float32
 ├── time (1980,) int64
 └── time_bnds (1980, 2) int64

Step 7: Check the resulting Zarr store

# Check to see if it worked:
ds = xr.open_zarr(target_dir.name)
print(ds)
<xarray.Dataset>
Dimensions:    (bnds: 2, lat: 180, lon: 288, time: 1980)
Coordinates:
  * bnds       (bnds) float64 1.0 2.0
    height     float64 ...
  * lat        (lat) float64 -89.5 -88.5 -87.5 -86.5 ... 86.5 87.5 88.5 89.5
    lat_bnds   (lat, bnds) float64 dask.array<chunksize=(180, 2), meta=np.ndarray>
  * lon        (lon) float64 0.625 1.875 3.125 4.375 ... 355.6 356.9 358.1 359.4
    lon_bnds   (lon, bnds) float64 dask.array<chunksize=(288, 2), meta=np.ndarray>
  * time       (time) object 1850-01-16 12:00:00 ... 2014-12-16 12:00:00
    time_bnds  (time, bnds) object dask.array<chunksize=(241, 2), meta=np.ndarray>
Data variables:
    tas        (time, lat, lon) float32 dask.array<chunksize=(241, 180, 288), meta=np.ndarray>
Attributes: (12/46)
    Conventions:            CF-1.7 CMIP-6.0 UGRID-1.0
    activity_id:            CMIP
    branch_method:          standard
    branch_time_in_child:   0.0
    branch_time_in_parent:  36500.0
    comment:                <null ref>
    ...                     ...
    table_id:               Amon
    title:                  NOAA GFDL GFDL-CM4 model output prepared for CMIP...
    tracking_id:            hdl:21.14100/e4193a02-6405-49b6-8ad3-65def741a4dd
    variable_id:            tas
    variant_info:           N/A
    variant_label:          r1i1p1f1
ds[var][-1].plot()
<matplotlib.collections.QuadMesh at 0x131231d90>
../../_images/cmip6-recipe_41_1.png

Postscript

  • If you find a CMIP6 dataset for which this recipe does not work, Please report it at issue#105 so we can refine the recipe, if possible.

# Troubles found:

dataset = 'IPSL-CM6A-LR.abrupt-4xCO2.r1i1p1f1.Lmon.cLeaf.gr'  # need decode_coords=False in xr.open_dataset, but using xarray_open_kwargs = {'decode_coords':False}, still throws an error when caching the input