Retrieving data from S3 buckets
The s3 data source provides access to Amazon S3 buckets.
In this example we will read GRIB data from a publicly available Amazon S3 bucket on the European Weather Cloud (EWC).
Getting a whole object
Disk based access
By default the data is downloaded and stored in the cache. Since we know that the bucket is public we use anon=True in from_source() to bypass the S3 authentication.
[1]:
import earthkit.data as ekd
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": "test6.grib",
}
ds = ekd.from_source("s3", req, anon=True)
ds.ls()
[1]:
| centre | shortName | typeOfLevel | level | dataDate | dataTime | stepRange | dataType | number | gridType | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | ecmf | t | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 1 | ecmf | u | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 2 | ecmf | v | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 3 | ecmf | t | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 4 | ecmf | u | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 5 | ecmf | v | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
Reading as a stream
We can read GRIB data from an S3 bucket as a stream without writing anything to disk. This can be activated by calling from_source() with stream=True. By default we get a stream iterator, which we can consume field by field.
[2]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": "test6.grib",
}
ds = ekd.from_source("s3", req, stream=True, anon=True)
for f in ds:
# f is GribField object. It gets deleted when going out of scope
print(f)
GribField(t,1000,20180801,1200,0,0)
GribField(u,1000,20180801,1200,0,0)
GribField(v,1000,20180801,1200,0,0)
GribField(t,850,20180801,1200,0,0)
GribField(u,850,20180801,1200,0,0)
GribField(v,850,20180801,1200,0,0)
When we use the batched method we can iterate through the stream in batches of fixed size. E.g. the following code reads the GRIB data in messages of 2.
[3]:
ds = ekd.from_source("s3", req, stream=True, anon=True)
for f in ds.batched(2):
# f is a fieldlist
print(f"len={len(f)}")
for g in f:
print(f" {g}")
len=2
GribField(t,1000,20180801,1200,0,0)
GribField(u,1000,20180801,1200,0,0)
len=2
GribField(v,1000,20180801,1200,0,0)
GribField(t,850,20180801,1200,0,0)
len=2
GribField(u,850,20180801,1200,0,0)
GribField(v,850,20180801,1200,0,0)
With the read_all=True option we will load the whole object into memory.
[4]:
ds = ekd.from_source("s3", req, stream=True, read_all=True, anon=True)
ds.ls()
[4]:
| centre | shortName | typeOfLevel | level | dataDate | dataTime | stepRange | dataType | number | gridType | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | ecmf | t | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 1 | ecmf | u | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 2 | ecmf | v | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 3 | ecmf | t | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 4 | ecmf | u | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 5 | ecmf | v | isobaricInhPa | 850 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
Getting multiple objects
[5]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": ["test6.grib", "tuv_pl.grib"],
}
ds = ekd.from_source("s3", req, anon=True)
len(ds)
[5]:
24
Getting parts of an object
We can specify the parts (byte ranges) we want to read. It works both in stream and non-stream mode.
[6]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": { "object": "test6.grib", "parts": (240, 480)},
}
ds = ekd.from_source("s3", req, anon=True)
ds.ls()
[6]:
| centre | shortName | typeOfLevel | level | dataDate | dataTime | stepRange | dataType | number | gridType | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | ecmf | u | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 1 | ecmf | v | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
[7]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": { "object": "test6.grib", "parts": [(0, 240), (480, 240)]},
}
ds = ekd.from_source("s3", req, anon=True)
ds.ls()
[7]:
| centre | shortName | typeOfLevel | level | dataDate | dataTime | stepRange | dataType | number | gridType | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | ecmf | t | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 1 | ecmf | v | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
Getting parts of multiple objects
[8]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": [{"object": "test6.grib", "parts": (0,240)},
{"object": "tuv_pl.grib", "parts": (2400, 240)}],
}
ds = ekd.from_source("s3", req, anon=True)
ds.ls()
[8]:
| centre | shortName | typeOfLevel | level | dataDate | dataTime | stepRange | dataType | number | gridType | |
|---|---|---|---|---|---|---|---|---|---|---|
| 0 | ecmf | t | isobaricInhPa | 1000 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
| 1 | ecmf | u | isobaricInhPa | 500 | 20180801 | 1200 | 0 | an | 0 | regular_ll |
Using parts with a stream
The parts (byte ranges) still work when used with streams.
[9]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": { "object": "test6.grib", "parts": (240, 480)},
}
ds = ekd.from_source("s3", req, stream=True, anon=True)
for f in ds:
# f is GribField object. It gets deleted when going out of scope
print(f)
GribField(u,1000,20180801,1200,0,0)
GribField(v,1000,20180801,1200,0,0)
[ ]: