Retrieving data from S3 buckets

The s3 data source provides access to Amazon S3 buckets.

In this example we will read GRIB data from a publicly available Amazon S3 bucket on the European Weather Cloud (EWC).

Getting a whole object

Disk based access

By default the data is downloaded and stored in the cache. Since we know that the bucket is public we use anon=True in from_source() to bypass the S3 authentication.

[1]:
import earthkit.data as ekd

req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
     "bucket": "earthkit-test-data-public",
     "objects": "test6.grib",
   }

ds = ekd.from_source("s3", req, anon=True)
ds.ls()
[1]:
centre shortName typeOfLevel level dataDate dataTime stepRange dataType number gridType
0 ecmf t isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
1 ecmf u isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
2 ecmf v isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
3 ecmf t isobaricInhPa 850 20180801 1200 0 an 0 regular_ll
4 ecmf u isobaricInhPa 850 20180801 1200 0 an 0 regular_ll
5 ecmf v isobaricInhPa 850 20180801 1200 0 an 0 regular_ll

Reading as a stream

We can read GRIB data from an S3 bucket as a stream without writing anything to disk. This can be activated by calling from_source() with stream=True. By default we get a stream iterator, which we can consume field by field.

[2]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
     "bucket": "earthkit-test-data-public",
     "objects": "test6.grib",
   }

ds = ekd.from_source("s3", req, stream=True, anon=True)

for f in ds:
    # f is GribField object. It gets deleted when going out of scope
    print(f)

GribField(t,1000,20180801,1200,0,0)
GribField(u,1000,20180801,1200,0,0)
GribField(v,1000,20180801,1200,0,0)
GribField(t,850,20180801,1200,0,0)
GribField(u,850,20180801,1200,0,0)
GribField(v,850,20180801,1200,0,0)

When we use the batched method we can iterate through the stream in batches of fixed size. E.g. the following code reads the GRIB data in messages of 2.

[3]:
ds = ekd.from_source("s3", req, stream=True, anon=True)

for f in ds.batched(2):
    # f is a fieldlist
    print(f"len={len(f)}")
    for g in f:
        print(f" {g}")
len=2
 GribField(t,1000,20180801,1200,0,0)
 GribField(u,1000,20180801,1200,0,0)
len=2
 GribField(v,1000,20180801,1200,0,0)
 GribField(t,850,20180801,1200,0,0)
len=2
 GribField(u,850,20180801,1200,0,0)
 GribField(v,850,20180801,1200,0,0)

With the read_all=True option we will load the whole object into memory.

[4]:
ds = ekd.from_source("s3", req, stream=True, read_all=True, anon=True)
ds.ls()
[4]:
centre shortName typeOfLevel level dataDate dataTime stepRange dataType number gridType
0 ecmf t isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
1 ecmf u isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
2 ecmf v isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
3 ecmf t isobaricInhPa 850 20180801 1200 0 an 0 regular_ll
4 ecmf u isobaricInhPa 850 20180801 1200 0 an 0 regular_ll
5 ecmf v isobaricInhPa 850 20180801 1200 0 an 0 regular_ll

Getting multiple objects

[5]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
     "bucket": "earthkit-test-data-public",
     "objects": ["test6.grib", "tuv_pl.grib"],
   }

ds = ekd.from_source("s3", req, anon=True)
len(ds)
[5]:
24

Getting parts of an object

We can specify the parts (byte ranges) we want to read. It works both in stream and non-stream mode.

[6]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
     "bucket": "earthkit-test-data-public",
     "objects": { "object": "test6.grib", "parts": (240, 480)},
   }

ds = ekd.from_source("s3", req, anon=True)
ds.ls()
[6]:
centre shortName typeOfLevel level dataDate dataTime stepRange dataType number gridType
0 ecmf u isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
1 ecmf v isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
[7]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
     "bucket": "earthkit-test-data-public",
     "objects": { "object": "test6.grib", "parts": [(0, 240), (480, 240)]},
   }

ds = ekd.from_source("s3", req, anon=True)
ds.ls()
[7]:
centre shortName typeOfLevel level dataDate dataTime stepRange dataType number gridType
0 ecmf t isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
1 ecmf v isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll

Getting parts of multiple objects

[8]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
     "bucket": "earthkit-test-data-public",
     "objects": [{"object": "test6.grib", "parts": (0,240)},
                 {"object": "tuv_pl.grib", "parts": (2400, 240)}],
   }

ds = ekd.from_source("s3", req, anon=True)
ds.ls()
[8]:
centre shortName typeOfLevel level dataDate dataTime stepRange dataType number gridType
0 ecmf t isobaricInhPa 1000 20180801 1200 0 an 0 regular_ll
1 ecmf u isobaricInhPa 500 20180801 1200 0 an 0 regular_ll

Using parts with a stream

The parts (byte ranges) still work when used with streams.

[9]:
req = {"endpoint": "object-store.os-api.cci1.ecmwf.int",
     "bucket": "earthkit-test-data-public",
     "objects": { "object": "test6.grib", "parts": (240, 480)},
   }


ds = ekd.from_source("s3", req, stream=True,  anon=True)

for f in ds:
    # f is GribField object. It gets deleted when going out of scope
    print(f)
GribField(u,1000,20180801,1200,0,0)
GribField(v,1000,20180801,1200,0,0)
[ ]: