# Create Rolling Averages with Dask

Having rolling average calculations with data that is distributed over a Dask cluster

“I need to calculate a rolling average of a numerical column, in time series data. In pandas, I can do this with rolling(x).mean() with sorted values, but what do I do in Dask, with distributed data?”

• Sort by index within AND across partitions
• Know when to compute (convert to pandas DF) or persist (process computations on cluster)
• Run calculations, with attention to our need to cross partitions correctly.

This example will walk you through these specific points, and demonstrate how it’s done. We’ll use New York City taxi trip data, and get the 30-day rolling average of base fare prices, for our example.

## Single Node

``````

timeseries.rolling('1D').mean().compute()
``````

## Cluster

``````from dask_saturn import SaturnCluster

cluster = SaturnCluster(
scheduler_size='medium',
worker_size='xlarge',
n_workers=3,
)
client = Client(cluster)
client
``````
``````import s3fs

s3 = s3fs.S3FileSystem(anon=True)
files_2019 = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv'
s3.glob(files_2019)
``````
``````import dask.dataframe as dd

files_2019,
parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
storage_options={'anon': True},
assume_missing=True,
)
``````
``````taxi = taxi.set_index("tpep_pickup_datetime")

taxi = taxi["2019-01-01": "2020-01-01"]
taxi = taxi.persist()
_ = wait(taxi)
``````
``````rolling_fares = taxi.fare_amount.rolling('30D').mean()
rolling_fares_df = rolling_fares.to_frame(name="fare_amount_rolled")
type(rolling_fares_df)
``````
``````taxi_new = taxi.join(rolling_fares_df, how='outer')
``````