Configuration Reference

Gateway Server

class dask_gateway_server.app.DaskGateway(**kwargs: Any)

A gateway for managing dask clusters across multiple users

address c.DaskGateway.address = Unicode('')

The address the private api server should listen at.

Should be of the form {hostname}:{port}

Where:

  • hostname sets the hostname to listen at. Set to "" or "0.0.0.0" to listen on all interfaces.

  • port sets the port to listen at.

Defaults to 127.0.0.1:0.

authenticator_class c.DaskGateway.authenticator_class = Type('dask_gateway_server.auth.SimpleAuthenticator')

The gateway authenticator class to use

backend_class c.DaskGateway.backend_class = Type('dask_gateway_server.backends.Backend')

The gateway backend class to use

config_file c.DaskGateway.config_file = Unicode('dask_gateway_config.py')

The config file to load

log_datefmt c.DaskGateway.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')

The date format used by logging formatters for %(asctime)s

log_format c.DaskGateway.log_format = Unicode('%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(reset)s %(message)s')

The Logging format template

log_level c.DaskGateway.log_level = Enum('INFO')

Set the log level by value or name.

show_config c.DaskGateway.show_config = Bool(False)

Instead of starting the Application, dump configuration to stdout

show_config_json c.DaskGateway.show_config_json = Bool(False)

Instead of starting the Application, dump configuration to stdout (as JSON)

Authentication

KerberosAuthenticator

class dask_gateway_server.auth.KerberosAuthenticator(**kwargs: Any)

An authenticator using kerberos

cache_max_age c.KerberosAuthenticator.cache_max_age = Int(300)

The maximum time in seconds to cache authentication information.

Helps reduce load on the backing authentication service by caching responses between requests. After this time the user will need to be reauthenticated before making additional requests (note this is usually transparent to the user).

cookie_name c.KerberosAuthenticator.cookie_name = Unicode('')

The cookie name to use for caching authentication information.

keytab c.KerberosAuthenticator.keytab = Unicode('dask_gateway.keytab')

The path to the keytab file

service_name c.KerberosAuthenticator.service_name = Unicode('HTTP')

The service’s kerberos principal name.

This is almost always “HTTP” (the default)

JupyterHubAuthenticator

class dask_gateway_server.auth.JupyterHubAuthenticator(**kwargs: Any)

An authenticator that uses JupyterHub to perform authentication

cache_max_age c.JupyterHubAuthenticator.cache_max_age = Int(300)

The maximum time in seconds to cache authentication information.

Helps reduce load on the backing authentication service by caching responses between requests. After this time the user will need to be reauthenticated before making additional requests (note this is usually transparent to the user).

cookie_name c.JupyterHubAuthenticator.cookie_name = Unicode('')

The cookie name to use for caching authentication information.

jupyterhub_api_token c.JupyterHubAuthenticator.jupyterhub_api_token = Unicode('')

Dask Gateway’s JupyterHub API Token, used for authenticating the gateway’s API requests to JupyterHub.

By default this is determined from the JUPYTERHUB_API_TOKEN environment variable.

jupyterhub_api_url c.JupyterHubAuthenticator.jupyterhub_api_url = Unicode('')

The API URL for the JupyterHub server.

By default this is determined from the JUPYTERHUB_API_URL environment variable.

tls_ca c.JupyterHubAuthenticator.tls_ca = Unicode('')

Path to TLS CA file for verifying API requests to JupyterHub.

When setting this, you should also set tls_key and tls_cert.

tls_cert c.JupyterHubAuthenticator.tls_cert = Unicode('')

Path to TLS certficate file for making API requests to JupyterHub.

When setting this, you should also set tls_cert.

tls_key c.JupyterHubAuthenticator.tls_key = Unicode('')

Path to TLS key file for making API requests to JupyterHub.

When setting this, you should also set tls_cert.

SimpleAuthenticator

class dask_gateway_server.auth.SimpleAuthenticator(**kwargs: Any)

A simple authenticator that uses Basic Auth.

This is highly insecure, use only for testing!!!

cache_max_age c.SimpleAuthenticator.cache_max_age = Int(300)

The maximum time in seconds to cache authentication information.

Helps reduce load on the backing authentication service by caching responses between requests. After this time the user will need to be reauthenticated before making additional requests (note this is usually transparent to the user).

cookie_name c.SimpleAuthenticator.cookie_name = Unicode('')

The cookie name to use for caching authentication information.

password c.SimpleAuthenticator.password = Unicode(None)

If set, a global password that all users must provide.

If unset (default), the password field is completely ignored.

Cluster Backends

Base Class

ClusterConfig

class dask_gateway_server.backends.base.ClusterConfig(**kwargs: Any)

Base class for holding individual Dask cluster configurations

adaptive_period c.ClusterConfig.adaptive_period = Float(3)

Time (in seconds) between adaptive scaling checks.

A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.

cluster_max_cores c.ClusterConfig.cluster_max_cores = Float(None)

The maximum number of cores available to this cluster.

Set to None for no cores limit (default).

cluster_max_memory c.ClusterConfig.cluster_max_memory = MemoryLimit(None)

The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

Set to None for no memory limit (default).

cluster_max_workers c.ClusterConfig.cluster_max_workers = Int(0)

The maximum number of workers available to this cluster.

Note that this will be combined with cluster_max_cores and cluster_max_memory at runtime to determine the actual maximum number of workers available to this cluster.

environment c.ClusterConfig.environment = Dict()

Environment variables to set for both the worker and scheduler processes.

idle_timeout c.ClusterConfig.idle_timeout = Float(0)

Time (in seconds) before an idle cluster is automatically shutdown.

Set to 0 (default) for no idle timeout.

scheduler_cmd c.ClusterConfig.scheduler_cmd = Command()

Shell command to start a dask scheduler.

scheduler_cores c.ClusterConfig.scheduler_cores = Int(1)

Number of cpu-cores available for a dask scheduler.

scheduler_memory c.ClusterConfig.scheduler_memory = MemoryLimit('2 G')

Number of bytes available for a dask scheduler. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_cmd c.ClusterConfig.worker_cmd = Command()

Shell command to start a dask worker.

worker_cores c.ClusterConfig.worker_cores = Int(1)

Number of cpu-cores available for a dask worker.

worker_memory c.ClusterConfig.worker_memory = MemoryLimit('2 G')

Number of bytes available for a dask worker. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_threads c.ClusterConfig.worker_threads = Int(0)

Number of threads available for a dask worker.

Defaults to worker_cores.

Backend

class dask_gateway_server.backends.base.Backend(**kwargs: Any)

Base class for defining dask-gateway backends.

Subclasses should implement the following methods:

  • setup

  • cleanup

  • start_cluster

  • stop_cluster

  • on_cluster_heartbeat

api_url c.Backend.api_url = Unicode('')

The address that internal components (e.g. dask clusters) will use when contacting the gateway.

cluster_config_class c.Backend.cluster_config_class = Type('dask_gateway_server.backends.base.ClusterConfig')

The cluster config class to use

cluster_options c.Backend.cluster_options = Union()

User options for configuring an individual cluster.

Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:

Exposing Cluster Options.

Local Processes

LocalClusterConfig

class dask_gateway_server.backends.local.LocalClusterConfig(**kwargs: Any)

Dask cluster configuration options when running as local processes

adaptive_period c.LocalClusterConfig.adaptive_period = Float(3)

Time (in seconds) between adaptive scaling checks.

A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.

cluster_max_cores c.LocalClusterConfig.cluster_max_cores = Float(None)

The maximum number of cores available to this cluster.

Set to None for no cores limit (default).

cluster_max_memory c.LocalClusterConfig.cluster_max_memory = MemoryLimit(None)

The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

Set to None for no memory limit (default).

cluster_max_workers c.LocalClusterConfig.cluster_max_workers = Int(0)

The maximum number of workers available to this cluster.

Note that this will be combined with cluster_max_cores and cluster_max_memory at runtime to determine the actual maximum number of workers available to this cluster.

environment c.LocalClusterConfig.environment = Dict()

Environment variables to set for both the worker and scheduler processes.

idle_timeout c.LocalClusterConfig.idle_timeout = Float(0)

Time (in seconds) before an idle cluster is automatically shutdown.

Set to 0 (default) for no idle timeout.

scheduler_cmd c.LocalClusterConfig.scheduler_cmd = Command()

Shell command to start a dask scheduler.

scheduler_cores c.LocalClusterConfig.scheduler_cores = Int(1)

Number of cpu-cores available for a dask scheduler.

scheduler_memory c.LocalClusterConfig.scheduler_memory = MemoryLimit('2 G')

Number of bytes available for a dask scheduler. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_cmd c.LocalClusterConfig.worker_cmd = Command()

Shell command to start a dask worker.

worker_cores c.LocalClusterConfig.worker_cores = Int(1)

Number of cpu-cores available for a dask worker.

worker_memory c.LocalClusterConfig.worker_memory = MemoryLimit('2 G')

Number of bytes available for a dask worker. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_threads c.LocalClusterConfig.worker_threads = Int(0)

Number of threads available for a dask worker.

Defaults to worker_cores.

LocalBackend

class dask_gateway_server.backends.local.LocalBackend(**kwargs: Any)

A cluster backend that launches local processes.

Requires super-user permissions in order to run processes for the requesting username.

api_url c.LocalBackend.api_url = Unicode('')

The address that internal components (e.g. dask clusters) will use when contacting the gateway.

Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.

backoff_base_delay c.LocalBackend.backoff_base_delay = Float(0.1)

Base delay (in seconds) for backoff when retrying after failures.

If an operation fails, it is retried after a backoff computed as:

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.LocalBackend.backoff_max_delay = Float(300)

Max delay (in seconds) for backoff policy when retrying after failures.

check_timeouts_period c.LocalBackend.check_timeouts_period = Float(0.0)

Time (in seconds) between timeout checks.

This shouldn’t be too small (to keep the overhead low), but should be smaller than cluster_heartbeat_timeout, cluster_start_timeout, and worker_start_timeout.

cluster_config_class c.LocalBackend.cluster_config_class = Type('dask_gateway_server.backends.local.LocalClusterConfig')

The cluster config class to use

cluster_heartbeat_period c.LocalBackend.cluster_heartbeat_period = Int(15)

Time (in seconds) between cluster heartbeats to the gateway.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

cluster_heartbeat_timeout c.LocalBackend.cluster_heartbeat_timeout = Float(0.0)

Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.

This should be greater than cluster_heartbeat_period. Defaults to 2 * cluster_heartbeat_period.

cluster_options c.LocalBackend.cluster_options = Union()

User options for configuring an individual cluster.

Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:

Exposing Cluster Options.

cluster_start_timeout c.LocalBackend.cluster_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask cluster.

cluster_status_period c.LocalBackend.cluster_status_period = Float(30)

Time (in seconds) between cluster status checks.

A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

clusters_directory c.LocalBackend.clusters_directory = Unicode('')

The base directory for cluster working directories.

A subdirectory will be created for each new cluster which will serve as the working directory for that cluster. On cluster shutdown the subdirectory will be removed.

If not specified, a temporary directory will be used for each cluster.

db_cleanup_period c.LocalBackend.db_cleanup_period = Float(600)

Time (in seconds) between database cleanup tasks.

This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than db_record_max_age (probably by an order of magnitude).

db_cluster_max_age c.LocalBackend.db_cluster_max_age = Float(86400)

Max time (in seconds) to keep around records of completed clusters.

Every db_cleanup_period, completed clusters older than db_cluster_max_age are removed from the database.

db_debug c.LocalBackend.db_debug = Bool(False)

If True, all database operations will be logged

db_encrypt_keys c.LocalBackend.db_encrypt_keys = List()

A list of keys to use to encrypt private data in the database. Can also be set by the environment variable DASK_GATEWAY_ENCRYPT_KEYS, where the value is a ; delimited string of encryption keys.

Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:

$ openssl rand -base64 32

A single key is valid, multiple keys can be used to support key rotation.

db_url c.LocalBackend.db_url = Unicode('sqlite:///:memory:')

The URL for the database. Default is in-memory only.

If not in-memory, db_encrypt_keys must also be set.

inherited_environment c.LocalBackend.inherited_environment = List()

Whitelist of environment variables for the scheduler and worker processes to inherit from the Dask-Gateway process.

parallelism c.LocalBackend.parallelism = Int(20)

Number of handlers to use for starting/stopping clusters.

sigint_timeout c.LocalBackend.sigint_timeout = Int(10)

Seconds to wait for process to stop after SIGINT.

If the process has not stopped after this time, a SIGTERM is sent.

sigkill_timeout c.LocalBackend.sigkill_timeout = Int(5)

Seconds to wait for process to stop after SIGKILL.

If the process has not stopped after this time, a warning is logged and the process is deemed a zombie process.

sigterm_timeout c.LocalBackend.sigterm_timeout = Int(5)

Seconds to wait for process to stop after SIGTERM.

If the process has not stopped after this time, a SIGKILL is sent.

stop_clusters_on_shutdown c.LocalBackend.stop_clusters_on_shutdown = Bool(True)

Whether to stop active clusters on gateway shutdown.

If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.

worker_start_failure_limit c.LocalBackend.worker_start_failure_limit = Int(3)

A limit on the number of failed attempts to start a worker before the cluster is marked as failed.

Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.

worker_start_timeout c.LocalBackend.worker_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask worker.

worker_status_period c.LocalBackend.worker_status_period = Float(30)

Time (in seconds) between worker status checks.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

UnsafeLocalBackend

class dask_gateway_server.backends.local.UnsafeLocalBackend(**kwargs: Any)

A version of LocalBackend that doesn’t set permissions.

FOR TESTING ONLY! This provides no user separations - clusters run with the same level of permission as the gateway.

api_url c.UnsafeLocalBackend.api_url = Unicode('')

The address that internal components (e.g. dask clusters) will use when contacting the gateway.

Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.

backoff_base_delay c.UnsafeLocalBackend.backoff_base_delay = Float(0.1)

Base delay (in seconds) for backoff when retrying after failures.

If an operation fails, it is retried after a backoff computed as:

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.UnsafeLocalBackend.backoff_max_delay = Float(300)

Max delay (in seconds) for backoff policy when retrying after failures.

check_timeouts_period c.UnsafeLocalBackend.check_timeouts_period = Float(0.0)

Time (in seconds) between timeout checks.

This shouldn’t be too small (to keep the overhead low), but should be smaller than cluster_heartbeat_timeout, cluster_start_timeout, and worker_start_timeout.

cluster_config_class c.UnsafeLocalBackend.cluster_config_class = Type('dask_gateway_server.backends.local.LocalClusterConfig')

