Retrieving data from S3 buckets

The s3 data source provides access to Amazon S3 buckets.

In this example we will read GRIB data from a publicly available Amazon S3 bucket on the European Weather Cloud (EWC).

Getting a whole object

Disk based access

By default the data is downloaded and stored in the cache. Since we know that the bucket is public we use anon=True in from_source() to bypass the S3 authentication.

[1]:
import earthkit.data as ekd

req = {
    "endpoint": "object-store.os-api.cci1.ecmwf.int",
    "bucket": "earthkit-test-data-public",
    "objects": "test6.grib",
}

ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
ds.ls()
[1]:
parameter.variable time.valid_datetime time.base_datetime time.step vertical.level vertical.level_type ensemble.member geography.grid_type
0 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
1 u 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
2 v 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
3 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll
4 u 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll
5 v 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll

Reading as a stream

We can read GRIB data from an S3 bucket as a stream without writing anything to disk. This can be activated by calling from_source() with stream=True. By default we get a stream iterator, which we can consume field by field.

[2]:
req = {
    "endpoint": "object-store.os-api.cci1.ecmwf.int",
    "bucket": "earthkit-test-data-public",
    "objects": "test6.grib",
}

ds = ekd.from_source("s3", req, stream=True, anon=True).to_fieldlist()

for f in ds:
    # f is GribField object. It gets deleted when going out of scope
    print(f)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)

When we use the batched>() method we can iterate through the stream in batches of fixed size. E.g. the following code reads the GRIB data in messages of 2.

[3]:
ds = ekd.from_source("s3", req, stream=True, anon=True).to_fieldlist()

for f in ds.batched(2):
    # f is a fieldlist
    print(f"len={len(f)}")
    for g in f:
        print(f" {g}")
len=2
 Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
 Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
len=2
 Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
 Field(t, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
len=2
 Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)
 Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 850, pressure, 0, regular_ll)

With the read_all=True option we will load the whole object into memory.

[4]:
ds = ekd.from_source("s3", req, stream=True, anon=True).to_fieldlist(read_all=True)
ds.ls()
[4]:
parameter.variable time.valid_datetime time.base_datetime time.step vertical.level vertical.level_type ensemble.member geography.grid_type
0 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
1 u 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
2 v 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
3 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll
4 u 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll
5 v 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 850 pressure 0 regular_ll

Getting multiple objects

[5]:
req = {
    "endpoint": "object-store.os-api.cci1.ecmwf.int",
    "bucket": "earthkit-test-data-public",
    "objects": ["test6.grib", "tuv_pl.grib"],
}

ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
len(ds)
[5]:
24

Getting parts of an object

We can specify the parts (byte ranges) we want to read. It works both in stream and non-stream mode.

[6]:
req = {
    "endpoint": "object-store.os-api.cci1.ecmwf.int",
    "bucket": "earthkit-test-data-public",
    "objects": {"object": "test6.grib", "parts": (240, 480)},
}

ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
ds.ls()
[6]:
parameter.variable time.valid_datetime time.base_datetime time.step vertical.level vertical.level_type ensemble.member geography.grid_type
0 u 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
1 v 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
[7]:
req = {
    "endpoint": "object-store.os-api.cci1.ecmwf.int",
    "bucket": "earthkit-test-data-public",
    "objects": {"object": "test6.grib", "parts": [(0, 240), (480, 240)]},
}

ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
ds.ls()
[7]:
parameter.variable time.valid_datetime time.base_datetime time.step vertical.level vertical.level_type ensemble.member geography.grid_type
0 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
1 v 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll

Getting parts of multiple objects

[8]:
req = {
    "endpoint": "object-store.os-api.cci1.ecmwf.int",
    "bucket": "earthkit-test-data-public",
    "objects": [{"object": "test6.grib", "parts": (0, 240)}, {"object": "tuv_pl.grib", "parts": (2400, 240)}],
}

ds = ekd.from_source("s3", req, anon=True).to_fieldlist()
ds.ls()
[8]:
parameter.variable time.valid_datetime time.base_datetime time.step vertical.level vertical.level_type ensemble.member geography.grid_type
0 t 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 1000 pressure 0 regular_ll
1 u 2018-08-01 12:00:00 2018-08-01 12:00:00 0 days 500 pressure 0 regular_ll

Using parts with a stream

The parts (byte ranges) still work when used with streams.

[9]:
req = {
    "endpoint": "object-store.os-api.cci1.ecmwf.int",
    "bucket": "earthkit-test-data-public",
    "objects": {"object": "test6.grib", "parts": (240, 480)},
}


ds = ekd.from_source("s3", req, stream=True, anon=True).to_fieldlist()

for f in ds:
    # f is Field object. It gets deleted when going out of scope
    print(f)
Field(u, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
Field(v, 2018-08-01 12:00:00, 2018-08-01 12:00:00, 0:00:00, 1000, pressure, 0, regular_ll)
[ ]: