Reading data from a stream

[1]:
import earthkit.data as ekd

earthkit-data can load GRIB data from a stream source, which can be an FDB stream, a standard Python IO stream or any object implementing the necessary stream methods.

For simplicity, in this notebook we will use a file stream to demonstrate the usage of streams. First, we ensure the example file containing 6 messages is available.

[2]:
ekd.download_example_file("test6.grib")

Getting single items from the stream

We create a stream from a file containing 6 GRIB fields by simply calling open(). It returns an io.BufferedReader object (a file stream). We load it into earthkit-data.

[3]:
stream = open("test6.grib", "rb")
ds_in = ekd.from_source("stream", stream)
ds_in
[3]:
Stream of fields

typesfieldlist

To access the stream data we need to convert the data object into a stream fieldlist.

[4]:
ds = ds_in.to_fieldlist()

Using this object we can iterate through the stream. As we progressing with the iteration Field objects are created then get deleted when going out of scope. As a result, only one GRIB message is kept in memory at a time.

[5]:
for f in ds:
    # f is Field object. It gets deleted when going out of scope
    print(f)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)

Having finished the iteration there is no data available in ds. We can close the stream:

[6]:
stream.close()

Using batched

When we use the batched() method we can iterate through the stream in batches of fixed size. In this example we create a stream and read 2 fields from it at a time.

[7]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream).to_fieldlist()

# f is a fieldlist
for f in ds.batched(2):
    print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=2 [['t', 1000], ['u', 1000]]
len=2 [['v', 1000], ['t', 850]]
len=2 [['u', 850], ['v', 850]]

Having finished the iteration there is no data available in ds. We can close the stream:

[8]:
stream.close()

It is possible to use a batch size that is not a factor of the total number fields in the stream. In this case the last batch will simply contain less fields than the specified batch size.

[9]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream).to_fieldlist()

# f is a fieldlist
for f in ds.batched(4):
    print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=4 [['t', 1000], ['u', 1000], ['v', 1000], ['t', 850]]
len=2 [['u', 850], ['v', 850]]

Using group_by

When we use the group_by>() method we can iterate throught the stream in groups defined by metadata keys. Each iteration step results in a SimpleFieldList object, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. The generated FieldList keeps GRIB messages in memory then gets deleted when going out of scope.

[10]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream).to_fieldlist()

# f is a fieldlist
for f in ds.group_by("level"):
    print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=6 [['t', 1000], ['u', 1000], ['v', 1000], ['t', 850], ['u', 850], ['v', 850]]

Having finished the iteration there is no data available in ds. We can close the stream:

[11]:
stream.close()

Storing each GRIB message in memory

We can load the whole stream into memory by using read_all=True in to_fieldlist(). The resulting object will be a SimpleFieldList storing all the GRIB messages in memory.

[12]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream).to_fieldlist(read_all=True)
[13]:
len(ds)
[13]:
6
[14]:
ds.ls()
[14]:
parameter.variable time.valid_datetime time.base_datetime time.step vertical.level vertical.level_type ensemble.member geography.grid_type
0 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
1 u 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
2 v 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
3 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll
4 u 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll
5 v 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll
[15]:
a = ds.sel({"parameter.variable": "t"})
a.ls()
[15]:
parameter.variable time.valid_datetime time.base_datetime time.step vertical.level vertical.level_type ensemble.member geography.grid_type
0 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
1 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll
[16]:
a = a.to_xarray()
a
[16]:
<xarray.Dataset> Size: 2kB
Dimensions:    (level: 2, latitude: 7, longitude: 12)
Coordinates:
  * level      (level) int64 16B 850 1000
  * latitude   (latitude) float64 56B 90.0 60.0 30.0 0.0 -30.0 -60.0 -90.0
  * longitude  (longitude) float64 96B 0.0 30.0 60.0 90.0 ... 270.0 300.0 330.0
Data variables:
    t          (level, latitude, longitude) float64 1kB ...
Attributes:
    Conventions:  CF-1.8
    institution:  ECMWF

We close the stream:

[17]:
stream.close()