Reading files as a stream¶
earthkit-data can read GRIB data from a file as a stream. This can be activated with the stream=True kwarg when calling from_source().
First, we ensure the example data is available.
[1]:
import earthkit.data as ekd
ekd.download_example_file("test6.grib")
Getting single items from the stream¶
[2]:
ds_in = ekd.from_source("file", "test6.grib", stream=True)
ds_in
[2]:
| types | fieldlist |
To access the stream data we need to convert the data object into a stream fieldlist.
[3]:
ds = ds_in.to_fieldlist()
Using the resulting object we can iterate through the stream. As we progressing with the iteration Field objects are created then get deleted when going out of scope. As a result, only one GRIB message is kept in memory at a time.
[4]:
for f in ds:
# f is GribField object. It gets deleted when going out of scope
print(f)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Having finished the iteration there is no data available in ds.
[5]:
len([f in ds])
[5]:
1
Using batched¶
When we use the batched() method we can iterate through the stream in batches of fixed size. In this example we create a stream and read 2 fields from it at a time.
[6]:
ds = ekd.from_source("file", "test6.grib", stream=True).to_fieldlist()
# f is a fieldlist
for f in ds.batched(2):
print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=2 [['t', 1000], ['u', 1000]]
len=2 [['v', 1000], ['t', 850]]
len=2 [['u', 850], ['v', 850]]
It is possible to use a batch size that is not a factor of the total number fields in the stream. In this case the last batch will simply contain less fields than the specified batch size.
[7]:
ds = ekd.from_source("file", "test6.grib", stream=True).to_fieldlist()
# f is a fieldlist
for f in ds.batched(4):
print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=4 [['t', 1000], ['u', 1000], ['v', 1000], ['t', 850]]
len=2 [['u', 850], ['v', 850]]
Using group_by¶
When we use the group_by>() method we can iterate throught the stream in groups defined by metadata keys. Each iteration step results in a SimpleFieldList object, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. The generated FieldList keeps GRIB messages in memory then gets deleted when going out of scope.
[8]:
ds = ekd.from_source("file", "test6.grib", stream=True).to_fieldlist()
# f is a fieldlist
for f in ds.group_by("level"):
print(f"len={len(f)} {f.get(['parameter.variable', 'vertical.level'])}")
len=6 [['t', 1000], ['u', 1000], ['v', 1000], ['t', 850], ['u', 850], ['v', 850]]
Storing each GRIB message in memory¶
We can load the whole stream into memory by using read_all=True in to_fieldlist(). The resulting object will be a SimpleFieldList storing all the GRIB messages in memory.
[9]:
ds = ekd.from_source("file", "test6.grib", stream=True).to_fieldlist(read_all=True)
[10]:
len(ds)
[10]:
6
[11]:
ds.ls()
[11]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 2 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 3 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
| 4 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
| 5 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
[12]:
a = ds.sel({"parameter.variable": "t"})
a.ls()
[12]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
[13]:
a = a.to_xarray()
a
[13]:
<xarray.Dataset> Size: 2kB
Dimensions: (level: 2, latitude: 7, longitude: 12)
Coordinates:
* level (level) int64 16B 850 1000
* latitude (latitude) float64 56B 90.0 60.0 30.0 0.0 -30.0 -60.0 -90.0
* longitude (longitude) float64 96B 0.0 30.0 60.0 90.0 ... 270.0 300.0 330.0
Data variables:
t (level, latitude, longitude) float64 1kB ...
Attributes:
Conventions: CF-1.8
institution: ECMWF