Reading data from a stream

[1]:
import earthkit.data as ekd

earthkit-data can load GRIB data from a stream source, which can be an FDB stream, a standard Python IO stream or any object implementing the necessary stream methods.

For simplicity, in this notebook we will use a file stream to demonstrate the usage of streams. First, we ensure the example file containing 6 messages is available.

[2]:
ekd.download_example_file("test6.grib")

Getting single items from the stream

We create a stream from a file containing 6 GRIB fields by simply calling open(). It returns an io.BufferedReader object (a file stream).

[3]:
stream = open("test6.grib", "rb")

We load it into earthkit-data.

[4]:
ds = ekd.from_source("stream", stream)

Using this object we can iterate through the stream. As we progressing with the iteration GribField objects are created then get deleted when going out of scope. As a result, only one GRIB message is kept in memory at a time.

[5]:
for f in ds:
    # f is GribField object. It gets deleted when going out of scope
    print(f)
GribField(t,1000,20180801,1200,0,0)
GribField(u,1000,20180801,1200,0,0)
GribField(v,1000,20180801,1200,0,0)
GribField(t,850,20180801,1200,0,0)
GribField(u,850,20180801,1200,0,0)
GribField(v,850,20180801,1200,0,0)

Having finished the iteration there is no data available in ds. We can close the stream:

[6]:
stream.close()

Using batched

When we use the batched method we can iterate through the stream in batches of fixed size. In this example we create a stream and read 2 fields from it at a time.

[7]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream)

# f is a fieldlist
for f in ds.batched(2):
    print(f"len={len(f)} {f.metadata(('param', 'level'))}")
len=2 [('t', 1000), ('u', 1000)]
len=2 [('v', 1000), ('t', 850)]
len=2 [('u', 850), ('v', 850)]

Having finished the iteration there is no data available in ds. We can close the stream:

[8]:
stream.close()

It is possible to use a batch size that is not a factor of the total number fields in the stream. In this case the last batch will simply contain less fields than the specified batch size.

[9]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream)

# f is a fieldlist
for f in ds.batched(4):
    print(f"len={len(f)} {f.metadata(('param', 'level'))}")
len=4 [('t', 1000), ('u', 1000), ('v', 1000), ('t', 850)]
len=2 [('u', 850), ('v', 850)]

Using group_by

When we use the group_by method we can iterate throught the stream in groups defined by metadata keys. Each iteration step results in a FieldList object, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. The generated FieldList keeps GRIB messages in memory then gets deleted when going out of scope.

[10]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream)

# f is a fieldlist
for f in ds.group_by("level"):
    print(f"len={len(f)} {f.metadata(('param', 'level'))}")
len=3 [('t', 1000), ('u', 1000), ('v', 1000)]
len=3 [('t', 850), ('u', 850), ('v', 850)]

Having finished the iteration there is no data available in ds. We can close the stream:

[11]:
stream.close()

Storing each GRIB message in memory

We can load the whole stream into memory by using read_all=True in from_source(). The resulting object will be a FieldList storing all the GRIB messages in memory.

[12]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream, read_all=True)
[13]:
len(ds)
[13]:
6
[14]:
ds.ls()
[14]:
centre shortName typeOfLevel level dataDate dataTime stepRange dataType number gridType
0 ecmf t isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
1 ecmf u isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
2 ecmf v isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
3 ecmf t isobaricInhPa 850 20180801 1200 0 an 0 regular_ll
4 ecmf u isobaricInhPa 850 20180801 1200 0 an 0 regular_ll
5 ecmf v isobaricInhPa 850 20180801 1200 0 an 0 regular_ll
[15]:
a = ds.sel(param="t")
a.ls()
[15]:
centre shortName typeOfLevel level dataDate dataTime stepRange dataType number gridType
0 ecmf t isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
1 ecmf t isobaricInhPa 850 20180801 1200 0 an 0 regular_ll
[16]:
a = a.to_xarray()
a
[16]:
<xarray.Dataset> Size: 2kB
Dimensions:    (levelist: 2, latitude: 7, longitude: 12)
Coordinates:
  * levelist   (levelist) int64 16B 850 1000
  * latitude   (latitude) float64 56B 90.0 60.0 30.0 0.0 -30.0 -60.0 -90.0
  * longitude  (longitude) float64 96B 0.0 30.0 60.0 90.0 ... 270.0 300.0 330.0
Data variables:
    t          (levelist, latitude, longitude) float64 1kB ...
Attributes: (12/13)
    param:        t
    paramId:      130
    class:        od
    stream:       oper
    levtype:      pl
    type:         an
    ...           ...
    date:         20180801
    time:         1200
    domain:       g
    number:       0
    Conventions:  CF-1.8
    institution:  ECMWF

We close the stream:

[17]:
stream.close()