NARR: Subsetting and OPeNDAP
Contents
NARR: Subsetting and OPeNDAP#
About the Dataset#
This tutorial uses data from NOAA’s North American Regional Reanalysis (NARR)
The North American Regional Reanalysis (NARR) is a model produced by the National Centers for Environmental Prediction (NCEP) that generates reanalyzed data for temperature, wind, moisture, soil, and dozens of other parameters. The NARR model assimilates a large amount of observational data from a variety of sources to produce a long-term picture of weather over North America.
For this recipe, we will access the data via OPeNDAP, a widely-used API for remote access of environmental data over HTTP. A key point is that, since we use using OPeNDAP, there are no input files to download / cache. We open the data directly from the remote server.
The data we will use are catalogged here (3D data on pressure levels): https://psl.noaa.gov/thredds/catalog/Datasets/NARR/pressure/catalog.html
Let’s peek at one file. Xarray should automatically do the right thing with the OPeNDAP url. But just to be safe, we can pass the option, engine='netcdf4'
, which is needed to open OPeNDAP links correctly. (We will need this again later when writing our recipe.)
import xarray as xr
url = "https://psl.noaa.gov/thredds/dodsC/Datasets/NARR/pressure/air.197901.nc"
ds = xr.open_dataset(url, engine='netcdf4', decode_cf="all")
ds
This is just one file.
But it’s a very big file (several GB)!
We will want to break it up by specifying target_chunks
when we write to Zarr.
ds.air._ChunkSizes
This tells us that we can subset in the time
or level
dimensions, but problably should avoid subsetting in the x
and y
dimensions.
Also note the presence of the Lambert_Conformal
data variable. This should be a coordinate. So we will need to write a custom transform to make that change.
Define File Pattern#
We are now ready to define the FilePattern
for the recipe. There is one file per month. So we start with a function like this:
def format_function(time):
return f"https://psl.noaa.gov/thredds/dodsC/Datasets/NARR/pressure/air.{time}.nc"
To keep things short and simple for this tutorial, we will just use one file, and subset it into many chunks.
But we could easily add more months to build up the entire dataset.
Since each file is monthly, and the number of days per months varies, we cannot set nitems_per_input
in the ConcatDim
.
Note
It’s important that we specify file_type="opendap"
when creating a FilePattern with OPeNDAP URLs.
OPeNDAP is actually an API, so there are no files to download.
from pangeo_forge_recipes.patterns import FilePattern, ConcatDim, MergeDim
time_dim = ConcatDim("time", ["197901"])
pattern = FilePattern(format_function, time_dim, file_type="opendap")
pattern
Define the Pipeline#
import apache_beam as beam
from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr
from pangeo_forge_recipes.transforms import Indexed, T
class SetProjectionAsCoord(beam.PTransform):
"""A preprocessing function which will assign the `Lambert_Conformal` variable as a coordinate variable."""
@staticmethod
def _set_projection_as_coord(item: Indexed[T]) -> Indexed[T]:
index, ds = item
ds = ds.set_coords(["Lambert_Conformal"])
return index, ds
def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._set_projection_as_coord)
We now define a target location for our recipe. Here we just use a temporary directory.
import os
from tempfile import TemporaryDirectory
td = TemporaryDirectory()
target_root = td.name
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)
target_store
Now we put together the necessary PTransforms. In this pipeline we’re adding in the argument, target_chunks
, which is a dictionary describing how we want the output dataset to be chunked. In this example, we are specifying single time chunks ({"time": 1}
).
transforms = (
beam.Create(pattern.items())
| OpenWithXarray(file_type=pattern.file_type)
| SetProjectionAsCoord()
| StoreToZarr(
store_name=store_name,
target_root=target_root,
combine_dims=pattern.combine_dim_keys,
target_chunks={"time": 1}
)
)
transforms
with beam.Pipeline() as p:
p | transforms
Check The Outputs#
ds_target = xr.open_dataset(target_store, engine="zarr", chunks={})
ds_target
ds_target.air.isel(level=0).mean("time").plot()