The cluster config class to use

cluster_heartbeat_period c.UnsafeLocalBackend.cluster_heartbeat_period = Int(15)

Time (in seconds) between cluster heartbeats to the gateway.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

cluster_heartbeat_timeout c.UnsafeLocalBackend.cluster_heartbeat_timeout = Float(0.0)

Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.

This should be greater than cluster_heartbeat_period. Defaults to 2 * cluster_heartbeat_period.

cluster_options c.UnsafeLocalBackend.cluster_options = Union()

User options for configuring an individual cluster.

Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:

Exposing Cluster Options.

cluster_start_timeout c.UnsafeLocalBackend.cluster_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask cluster.

cluster_status_period c.UnsafeLocalBackend.cluster_status_period = Float(30)

Time (in seconds) between cluster status checks.

A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

clusters_directory c.UnsafeLocalBackend.clusters_directory = Unicode('')

The base directory for cluster working directories.

A subdirectory will be created for each new cluster which will serve as the working directory for that cluster. On cluster shutdown the subdirectory will be removed.

If not specified, a temporary directory will be used for each cluster.

db_cleanup_period c.UnsafeLocalBackend.db_cleanup_period = Float(600)

Time (in seconds) between database cleanup tasks.

This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than db_record_max_age (probably by an order of magnitude).

db_cluster_max_age c.UnsafeLocalBackend.db_cluster_max_age = Float(86400)

Max time (in seconds) to keep around records of completed clusters.

Every db_cleanup_period, completed clusters older than db_cluster_max_age are removed from the database.

db_debug c.UnsafeLocalBackend.db_debug = Bool(False)

If True, all database operations will be logged

db_encrypt_keys c.UnsafeLocalBackend.db_encrypt_keys = List()

A list of keys to use to encrypt private data in the database. Can also be set by the environment variable DASK_GATEWAY_ENCRYPT_KEYS, where the value is a ; delimited string of encryption keys.

Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:

$ openssl rand -base64 32

A single key is valid, multiple keys can be used to support key rotation.

db_url c.UnsafeLocalBackend.db_url = Unicode('sqlite:///:memory:')

The URL for the database. Default is in-memory only.

If not in-memory, db_encrypt_keys must also be set.

inherited_environment c.UnsafeLocalBackend.inherited_environment = List()

Whitelist of environment variables for the scheduler and worker processes to inherit from the Dask-Gateway process.

parallelism c.UnsafeLocalBackend.parallelism = Int(20)

Number of handlers to use for starting/stopping clusters.

sigint_timeout c.UnsafeLocalBackend.sigint_timeout = Int(10)

Seconds to wait for process to stop after SIGINT.

If the process has not stopped after this time, a SIGTERM is sent.

sigkill_timeout c.UnsafeLocalBackend.sigkill_timeout = Int(5)

Seconds to wait for process to stop after SIGKILL.

If the process has not stopped after this time, a warning is logged and the process is deemed a zombie process.

sigterm_timeout c.UnsafeLocalBackend.sigterm_timeout = Int(5)

Seconds to wait for process to stop after SIGTERM.

If the process has not stopped after this time, a SIGKILL is sent.

stop_clusters_on_shutdown c.UnsafeLocalBackend.stop_clusters_on_shutdown = Bool(True)

Whether to stop active clusters on gateway shutdown.

If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.

worker_start_failure_limit c.UnsafeLocalBackend.worker_start_failure_limit = Int(3)

A limit on the number of failed attempts to start a worker before the cluster is marked as failed.

Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.

worker_start_timeout c.UnsafeLocalBackend.worker_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask worker.

worker_status_period c.UnsafeLocalBackend.worker_status_period = Float(30)

Time (in seconds) between worker status checks.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

YARN

YarnClusterConfig

class dask_gateway_server.backends.yarn.YarnClusterConfig(**kwargs: Any)

Dask cluster configuration options when running on Hadoop/YARN

adaptive_period c.YarnClusterConfig.adaptive_period = Float(3)

Time (in seconds) between adaptive scaling checks.

A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.

cluster_max_cores c.YarnClusterConfig.cluster_max_cores = Float(None)

The maximum number of cores available to this cluster.

Set to None for no cores limit (default).

cluster_max_memory c.YarnClusterConfig.cluster_max_memory = MemoryLimit(None)

The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

Set to None for no memory limit (default).

cluster_max_workers c.YarnClusterConfig.cluster_max_workers = Int(0)

The maximum number of workers available to this cluster.

Note that this will be combined with cluster_max_cores and cluster_max_memory at runtime to determine the actual maximum number of workers available to this cluster.

environment c.YarnClusterConfig.environment = Dict()

Environment variables to set for both the worker and scheduler processes.

idle_timeout c.YarnClusterConfig.idle_timeout = Float(0)

Time (in seconds) before an idle cluster is automatically shutdown.

Set to 0 (default) for no idle timeout.

localize_files c.YarnClusterConfig.localize_files = Dict()

Extra files to distribute to both the worker and scheduler containers.

This is a mapping from local-name to resource. Resource paths can be local, or in HDFS (prefix with hdfs://... if so). If an archive (.tar.gz or .zip), the resource will be unarchived as directory local-name. For finer control, resources can also be specified as skein.File objects, or their dict equivalents.

This can be used to distribute conda/virtual environments by configuring the following:

c.YarnClusterConfig.localize_files = {
    'environment': {
        'source': 'hdfs:///path/to/archived/environment.tar.gz',
        'visibility': 'public'
    }
}
c.YarnClusterConfig.scheduler_setup = 'source environment/bin/activate'
c.YarnClusterConfig.worker_setup = 'source environment/bin/activate'

These archives are usually created using either conda-pack or venv-pack. For more information on distributing files, see https://jcristharif.com/skein/distributing-files.html.

queue c.YarnClusterConfig.queue = Unicode('default')

The YARN queue to submit applications under

scheduler_cmd c.YarnClusterConfig.scheduler_cmd = Command()

Shell command to start a dask scheduler.

scheduler_cores c.YarnClusterConfig.scheduler_cores = Int(1)

Number of cpu-cores available for a dask scheduler.

scheduler_memory c.YarnClusterConfig.scheduler_memory = MemoryLimit('2 G')

Number of bytes available for a dask scheduler. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

scheduler_setup c.YarnClusterConfig.scheduler_setup = Unicode('')

Script to run before dask scheduler starts.

worker_cmd c.YarnClusterConfig.worker_cmd = Command()

Shell command to start a dask worker.

worker_cores c.YarnClusterConfig.worker_cores = Int(1)

Number of cpu-cores available for a dask worker.

worker_memory c.YarnClusterConfig.worker_memory = MemoryLimit('2 G')

Number of bytes available for a dask worker. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_setup c.YarnClusterConfig.worker_setup = Unicode('')

Script to run before dask worker starts.

worker_threads c.YarnClusterConfig.worker_threads = Int(0)

Number of threads available for a dask worker.

Defaults to worker_cores.

YarnBackend

class dask_gateway_server.backends.yarn.YarnBackend(**kwargs: Any)

A cluster backend for managing dask clusters on Hadoop/YARN.

api_url c.YarnBackend.api_url = Unicode('')

The address that internal components (e.g. dask clusters) will use when contacting the gateway.

Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.

app_client_cache_max_size c.YarnBackend.app_client_cache_max_size = Int(10)

The max size of the cache for application clients.

A larger cache will result in improved performance, but will also use more resources.

backoff_base_delay c.YarnBackend.backoff_base_delay = Float(0.1)

Base delay (in seconds) for backoff when retrying after failures.

If an operation fails, it is retried after a backoff computed as:

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.YarnBackend.backoff_max_delay = Float(300)

Max delay (in seconds) for backoff policy when retrying after failures.

check_timeouts_period c.YarnBackend.check_timeouts_period = Float(0.0)

Time (in seconds) between timeout checks.

This shouldn’t be too small (to keep the overhead low), but should be smaller than cluster_heartbeat_timeout, cluster_start_timeout, and worker_start_timeout.

cluster_config_class c.YarnBackend.cluster_config_class = Type('dask_gateway_server.backends.yarn.YarnClusterConfig')

The cluster config class to use

cluster_heartbeat_period c.YarnBackend.cluster_heartbeat_period = Int(15)

Time (in seconds) between cluster heartbeats to the gateway.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

cluster_heartbeat_timeout c.YarnBackend.cluster_heartbeat_timeout = Float(0.0)

Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.

This should be greater than cluster_heartbeat_period. Defaults to 2 * cluster_heartbeat_period.

cluster_options c.YarnBackend.cluster_options = Union()

User options for configuring an individual cluster.

Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:

Exposing Cluster Options.

cluster_start_timeout c.YarnBackend.cluster_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask cluster.

cluster_status_period c.YarnBackend.cluster_status_period = Float(30)

Time (in seconds) between cluster status checks.

A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

db_cleanup_period c.YarnBackend.db_cleanup_period = Float(600)

Time (in seconds) between database cleanup tasks.

This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than db_record_max_age (probably by an order of magnitude).

db_cluster_max_age c.YarnBackend.db_cluster_max_age = Float(86400)

Max time (in seconds) to keep around records of completed clusters.

Every db_cleanup_period, completed clusters older than db_cluster_max_age are removed from the database.

db_debug c.YarnBackend.db_debug = Bool(False)

If True, all database operations will be logged

db_encrypt_keys c.YarnBackend.db_encrypt_keys = List()

A list of keys to use to encrypt private data in the database. Can also be set by the environment variable DASK_GATEWAY_ENCRYPT_KEYS, where the value is a ; delimited string of encryption keys.

Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:

$ openssl rand -base64 32

A single key is valid, multiple keys can be used to support key rotation.

db_url c.YarnBackend.db_url = Unicode('sqlite:///:memory:')

The URL for the database. Default is in-memory only.

If not in-memory, db_encrypt_keys must also be set.

keytab c.YarnBackend.keytab = Unicode(None)

Path to kerberos keytab for Dask Gateway user

parallelism c.YarnBackend.parallelism = Int(20)

Number of handlers to use for starting/stopping clusters.

principal c.YarnBackend.principal = Unicode(None)

Kerberos principal for Dask Gateway user

stop_clusters_on_shutdown c.YarnBackend.stop_clusters_on_shutdown = Bool(True)

Whether to stop active clusters on gateway shutdown.

If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.

worker_start_failure_limit c.YarnBackend.worker_start_failure_limit = Int(3)

A limit on the number of failed attempts to start a worker before the cluster is marked as failed.

Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.

worker_start_timeout c.YarnBackend.worker_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask worker.

worker_status_period c.YarnBackend.worker_status_period = Float(30)

Time (in seconds) between worker status checks.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

Kubernetes

KubeClusterConfig

class dask_gateway_server.backends.kubernetes.KubeClusterConfig(**kwargs: Any)

Configuration for a single Dask cluster running on kubernetes

adaptive_period c.KubeClusterConfig.adaptive_period = Float(3)

Time (in seconds) between adaptive scaling checks.

A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.

cluster_max_cores c.KubeClusterConfig.cluster_max_cores = Float(None)

The maximum number of cores available to this cluster.

Set to None for no cores limit (default).

cluster_max_memory c.KubeClusterConfig.cluster_max_memory = MemoryLimit(None)

The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

Set to None for no memory limit (default).

cluster_max_workers c.KubeClusterConfig.cluster_max_workers = Int(0)

The maximum number of workers available to this cluster.

Note that this will be combined with cluster_max_cores and cluster_max_memory at runtime to determine the actual maximum number of workers available to this cluster.

environment c.KubeClusterConfig.environment = Dict()

Environment variables to set for both the worker and scheduler processes.

idle_timeout c.KubeClusterConfig.idle_timeout = Float(0)

Time (in seconds) before an idle cluster is automatically shutdown.

Set to 0 (default) for no idle timeout.

image c.KubeClusterConfig.image = Unicode('daskgateway/dask-gateway:latest')

Docker image to use for running user’s containers.

image_pull_policy c.KubeClusterConfig.image_pull_policy = Unicode(None)

The image pull policy of the docker image specified in image

image_pull_secrets c.KubeClusterConfig.image_pull_secrets = List()

Image pull secrets to access private registries.

Should be a list of dictionaries with a single key called name to match the k8s native syntax.

namespace c.KubeClusterConfig.namespace = Unicode('default')

Kubernetes namespace to launch pods in.

If running inside a kubernetes cluster with service accounts enabled, defaults to the current namespace. If not, defaults to default

scheduler_cmd c.KubeClusterConfig.scheduler_cmd = Command()

Shell command to start a dask scheduler.

scheduler_cores c.KubeClusterConfig.scheduler_cores = Float(1)

Number of cpu-cores available for a dask scheduler.

scheduler_cores_limit c.KubeClusterConfig.scheduler_cores_limit = Float(0.0)

Maximum number of cpu-cores available for a dask scheduler.

Defaults to scheduler_cores.

scheduler_extra_container_config c.KubeClusterConfig.scheduler_extra_container_config = Dict()

Any extra configuration for the scheduler container.

This dict will be deep merged with the scheduler container (a V1Container object) before submission. Keys should match those in the kubernetes spec, and should be camelCase.

See worker_extra_container_config for more information.

scheduler_extra_pod_annotations c.KubeClusterConfig.scheduler_extra_pod_annotations = Dict()

Any extra annotations to be applied to a user’s scheduler pods.

These annotations can be set using with cluster options (See Exposing Cluster Options) to allow for injecting user-specific information, e.g. adding an annotation based on a user’s group or username.

This dict will be merged with common_annotations before being applied to user pods.

scheduler_extra_pod_config c.KubeClusterConfig.scheduler_extra_pod_config = Dict()

Any extra configuration for the scheduler pods.

This dict will be deep merged with the scheduler pod spec (a V1PodSpec object) before submission. Keys should match those in the kubernetes spec, and should be camelCase.

See worker_extra_pod_config for more information.

scheduler_extra_pod_labels c.KubeClusterConfig.scheduler_extra_pod_labels = Dict()

Any extra labels to be applied to a user’s scheduler pods.

These labels can be set using with cluster options (See Exposing Cluster Options) to allow for injecting user-specific information, e.g. adding a label based on a user’s group or username.

This dict will be merged with common_labels before being applied to user pods.

scheduler_memory c.KubeClusterConfig.scheduler_memory = MemoryLimit('2 G')

Number of bytes available for a dask scheduler. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

scheduler_memory_limit c.KubeClusterConfig.scheduler_memory_limit = MemoryLimit(0)

Maximum number of bytes available for a dask scheduler. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

Defaults to scheduler_memory.

worker_cmd c.KubeClusterConfig.worker_cmd = Command()

Shell command to start a dask worker.

worker_cores c.KubeClusterConfig.worker_cores = Float(1)

Number of cpu-cores available for a dask worker.

worker_cores_limit c.KubeClusterConfig.worker_cores_limit = Float(0.0)

Maximum number of cpu-cores available for a dask worker.

Defaults to worker_cores.

worker_extra_container_config c.KubeClusterConfig.worker_extra_container_config = Dict()

Any extra configuration for the worker container.

This dict will be deep merged with the worker container (a V1Container object) before submission. Keys should match those in the kubernetes spec, and should be camelCase.

For example, here we add environment variables from a secret to the worker container:

c.KubeClusterConfig.worker_extra_container_config = {
    "envFrom": [
        {"secretRef": {"name": "my-env-secret"}}
    ]
}
worker_extra_pod_annotations c.KubeClusterConfig.worker_extra_pod_annotations = Dict()

Any extra annotations to be applied to a user’s worker pods.

These annotations can be set using with cluster options (See Exposing Cluster Options) to allow for injecting user-specific information, e.g. adding an annotation based on a user’s group or username.

This dict will be merged with common_annotations before being applied to user pods.

worker_extra_pod_config c.KubeClusterConfig.worker_extra_pod_config = Dict()

Any extra configuration for the worker pods.

This dict will be deep merged with the worker pod spec (a V1PodSpec object) before submission. Keys should match those in the kubernetes spec, and should be camelCase.

For example, here we add a toleration to worker pods.

c.KubeClusterConfig.worker_extra_pod_config = {
    "tolerations": [
        {
            "key": "key",
            "operator": "Equal",
            "value": "value",
            "effect": "NoSchedule",
        }
    ]
}
worker_extra_pod_labels c.KubeClusterConfig.worker_extra_pod_labels = Dict()

Any extra labels to be applied to a user’s worker pods.

These labels can be set using with cluster options (See Exposing Cluster Options) to allow for injecting user-specific information, e.g. adding a label based on a user’s group or username.

This dict will be merged with common_labels before being applied to user pods.

worker_memory c.KubeClusterConfig.worker_memory = MemoryLimit('2 G')

Number of bytes available for a dask worker. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_memory_limit c.KubeClusterConfig.worker_memory_limit = MemoryLimit(0)

Maximum number of bytes available for a dask worker. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

Defaults to worker_memory.

worker_threads c.KubeClusterConfig.worker_threads = Int(0)

Number of threads available for a dask worker.

Defaults to worker_cores.

KubeBackend

class dask_gateway_server.backends.kubernetes.KubeBackend(**kwargs: Any)

A dask-gateway backend for running on Kubernetes

api_url c.KubeBackend.api_url = Unicode('')

The address that internal components (e.g. dask clusters) will use when contacting the gateway.

cluster_config_class c.KubeBackend.cluster_config_class = Type('dask_gateway_server.backends.kubernetes.backend.KubeClusterConfig')

The cluster config class to use

cluster_options c.KubeBackend.cluster_options = Union()

User options for configuring an individual cluster.

Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:

Exposing Cluster Options.

common_annotations c.KubeBackend.common_annotations = Dict()

Kubernetes annotations to apply to all objects created by the gateway

common_labels c.KubeBackend.common_labels = Dict()

Kubernetes labels to apply to all objects created by the gateway

crd_version c.KubeBackend.crd_version = Unicode('v1alpha1')

The version for the DaskCluster CRD

gateway_instance c.KubeBackend.gateway_instance = Unicode('')

A unique ID for this instance of dask-gateway.

The controller must also be configured with the same ID.

label_selector c.KubeBackend.label_selector = Unicode('')

The label selector to use when watching objects managed by the gateway.

KubeController

class dask_gateway_server.backends.kubernetes.controller.KubeController(**kwargs: Any)

Kubernetes controller for dask-gateway

address c.KubeController.address = Unicode(':8000')

The address the server should listen at

api_url c.KubeController.api_url = Unicode('')

The address that internal components (e.g. dask clusters) will use when contacting the gateway.

backoff_base_delay c.KubeController.backoff_base_delay = Float(0.1)

Base delay (in seconds) for backoff when retrying after failures.

If an operation fails, it is retried after a backoff computed as:

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.KubeController.backoff_max_delay = Float(300)

Max delay (in seconds) for backoff policy when retrying after failures.

common_annotations c.KubeController.common_annotations = Dict()

Kubernetes annotations to apply to all objects created by the gateway

common_labels c.KubeController.common_labels = Dict()

Kubernetes labels to apply to all objects created by the gateway

completed_cluster_cleanup_period c.KubeController.completed_cluster_cleanup_period = Float(600)

Time (in seconds) between cleanup tasks.

This sets how frequently old cluster records are deleted from kubernetes. This shouldn’t be too small (to keep the overhead low), but should be smaller than completed_cluster_max_age (probably by an order of magnitude).

completed_cluster_max_age c.KubeController.completed_cluster_max_age = Float(86400)

Max time (in seconds) to keep around records of completed clusters.

Every completed_cluster_cleanup_period, completed clusters older than completed_cluster_max_age are deleted from kubernetes.

config_file c.KubeController.config_file = Unicode('dask_gateway_config.py')

The config file to load

crd_version c.KubeController.crd_version = Unicode('v1alpha1')

The version for the DaskCluster CRD

gateway_instance c.KubeController.gateway_instance = Unicode('')

A unique ID for this instance of dask-gateway.

The controller must also be configured with the same ID.

k8s_api_rate_limit c.KubeController.k8s_api_rate_limit = Int(50)

Limit on the average number of k8s api calls per second.

k8s_api_rate_limit_burst c.KubeController.k8s_api_rate_limit_burst = Int(100)

Limit on the maximum number of k8s api calls per second.

label_selector c.KubeController.label_selector = Unicode('')

The label selector to use when watching objects managed by the gateway.

log_datefmt c.KubeController.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')

The date format used by logging formatters for %(asctime)s

log_format c.KubeController.log_format = Unicode('%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(reset)s %(message)s')

The Logging format template

log_level c.KubeController.log_level = Enum('INFO')

Set the log level by value or name.

parallelism c.KubeController.parallelism = Int(20)

Number of handlers to use for reconciling k8s objects.

proxy_prefix c.KubeController.proxy_prefix = Unicode('')

The path prefix the HTTP/HTTPS proxy should serve under.

This prefix will be prepended to all routes registered with the proxy.

proxy_tcp_entrypoint c.KubeController.proxy_tcp_entrypoint = Unicode('tcp')

The traefik entrypoint name to use when creating ingressroutetcps

proxy_web_entrypoint c.KubeController.proxy_web_entrypoint = Unicode('web')

The traefik entrypoint name to use when creating ingressroutes

proxy_web_middlewares c.KubeController.proxy_web_middlewares = List()

A list of middlewares to apply to web routes added to the proxy.

show_config c.KubeController.show_config = Bool(False)

Instead of starting the Application, dump configuration to stdout

show_config_json c.KubeController.show_config_json = Bool(False)

Instead of starting the Application, dump configuration to stdout (as JSON)

Job Queues

PBSClusterConfig

class dask_gateway_server.backends.jobqueue.pbs.PBSClusterConfig(**kwargs: Any)

Dask cluster configuration options when running on PBS

account c.PBSClusterConfig.account = Unicode('')

Accounting string associated with each job.

adaptive_period c.PBSClusterConfig.adaptive_period = Float(3)

Time (in seconds) between adaptive scaling checks.

A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.

cluster_max_cores c.PBSClusterConfig.cluster_max_cores = Float(None)

The maximum number of cores available to this cluster.

Set to None for no cores limit (default).

cluster_max_memory c.PBSClusterConfig.cluster_max_memory = MemoryLimit(None)

The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

Set to None for no memory limit (default).

cluster_max_workers c.PBSClusterConfig.cluster_max_workers = Int(0)

The maximum number of workers available to this cluster.

Note that this will be combined with cluster_max_cores and cluster_max_memory at runtime to determine the actual maximum number of workers available to this cluster.

environment c.PBSClusterConfig.environment = Dict()

Environment variables to set for both the worker and scheduler processes.

idle_timeout c.PBSClusterConfig.idle_timeout = Float(0)

Time (in seconds) before an idle cluster is automatically shutdown.

Set to 0 (default) for no idle timeout.

project c.PBSClusterConfig.project = Unicode('')

Project associated with each job.

queue c.PBSClusterConfig.queue = Unicode('')

The queue to submit jobs to.

scheduler_cmd c.PBSClusterConfig.scheduler_cmd = Command()

Shell command to start a dask scheduler.

scheduler_cores c.PBSClusterConfig.scheduler_cores = Int(1)

Number of cpu-cores available for a dask scheduler.

scheduler_memory c.PBSClusterConfig.scheduler_memory = MemoryLimit('2 G')

Number of bytes available for a dask scheduler. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

scheduler_resource_list c.PBSClusterConfig.scheduler_resource_list = Unicode('select=1:ncpus={cores}:mem={memory}')

The resource list to use for the scheduler.

This is a template, and receives the following fields:

  • cores

  • memory

scheduler_setup c.PBSClusterConfig.scheduler_setup = Unicode('')

Script to run before dask scheduler starts.

staging_directory c.PBSClusterConfig.staging_directory = Unicode('{home}/.dask-gateway/')

The staging directory for storing files before the job starts.

A subdirectory will be created for each new cluster which will store temporary files for that cluster. On cluster shutdown the subdirectory will be removed.

This field can be a template, which receives the following fields:

  • home (the user’s home directory)

  • username (the user’s name)

use_stagein c.PBSClusterConfig.use_stagein = Bool(True)

If true, the staging directory created above will be copied into the job working directories at runtime using the -Wstagein directive.

If the staging directory is on a networked filesystem, you can set this to False and rely on the networked filesystem for access.

worker_cmd c.PBSClusterConfig.worker_cmd = Command()

Shell command to start a dask worker.

worker_cores c.PBSClusterConfig.worker_cores = Int(1)

Number of cpu-cores available for a dask worker.

worker_memory c.PBSClusterConfig.worker_memory = MemoryLimit('2 G')

Number of bytes available for a dask worker. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_resource_list c.PBSClusterConfig.worker_resource_list = Unicode('select=1:ncpus={cores}:mem={memory}')

The resource list to use for the workers.

This is a template, and receives the following fields:

  • cores

  • memory

worker_setup c.PBSClusterConfig.worker_setup = Unicode('')

Script to run before dask worker starts.

worker_threads c.PBSClusterConfig.worker_threads = Int(0)

Number of threads available for a dask worker.

Defaults to worker_cores.

PBSBackend

class dask_gateway_server.backends.jobqueue.pbs.PBSBackend(**kwargs: Any)

A backend for deploying Dask on a PBS cluster.

api_url c.PBSBackend.api_url = Unicode('')

The address that internal components (e.g. dask clusters) will use when contacting the gateway.

Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.

backoff_base_delay c.PBSBackend.backoff_base_delay = Float(0.1)

Base delay (in seconds) for backoff when retrying after failures.

If an operation fails, it is retried after a backoff computed as:

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.PBSBackend.backoff_max_delay = Float(300)

Max delay (in seconds) for backoff policy when retrying after failures.

cancel_command c.PBSBackend.cancel_command = Unicode('')

The path to the job cancel command

check_timeouts_period c.PBSBackend.check_timeouts_period = Float(0.0)

Time (in seconds) between timeout checks.

This shouldn’t be too small (to keep the overhead low), but should be smaller than cluster_heartbeat_timeout, cluster_start_timeout, and worker_start_timeout.

cluster_config_class c.PBSBackend.cluster_config_class = Type('dask_gateway_server.backends.jobqueue.pbs.PBSClusterConfig')

The cluster config class to use

cluster_heartbeat_period c.PBSBackend.cluster_heartbeat_period = Int(15)

Time (in seconds) between cluster heartbeats to the gateway.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

cluster_heartbeat_timeout c.PBSBackend.cluster_heartbeat_timeout = Float(0.0)

Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.

This should be greater than cluster_heartbeat_period. Defaults to 2 * cluster_heartbeat_period.

cluster_options c.PBSBackend.cluster_options = Union()

User options for configuring an individual cluster.

Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:

Exposing Cluster Options.

cluster_start_timeout c.PBSBackend.cluster_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask cluster.

cluster_status_period c.PBSBackend.cluster_status_period = Float(30)

Time (in seconds) between cluster status checks.

A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

dask_gateway_jobqueue_launcher c.PBSBackend.dask_gateway_jobqueue_launcher = Unicode('')

The path to the dask-gateway-jobqueue-launcher executable

db_cleanup_period c.PBSBackend.db_cleanup_period = Float(600)

Time (in seconds) between database cleanup tasks.

This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than db_record_max_age (probably by an order of magnitude).

db_cluster_max_age c.PBSBackend.db_cluster_max_age = Float(86400)

Max time (in seconds) to keep around records of completed clusters.

Every db_cleanup_period, completed clusters older than db_cluster_max_age are removed from the database.

db_debug c.PBSBackend.db_debug = Bool(False)

If True, all database operations will be logged

db_encrypt_keys c.PBSBackend.db_encrypt_keys = List()

A list of keys to use to encrypt private data in the database. Can also be set by the environment variable DASK_GATEWAY_ENCRYPT_KEYS, where the value is a ; delimited string of encryption keys.

Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:

$ openssl rand -base64 32

A single key is valid, multiple keys can be used to support key rotation.

db_url c.PBSBackend.db_url = Unicode('sqlite:///:memory:')

The URL for the database. Default is in-memory only.

If not in-memory, db_encrypt_keys must also be set.

gateway_hostname c.PBSBackend.gateway_hostname = Unicode('')

The hostname of the node running the gateway. Used for referencing the local host in PBS directives.

parallelism c.PBSBackend.parallelism = Int(20)

Number of handlers to use for starting/stopping clusters.

status_command c.PBSBackend.status_command = Unicode('')

The path to the job status command

stop_clusters_on_shutdown c.PBSBackend.stop_clusters_on_shutdown = Bool(True)

Whether to stop active clusters on gateway shutdown.

If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.

submit_command c.PBSBackend.submit_command = Unicode('')

The path to the job submit command

worker_start_failure_limit c.PBSBackend.worker_start_failure_limit = Int(3)

A limit on the number of failed attempts to start a worker before the cluster is marked as failed.

Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.

worker_start_timeout c.PBSBackend.worker_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask worker.

worker_status_period c.PBSBackend.worker_status_period = Float(30)

Time (in seconds) between worker status checks.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

SlurmClusterConfig

class dask_gateway_server.backends.jobqueue.slurm.SlurmClusterConfig(**kwargs: Any)

Dask cluster configuration options when running on SLURM

account c.SlurmClusterConfig.account = Unicode('')

Account string associated with each job.

adaptive_period c.SlurmClusterConfig.adaptive_period = Float(3)

Time (in seconds) between adaptive scaling checks.

A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.

cluster_max_cores c.SlurmClusterConfig.cluster_max_cores = Float(None)

The maximum number of cores available to this cluster.

Set to None for no cores limit (default).

cluster_max_memory c.SlurmClusterConfig.cluster_max_memory = MemoryLimit(None)

The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

Set to None for no memory limit (default).

cluster_max_workers c.SlurmClusterConfig.cluster_max_workers = Int(0)

The maximum number of workers available to this cluster.

Note that this will be combined with cluster_max_cores and cluster_max_memory at runtime to determine the actual maximum number of workers available to this cluster.

environment c.SlurmClusterConfig.environment = Dict()

Environment variables to set for both the worker and scheduler processes.

idle_timeout c.SlurmClusterConfig.idle_timeout = Float(0)

Time (in seconds) before an idle cluster is automatically shutdown.

Set to 0 (default) for no idle timeout.

partition c.SlurmClusterConfig.partition = Unicode('')

The partition to submit jobs to.

qos c.SlurmClusterConfig.qos = Unicode('')

QOS string associated with each job.

scheduler_cmd c.SlurmClusterConfig.scheduler_cmd = Command()

Shell command to start a dask scheduler.

scheduler_cores c.SlurmClusterConfig.scheduler_cores = Int(1)

Number of cpu-cores available for a dask scheduler.

scheduler_memory c.SlurmClusterConfig.scheduler_memory = MemoryLimit('2 G')

Number of bytes available for a dask scheduler. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

scheduler_setup c.SlurmClusterConfig.scheduler_setup = Unicode('')

Script to run before dask scheduler starts.

staging_directory c.SlurmClusterConfig.staging_directory = Unicode('{home}/.dask-gateway/')

The staging directory for storing files before the job starts.

A subdirectory will be created for each new cluster which will store temporary files for that cluster. On cluster shutdown the subdirectory will be removed.

This field can be a template, which receives the following fields:

  • home (the user’s home directory)

  • username (the user’s name)

worker_cmd c.SlurmClusterConfig.worker_cmd = Command()

Shell command to start a dask worker.

worker_cores c.SlurmClusterConfig.worker_cores = Int(1)

Number of cpu-cores available for a dask worker.

worker_memory c.SlurmClusterConfig.worker_memory = MemoryLimit('2 G')

Number of bytes available for a dask worker. Allows the following suffixes:

  • K -> Kibibytes

  • M -> Mebibytes

  • G -> Gibibytes

  • T -> Tebibytes

worker_setup c.SlurmClusterConfig.worker_setup = Unicode('')

Script to run before dask worker starts.

worker_threads c.SlurmClusterConfig.worker_threads = Int(0)

Number of threads available for a dask worker.

Defaults to worker_cores.

SlurmBackend

class dask_gateway_server.backends.jobqueue.slurm.SlurmBackend(**kwargs: Any)

A backend for deploying Dask on a Slurm cluster.

api_url c.SlurmBackend.api_url = Unicode('')

The address that internal components (e.g. dask clusters) will use when contacting the gateway.

Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.

backoff_base_delay c.SlurmBackend.backoff_base_delay = Float(0.1)

Base delay (in seconds) for backoff when retrying after failures.

If an operation fails, it is retried after a backoff computed as:

` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `

backoff_max_delay c.SlurmBackend.backoff_max_delay = Float(300)

Max delay (in seconds) for backoff policy when retrying after failures.

cancel_command c.SlurmBackend.cancel_command = Unicode('')

The path to the job cancel command

check_timeouts_period c.SlurmBackend.check_timeouts_period = Float(0.0)

Time (in seconds) between timeout checks.

This shouldn’t be too small (to keep the overhead low), but should be smaller than cluster_heartbeat_timeout, cluster_start_timeout, and worker_start_timeout.

cluster_config_class c.SlurmBackend.cluster_config_class = Type('dask_gateway_server.backends.jobqueue.slurm.SlurmClusterConfig')

The cluster config class to use

cluster_heartbeat_period c.SlurmBackend.cluster_heartbeat_period = Int(15)

Time (in seconds) between cluster heartbeats to the gateway.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

cluster_heartbeat_timeout c.SlurmBackend.cluster_heartbeat_timeout = Float(0.0)

Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.

This should be greater than cluster_heartbeat_period. Defaults to 2 * cluster_heartbeat_period.

cluster_options c.SlurmBackend.cluster_options = Union()

User options for configuring an individual cluster.

Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:

Exposing Cluster Options.

cluster_start_timeout c.SlurmBackend.cluster_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask cluster.

cluster_status_period c.SlurmBackend.cluster_status_period = Float(30)

Time (in seconds) between cluster status checks.

A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

dask_gateway_jobqueue_launcher c.SlurmBackend.dask_gateway_jobqueue_launcher = Unicode('')

The path to the dask-gateway-jobqueue-launcher executable

db_cleanup_period c.SlurmBackend.db_cleanup_period = Float(600)

Time (in seconds) between database cleanup tasks.

This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than db_record_max_age (probably by an order of magnitude).

db_cluster_max_age c.SlurmBackend.db_cluster_max_age = Float(86400)

Max time (in seconds) to keep around records of completed clusters.

Every db_cleanup_period, completed clusters older than db_cluster_max_age are removed from the database.

db_debug c.SlurmBackend.db_debug = Bool(False)

If True, all database operations will be logged

db_encrypt_keys c.SlurmBackend.db_encrypt_keys = List()

A list of keys to use to encrypt private data in the database. Can also be set by the environment variable DASK_GATEWAY_ENCRYPT_KEYS, where the value is a ; delimited string of encryption keys.

Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:

$ openssl rand -base64 32

A single key is valid, multiple keys can be used to support key rotation.

db_url c.SlurmBackend.db_url = Unicode('sqlite:///:memory:')

The URL for the database. Default is in-memory only.

If not in-memory, db_encrypt_keys must also be set.

parallelism c.SlurmBackend.parallelism = Int(20)

Number of handlers to use for starting/stopping clusters.

status_command c.SlurmBackend.status_command = Unicode('')

The path to the job status command

stop_clusters_on_shutdown c.SlurmBackend.stop_clusters_on_shutdown = Bool(True)

Whether to stop active clusters on gateway shutdown.

If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.

submit_command c.SlurmBackend.submit_command = Unicode('')

The path to the job submit command

worker_start_failure_limit c.SlurmBackend.worker_start_failure_limit = Int(3)

A limit on the number of failed attempts to start a worker before the cluster is marked as failed.

Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.

worker_start_timeout c.SlurmBackend.worker_start_timeout = Float(60)

Timeout (in seconds) before giving up on a starting dask worker.

worker_status_period c.SlurmBackend.worker_status_period = Float(30)

Time (in seconds) between worker status checks.

A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.

Proxy

Proxy

class dask_gateway_server.proxy.Proxy(**kwargs: Any)

The dask-gateway-server proxy

address c.Proxy.address = Unicode(':8000')

The address the HTTP/HTTPS proxy should listen at.

Should be of the form {hostname}:{port}.

Where:

  • hostname sets the hostname to listen at. Set to "" or "0.0.0.0" to listen on all interfaces.

  • port sets the port to listen at.

api_token c.Proxy.api_token = Unicode('')

The Proxy api token

A 32 byte hex-encoded random string. Commonly created with the openssl CLI:

$ openssl rand -hex 32

Loaded from the DASK_GATEWAY_PROXY_TOKEN env variable by default.

externally_managed c.Proxy.externally_managed = Bool(False)

Whether the proxy process is externally managed.

If False (default), the proxy process will be started and stopped by the gateway process. Set to True if the proxy will be started via some external manager (e.g. supervisord).

gateway_url c.Proxy.gateway_url = Unicode('')

The base URL the proxy should use for connecting to the gateway server

log_level c.Proxy.log_level = CaselessStrEnum('warn')

The proxy log-level.

max_events c.Proxy.max_events = Int(100)

The maximum number of events (proxy changes) to retain.

A proxy server that lags by more than this number will have to do a full refesh.

prefix c.Proxy.prefix = Unicode('')

The path prefix the HTTP/HTTPS proxy should serve under.

This prefix will be prepended to all routes registered with the proxy.

proxy_status_period c.Proxy.proxy_status_period = Float(30)

Time (in seconds) between proxy status checks.

Only applies when externally_managed is False.

tcp_address c.Proxy.tcp_address = Unicode('')

The address the TCP (scheduler) proxy should listen at.

Should be of the form {hostname}:{port}

Where:

  • hostname sets the hostname to listen at. Set to "" or "0.0.0.0" to listen on all interfaces.

  • port sets the port to listen at.

If not specified, will default to address.

tls_cert c.Proxy.tls_cert = Unicode('')

Path to TLS certificate file for the public url of the HTTP proxy.

When setting this, you should also set tls_key.

tls_key c.Proxy.tls_key = Unicode('')

Path to TLS key file for the public url of the HTTP proxy.

When setting this, you should also set tls_cert.

Cluster Manager Options

class dask_gateway_server.options.Options(*fields, handler=None)

A declarative specification of exposed cluster options.

Parameters
  • *fields (Field) – Zero or more configurable fields.

  • handler (callable, optional) – A callable with the signature handler(options) or handler(options, user), where options is the validated dict of user options, and user is a User model for that user. Should return a dict of configuration overrides to forward to the cluster manager. If not provided, the default will return the options unchanged.

Example

Here we expose options for users to configure c.Backend.worker_cores and c.Backend.worker_memory. We set bounds on each resource to prevent users from requesting too large of a worker. The handler is used to convert the user specified memory from GiB to bytes (as expected by c.Backend.worker_memory).

from dask_gateway_server.options import Options, Integer, Float

def options_handler(options):
    return {
        "worker_cores": options.worker_cores,
        "worker_memory": int(options.worker_memory * 2 ** 30)
    }

c.Backend.DaskGateway.cluster_options = Options(
    Integer("worker_cores", default=1, min=1, max=4, label="Worker Cores"),
    Float("worker_memory", default=1, min=1, max=8, label="Worker Memory (GiB)"),
    handler=options_handler,
)
class dask_gateway_server.options.Integer(field, default=0, min=None, max=None, label=None, target=None)

An integer field, with optional bounds.

Parameters
  • field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g. "worker_cores").

  • default (int, optional) – The default value. Default is 0.

  • min (int, optional) – The minimum valid value (inclusive). Unbounded if not set.

  • max (int, optional) – The maximum valid value (inclusive). Unbounded if not set.

  • label (str, optional) – A human readable label that will be used in GUI representations (e.g. "Worker Cores"). If not provided, field will be used.

  • target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided, field will be used.

