GPCP

GPCP#

Hint

This recipe is representative of the Open with Xarray, write to Zarr style.

import apache_beam as beam
import pandas as pd
import zarr

from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import (
    ConsolidateDimensionCoordinates,
    ConsolidateMetadata,
    OpenURLWithFSSpec,
    OpenWithXarray,
    StoreToZarr,
)

dates = [
    d.to_pydatetime().strftime("%Y%m%d")
    for d in pd.date_range("1996-10-01", "1999-02-01", freq="D")
]


def make_url(time):
    url_base = "https://storage.googleapis.com/pforge-test-data"
    return f"{url_base}/gpcp/v01r03_daily_d{time}.nc"


concat_dim = ConcatDim("time", dates, nitems_per_file=1)
pattern = FilePattern(make_url, concat_dim)


def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore:
    # This fails integration test if not imported here
    # TODO: see if --setup-file option for runner fixes this
    import xarray as xr

    ds = xr.open_dataset(store, engine="zarr", chunks={})
    assert ds.title == (
        "Global Precipitation Climatatology Project (GPCP) " "Climate Data Record (CDR), Daily V1.3"
    )
    # Making sure that the native chunking is different from the dynamic chunking
    assert ds.chunks["time"][0] == 1

    return store


recipe = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec()
    | OpenWithXarray(file_type=pattern.file_type, xarray_open_kwargs={"decode_coords": "all"})
    | StoreToZarr(
        store_name="gpcp.zarr",
        combine_dims=pattern.combine_dim_keys,
    )
    | ConsolidateDimensionCoordinates()
    | ConsolidateMetadata()
    | "Test dataset" >> beam.Map(test_ds)
)