I often have highly optimized code that I want to run independently for different parameters. For example, I might want to see how reconstruction quality varies as I change two parameters. My code takes a moderate amount of time to run, maybe 1 minute. This isn’t huge, but if I want to average performance over 5 random runs for $20^2$ different input combinations, using a naïve for-loop means about 1.5 days. Using dask.distributed, I distribute these independent jobs across different machines and different cores for a significant speedup.

Testing these input combinations requires at least one embarassingly simple for-loop – each iteration is run independently of the last iteration. The simplified example takes the form of

import numpy as np

def test_model(x):
    # ...
    return random.choice([0, 1])

y = [test_model(x) for x in np.linspace(0, 1)]

dask.distributed is a tool to optimize these for loops1. It can distribute a single loop of this for-loop onto different cores and different machines. This is perfect for me – as an grad student involved with the Wisconsin Institute for Discovery, I have a cluster of about 30 machines ready for my use.

I’ll first illustrate basic dask use then explain how I personally set it up on the cluster. I’ll then go over some advanced use that covers how to use it with the cluster at UW–Madison.

dask.distributed example

Using dask.distributed is easy, and the dask.distributed documentation is helpful. Functions for submitting jobs such as Client.map and Client.submit exist, and Client.gather exists to collect pending jobs.

In the use case above,

from distributed import Client
client = Client()  # will start scheduler and worker automatically

def test_model(x):
    # ...
    return random.choice([0, 1])

y = [client.submit(test_model, x) for x in np.linspace(0, 1)]
y = client.gather(y)  # collect the results

This is a setup – we have to add 3 lines and change 1. For the speedup that dask.distributed gives access to that’s remarkably simple.

Using Client() is easy – it starts a worker and scheduler for you. This only works for one machine though; other tools exist to use many machines as detailed on Setup Network.

That really covers all you need to know; the dask.distributed docs are decent and the above example is enough to get started. In what follows, I’ll explain my work flow: using dask.distributed on the UW–Madison optimization cluster with a Jupyter notebook.

Using dask.distributed with the UW cluster

After installing a personal Python install on the UW cluster, following dask.distributed’s Quickstart gets you 99% of the way to using dask.distributed on the UW Optimization Cluster. The dask.distributed documentation are rather complete – please, give them a look. Most of the content below can be found in Quickstart, Web interface and FAQ.

Setting up many workers on the cluster with many machines is a little trickier because the cluster is not my personal machine and I (thankfully) don’t manage it. I’ll describe how I use dask.distributed and what workarounds I had to find to get dask.distributed to run on the UW–Madison cluster. Additionally I’ll describe how I use this in conjunction with Jupyter notebooks.

I setup dask.distributed like below, using SSH port forwarding to view the web UI.

# visit localhost:8070/status to see dask's web UI
[email protected]$ ssh -L 8070:localhost:8787 [email protected]
[email protected]$ dask-scheduler
# `dask-scheduler` prints "INFO - Scheduler at 123.1.28.1:8786"
[email protected]$ export OMP_NUM_THREADS=1; dask-worker 123.1.28.1:8786
[email protected]$ export OMP_NUM_THREADS=1; dask-worker 123.1.28.1:8786

When I run dask-worker without setting OMP_NUM_THREADS, the worker throws an error and fails. Setting OMP_NUM_THREADS=1 resolves this issue, and see a SO question titled “Error with OMP_NUM_THREADS when using dask distributed” for more detail.

A nice tool to manage running the same commands on many machines is csshx for OS X’s Terminal.app (not iTerm) and cssh for linux (cssh stands for “Cluter SSH”).

I use tmux to handle my dask-scheduler and dask-workers. This allows me to

  • logout and not kill the processes I want running in the background.
  • view the output of both dask-scheduler and dask-worker even after logging out
  • always have a scheduler and workers available when I log in

This is enough to use dask.distributed on this cluster. Now, I’ll touch on how I use it with Jupyter notebooks using port forwarding. This allows me to quickly visualize the result on the cluster and provides a native editing environment.

Jupyter notebooks + dask.distributed

I also use dask.distributed with the Jupyter notebook, which provides a nice interface to view the results and edit the code. This means I don’t have to rsync my results to my local machine to visualize the results. Additionally, I feel like I’m editing on my local machine while editing code on this remote cluster.

[email protected]$ ssh -L 8080:localhost:8888 -L 8081:localhost:8787  [email protected]
[email protected]$ jupyter notebook
# on local machine, localhost:8080 views notebook running on the cluster

With the process above, I can quickly visualize results directly on the server. Even better, I can fully utilize the cluster and use as many machines as I wish.

With this, I can also view dask.distributed’s web UI. This allows me to see the progress of the jobs on the cluster; I can check to see how far I’ve come and how close I am to finishing.

Notebook Web UI

Actual use + visualization

Often times, I am finding model performance for different input combinations. During this I typically average the results by calling test_model many times.

In the example below, I show a personal use case of dask.distributed. In this, I include the method of visualization (which relies on pandas and seaborn).

import numpy as np
import seaborn as sns
import pandas as pd
from distributed import Client
from distributed.diagnostics import progress

def test_model(k, n, seed=42):
    np.random.seed(seed)
    # ...
    return {'sparsity': k, 'n_observations': n, 'success': 1 if error < 0.1 else 0}

client = Client('127.61.142.160:8786')
data = [client.submit(test_model, k, n, seed=repeat*n*k)
        for n in np.logspace(1, 3, num=60)
        for k in np.logspace(1, 1.7, num=40, dtype=int)
        for repeat in range(10)]
data = progress(data)  # allows notebook/console progress bar
data = client.gather(data)

df = pd.DataFrame(data)
show = df.pivot_table(index='sparsity', columns='n_observations', values='success')

sns.heatmap(show)
# small amount of matplotlib/seaborn code

This plot is $40\times 60$ with each job averaged over 10 trials. In total, that makes for $40 \cdot 60 \cdot 10 = 24\cdot 10^3$ jobs. This plot was generated on my local machine with 8 cores; at most, we can see a speedup of 8.

Further speedups

This approach only parallelizes different jobs, not tasks within that job. This means that if a core finishes quickly and another job isn’t available, that core sits empty and isn’t used.

For more details on this setup, see dask.distributed’s page on Related Work. Using any of these frameworks should allow for further speedups. I would recommend dask the most as it has dask.array and dask.DataFrame, parallel implementations of NumPy’s array and Panda’s DataFrame.

Additionally, dask also has a delayed function decorator. This allows running functions decorated with @delayed on all available cores of one machine. Of course, make you need to optimize before decorating a function.

Notes

  • I couldn’t include nested map functions or use dask.distributed’s joblib frontend inside a function submitted to dask.distributed as detailed in dask.distributed#465 and joblib#389. Note that pd.pivot_table alleviates many of these concerns as illustrated above.
  • The psuedo-random number generated the same random number. To get around this and generate different seeds for every iteration, I passed i_repeat * some_model_param as the seed.
  1. Of course before you optimize, be sure you need to optimize.