class dask_gateway_server.options.Float(field, default=0, min=None, max=None, label=None, target=None)

A float field, with optional bounds.

Parameters
  • field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g. "worker_cores").

  • default (float, optional) – The default value. Default is 0.

  • min (float, optional) – The minimum valid value (inclusive). Unbounded if not set.

  • max (float, optional) – The maximum valid value (inclusive). Unbounded if not set.

  • label (str, optional) – A human readable label that will be used in GUI representations (e.g. "Worker Cores"). If not provided, field will be used.

  • target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided, field will be used.

class dask_gateway_server.options.String(field, default='', label=None, target=None)

A string field.

Parameters
  • field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g. "worker_cores").

  • default (str, optional) – The default value. Default is the empty string ("").

  • label (str, optional) – A human readable label that will be used in GUI representations (e.g. "Worker Cores"). If not provided, field will be used.

  • target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided, field will be used.

class dask_gateway_server.options.Bool(field, default=False, label=None, target=None)

A boolean field.

Parameters
  • field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g. "worker_cores").

  • default (bool, optional) – The default value. Default is False.

  • label (str, optional) – A human readable label that will be used in GUI representations (e.g. "Worker Cores"). If not provided, field will be used.

  • target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided, field will be used.

class dask_gateway_server.options.Select(field, options, default=None, label=None, target=None)

