CrocoLake - Temperature map in the North West Atlantic#
This tutorial shows how to read and manipulate CrocoLake data. CrocoLake is stored in parquet format across multiple files for faster reading operations: we will load into memory only what we need by applying some filters, and we will create a map showing the temperature measurements in the North West Atlantic.
CrocoLake contains QCed data from different datasets (Argo, GLODAP, Spray Gliders as of today).
You can find more examples on the crocolake-python repository.
“Installing” CrocoLake#
CrocoLake is not a package but a dataset, thus it does not require any installation per se. To access the data with Python you’ll need the pandas and dask packages though. If you have installed CrocoCamp in a previous tutorial, you’re already set and you can use the same conda environment you have used before (instructions were provided also here). If you still have issues, look also at the crocolake-python repository, or contact enrico.milanese@whoi.edu.
Note on parquet files#
There are several ways to load parquet files in a dataframe in Python, and this notebook shows two of them:
pandas + pyarrow: this approach uses the pyarrow package to load the data into a pandas dataframe;
dask (+ pandas + pyarrow): this approach uses the dask package to load the data into a dask dataframe; it uses pandas and pyarrow under the hood, and a dask dataframe is (almost) identical to a pandas dataframe.
We will first use pyarrow to load the dataset, so that we can provide a target schema (containing the Argo variable names) and load the data consistently across floats, independently of what variables each float actually has. We will finally convert the dataset to a pandas dataframe for manipulation and plots.
Other ways to read parquet files are by using pandas (make sure you have pyarrow, fastparquet or some other suitable engine installed), and Dask. Generally speaking, you’ll want to use Dask only if you need a large amount of data at the same time so that you can benefit from its parallelization. You should avoid Dask whenever the data fits in your RAM.
When reading parquet files with pyarrow (or pandas), you can either specificy the file name(s), or the directory containing all the parquet files. In latter case if you apply any filter, pandas and pyarrow will sort through all the files in the folder, reading into memory only the subsets that satisfy your filter.
Getting started#
We then import the necessary modules and set up the path to the dataset (update the crocolake_path
variable below if you stored CrocoLake at a different location).
import datetime
import glob
from pprint import pprint
import pandas as pd
import pyarrow.parquet as pq
# Path to CrocoLake PHY
crocolake_path = '/glade/campaign/cgd/oce/projects/CROCODILE/workshops/2025/CrocoCamp/CrocoLakePHY/'
# Setting up parquet schema
crocolake_schema = pq.read_schema(crocolake_path+"_common_metadata")
# Geographical boundaries
lat0 = 30
lat1 = 45
lon0 = -85
lon1 = -60
pyarrow + pandas approach#
We now create a ParquetDataset
object that will allow us to read the data. Specifying a schema (that we read into crocolake_schema
in the previous cell) will make later operations faster.
We can get a peak at what variables are available in the dataset looking at its schema
attribute. Note that we have not load any data into memory yet (except for the schema).
dataset = pq.ParquetDataset(
crocolake_path,
schema=crocolake_schema
)
schema = dataset.schema
pprint(sorted(schema.names))
We now want to filter the dataset to load in memory only the data from the NWA (we set the values for lat0
,lat1
,lon0
,lon1
in the first cell) and recorded in the last 6 months.
The geographical coordinates are stored in the variables ‘LATITUDE’and ‘LONGITUDE’. We then generate the filter, with its syntax being: [[(column, op, val), …],…]
where column
is the variable name, and val
is the value to for the operator op
, which accepts [==, =, >, >=, <, <=, !=, in, not in]
,
date0 = datetime.datetime(2010, 1, 1, 0, 0, 0)
date1 = datetime.datetime(2020, 1, 1, 0, 0, 0)
filter_coords_time = [
("LATITUDE",">",lat0), ("LATITUDE","<",lat1),
("LONGITUDE",">",lon0), ("LONGITUDE","<",lon1),
("JULD",">",date0), ("JULD","<",date1)
]
To get a pandas dataframe, we re-generate a ParquetDataset
object adding the filters to it, and then we read it into a pandas dataframe with the read().to_pandas()
methods of the dataset.
NB: The following operation fetches a large amount of data (~1.6 GB), so you can reduce the number of days in the filter above if you cannot use this much memory.
%%time
ds = pq.ParquetDataset(crocolake_path, schema=crocolake_schema, filters=filter_coords_time)
df = ds.read().to_pandas()
df
You can explore the dataframe just by calling it (df
) as we did above. If you want a list of the variables that are stored, you can use sorted(df.columns.to_list())
. The following cell shows that we loaded roughly 2 GB of data.
memory_usage = df.memory_usage(deep=True).sum()/(1024**2)
print(f"DataFrame size: {memory_usage:.2f} MB")
If we now that we only need a subset of variables, we can specify them in the dataset’s read()
method.
For example, let’s say that we are interested in the adjusted measurements of the dissolved oxygen recorded in the past 6 months in the NWA. To exclude non valid and missing observations, we filter the temperature (TEMP) to be within a (large) range of valid values. Besides the oxygen, we also want to keep the spatial and time coordinates.
%%time
cols = ["DB_NAME","LATITUDE","LONGITUDE","PRES","JULD","TEMP"]
filter_coords_time_temp = [
("LATITUDE",">",lat0), ("LATITUDE","<",lat1),
("LONGITUDE",">",lon0), ("LONGITUDE","<",lon1),
("JULD",">",date0), ("JULD","<",date1),
("TEMP",">=",-1e30),("TEMP","<=",+1e30)
]
ds = pq.ParquetDataset(crocolake_path, schema=crocolake_schema, filters=filter_coords_time_temp)
df = ds.read(columns=cols).to_pandas()
df
This operation was much faster (3-4x faster on my machine) and loaded a smaller dataframe (~800 MB):
memory_usage = df.memory_usage(deep=True).sum()/(1024**2)
print(f"DataFrame size: {memory_usage:.2f} MB")
Map#
We can now produce a scatter plot as we’d normally do with pandas. For example, here is the average dissolved oxygen at each location in the dataframe that we loaded:
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
from matplotlib import colormaps
ref_var = "TEMP"
# Group by 'LATITUDE' and 'LONGITUDE' and database
df_for_map = df.groupby(["LATITUDE", "LONGITUDE", "DB_NAME"]).agg({
ref_var: 'mean' # Take the mean intensity at that coordinate
}).reset_index()
# Plotting using Cartopy
plt.figure(figsize=(16, 10))
ax = plt.axes(projection=ccrs.PlateCarree())
ax.coastlines()
cbar_min = df_for_map[ref_var].quantile(q=0.05)
cbar_max = df_for_map[ref_var].quantile(q=0.95)
# Group by DB_NAME and plot each group
df_for_map["DB_NAME"] = pd.Categorical(df_for_map["DB_NAME"], categories=["ARGO","GLODAP","SprayGliders"], ordered=True)
cmaps = iter( ["Greens","Oranges","Blues"] )
for db_name, group in df_for_map.groupby("DB_NAME",observed=False):
plt.scatter(
group['LONGITUDE'],
group['LATITUDE'],
s=10,
c=group[ref_var],
vmin=cbar_min,
vmax=cbar_max,
cmap=next(cmaps),
transform=ccrs.PlateCarree()
)
plt.colorbar(shrink=0.5, label='Average ' + ref_var + " (" + db_name + ")", pad=0.02)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('North-West Atlantic average temperature 2010-2019 (Celsius)')
plt.grid(True)
plt.xlim([lon0, lon1])
plt.ylim([lat0, lat1])
plt.show()
dask approach#
dask can be faster at reading parquet databases thanks to its parallel and lazy evaluation of operations.
When using dask, very few things change compared to using pandas and pyarrow: for example, dask dataframes are almost identical to pandas dataframes and indeed for our needs we will use the same syntax! The only difference is that dask does not evaluate the instructions immediately: it creates so-called delayed objects, through which it builds internally a graph of instructions optimized for the sequence of operations called. To trigger the actual computation we then need to call the compute()
method of the dask dataframe. This allows dask to handle larger-than-memory datasets, by reading into memory only the portions needed to perform the optimized set of instructions.
We start by importing a few extra modules.
import dask
import dask.dataframe as dd
We can use the same filters defined earlier, and we use read_parquet()
to filter the database and prescribe what we will need.
Note that:
unlike the pyarrow-pandas approach, here we provide both filters and columns in the same method;
we provide the schema
crocolake_schema
that we read during the set up of the excercisedask uses pyarrow under the hood (
engine
variable, other options are available)
%%time
date0 = datetime.datetime(2010, 1, 1, 0, 0, 0)
date1 = datetime.datetime(2020, 1, 1, 0, 0, 0)
cols = ["DB_NAME","LATITUDE","LONGITUDE","PRES","JULD","TEMP"]
filter_coords_time_temp = [
("LATITUDE",">",lat0), ("LATITUDE","<",lat1),
("LONGITUDE",">",lon0), ("LONGITUDE","<",lon1),
("JULD",">",date0), ("JULD","<",date1),
("TEMP",">=",-1e30),("TEMP","<=",+1e30)
]
ddf = dd.read_parquet(
crocolake_path,
engine="pyarrow",
schema=crocolake_schema,
columns= cols,
filters=filter_coords_time_temp
)
It appears dask is much faster! Yet, it actually creates a delayed object, i.e. ddf
contains the instructions that dask will later use to load the dataframe into memory. This allows us to use ddf
to schedule all the operations that we’d normally perform on a dataframe. Eventually we will call ddf.compute()
to actually evaluate the instructions.
If we look at ddf
, the dataframe will in fact appear empty:
ddf
We can explore its first entries with head()
, which loads into memory only the first 5 entries:
(Note: head()
can return an empty dataframe even when ddf
is not empty. This happens because head()
looks for the first 5 rows of the first partition, which sometimes happens to be empty. Repartitioning the dask dataframe will get rid of empty partitions and solve this.)
#ddf=ddf.repartition(partition_size="300MB")
ddf.head()
To load the whole dataframe, we call compute()
(this returns a pandas dataframe).
%%time
ddf_loaded = ddf.compute()
ddf_loaded
It took just a few hundreds milliseconds! So yes, dask can be much faster than just using pyarrow and pandas.
If we look into ddf_loaded
now, it will show the populated pandas dataframe.
(Note: if you compare ddf_loaded
with df
loaded with pyarrow, you’ll see that the rows are not in the same order, yet under the dataframe you can see that the total number of rows is the same. Also the index seems smaller, but dask holds the dataframe in multiple partitions, and the index is reset at every partition.)
dask’s main advantage is not just in loading the data faster, but in performing operations on larger-than-memory data. For example, it can compute the mean value of TEMPERATURE in the whole Argo PHY dataset without loading the whole data in memory (and pretty quickly, too):
# note that we are not passing any filter conditions or selection of columns to load
ddf_all = dd.read_parquet(
crocolake_path,
engine="pyarrow",
schema=crocolake_schema,
)
temp_mean = ddf_all['TEMP'].mean() # temp_mean is a delayed object: it knows what operations to perform to compute the mean,
# but doesn't perform them until we call compute() in the next cell
%%time
temp_mean.compute()
Map#
When producing the map with our dask approach, we would create the grouped
dataframe from the delayed dataframe ddf
, and compute()
it as late as possible.
In this example, the compute that we are triggering includes four operations:
reading the filtered dataset;
grouping the dataframe by geographical coordinates;
averaging by pressure and time;
resetting the index;
Dask internally builds a graph of all of these operations, so that it knows what subset of data is needed and it optimizes the number and sequence of instructions before executing them.
import cartopy.crs as ccrs
import matplotlib.pyplot as plt
from matplotlib import colormaps
ref_var = "TEMP"
# Group by 'LATITUDE' and 'LONGITUDE' and database
ddf_for_map = ddf.groupby(["LATITUDE", "LONGITUDE", "DB_NAME"]).agg({
ref_var: 'mean' # Take the mean intensity at that coordinate
}).reset_index()
# Plotting using Cartopy
plt.figure(figsize=(16, 10))
ax = plt.axes(projection=ccrs.PlateCarree())
ax.coastlines()
cbar_min = ddf_for_map[ref_var].quantile(q=0.05).compute()
cbar_max = ddf_for_map[ref_var].quantile(q=0.95).compute()
# Group by DB_NAME and plot each group
ddf['DB_NAME'] = ddf.DB_NAME.astype('category')
cmaps = iter( ["Greens","Oranges","Blues"] )
for db_name, group in ddf_for_map.compute().groupby("DB_NAME"):
plt.scatter(
group['LONGITUDE'],
group['LATITUDE'],
s=10,
c=group[ref_var],
vmin=cbar_min,
vmax=cbar_max,
cmap=next(cmaps),
transform=ccrs.PlateCarree()
)
plt.colorbar(shrink=0.5, label='Average ' + ref_var + " (" + db_name + ")", pad=0.02)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('North-West Atlantic average temperature 2010-2019 (Celsius)')
plt.grid(True)
plt.xlim([lon0, lon1])
plt.ylim([lat0, lat1])
plt.show()
Suggested exercises#
Try and access some other data, for example:
filtering by different time periods;
mapping a different parameter;
restraining the data of interest further by imposing the pressure PRES to be below 100db;
performing reads/manipulations that you would need to perform your tasks.
If you encounter any issues, please reach out!