NARR: Subsetting and OPeNDAP#

About the Dataset#

This tutorial uses data from NOAA’s North American Regional Reanalysis (NARR)

The North American Regional Reanalysis (NARR) is a model produced by the National Centers for Environmental Prediction (NCEP) that generates reanalyzed data for temperature, wind, moisture, soil, and dozens of other parameters. The NARR model assimilates a large amount of observational data from a variety of sources to produce a long-term picture of weather over North America.

For this recipe, we will access the data via OPeNDAP, a widely-used API for remote access of environmental data over HTTP. A key point is that, since we use using OPeNDAP, there are no input files to download / cache. We open the data directly from the remote server.

The data we will use are catalogged here (3D data on pressure levels): https://psl.noaa.gov/thredds/catalog/Datasets/NARR/pressure/catalog.html

Let’s peek at one file. Xarray should automatically do the right thing with the OPeNDAP url. But just to be safe, we can pass the option, engine='netcdf4', which is needed to open OPeNDAP links correctly. (We will need this again later when writing our recipe.)

import xarray as xr
url = "https://psl.noaa.gov/thredds/dodsC/Datasets/NARR/pressure/air.197901.nc"
ds = xr.open_dataset(url, engine='netcdf4', decode_cf="all")
ds

This is just one file. But it’s a very big file (several GB)! We will want to break it up by specifying target_chunks when we write to Zarr.

ds.air._ChunkSizes

This tells us that we can subset in the time or level dimensions, but problably should avoid subsetting in the x and y dimensions.

Also note the presence of the Lambert_Conformal data variable. This should be a coordinate. So we will need to write a custom transform to make that change.

Define File Pattern#

We are now ready to define the FilePattern for the recipe. There is one file per month. So we start with a function like this:

def format_function(time):
    return f"https://psl.noaa.gov/thredds/dodsC/Datasets/NARR/pressure/air.{time}.nc"

To keep things short and simple for this tutorial, we will just use one file, and subset it into many chunks. But we could easily add more months to build up the entire dataset. Since each file is monthly, and the number of days per months varies, we cannot set nitems_per_input in the ConcatDim.

Note

It’s important that we specify file_type="opendap" when creating a FilePattern with OPeNDAP URLs. OPeNDAP is actually an API, so there are no files to download.

from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, MergeDim
time_dim = ConcatDim("time", ["197901"])
pattern = FilePattern(format_function, time_dim, file_type="opendap")
pattern

Define the Pipeline#

import apache_beam as beam
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
from pangeo_forge_recipes.transforms import Indexed, T

class SetProjectionAsCoord(beam.PTransform):
    """A preprocessing function which will assign the `Lambert_Conformal` variable as a coordinate variable."""

    @staticmethod
    def _set_projection_as_coord(item: Indexed[T]) -> Indexed[T]:
        index, ds = item
        ds = ds.set_coords(["Lambert_Conformal"])
        return index, ds

    def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
        return pcoll | beam.Map(self._set_projection_as_coord)

We now define a target location for our recipe. Here we just use a temporary directory.

import os
from tempfile import TemporaryDirectory
td = TemporaryDirectory()
target_root = td.name
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)
target_store

Now we put together the necessary PTransforms. In this pipeline we’re adding in the argument, target_chunks, which is a dictionary describing how we want the output dataset to be chunked. In this example, we are specifying single time chunks ({"time": 1}).

transforms = (
    beam.Create(pattern.items())
    | OpenWithXarray(file_type=pattern.file_type)
    | SetProjectionAsCoord()
    | StoreToZarr(
        store_name=store_name,
        target_root=target_root,
        combine_dims=pattern.combine_dim_keys,
        target_chunks={"time": 1}
    )
)
transforms
with beam.Pipeline() as p:
    p | transforms

Check The Outputs#

ds_target =  xr.open_dataset(target_store, engine="zarr", chunks={})
ds_target
ds_target.air.isel(level=0).mean("time").plot()