Reading data from a stream
[1]:
import earthkit.data as ekd
earthkit-data can load GRIB data from a stream source, which can be an FDB stream, a standard Python IO stream or any object implementing the necessary stream methods.
For simplicity, in this notebook we will use a file stream to demonstrate the usage of streams. First, we ensure the example file containing 6 messages is available.
[2]:
ekd.download_example_file("test6.grib")
Getting single items from the stream
We create a stream from a file containing 6 GRIB fields by simply calling open(). It returns an io.BufferedReader object (a file stream).
[3]:
stream = open("test6.grib", "rb")
We load it into earthkit-data.
[4]:
ds = ekd.from_source("stream", stream)
Using this object we can iterate through the stream. As we progressing with the iteration GribField objects are created then get deleted when going out of scope. As a result, only one GRIB message is kept in memory at a time.
[5]:
for f in ds:
# f is GribField object. It gets deleted when going out of scope
print(f)
GribField(t,1000,20180801,1200,0,0)
GribField(u,1000,20180801,1200,0,0)
GribField(v,1000,20180801,1200,0,0)
GribField(t,850,20180801,1200,0,0)
GribField(u,850,20180801,1200,0,0)
GribField(v,850,20180801,1200,0,0)
Having finished the iteration there is no data available in ds. We can close the stream:
[6]:
stream.close()
Using batched
When we use the batched method we can iterate through the stream in batches of fixed size. In this example we create a stream and read 2 fields from it at a time.
[7]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream)
# f is a fieldlist
for f in ds.batched(2):
print(f"len={len(f)} {f.metadata(('param', 'level'))}")
len=2 [('t', 1000), ('u', 1000)]
len=2 [('v', 1000), ('t', 850)]
len=2 [('u', 850), ('v', 850)]
Having finished the iteration there is no data available in ds. We can close the stream:
[8]:
stream.close()
It is possible to use a batch size that is not a factor of the total number fields in the stream. In this case the last batch will simply contain less fields than the specified batch size.
[9]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream)
# f is a fieldlist
for f in ds.batched(4):
print(f"len={len(f)} {f.metadata(('param', 'level'))}")
len=4 [('t', 1000), ('u', 1000), ('v', 1000), ('t', 850)]
len=2 [('u', 850), ('v', 850)]
Using group_by
When we use the group_by method we can iterate throught the stream in groups defined by metadata keys. Each iteration step results in a FieldList object, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. The generated FieldList keeps GRIB messages in memory then gets deleted when going out of scope.
[10]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream)
# f is a fieldlist
for f in ds.group_by("level"):
print(f"len={len(f)} {f.metadata(('param', 'level'))}")
len=3 [('t', 1000), ('u', 1000), ('v', 1000)]
len=3 [('t', 850), ('u', 850), ('v', 850)]
Having finished the iteration there is no data available in ds. We can close the stream:
[11]:
stream.close()
Storing each GRIB message in memory
We can load the whole stream into memory by using read_all=True in from_source(). The resulting object will be a FieldList storing all the GRIB messages in memory.
[12]:
stream = open("test6.grib", "rb")
ds = ekd.from_source("stream", stream, read_all=True)
[13]:
len(ds)
[13]:
6
[14]:
ds.ls()
[14]:
| centre | shortName | typeOfLevel | level | dataDate | dataTime | stepRange | dataType | number | gridType | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | ecmf | t | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 1 | ecmf | u | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 2 | ecmf | v | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 3 | ecmf | t | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 4 | ecmf | u | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 5 | ecmf | v | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
[15]:
a = ds.sel(param="t")
a.ls()
[15]:
| centre | shortName | typeOfLevel | level | dataDate | dataTime | stepRange | dataType | number | gridType | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | ecmf | t | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 1 | ecmf | t | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
[16]:
a = a.to_xarray()
a
[16]:
<xarray.Dataset> Size: 2kB
Dimensions: (levelist: 2, latitude: 7, longitude: 12)
Coordinates:
* levelist (levelist) int64 16B 850 1000
* latitude (latitude) float64 56B 90.0 60.0 30.0 0.0 -30.0 -60.0 -90.0
* longitude (longitude) float64 96B 0.0 30.0 60.0 90.0 ... 270.0 300.0 330.0
Data variables:
t (levelist, latitude, longitude) float64 1kB ...
Attributes: (12/13)
param: t
paramId: 130
class: od
stream: oper
levtype: pl
type: an
... ...
date: 20180801
time: 1200
domain: g
number: 0
Conventions: CF-1.8
institution: ECMWFWe close the stream:
[17]:
stream.close()