Retrieving data from S3 buckets¶
The s3 data source provides access to Amazon S3 buckets.
In this example we will read GRIB data from a publicly available Amazon S3 bucket on the European Weather Cloud (EWC).
Getting a whole object¶
Disk based access¶
By default the data is downloaded and stored in the cache. Since we know that the bucket is public we use anon=True in from_source() to bypass the S3 authentication.
[1]:
import earthkit.data as ekd
req = {
"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": "test6.grib",
}
ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
ds.ls()
[1]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 2 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 3 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
| 4 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
| 5 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
Reading as a stream¶
We can read GRIB data from an S3 bucket as a stream without writing anything to disk. This can be activated by calling from_source() with stream=True. By default we get a stream iterator, which we can consume field by field.
[2]:
req = {
"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": "test6.grib",
}
ds = ekd.from_source("s3", req, stream=True, anon=True).to_fieldlist()
for f in ds:
# f is GribField object. It gets deleted when going out of scope
print(f)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
When we use the batched>() method we can iterate through the stream in batches of fixed size. E.g. the following code reads the GRIB data in messages of 2.
[3]:
ds = ekd.from_source("s3", req, stream=True, anon=True).to_fieldlist()
for f in ds.batched(2):
# f is a fieldlist
print(f"len={len(f)}")
for g in f:
print(f" {g}")
len=2
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
len=2
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
len=2
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
With the read_all=True option we will load the whole object into memory.
[4]:
ds = ekd.from_source("s3", req, stream=True, anon=True).to_fieldlist(read_all=True)
ds.ls()
[4]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 2 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 3 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
| 4 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
| 5 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 850 | pressure | 0 | regular_ll |
Getting multiple objects¶
[5]:
req = {
"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": ["test6.grib", "tuv_pl.grib"],
}
ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
len(ds)
[5]:
24
Getting parts of an object¶
We can specify the parts (byte ranges) we want to read. It works both in stream and non-stream mode.
[6]:
req = {
"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": {"object": "test6.grib", "parts": (240, 480)},
}
ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
ds.ls()
[6]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
[7]:
req = {
"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": {"object": "test6.grib", "parts": [(0, 240), (480, 240)]},
}
ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
ds.ls()
[7]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | v | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
Getting parts of multiple objects¶
[8]:
req = {
"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": [{"object": "test6.grib", "parts": (0, 240)}, {"object": "tuv_pl.grib", "parts": (2400, 240)}],
}
ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
ds.ls()
[8]:
| parameter.variable | time.valid_datetime | time.base_datetime | time.step | vertical.level | vertical.level_type | ensemble.member | geography.grid_type | |
|---|---|---|---|---|---|---|---|---|
| 0 | t | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 1000 | pressure | 0 | regular_ll |
| 1 | u | 2018-08-01 12:00:00 | 2018-08-01 12:00:00 | 0 days | 500 | pressure | 0 | regular_ll |
Using parts with a stream¶
The parts (byte ranges) still work when used with streams.
[9]:
req = {
"endpoint": "object-store.os-api.cci1.ecmwf.int",
"bucket": "earthkit-test-data-public",
"objects": {"object": "test6.grib", "parts": (240, 480)},
}
ds = ekd.from_source("s3", req, stream=True, anon=True).to_fieldlist()
for f in ds:
# f is Field object. It gets deleted when going out of scope
print(f)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
[ ]: