{ "cells": [ { "cell_type": "markdown", "id": "recovered-organizer", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "## Reading data from a stream" ] }, { "cell_type": "code", "execution_count": 1, "id": "sticky-refrigerator", "metadata": {}, "outputs": [], "source": [ "import earthkit.data as ekd" ] }, { "attachments": {}, "cell_type": "raw", "id": "2da4f1e8-e4ac-489f-9695-72683c779496", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "earthkit-data can load GRIB data from a :ref:`stream ` source, which can be an FDB stream, a standard Python IO stream or any object implementing the necessary stream methods. \n", "\n", "For simplicity, in this notebook we will use a **file stream** to demonstrate the usage of streams. First, we ensure the example file containing 6 messages is available." ] }, { "cell_type": "code", "execution_count": 2, "id": "939a3b56-a434-457c-966f-f5b8bb6d74dc", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [], "source": [ "ekd.download_example_file(\"test6.grib\")" ] }, { "cell_type": "markdown", "id": "prescribed-giant", "metadata": {}, "source": [ "### Getting single items from the stream" ] }, { "cell_type": "markdown", "id": "affiliated-joint", "metadata": {}, "source": [ "We create a stream from a file containing 6 GRIB fields by simply calling *open()*. It returns an io.BufferedReader object (a file stream)." ] }, { "cell_type": "code", "execution_count": 3, "id": "verbal-damage", "metadata": {}, "outputs": [], "source": [ "stream = open(\"test6.grib\", \"rb\")" ] }, { "cell_type": "raw", "id": "86a95676-ed76-4d6f-a9bd-8ef8c2c9607c", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "We load it into earthkit-data." ] }, { "cell_type": "code", "execution_count": 4, "id": "durable-helicopter", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [], "source": [ "ds = ekd.from_source(\"stream\", stream)" ] }, { "cell_type": "raw", "id": "694df8fb-23a8-46dd-87c4-f9234e14d854", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "Using this object we can iterate through the stream. As we progressing with the iteration :py:class:`~data.readers.grib.codes.GribField` objects are created then get deleted when going out of scope. As a result, only one GRIB message is kept in memory at a time." ] }, { "cell_type": "code", "execution_count": 5, "id": "animated-prayer", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "GribField(t,1000,20180801,1200,0,0)\n", "GribField(u,1000,20180801,1200,0,0)\n", "GribField(v,1000,20180801,1200,0,0)\n", "GribField(t,850,20180801,1200,0,0)\n", "GribField(u,850,20180801,1200,0,0)\n", "GribField(v,850,20180801,1200,0,0)\n" ] } ], "source": [ "for f in ds:\n", " # f is GribField object. It gets deleted when going out of scope\n", " print(f)" ] }, { "cell_type": "markdown", "id": "brilliant-struggle", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "Having finished the iteration there is no data available in *ds*. We can close the stream:" ] }, { "cell_type": "code", "execution_count": 6, "id": "unique-metadata", "metadata": {}, "outputs": [], "source": [ "stream.close()" ] }, { "cell_type": "markdown", "id": "judicial-backing", "metadata": {}, "source": [ "### Using batched" ] }, { "cell_type": "raw", "id": "fb4528e4-f649-4a5b-92b4-e92e9391851b", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "When we use the :py:meth:`batched ` method we can iterate through the stream in batches of fixed size. In this example we create a stream and read 2 fields from it at a time." ] }, { "cell_type": "code", "execution_count": 7, "id": "placed-blues", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "len=2 [('t', 1000), ('u', 1000)]\n", "len=2 [('v', 1000), ('t', 850)]\n", "len=2 [('u', 850), ('v', 850)]\n" ] } ], "source": [ "stream = open(\"test6.grib\", \"rb\")\n", "ds = ekd.from_source(\"stream\", stream)\n", "\n", "# f is a fieldlist\n", "for f in ds.batched(2):\n", " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" ] }, { "cell_type": "markdown", "id": "unavailable-actress", "metadata": {}, "source": [ "Having finished the iteration there is no data available in *ds*. We can close the stream:" ] }, { "cell_type": "code", "execution_count": 8, "id": "jewish-season", "metadata": {}, "outputs": [], "source": [ "stream.close()" ] }, { "cell_type": "raw", "id": "ed0b0e9c-1016-474b-8ea0-c65d864d2427", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "It is possible to use a batch size that is not a factor of the total number fields in the stream. In this case the last batch will simply contain less fields than the specified batch size." ] }, { "cell_type": "code", "execution_count": 9, "id": "bf94a190-ec0e-4172-8e75-6518e48f50a4", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "len=4 [('t', 1000), ('u', 1000), ('v', 1000), ('t', 850)]\n", "len=2 [('u', 850), ('v', 850)]\n" ] } ], "source": [ "stream = open(\"test6.grib\", \"rb\")\n", "ds = ekd.from_source(\"stream\", stream)\n", "\n", "# f is a fieldlist\n", "for f in ds.batched(4):\n", " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" ] }, { "cell_type": "markdown", "id": "7122a9a4-9ca0-4d75-9194-144074c6dcad", "metadata": {}, "source": [ "### Using group_by" ] }, { "cell_type": "raw", "id": "d970d832-7203-498f-81d6-99434ce42b88", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "When we use the :py:meth:`group_by ` method we can iterate throught the stream in groups defined by metadata keys. Each iteration step results in a :py:class:`FieldList ` object, which is built by consuming GRIB messages from the stream until the values of the metadata keys change. The generated :py:class:`FieldList ` keeps GRIB messages in memory then gets deleted when going out of scope." ] }, { "cell_type": "code", "execution_count": 10, "id": "8e1be478-6eb6-4732-bb96-9d6fa942c20d", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "len=3 [('t', 1000), ('u', 1000), ('v', 1000)]\n", "len=3 [('t', 850), ('u', 850), ('v', 850)]\n" ] } ], "source": [ "stream = open(\"test6.grib\", \"rb\")\n", "ds = ekd.from_source(\"stream\", stream)\n", "\n", "# f is a fieldlist\n", "for f in ds.group_by(\"level\"):\n", " print(f\"len={len(f)} {f.metadata(('param', 'level'))}\")" ] }, { "cell_type": "markdown", "id": "89d33d4c-00d2-4f0b-996e-aaf5a5c9e161", "metadata": {}, "source": [ "Having finished the iteration there is no data available in *ds*. We can close the stream:" ] }, { "cell_type": "code", "execution_count": 11, "id": "2ac79f14-cb43-40c0-8a56-9bc16943d7e7", "metadata": {}, "outputs": [], "source": [ "stream.close()" ] }, { "cell_type": "markdown", "id": "permanent-uncertainty", "metadata": {}, "source": [ "### Storing each GRIB message in memory" ] }, { "cell_type": "raw", "id": "0b9b01c1-b528-42a2-9f1b-ccae28eb65b5", "metadata": { "editable": true, "raw_mimetype": "text/restructuredtext", "slideshow": { "slide_type": "" }, "tags": [] }, "source": [ "We can load the whole stream into memory by using ``read_all=True`` in :ref:`from_source() `. The resulting object will be a :py:class:`FieldList` storing all the GRIB messages in memory." ] }, { "cell_type": "code", "execution_count": 12, "id": "simple-london", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [], "source": [ "stream = open(\"test6.grib\", \"rb\")\n", "ds = ekd.from_source(\"stream\", stream, read_all=True)" ] }, { "cell_type": "code", "execution_count": 13, "id": "meaning-oxide", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "6" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "len(ds)" ] }, { "cell_type": "code", "execution_count": 14, "id": "copyrighted-walnut", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
centreshortNametypeOfLevelleveldataDatedataTimestepRangedataTypenumbergridType
0ecmftisobaricInhPa10002018080112000an0regular_ll
1ecmfuisobaricInhPa10002018080112000an0regular_ll
2ecmfvisobaricInhPa10002018080112000an0regular_ll
3ecmftisobaricInhPa8502018080112000an0regular_ll
4ecmfuisobaricInhPa8502018080112000an0regular_ll
5ecmfvisobaricInhPa8502018080112000an0regular_ll
\n", "
" ], "text/plain": [ " centre shortName typeOfLevel level dataDate dataTime stepRange \\\n", "0 ecmf t isobaricInhPa 1000 20180801 1200 0 \n", "1 ecmf u isobaricInhPa 1000 20180801 1200 0 \n", "2 ecmf v isobaricInhPa 1000 20180801 1200 0 \n", "3 ecmf t isobaricInhPa 850 20180801 1200 0 \n", "4 ecmf u isobaricInhPa 850 20180801 1200 0 \n", "5 ecmf v isobaricInhPa 850 20180801 1200 0 \n", "\n", " dataType number gridType \n", "0 an 0 regular_ll \n", "1 an 0 regular_ll \n", "2 an 0 regular_ll \n", "3 an 0 regular_ll \n", "4 an 0 regular_ll \n", "5 an 0 regular_ll " ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "ds.ls()" ] }, { "cell_type": "code", "execution_count": 15, "id": "static-reasoning", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
centreshortNametypeOfLevelleveldataDatedataTimestepRangedataTypenumbergridType
0ecmftisobaricInhPa10002018080112000an0regular_ll
1ecmftisobaricInhPa8502018080112000an0regular_ll
\n", "
" ], "text/plain": [ " centre shortName typeOfLevel level dataDate dataTime stepRange \\\n", "0 ecmf t isobaricInhPa 1000 20180801 1200 0 \n", "1 ecmf t isobaricInhPa 850 20180801 1200 0 \n", "\n", " dataType number gridType \n", "0 an 0 regular_ll \n", "1 an 0 regular_ll " ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "a = ds.sel(param=\"t\")\n", "a.ls()" ] }, { "cell_type": "code", "execution_count": 16, "id": "spanish-wagon", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
<xarray.Dataset> Size: 2kB\n",
       "Dimensions:    (levelist: 2, latitude: 7, longitude: 12)\n",
       "Coordinates:\n",
       "  * levelist   (levelist) int64 16B 850 1000\n",
       "  * latitude   (latitude) float64 56B 90.0 60.0 30.0 0.0 -30.0 -60.0 -90.0\n",
       "  * longitude  (longitude) float64 96B 0.0 30.0 60.0 90.0 ... 270.0 300.0 330.0\n",
       "Data variables:\n",
       "    t          (levelist, latitude, longitude) float64 1kB ...\n",
       "Attributes: (12/13)\n",
       "    param:        t\n",
       "    paramId:      130\n",
       "    class:        od\n",
       "    stream:       oper\n",
       "    levtype:      pl\n",
       "    type:         an\n",
       "    ...           ...\n",
       "    date:         20180801\n",
       "    time:         1200\n",
       "    domain:       g\n",
       "    number:       0\n",
       "    Conventions:  CF-1.8\n",
       "    institution:  ECMWF
" ], "text/plain": [ " Size: 2kB\n", "Dimensions: (levelist: 2, latitude: 7, longitude: 12)\n", "Coordinates:\n", " * levelist (levelist) int64 16B 850 1000\n", " * latitude (latitude) float64 56B 90.0 60.0 30.0 0.0 -30.0 -60.0 -90.0\n", " * longitude (longitude) float64 96B 0.0 30.0 60.0 90.0 ... 270.0 300.0 330.0\n", "Data variables:\n", " t (levelist, latitude, longitude) float64 1kB ...\n", "Attributes: (12/13)\n", " param: t\n", " paramId: 130\n", " class: od\n", " stream: oper\n", " levtype: pl\n", " type: an\n", " ... ...\n", " date: 20180801\n", " time: 1200\n", " domain: g\n", " number: 0\n", " Conventions: CF-1.8\n", " institution: ECMWF" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "a = a.to_xarray()\n", "a" ] }, { "cell_type": "markdown", "id": "contained-jackson", "metadata": {}, "source": [ "We close the stream:" ] }, { "cell_type": "code", "execution_count": 17, "id": "through-mistress", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [] }, "outputs": [], "source": [ "stream.close()" ] } ], "metadata": { "kernelspec": { "display_name": "dev_ecc", "language": "python", "name": "dev_ecc" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.13" } }, "nbformat": 4, "nbformat_minor": 5 }