A select field, allowing users to select between a few choices.

Parameters
  • field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g. "worker_cores").

  • options (list) – A list of valid options. Elements may be a tuple of (key, value), or just key (in which case the value is the same as the key). Values may be any Python object, keys must be strings.

  • default (str, optional) – The key for the default option. Defaults to the first listed option.

  • label (str, optional) – A human readable label that will be used in GUI representations (e.g. "Worker Cores"). If not provided, field will be used.

  • target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided, field will be used.

class dask_gateway_server.options.Mapping(field, default=None, label=None, target=None)

A mapping field.

Parameters
  • field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g. "worker_cores").

  • default (dict, optional) – The default value. Default is an empty dict ({}).

  • label (str, optional) – A human readable label that will be used in GUI representations (e.g. "Worker Cores"). If not provided, field will be used.

  • target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided, field will be used.

Models

User

class dask_gateway_server.models.User(name, groups=(), admin=False)

A user record.

Parameters
  • name (str) – The username

  • groups (sequence, optional) – A set of groups the user belongs to. Default is no groups.

  • admin (bool, optional) – Whether the user is an admin user. Default is False.

Cluster

class dask_gateway_server.models.Cluster(name, username, token, options, config, status, scheduler_address='', dashboard_address='', api_address='', tls_cert=b'', tls_key=b'', start_time=None, stop_time=None)

A cluster record.

Parameters
  • name (str) – The cluster name. An unique string that can be used to identify the cluster in the gateway.

  • username (str) – The username that started this cluster.

  • token (str) – The API token associated with this cluster. Used to authenticate the cluster with the gateway.

  • options (dict) – The normalized set of configuration options provided when starting this cluster. These values are user-facing, and don’t necessarily correspond with the ClusterConfig options on the backend.

  • config (dict) – The serialized version of ClusterConfig for this cluster.

  • status (ClusterStatus) – The status of the cluster.

  • scheduler_address (str) – The scheduler address. The empty string if the cluster is not running.

  • dashboard_address (str) – The dashboard address. The empty string if the cluster is not running, or no dashboard is running on the cluster.

  • api_address (str) – The cluster’s api address. The empty string if the cluster is not running.

  • tls_cert (bytes) – The TLS cert credentials associated with the cluster.

  • tls_key (bytes) – The TLS key credentials associated with the cluster.

  • start_time (int or None) – Start time in ms since the epoch, or None if not started.

  • stop_time (int or None) – Stop time in ms since the epoch, or None if not stopped.