Launching tasks from workers with Dask
Dask recently surprised me with it’s flexibility in a recent use case, even more than the basic use detailed in a previous post. I’ll walk through my use case and the interesting problem it highlights. I’ll show a toy solution and point to the relevant parts of the Dask documentation.
Problem
I need to perform hyperparameter optimization on deep neural networks in my job. Most networks assume certain values are given, but actually finding those values is difficult and requires “hyperparameter optimization.” A scikitlearn documentation page has more detail on this issue.
For my use case, model evaluations take about 15 minutes to complete and I have to tune 4 different values for 18 different models. Grid search or random search (two popular methods for hyperparameter optimization) are infeasible because of they take too long for the resolution I desire^{1}.
I eventually gave up on tuning 4 parameters for each model, and decided to tune only 1 parameter per model. However, grid or random search would still take too long. These algorithms don’t adapt to previous evaluations and don’t use information they’ve already collected.
This one hyperparameter is pretty simple, and doesn’t require a complex search. A simple recursive function would do and could be implemented quickly^{2}. Either way, my implementation highlighted an interesting software problem.
Dask is the right tool for my setup, especially given the fact that one machine can only fit one model and I want to evaluate many models in parallel. I’ve illustrated my use case below with a Fibonacci function, which is the simplest example that illustrates how Dask can be used to solve my problem.
Toy solution
Both my hyperparameter search and this Fibonacci function are recursive^{3}. The solution I first started out with is
from distributed import Client
def fib(n):
if n <= 1:
return n
return fib(n  1) + fib(n  2)
if __name__ == "__main__":
client = Client()
x = [6, 7, 8, 9]
futures = client.map(fib, x)
y = client.gather(futures)
This only distributes the evaluations of the first call to fib
, not any
recursive calls to fib
. This means that fib(1)
is evaluated by every node,
and the scheduler never knows fib(1)
is evaluated.
But I want this recursive function to submit more jobs to the scheduler. That is, when each recursive function runs on a worker, I want that worker to submit more tasks to the scheduler.
Dask does support this, and it’s described on “Launching Tasks from Tasks”. It steps through the same example in more detail, and I encourage you to read it. Discovery of this simplified the way I approached the problem; I didn’t need to perform all task submission on the scheduler.
The core of the documentation page mentions that I can turn my function into
from distributed import get_client, secede, rejoin
def fib(n):
if n <= 1:
return n
client = get_client()
futures = client.map(fib, [n  1, n  2])
secede()
a, b = client.gather(futures)
rejoin()
return a + b
This functionality allows me to use my Amazon EC2 instances optimally. I can dedicate one instance to run the scheduler, and the rest of my spot instances can be workers. I can add workers freely, and removing workers will have a limited but not massive impact.

And especially with Amazon’s constraints on
p2.xlarge
machines ↩ 
If it wasn’t so simple or I had more hyperparameters, I would have needed one of the many other hyperparameter optimization algorithms. ↩

The Fibonacci function doesn’t have to be recursive: see my post “Applying eigenvalues to the Fibonacci problem” ↩
subscribe via RSS