Usage

Here we walk through a typical user workflow. This assumes you’ve already installed the dask-gateway client library (see Installation) and have a dask-gateway-server running somewhere. For a completely local setup of both client and server (for demos, testing, etc…) see Install Locally (Quickstart) instead.

Connect to a dask-gateway server

Users interact with a dask-gateway-server via the dask-gateway client library. Typically a session starts by creating a Gateway client. This takes a few parameters:

  • address: The full address of the dask-gateway server.

  • proxy_address: The full address of the dask-gateway scheduler proxy. If not provided, this defaults to address.

  • auth: The authentication method to use

The values for each of these are specific to your deployment - consult your administrator for more information about your specific deployment.

from dask_gateway import Gateway

# -- Here we provide a few examples of creating a `Gateway` object --

# Gateway server running at http://mygateway.com with kerberos authentication
gateway = Gateway(
    address="http://mygateway.com",
    auth="kerberos"
)

# Gateway server at http://146.148.58.187, proxy at
# tls://35.202.68.87:8786, with JupyterHub authentication
gateway = Gateway(
     "http://146.148.58.187",
     proxy_address="tls://35.202.68.87:8786",
     auth="jupyterhub",
)

Typically these parameters are configured once in Configuration, at which point a Gateway object can be created with no parameters.

from dask_gateway import Gateway

# Use values stored in your local configuration (recommended)
gateway = Gateway()

To check that everything is setup properly, query the gateway to see any existing clusters. If this call completes, you should have a properly configured gateway client.

>>> gateway.list_clusters()
[]

Create a new cluster

To create a new cluster, you can use the Gateway.new_cluster() method. This will create a new cluster with no workers.

>>> cluster = gateway.new_cluster()
>>> cluster
GatewayCluster<6c14f41343ea462599f126818a14ebd2>

Alternatively, you can skip creating a Gateway object, and use the GatewayCluster constructor directly.

>>> from dask_gateway import GatewayCluster
# Create a new cluster using the GatewayCluster constructor directly
>>> cluster = GatewayCluster()

The choice between methods is largely preferential. If you need to interact with the gateway server only to create a new cluster then using GatewayCluster may be simpler. If you need to perform other operations (e.g. querying running clusters) then you may wish to use a Gateway instead.

Configure a cluster

Some dask-gateway-server deployments allow users to configure their clusters upon launching. Typical options may include specifying worker memory/cores or which docker image to use. To see which options (if any) your server supports you can use the Gateway.cluster_options() method.

>>> options = gateway.cluster_options()
>>> options
Options<worker_cores=1, worker_memory=1.0, environment='basic'>

This returns a Options object, which describes the options available. The options object is a MutableMapping, that also supports attribute access of fields.

# Both attribute and key access works
>>> options.worker_cores
1
>>> options["worker_cores"]
1

# Can change values using attribute or key access
>>> options.worker_cores = 2
>>> options.worker_cores
2

Note that validation of the fields is done both client and server-side. For example, if a limit has been set on a numeric field (e.g. max worker_cores), then a nice error will be raised if that limit is exceeded.

>>> options.worker_cores = 10
    Traceback (most recent call last):
            ...
    ValueError: worker_cores must be <= 4, got 10

If you’re working interactively in a Jupyter Notebook or JupyterLab and have ipywidgets installed, you can also use the provided widget for configuring your cluster.

Cluster options widget

Once Options object has the desired values set, you can pass this to Gateway.new_cluster() or GatewayCluster to use these values when creating a new cluster.

# Using Gateway.new_cluster
>>> cluster = gateway.new_cluster(options)

# Or using the GatewayCluster constructor
>>> cluster = GatewayCluster(cluster_options=options)

Alternatively, if you know the configurable options available on your dask-gateway-server deployment, you can pass in your values directly as keyword arguments to either method:

# Using Gateway.new_cluster
>>> cluster = gateway.new_cluster(worker_cores=2, environment="tensorflow")

# Or using the GatewayCluster constructor
>>> cluster = GatewayCluster(worker_cores=2, environment="tensorflow")

Scale up a cluster

To scale a cluster to one or more workers, you can use the GatewayCluster.scale() method. Here we scale our cluster up to two workers.

>>> cluster.scale(2)

If you’re working interactively in a Jupyter Notebook or JupyterLab and have ipywidgets installed, you can also use the provided widget to change the cluster size, instead of calling GatewayCluster.scale() programmatically.

Manual scaling widget

Enable adaptive scaling

Alternatively, you can use adaptive scaling to allow your cluster to scale up/down based on load. This can be useful in reducing resource usage only to what’s currently required for your workload. To enable adaptive scaling you can use the GatewayCluster.adapt() method.

# Adaptively scale between 2 and 10 workers
>>> cluster.adapt(minimum=2, maximum=10)

As with manual scaling above, if you’re working in a notebook environment you can also use the provided widget to enable adaptive scaling , instead of calling GatewayCluster.adapt() programmatically.

Manual scaling widget

If you wish to disable adaptive scaling later you can pass in active=False:

# Disable adaptive scaling
>>> cluster.adapt(active=False)

Connect to the cluster

To connect to the cluster so you can start doing work, you can use the GatewayCluster.get_client() method. This returns a dask.distributed.Client object.

>>> client = cluster.get_client()
>>> client
<Client: scheduler='tls://198.51.100.1:65252' processes=2 cores=2>

Run computations on the cluster

At this point you should be able to use normal dask methods to do work. For example, here we take the mean of a random array.

>>> import dask.array as da
>>> a = da.random.normal(size=(1000, 1000), chunks=(500, 500))
>>> a.mean().compute()
0.0022336223893512945

Shutdown the cluster

When you’re done using it, you can shutdown the cluster using the Cluster.shutdown() method. This will cleanly close all dask workers, as well as the scheduler.

>>> cluster.shutdown()

Note that when a GatewayCluster object is used as a context manager, shutdown will be called automatically on context exit:

with gateway.new_cluster() as cluster:
    client = cluster.get_client()
    # ...

Alternatively, lingering cluster objects will be automatically shutdown when your client process closes. It’s good practice to explicitly clean things up, but it’s not strictly necessary. If you want clusters to persist longer than the lifetime of your client process, set shutdown_on_close=False when calling Gateway.new_cluster().

Connect to an existing cluster

Alternatively, you can leave a cluster running and reconnect to it later. To do this, set shutdown_on_close=False when calling Gateway.new_cluster() - this allows Dask clusters to persist longer than the lifetime of your client process.

# Create a new cluster which will persist longer than the lifetime of the
# client process
cluster = gateway.new_cluster(shutdown_on_close=False)

To connect to a running cluster, you need the cluster’s name (a unique identifier). If you don’t already know it, you can see all running clusters using the Gateway.list_clusters() method.

>>> clusters = gateway.list_clusters()
>>> clusters
[ClusterReport<name=ce498e95403741118a8f418ee242e646, status=RUNNING>]

You can then connect to an existing cluster using the Gateway.connect() method.

>>> cluster = gateway.connect(clusters[0].name)
>>> cluster
GatewayCluster<ce498e95403741118a8f418ee242e646>