Grid Search

Use Dask for grid search of a parameter space
Grid Search
Try this example in seconds on Saturn Cloud

Hyperparameter tuning is a crucial, and often painful, part of building machine learning models. Squeezing out each bit of performance from your model may mean the difference of millions of dollars in ad revenue or life-and-death for patients in healthcare models. Even if your model takes one minute to train, you can end up waiting hours for a grid search to complete (think a 10×10 grid, cross-validation, etc.). Each time you wait for a search to finish breaks an iteration cycle and increases the time it takes to produce value with your model. When using pandas, NumPy, and scikit-learn for model training, you can often speed up the grid search with the help of Dask.

There are several different scenarios that can arise when doing hyperparameter searching:

  1. The training data is small, and the parameter space is small - You do not need Dask, use scikit-learn.
  2. The training data is small, and the parameter space is large - Train the data with pandas/NumPy/scikit-learn, and use joblib and Dask for distributed parameter testing.
  3. The training data is large - Use the Dask-ML package, with classes that look and feel like scikit-learn, to spread operations across your Dask cluster.

We’ll illustrate the two Dask methods of scaling grid search using the famous NYC Taxi Dataset. This dataset contains information on taxi trips in New York City.

First, start the Dask cluster associated with your Saturn Cloud resource.

from dask_saturn import SaturnCluster
from dask.distributed import Client

client = Client(SaturnCluster())

After running the above command, it’s recommended that you check on the Saturn Cloud resource page that the Dask cluster as fully online before continuing. Alternatively, you can use the command client.wait_for_workers(3) to halt the notebook execution until all three of the workers are ready.

Joblib for small data and large parameter spaces

In this case, the training data and pipeline code remains in pandas/NumPy/scikit-learn. Scikit-learn has algorithms that support parallel execution via the n_jobs parameter, and GridSearchCV is one of them. By default, this parallelizes across all cores on a single machine using the Joblib library. Dask provides a Joblib backend that hooks into these scikit-learn algorithms to parallelize work across a Dask cluster. This enables us to pull in Dask just for the grid search.

import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV

The data is loaded into a pandas DataFrame from S3:

taxi = pd.read_csv(
    "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-05.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
)

The next chunk of code defines the features and cleans the data:

raw_features = [
    "tpep_pickup_datetime",
    "passenger_count",
    "tip_amount",
    "fare_amount",
]
features = [
    "pickup_weekday",
    "pickup_weekofyear",
    "pickup_hour",
    "pickup_week_hour",
    "pickup_minute",
    "passenger_count",
]
label = "tip_fraction"


def prep_df(taxi_df):
    """
    Generate features from a raw taxi dataframe.
    """
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df["pickup_weekday"] = df.tpep_pickup_datetime.dt.isocalendar().day
    df["pickup_weekofyear"] = df.tpep_pickup_datetime.dt.isocalendar().week
    df["pickup_hour"] = df.tpep_pickup_datetime.dt.hour
    df["pickup_week_hour"] = (df.pickup_weekday * 24) + df.pickup_hour
    df["pickup_minute"] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df


taxi_feat = prep_df(taxi)

The pipeline needs to be define for how to do the grid search:

pipeline = Pipeline(
    steps=[
        ("scale", StandardScaler()),
        ("clf", ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
    ]
)

params = {
    "clf__l1_ratio": np.arange(0, 1.1, 0.1),
    "clf__alpha": [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    n_jobs=-1,
    verbose=1,
    scoring="neg_mean_squared_error",
)

To execute the grid search in Dask we need to run inside a context manager for a Joblib backend. Besides that, we call the grid_search.fit() method the same way as you would when using scikit-learn in a non-distributed environment. When you run this cell, watch the Dask Dashboard to see the progress.

import joblib

with joblib.parallel_backend("dask"):
    _ = grid_search.fit(
        taxi_feat[features],
        taxi_feat[label],
    )

Note that using the Dask Joblib backend requires sending the DataFrame through the scheduler to all the workers, so make sure your scheduler has enough RAM to hold your dataset.

Dask-ML for large data and/or large parameter spaces

This version accelerates the grid search by using Dask DataFrames and Dask-ML’s GridSearchCV class. Dask-ML is a package in the Dask ecosystem that has its own parallel implementations of machine learning algorithms, written in a familiar scikit-learn-like API. This includes GridSearchCV and other hyperparameter search options. To use it, we load our data into a Dask DataFrame and use Dask ML’s preprocessing and model selection classes.

We begin by importing the required libraries. When setting up the training grid notice that we are still using scikit-learn’s ElasticNet class in this case, but now we are using dask_ml versions of some libraries. This will use Dask to do the pre-processing and grid search work, but use scikit-learn for the model fitting. This means that within a given Dask worker, the processed training dataset will be pulled down to a pandas DataFrame. In most cases, this is probably okay because the data will be small after processing. If the data is still too large, you can use one of Dask-ML’s estimators, such as LinearRegression.

import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import GridSearchCV

This time the data is read into a Dask DataFrame instead of a pandas one:

import dask.dataframe as dd

taxi_dd = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2020-05.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={"anon": True},
    assume_missing=True,
)

The data is cleaned similar to before only now it’s a Dask DataFrame being cleaned:

def prep_df(taxi_df):
    """
    Generate features from a raw taxi dataframe.
    """
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df["pickup_weekday"] = df.tpep_pickup_datetime.dt.isocalendar().day
    df["pickup_weekofyear"] = df.tpep_pickup_datetime.dt.isocalendar().week
    df["pickup_hour"] = df.tpep_pickup_datetime.dt.hour
    df["pickup_week_hour"] = (df.pickup_weekday * 24) + df.pickup_hour
    df["pickup_minute"] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df


taxi_feat_dd = prep_df(taxi_dd)

Again the pipeline is created in a similar manner as before:

pipeline = Pipeline(
    steps=[
        ("scale", StandardScaler()),
        ("clf", ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
    ]
)

params = {
    "clf__l1_ratio": np.arange(0, 1.1, 0.1),
    "clf__alpha": [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    scoring="neg_mean_squared_error",
)

Now we can run the grid search using the grid_search object defined above. It works the same way as scikit-learn’s GridSearchCV class.

_ = grid_search.fit(
    taxi_feat_dd[features],
    taxi_feat_dd[label],
)

These are a couple of ways to speed up grid search for machine learning with Dask. Check out our other examples for more ways to use Python on Saturn Cloud!

from dask_saturn import SaturnCluster
from dask.distributed import Client

client = Client(SaturnCluster())


import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV


taxi = pd.read_csv(
    "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-05.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
)


raw_features = [
    "tpep_pickup_datetime",
    "passenger_count",
    "tip_amount",
    "fare_amount",
]
features = [
    "pickup_weekday",
    "pickup_weekofyear",
    "pickup_hour",
    "pickup_week_hour",
    "pickup_minute",
    "passenger_count",
]
label = "tip_fraction"


def prep_df(taxi_df):
    """
    Generate features from a raw taxi dataframe.
    """
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df["pickup_weekday"] = df.tpep_pickup_datetime.dt.isocalendar().day
    df["pickup_weekofyear"] = df.tpep_pickup_datetime.dt.isocalendar().week
    df["pickup_hour"] = df.tpep_pickup_datetime.dt.hour
    df["pickup_week_hour"] = (df.pickup_weekday * 24) + df.pickup_hour
    df["pickup_minute"] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df


taxi_feat = prep_df(taxi)


pipeline = Pipeline(
    steps=[
        ("scale", StandardScaler()),
        ("clf", ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
    ]
)

params = {
    "clf__l1_ratio": np.arange(0, 1.1, 0.1),
    "clf__alpha": [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    n_jobs=-1,
    verbose=1,
    scoring="neg_mean_squared_error",
)


import joblib

with joblib.parallel_backend("dask"):
    _ = grid_search.fit(
        taxi_feat[features],
        taxi_feat[label],
    )


import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from dask_ml.preprocessing import StandardScaler
from dask_ml.model_selection import GridSearchCV


import dask.dataframe as dd

taxi_dd = dd.read_csv(
    "s3://nyc-tlc/trip data/yellow_tripdata_2020-05.csv",
    parse_dates=["tpep_pickup_datetime", "tpep_dropoff_datetime"],
    storage_options={"anon": True},
    assume_missing=True,
)


def prep_df(taxi_df):
    """
    Generate features from a raw taxi dataframe.
    """
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount

    df["pickup_weekday"] = df.tpep_pickup_datetime.dt.isocalendar().day
    df["pickup_weekofyear"] = df.tpep_pickup_datetime.dt.isocalendar().week
    df["pickup_hour"] = df.tpep_pickup_datetime.dt.hour
    df["pickup_week_hour"] = (df.pickup_weekday * 24) + df.pickup_hour
    df["pickup_minute"] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)

    return df


taxi_feat_dd = prep_df(taxi_dd)


pipeline = Pipeline(
    steps=[
        ("scale", StandardScaler()),
        ("clf", ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
    ]
)

params = {
    "clf__l1_ratio": np.arange(0, 1.1, 0.1),
    "clf__alpha": [0, 0.5, 1, 2],
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    scoring="neg_mean_squared_error",
)


_ = grid_search.fit(
    taxi_feat_dd[features],
    taxi_feat_dd[label],
)