Reading data from a stream¶
[1]:
import earthkit.data as ekd
earthkit-data can load GRIB data from a stream source, which can be an FDB stream, a standard Python IO stream or any object implementing the necessary stream methods.
For simplicity, in this notebook we will use a file stream to demonstrate the usage of streams. First, we ensure the example file containing 6 messages is available.
[2]:
ekd.download_example_file("test6.grib")
Getting single items from the stream¶
We create a stream from a file containing 6 GRIB fields by simply calling open(). It returns an io.BufferedReader object (a file stream). We load it into earthkit-data.
[3]:
stream = open("test6.grib", "rb")
ds_in = ekd.from_source("stream", stream)
ds_in
[3]:
| types | fieldlist |
To access the stream data we need to convert the data object into a stream fieldlist.
[4]:
ds = ds_in.to_fieldlist()
Using this object we can iterate through the stream. As we progressing with the iteration Field objects are created then get deleted when going out of scope. As a result, only one GRIB message is kept in memory at a time.
[5]:
for f in ds:
# f is Field object. It gets deleted when going out of scope
print(f)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Having finished the iteration there is no data available in ds. We can close the stream:
[6]:
stream.close()
Using batched¶
When we use the batched() method we can iterate through the stream in batches of fixed size. In this example we create a stream and read 2 fields from it at a time.
[7]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream).to_fieldlist()
# f is a fieldlist
for f in ds.batched(2):
print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=2 [['t', 1000], ['u', 1000]]
len=2 [['v', 1000], ['t', 850]]
len=2 [['u', 850], ['v', 850]]
Having finished the iteration there is no data available in ds. We can close the stream:
[8]:
stream.close()
It is possible to use a batch size that is not a factor of the total number fields in the stream. In this case the last batch will simply contain less fields than the specified batch size.
[9]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream).to_fieldlist()
# f is a fieldlist
for f in ds.batched(4):
print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=4 [['t', 1000], ['u', 1000], ['v', 1000], ['t', 850]]
len=2 [['u', 850], ['v', 850]]
Using group_by¶
When we use the group_by>() method we can iterate throught the stream in groups defined by metadata keys. Each iteration step results in a SimpleFieldList object, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. The generated FieldList keeps GRIB messages in memory then gets deleted when going out of scope.
[10]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream).to_fieldlist()
# f is a fieldlist
for f in ds.group_by("level"):
print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=6 [['t', 1000], ['u', 1000], ['v', 1000], ['t', 850], ['u', 850], ['v', 850]]
Having finished the iteration there is no data available in ds. We can close the stream:
[11]:
stream.close()
Storing each GRIB message in memory¶
We can load the whole stream into memory by using read_all=True in to_fieldlist(). The resulting object will be a SimpleFieldList storing all the GRIB messages in memory.
[12]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream).to_fieldlist(read_all=True)
[13]:
len(ds)
[13]:
6
[14]:
ds.ls()
[14]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 2 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 3 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
| 4 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
| 5 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
[15]:
a = ds.sel({"parameter.variable": "t"})
a.ls()
[15]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
[16]:
a = a.to_xarray()
a
[16]:
<xarray.Dataset> Size: 2kB
Dimensions: (level: 2, latitude: 7, longitude: 12)
Coordinates:
* level (level) int64 16B 850 1000
* latitude (latitude) float64 56B 90.0 60.0 30.0 0.0 -30.0 -60.0 -90.0
* longitude (longitude) float64 96B 0.0 30.0 60.0 90.0 ... 270.0 300.0 330.0
Data variables:
t (level, latitude, longitude) float64 1kB ...
Attributes:
Conventions: CF-1.8
institution: ECMWFWe close the stream:
[17]:
stream.close()