Configuration Reference
Contents
Configuration Reference¶
Gateway Server¶
- class dask_gateway_server.app.DaskGateway(**kwargs: Any)¶
A gateway for managing dask clusters across multiple users
- address c.DaskGateway.address = Unicode('')¶
The address the private api server should listen at.
Should be of the form
{hostname}:{port}
Where:
hostname
sets the hostname to listen at. Set to""
or"0.0.0.0"
to listen on all interfaces.port
sets the port to listen at.
Defaults to
127.0.0.1:0
.
- authenticator_class c.DaskGateway.authenticator_class = Type('dask_gateway_server.auth.SimpleAuthenticator')¶
The gateway authenticator class to use
- backend_class c.DaskGateway.backend_class = Type('dask_gateway_server.backends.Backend')¶
The gateway backend class to use
- config_file c.DaskGateway.config_file = Unicode('dask_gateway_config.py')¶
The config file to load
- log_datefmt c.DaskGateway.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')¶
The date format used by logging formatters for %(asctime)s
- log_format c.DaskGateway.log_format = Unicode('%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(reset)s %(message)s')¶
The Logging format template
- log_level c.DaskGateway.log_level = Enum('INFO')¶
Set the log level by value or name.
- show_config c.DaskGateway.show_config = Bool(False)¶
Instead of starting the Application, dump configuration to stdout
- show_config_json c.DaskGateway.show_config_json = Bool(False)¶
Instead of starting the Application, dump configuration to stdout (as JSON)
Authentication¶
KerberosAuthenticator¶
- class dask_gateway_server.auth.KerberosAuthenticator(**kwargs: Any)¶
An authenticator using kerberos
- cache_max_age c.KerberosAuthenticator.cache_max_age = Int(300)¶
The maximum time in seconds to cache authentication information.
Helps reduce load on the backing authentication service by caching responses between requests. After this time the user will need to be reauthenticated before making additional requests (note this is usually transparent to the user).
- cookie_name c.KerberosAuthenticator.cookie_name = Unicode('')¶
The cookie name to use for caching authentication information.
- keytab c.KerberosAuthenticator.keytab = Unicode('dask_gateway.keytab')¶
The path to the keytab file
- service_name c.KerberosAuthenticator.service_name = Unicode('HTTP')¶
The service’s kerberos principal name.
This is almost always “HTTP” (the default)
JupyterHubAuthenticator¶
- class dask_gateway_server.auth.JupyterHubAuthenticator(**kwargs: Any)¶
An authenticator that uses JupyterHub to perform authentication
- cache_max_age c.JupyterHubAuthenticator.cache_max_age = Int(300)¶
The maximum time in seconds to cache authentication information.
Helps reduce load on the backing authentication service by caching responses between requests. After this time the user will need to be reauthenticated before making additional requests (note this is usually transparent to the user).
- cookie_name c.JupyterHubAuthenticator.cookie_name = Unicode('')¶
The cookie name to use for caching authentication information.
- jupyterhub_api_token c.JupyterHubAuthenticator.jupyterhub_api_token = Unicode('')¶
Dask Gateway’s JupyterHub API Token, used for authenticating the gateway’s API requests to JupyterHub.
By default this is determined from the
JUPYTERHUB_API_TOKEN
environment variable.
- jupyterhub_api_url c.JupyterHubAuthenticator.jupyterhub_api_url = Unicode('')¶
The API URL for the JupyterHub server.
By default this is determined from the
JUPYTERHUB_API_URL
environment variable.
- tls_ca c.JupyterHubAuthenticator.tls_ca = Unicode('')¶
Path to TLS CA file for verifying API requests to JupyterHub.
When setting this, you should also set tls_key and tls_cert.
- tls_cert c.JupyterHubAuthenticator.tls_cert = Unicode('')¶
Path to TLS certficate file for making API requests to JupyterHub.
When setting this, you should also set tls_cert.
- tls_key c.JupyterHubAuthenticator.tls_key = Unicode('')¶
Path to TLS key file for making API requests to JupyterHub.
When setting this, you should also set tls_cert.
SimpleAuthenticator¶
- class dask_gateway_server.auth.SimpleAuthenticator(**kwargs: Any)¶
A simple authenticator that uses Basic Auth.
This is highly insecure, use only for testing!!!
- cache_max_age c.SimpleAuthenticator.cache_max_age = Int(300)¶
The maximum time in seconds to cache authentication information.
Helps reduce load on the backing authentication service by caching responses between requests. After this time the user will need to be reauthenticated before making additional requests (note this is usually transparent to the user).
- cookie_name c.SimpleAuthenticator.cookie_name = Unicode('')¶
The cookie name to use for caching authentication information.
- password c.SimpleAuthenticator.password = Unicode(None)¶
If set, a global password that all users must provide.
If unset (default), the password field is completely ignored.
Cluster Backends¶
Base Class¶
ClusterConfig¶
- class dask_gateway_server.backends.base.ClusterConfig(**kwargs: Any)¶
Base class for holding individual Dask cluster configurations
- adaptive_period c.ClusterConfig.adaptive_period = Float(3)¶
Time (in seconds) between adaptive scaling checks.
A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.
- cluster_max_cores c.ClusterConfig.cluster_max_cores = Float(None)¶
The maximum number of cores available to this cluster.
Set to
None
for no cores limit (default).
- cluster_max_memory c.ClusterConfig.cluster_max_memory = MemoryLimit(None)¶
The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
Set to
None
for no memory limit (default).
- cluster_max_workers c.ClusterConfig.cluster_max_workers = Int(0)¶
The maximum number of workers available to this cluster.
Note that this will be combined with
cluster_max_cores
andcluster_max_memory
at runtime to determine the actual maximum number of workers available to this cluster.
- environment c.ClusterConfig.environment = Dict()¶
Environment variables to set for both the worker and scheduler processes.
- idle_timeout c.ClusterConfig.idle_timeout = Float(0)¶
Time (in seconds) before an idle cluster is automatically shutdown.
Set to 0 (default) for no idle timeout.
- scheduler_cmd c.ClusterConfig.scheduler_cmd = Command()¶
Shell command to start a dask scheduler.
- scheduler_cores c.ClusterConfig.scheduler_cores = Int(1)¶
Number of cpu-cores available for a dask scheduler.
- scheduler_memory c.ClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask scheduler. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_cmd c.ClusterConfig.worker_cmd = Command()¶
Shell command to start a dask worker.
- worker_cores c.ClusterConfig.worker_cores = Int(1)¶
Number of cpu-cores available for a dask worker.
- worker_memory c.ClusterConfig.worker_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask worker. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_threads c.ClusterConfig.worker_threads = Int(0)¶
Number of threads available for a dask worker.
Defaults to
worker_cores
.
Backend¶
- class dask_gateway_server.backends.base.Backend(**kwargs: Any)¶
Base class for defining dask-gateway backends.
Subclasses should implement the following methods:
setup
cleanup
start_cluster
stop_cluster
on_cluster_heartbeat
- api_url c.Backend.api_url = Unicode('')¶
The address that internal components (e.g. dask clusters) will use when contacting the gateway.
- cluster_config_class c.Backend.cluster_config_class = Type('dask_gateway_server.backends.base.ClusterConfig')¶
The cluster config class to use
- cluster_options c.Backend.cluster_options = Union()¶
User options for configuring an individual cluster.
Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:
Local Processes¶
LocalClusterConfig¶
- class dask_gateway_server.backends.local.LocalClusterConfig(**kwargs: Any)¶
Dask cluster configuration options when running as local processes
- adaptive_period c.LocalClusterConfig.adaptive_period = Float(3)¶
Time (in seconds) between adaptive scaling checks.
A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.
- cluster_max_cores c.LocalClusterConfig.cluster_max_cores = Float(None)¶
The maximum number of cores available to this cluster.
Set to
None
for no cores limit (default).
- cluster_max_memory c.LocalClusterConfig.cluster_max_memory = MemoryLimit(None)¶
The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
Set to
None
for no memory limit (default).
- cluster_max_workers c.LocalClusterConfig.cluster_max_workers = Int(0)¶
The maximum number of workers available to this cluster.
Note that this will be combined with
cluster_max_cores
andcluster_max_memory
at runtime to determine the actual maximum number of workers available to this cluster.
- environment c.LocalClusterConfig.environment = Dict()¶
Environment variables to set for both the worker and scheduler processes.
- idle_timeout c.LocalClusterConfig.idle_timeout = Float(0)¶
Time (in seconds) before an idle cluster is automatically shutdown.
Set to 0 (default) for no idle timeout.
- scheduler_cmd c.LocalClusterConfig.scheduler_cmd = Command()¶
Shell command to start a dask scheduler.
- scheduler_cores c.LocalClusterConfig.scheduler_cores = Int(1)¶
Number of cpu-cores available for a dask scheduler.
- scheduler_memory c.LocalClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask scheduler. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_cmd c.LocalClusterConfig.worker_cmd = Command()¶
Shell command to start a dask worker.
- worker_cores c.LocalClusterConfig.worker_cores = Int(1)¶
Number of cpu-cores available for a dask worker.
- worker_memory c.LocalClusterConfig.worker_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask worker. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_threads c.LocalClusterConfig.worker_threads = Int(0)¶
Number of threads available for a dask worker.
Defaults to
worker_cores
.
LocalBackend¶
- class dask_gateway_server.backends.local.LocalBackend(**kwargs: Any)¶
A cluster backend that launches local processes.
Requires super-user permissions in order to run processes for the requesting username.
- api_url c.LocalBackend.api_url = Unicode('')¶
The address that internal components (e.g. dask clusters) will use when contacting the gateway.
Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.
- backoff_base_delay c.LocalBackend.backoff_base_delay = Float(0.1)¶
Base delay (in seconds) for backoff when retrying after failures.
If an operation fails, it is retried after a backoff computed as:
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.LocalBackend.backoff_max_delay = Float(300)¶
Max delay (in seconds) for backoff policy when retrying after failures.
- check_timeouts_period c.LocalBackend.check_timeouts_period = Float(0.0)¶
Time (in seconds) between timeout checks.
This shouldn’t be too small (to keep the overhead low), but should be smaller than
cluster_heartbeat_timeout
,cluster_start_timeout
, andworker_start_timeout
.
- cluster_config_class c.LocalBackend.cluster_config_class = Type('dask_gateway_server.backends.local.LocalClusterConfig')¶
The cluster config class to use
- cluster_heartbeat_period c.LocalBackend.cluster_heartbeat_period = Int(15)¶
Time (in seconds) between cluster heartbeats to the gateway.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- cluster_heartbeat_timeout c.LocalBackend.cluster_heartbeat_timeout = Float(0.0)¶
Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.
This should be greater than
cluster_heartbeat_period
. Defaults to2 * cluster_heartbeat_period
.
- cluster_options c.LocalBackend.cluster_options = Union()¶
User options for configuring an individual cluster.
Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:
- cluster_start_timeout c.LocalBackend.cluster_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask cluster.
- cluster_status_period c.LocalBackend.cluster_status_period = Float(30)¶
Time (in seconds) between cluster status checks.
A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- clusters_directory c.LocalBackend.clusters_directory = Unicode('')¶
The base directory for cluster working directories.
A subdirectory will be created for each new cluster which will serve as the working directory for that cluster. On cluster shutdown the subdirectory will be removed.
If not specified, a temporary directory will be used for each cluster.
- db_cleanup_period c.LocalBackend.db_cleanup_period = Float(600)¶
Time (in seconds) between database cleanup tasks.
This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than
db_record_max_age
(probably by an order of magnitude).
- db_cluster_max_age c.LocalBackend.db_cluster_max_age = Float(86400)¶
Max time (in seconds) to keep around records of completed clusters.
Every
db_cleanup_period
, completed clusters older thandb_cluster_max_age
are removed from the database.
- db_debug c.LocalBackend.db_debug = Bool(False)¶
If True, all database operations will be logged
- db_encrypt_keys c.LocalBackend.db_encrypt_keys = List()¶
A list of keys to use to encrypt private data in the database. Can also be set by the environment variable
DASK_GATEWAY_ENCRYPT_KEYS
, where the value is a;
delimited string of encryption keys.Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:
$ openssl rand -base64 32
A single key is valid, multiple keys can be used to support key rotation.
- db_url c.LocalBackend.db_url = Unicode('sqlite:///:memory:')¶
The URL for the database. Default is in-memory only.
If not in-memory,
db_encrypt_keys
must also be set.
- inherited_environment c.LocalBackend.inherited_environment = List()¶
Whitelist of environment variables for the scheduler and worker processes to inherit from the Dask-Gateway process.
- parallelism c.LocalBackend.parallelism = Int(20)¶
Number of handlers to use for starting/stopping clusters.
- sigint_timeout c.LocalBackend.sigint_timeout = Int(10)¶
Seconds to wait for process to stop after SIGINT.
If the process has not stopped after this time, a SIGTERM is sent.
- sigkill_timeout c.LocalBackend.sigkill_timeout = Int(5)¶
Seconds to wait for process to stop after SIGKILL.
If the process has not stopped after this time, a warning is logged and the process is deemed a zombie process.
- sigterm_timeout c.LocalBackend.sigterm_timeout = Int(5)¶
Seconds to wait for process to stop after SIGTERM.
If the process has not stopped after this time, a SIGKILL is sent.
- stop_clusters_on_shutdown c.LocalBackend.stop_clusters_on_shutdown = Bool(True)¶
Whether to stop active clusters on gateway shutdown.
If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.
- worker_start_failure_limit c.LocalBackend.worker_start_failure_limit = Int(3)¶
A limit on the number of failed attempts to start a worker before the cluster is marked as failed.
Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.
- worker_start_timeout c.LocalBackend.worker_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask worker.
- worker_status_period c.LocalBackend.worker_status_period = Float(30)¶
Time (in seconds) between worker status checks.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
UnsafeLocalBackend¶
- class dask_gateway_server.backends.local.UnsafeLocalBackend(**kwargs: Any)¶
A version of LocalBackend that doesn’t set permissions.
FOR TESTING ONLY! This provides no user separations - clusters run with the same level of permission as the gateway.
- api_url c.UnsafeLocalBackend.api_url = Unicode('')¶
The address that internal components (e.g. dask clusters) will use when contacting the gateway.
Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.
- backoff_base_delay c.UnsafeLocalBackend.backoff_base_delay = Float(0.1)¶
Base delay (in seconds) for backoff when retrying after failures.
If an operation fails, it is retried after a backoff computed as:
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.UnsafeLocalBackend.backoff_max_delay = Float(300)¶
Max delay (in seconds) for backoff policy when retrying after failures.
- check_timeouts_period c.UnsafeLocalBackend.check_timeouts_period = Float(0.0)¶
Time (in seconds) between timeout checks.
This shouldn’t be too small (to keep the overhead low), but should be smaller than
cluster_heartbeat_timeout
,cluster_start_timeout
, andworker_start_timeout
.
- cluster_config_class c.UnsafeLocalBackend.cluster_config_class = Type('dask_gateway_server.backends.local.LocalClusterConfig')¶
The cluster config class to use
- cluster_heartbeat_period c.UnsafeLocalBackend.cluster_heartbeat_period = Int(15)¶
Time (in seconds) between cluster heartbeats to the gateway.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- cluster_heartbeat_timeout c.UnsafeLocalBackend.cluster_heartbeat_timeout = Float(0.0)¶
Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.
This should be greater than
cluster_heartbeat_period
. Defaults to2 * cluster_heartbeat_period
.
- cluster_options c.UnsafeLocalBackend.cluster_options = Union()¶
User options for configuring an individual cluster.
Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:
- cluster_start_timeout c.UnsafeLocalBackend.cluster_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask cluster.
- cluster_status_period c.UnsafeLocalBackend.cluster_status_period = Float(30)¶
Time (in seconds) between cluster status checks.
A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- clusters_directory c.UnsafeLocalBackend.clusters_directory = Unicode('')¶
The base directory for cluster working directories.
A subdirectory will be created for each new cluster which will serve as the working directory for that cluster. On cluster shutdown the subdirectory will be removed.
If not specified, a temporary directory will be used for each cluster.
- db_cleanup_period c.UnsafeLocalBackend.db_cleanup_period = Float(600)¶
Time (in seconds) between database cleanup tasks.
This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than
db_record_max_age
(probably by an order of magnitude).
- db_cluster_max_age c.UnsafeLocalBackend.db_cluster_max_age = Float(86400)¶
Max time (in seconds) to keep around records of completed clusters.
Every
db_cleanup_period
, completed clusters older thandb_cluster_max_age
are removed from the database.
- db_debug c.UnsafeLocalBackend.db_debug = Bool(False)¶
If True, all database operations will be logged
- db_encrypt_keys c.UnsafeLocalBackend.db_encrypt_keys = List()¶
A list of keys to use to encrypt private data in the database. Can also be set by the environment variable
DASK_GATEWAY_ENCRYPT_KEYS
, where the value is a;
delimited string of encryption keys.Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:
$ openssl rand -base64 32
A single key is valid, multiple keys can be used to support key rotation.
- db_url c.UnsafeLocalBackend.db_url = Unicode('sqlite:///:memory:')¶
The URL for the database. Default is in-memory only.
If not in-memory,
db_encrypt_keys
must also be set.
- inherited_environment c.UnsafeLocalBackend.inherited_environment = List()¶
Whitelist of environment variables for the scheduler and worker processes to inherit from the Dask-Gateway process.
- parallelism c.UnsafeLocalBackend.parallelism = Int(20)¶
Number of handlers to use for starting/stopping clusters.
- sigint_timeout c.UnsafeLocalBackend.sigint_timeout = Int(10)¶
Seconds to wait for process to stop after SIGINT.
If the process has not stopped after this time, a SIGTERM is sent.
- sigkill_timeout c.UnsafeLocalBackend.sigkill_timeout = Int(5)¶
Seconds to wait for process to stop after SIGKILL.
If the process has not stopped after this time, a warning is logged and the process is deemed a zombie process.
- sigterm_timeout c.UnsafeLocalBackend.sigterm_timeout = Int(5)¶
Seconds to wait for process to stop after SIGTERM.
If the process has not stopped after this time, a SIGKILL is sent.
- stop_clusters_on_shutdown c.UnsafeLocalBackend.stop_clusters_on_shutdown = Bool(True)¶
Whether to stop active clusters on gateway shutdown.
If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.
- worker_start_failure_limit c.UnsafeLocalBackend.worker_start_failure_limit = Int(3)¶
A limit on the number of failed attempts to start a worker before the cluster is marked as failed.
Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.
- worker_start_timeout c.UnsafeLocalBackend.worker_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask worker.
- worker_status_period c.UnsafeLocalBackend.worker_status_period = Float(30)¶
Time (in seconds) between worker status checks.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
YARN¶
YarnClusterConfig¶
- class dask_gateway_server.backends.yarn.YarnClusterConfig(**kwargs: Any)¶
Dask cluster configuration options when running on Hadoop/YARN
- adaptive_period c.YarnClusterConfig.adaptive_period = Float(3)¶
Time (in seconds) between adaptive scaling checks.
A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.
- cluster_max_cores c.YarnClusterConfig.cluster_max_cores = Float(None)¶
The maximum number of cores available to this cluster.
Set to
None
for no cores limit (default).
- cluster_max_memory c.YarnClusterConfig.cluster_max_memory = MemoryLimit(None)¶
The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
Set to
None
for no memory limit (default).
- cluster_max_workers c.YarnClusterConfig.cluster_max_workers = Int(0)¶
The maximum number of workers available to this cluster.
Note that this will be combined with
cluster_max_cores
andcluster_max_memory
at runtime to determine the actual maximum number of workers available to this cluster.
- environment c.YarnClusterConfig.environment = Dict()¶
Environment variables to set for both the worker and scheduler processes.
- idle_timeout c.YarnClusterConfig.idle_timeout = Float(0)¶
Time (in seconds) before an idle cluster is automatically shutdown.
Set to 0 (default) for no idle timeout.
- localize_files c.YarnClusterConfig.localize_files = Dict()¶
Extra files to distribute to both the worker and scheduler containers.
This is a mapping from
local-name
toresource
. Resource paths can be local, or in HDFS (prefix withhdfs://...
if so). If an archive (.tar.gz
or.zip
), the resource will be unarchived as directorylocal-name
. For finer control, resources can also be specified asskein.File
objects, or theirdict
equivalents.This can be used to distribute conda/virtual environments by configuring the following:
c.YarnClusterConfig.localize_files = { 'environment': { 'source': 'hdfs:///path/to/archived/environment.tar.gz', 'visibility': 'public' } } c.YarnClusterConfig.scheduler_setup = 'source environment/bin/activate' c.YarnClusterConfig.worker_setup = 'source environment/bin/activate'
These archives are usually created using either
conda-pack
orvenv-pack
. For more information on distributing files, see https://jcristharif.com/skein/distributing-files.html.
- queue c.YarnClusterConfig.queue = Unicode('default')¶
The YARN queue to submit applications under
- scheduler_cmd c.YarnClusterConfig.scheduler_cmd = Command()¶
Shell command to start a dask scheduler.
- scheduler_cores c.YarnClusterConfig.scheduler_cores = Int(1)¶
Number of cpu-cores available for a dask scheduler.
- scheduler_memory c.YarnClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask scheduler. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- scheduler_setup c.YarnClusterConfig.scheduler_setup = Unicode('')¶
Script to run before dask scheduler starts.
- worker_cmd c.YarnClusterConfig.worker_cmd = Command()¶
Shell command to start a dask worker.
- worker_cores c.YarnClusterConfig.worker_cores = Int(1)¶
Number of cpu-cores available for a dask worker.
- worker_memory c.YarnClusterConfig.worker_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask worker. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_setup c.YarnClusterConfig.worker_setup = Unicode('')¶
Script to run before dask worker starts.
- worker_threads c.YarnClusterConfig.worker_threads = Int(0)¶
Number of threads available for a dask worker.
Defaults to
worker_cores
.
YarnBackend¶
- class dask_gateway_server.backends.yarn.YarnBackend(**kwargs: Any)¶
A cluster backend for managing dask clusters on Hadoop/YARN.
- api_url c.YarnBackend.api_url = Unicode('')¶
The address that internal components (e.g. dask clusters) will use when contacting the gateway.
Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.
- app_client_cache_max_size c.YarnBackend.app_client_cache_max_size = Int(10)¶
The max size of the cache for application clients.
A larger cache will result in improved performance, but will also use more resources.
- backoff_base_delay c.YarnBackend.backoff_base_delay = Float(0.1)¶
Base delay (in seconds) for backoff when retrying after failures.
If an operation fails, it is retried after a backoff computed as:
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.YarnBackend.backoff_max_delay = Float(300)¶
Max delay (in seconds) for backoff policy when retrying after failures.
- check_timeouts_period c.YarnBackend.check_timeouts_period = Float(0.0)¶
Time (in seconds) between timeout checks.
This shouldn’t be too small (to keep the overhead low), but should be smaller than
cluster_heartbeat_timeout
,cluster_start_timeout
, andworker_start_timeout
.
- cluster_config_class c.YarnBackend.cluster_config_class = Type('dask_gateway_server.backends.yarn.YarnClusterConfig')¶
The cluster config class to use
- cluster_heartbeat_period c.YarnBackend.cluster_heartbeat_period = Int(15)¶
Time (in seconds) between cluster heartbeats to the gateway.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- cluster_heartbeat_timeout c.YarnBackend.cluster_heartbeat_timeout = Float(0.0)¶
Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.
This should be greater than
cluster_heartbeat_period
. Defaults to2 * cluster_heartbeat_period
.
- cluster_options c.YarnBackend.cluster_options = Union()¶
User options for configuring an individual cluster.
Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:
- cluster_start_timeout c.YarnBackend.cluster_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask cluster.
- cluster_status_period c.YarnBackend.cluster_status_period = Float(30)¶
Time (in seconds) between cluster status checks.
A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- db_cleanup_period c.YarnBackend.db_cleanup_period = Float(600)¶
Time (in seconds) between database cleanup tasks.
This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than
db_record_max_age
(probably by an order of magnitude).
- db_cluster_max_age c.YarnBackend.db_cluster_max_age = Float(86400)¶
Max time (in seconds) to keep around records of completed clusters.
Every
db_cleanup_period
, completed clusters older thandb_cluster_max_age
are removed from the database.
- db_debug c.YarnBackend.db_debug = Bool(False)¶
If True, all database operations will be logged
- db_encrypt_keys c.YarnBackend.db_encrypt_keys = List()¶
A list of keys to use to encrypt private data in the database. Can also be set by the environment variable
DASK_GATEWAY_ENCRYPT_KEYS
, where the value is a;
delimited string of encryption keys.Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:
$ openssl rand -base64 32
A single key is valid, multiple keys can be used to support key rotation.
- db_url c.YarnBackend.db_url = Unicode('sqlite:///:memory:')¶
The URL for the database. Default is in-memory only.
If not in-memory,
db_encrypt_keys
must also be set.
- keytab c.YarnBackend.keytab = Unicode(None)¶
Path to kerberos keytab for Dask Gateway user
- parallelism c.YarnBackend.parallelism = Int(20)¶
Number of handlers to use for starting/stopping clusters.
- principal c.YarnBackend.principal = Unicode(None)¶
Kerberos principal for Dask Gateway user
- stop_clusters_on_shutdown c.YarnBackend.stop_clusters_on_shutdown = Bool(True)¶
Whether to stop active clusters on gateway shutdown.
If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.
- worker_start_failure_limit c.YarnBackend.worker_start_failure_limit = Int(3)¶
A limit on the number of failed attempts to start a worker before the cluster is marked as failed.
Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.
- worker_start_timeout c.YarnBackend.worker_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask worker.
- worker_status_period c.YarnBackend.worker_status_period = Float(30)¶
Time (in seconds) between worker status checks.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
Kubernetes¶
KubeClusterConfig¶
- class dask_gateway_server.backends.kubernetes.KubeClusterConfig(**kwargs: Any)¶
Configuration for a single Dask cluster running on kubernetes
- adaptive_period c.KubeClusterConfig.adaptive_period = Float(3)¶
Time (in seconds) between adaptive scaling checks.
A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.
- cluster_max_cores c.KubeClusterConfig.cluster_max_cores = Float(None)¶
The maximum number of cores available to this cluster.
Set to
None
for no cores limit (default).
- cluster_max_memory c.KubeClusterConfig.cluster_max_memory = MemoryLimit(None)¶
The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
Set to
None
for no memory limit (default).
- cluster_max_workers c.KubeClusterConfig.cluster_max_workers = Int(0)¶
The maximum number of workers available to this cluster.
Note that this will be combined with
cluster_max_cores
andcluster_max_memory
at runtime to determine the actual maximum number of workers available to this cluster.
- environment c.KubeClusterConfig.environment = Dict()¶
Environment variables to set for both the worker and scheduler processes.
- idle_timeout c.KubeClusterConfig.idle_timeout = Float(0)¶
Time (in seconds) before an idle cluster is automatically shutdown.
Set to 0 (default) for no idle timeout.
- image c.KubeClusterConfig.image = Unicode('daskgateway/dask-gateway:latest')¶
Docker image to use for running user’s containers.
- image_pull_policy c.KubeClusterConfig.image_pull_policy = Unicode(None)¶
The image pull policy of the docker image specified in
image
- image_pull_secrets c.KubeClusterConfig.image_pull_secrets = List()¶
Image pull secrets to access private registries.
Should be a list of dictionaries with a single key called
name
to match the k8s native syntax.
- namespace c.KubeClusterConfig.namespace = Unicode('default')¶
Kubernetes namespace to launch pods in.
If running inside a kubernetes cluster with service accounts enabled, defaults to the current namespace. If not, defaults to default
- scheduler_cmd c.KubeClusterConfig.scheduler_cmd = Command()¶
Shell command to start a dask scheduler.
- scheduler_cores c.KubeClusterConfig.scheduler_cores = Float(1)¶
Number of cpu-cores available for a dask scheduler.
- scheduler_cores_limit c.KubeClusterConfig.scheduler_cores_limit = Float(0.0)¶
Maximum number of cpu-cores available for a dask scheduler.
Defaults to
scheduler_cores
.
- scheduler_extra_container_config c.KubeClusterConfig.scheduler_extra_container_config = Dict()¶
Any extra configuration for the scheduler container.
This dict will be deep merged with the scheduler container (a
V1Container
object) before submission. Keys should match those in the kubernetes spec, and should be camelCase.See
worker_extra_container_config
for more information.
- scheduler_extra_pod_annotations c.KubeClusterConfig.scheduler_extra_pod_annotations = Dict()¶
Any extra annotations to be applied to a user’s scheduler pods.
These annotations can be set using with cluster options (See Exposing Cluster Options) to allow for injecting user-specific information, e.g. adding an annotation based on a user’s group or username.
This dict will be merged with
common_annotations
before being applied to user pods.
- scheduler_extra_pod_config c.KubeClusterConfig.scheduler_extra_pod_config = Dict()¶
Any extra configuration for the scheduler pods.
This dict will be deep merged with the scheduler pod spec (a
V1PodSpec
object) before submission. Keys should match those in the kubernetes spec, and should be camelCase.See
worker_extra_pod_config
for more information.
- scheduler_extra_pod_labels c.KubeClusterConfig.scheduler_extra_pod_labels = Dict()¶
Any extra labels to be applied to a user’s scheduler pods.
These labels can be set using with cluster options (See Exposing Cluster Options) to allow for injecting user-specific information, e.g. adding a label based on a user’s group or username.
This dict will be merged with
common_labels
before being applied to user pods.
- scheduler_memory c.KubeClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask scheduler. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- scheduler_memory_limit c.KubeClusterConfig.scheduler_memory_limit = MemoryLimit(0)¶
Maximum number of bytes available for a dask scheduler. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
Defaults to
scheduler_memory
.
- worker_cmd c.KubeClusterConfig.worker_cmd = Command()¶
Shell command to start a dask worker.
- worker_cores c.KubeClusterConfig.worker_cores = Float(1)¶
Number of cpu-cores available for a dask worker.
- worker_cores_limit c.KubeClusterConfig.worker_cores_limit = Float(0.0)¶
Maximum number of cpu-cores available for a dask worker.
Defaults to
worker_cores
.
- worker_extra_container_config c.KubeClusterConfig.worker_extra_container_config = Dict()¶
Any extra configuration for the worker container.
This dict will be deep merged with the worker container (a
V1Container
object) before submission. Keys should match those in the kubernetes spec, and should be camelCase.For example, here we add environment variables from a secret to the worker container:
c.KubeClusterConfig.worker_extra_container_config = { "envFrom": [ {"secretRef": {"name": "my-env-secret"}} ] }
- worker_extra_pod_annotations c.KubeClusterConfig.worker_extra_pod_annotations = Dict()¶
Any extra annotations to be applied to a user’s worker pods.
These annotations can be set using with cluster options (See Exposing Cluster Options) to allow for injecting user-specific information, e.g. adding an annotation based on a user’s group or username.
This dict will be merged with
common_annotations
before being applied to user pods.
- worker_extra_pod_config c.KubeClusterConfig.worker_extra_pod_config = Dict()¶
Any extra configuration for the worker pods.
This dict will be deep merged with the worker pod spec (a
V1PodSpec
object) before submission. Keys should match those in the kubernetes spec, and should be camelCase.For example, here we add a toleration to worker pods.
c.KubeClusterConfig.worker_extra_pod_config = { "tolerations": [ { "key": "key", "operator": "Equal", "value": "value", "effect": "NoSchedule", } ] }
- worker_extra_pod_labels c.KubeClusterConfig.worker_extra_pod_labels = Dict()¶
Any extra labels to be applied to a user’s worker pods.
These labels can be set using with cluster options (See Exposing Cluster Options) to allow for injecting user-specific information, e.g. adding a label based on a user’s group or username.
This dict will be merged with
common_labels
before being applied to user pods.
- worker_memory c.KubeClusterConfig.worker_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask worker. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_memory_limit c.KubeClusterConfig.worker_memory_limit = MemoryLimit(0)¶
Maximum number of bytes available for a dask worker. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
Defaults to
worker_memory
.
- worker_threads c.KubeClusterConfig.worker_threads = Int(0)¶
Number of threads available for a dask worker.
Defaults to
worker_cores
.
KubeBackend¶
- class dask_gateway_server.backends.kubernetes.KubeBackend(**kwargs: Any)¶
A dask-gateway backend for running on Kubernetes
- api_url c.KubeBackend.api_url = Unicode('')¶
The address that internal components (e.g. dask clusters) will use when contacting the gateway.
- cluster_config_class c.KubeBackend.cluster_config_class = Type('dask_gateway_server.backends.kubernetes.backend.KubeClusterConfig')¶
The cluster config class to use
- cluster_options c.KubeBackend.cluster_options = Union()¶
User options for configuring an individual cluster.
Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:
- common_annotations c.KubeBackend.common_annotations = Dict()¶
Kubernetes annotations to apply to all objects created by the gateway
- common_labels c.KubeBackend.common_labels = Dict()¶
Kubernetes labels to apply to all objects created by the gateway
- crd_version c.KubeBackend.crd_version = Unicode('v1alpha1')¶
The version for the DaskCluster CRD
- gateway_instance c.KubeBackend.gateway_instance = Unicode('')¶
A unique ID for this instance of dask-gateway.
The controller must also be configured with the same ID.
- label_selector c.KubeBackend.label_selector = Unicode('')¶
The label selector to use when watching objects managed by the gateway.
KubeController¶
- class dask_gateway_server.backends.kubernetes.controller.KubeController(**kwargs: Any)¶
Kubernetes controller for dask-gateway
- address c.KubeController.address = Unicode(':8000')¶
The address the server should listen at
- api_url c.KubeController.api_url = Unicode('')¶
The address that internal components (e.g. dask clusters) will use when contacting the gateway.
- backoff_base_delay c.KubeController.backoff_base_delay = Float(0.1)¶
Base delay (in seconds) for backoff when retrying after failures.
If an operation fails, it is retried after a backoff computed as:
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.KubeController.backoff_max_delay = Float(300)¶
Max delay (in seconds) for backoff policy when retrying after failures.
- common_annotations c.KubeController.common_annotations = Dict()¶
Kubernetes annotations to apply to all objects created by the gateway
- common_labels c.KubeController.common_labels = Dict()¶
Kubernetes labels to apply to all objects created by the gateway
- completed_cluster_cleanup_period c.KubeController.completed_cluster_cleanup_period = Float(600)¶
Time (in seconds) between cleanup tasks.
This sets how frequently old cluster records are deleted from kubernetes. This shouldn’t be too small (to keep the overhead low), but should be smaller than
completed_cluster_max_age
(probably by an order of magnitude).
- completed_cluster_max_age c.KubeController.completed_cluster_max_age = Float(86400)¶
Max time (in seconds) to keep around records of completed clusters.
Every
completed_cluster_cleanup_period
, completed clusters older thancompleted_cluster_max_age
are deleted from kubernetes.
- config_file c.KubeController.config_file = Unicode('dask_gateway_config.py')¶
The config file to load
- crd_version c.KubeController.crd_version = Unicode('v1alpha1')¶
The version for the DaskCluster CRD
- gateway_instance c.KubeController.gateway_instance = Unicode('')¶
A unique ID for this instance of dask-gateway.
The controller must also be configured with the same ID.
- k8s_api_rate_limit c.KubeController.k8s_api_rate_limit = Int(50)¶
Limit on the average number of k8s api calls per second.
- k8s_api_rate_limit_burst c.KubeController.k8s_api_rate_limit_burst = Int(100)¶
Limit on the maximum number of k8s api calls per second.
- label_selector c.KubeController.label_selector = Unicode('')¶
The label selector to use when watching objects managed by the gateway.
- log_datefmt c.KubeController.log_datefmt = Unicode('%Y-%m-%d %H:%M:%S')¶
The date format used by logging formatters for %(asctime)s
- log_format c.KubeController.log_format = Unicode('%(log_color)s[%(levelname)1.1s %(asctime)s.%(msecs).03d %(name)s]%(reset)s %(message)s')¶
The Logging format template
- log_level c.KubeController.log_level = Enum('INFO')¶
Set the log level by value or name.
- parallelism c.KubeController.parallelism = Int(20)¶
Number of handlers to use for reconciling k8s objects.
- proxy_prefix c.KubeController.proxy_prefix = Unicode('')¶
The path prefix the HTTP/HTTPS proxy should serve under.
This prefix will be prepended to all routes registered with the proxy.
- proxy_tcp_entrypoint c.KubeController.proxy_tcp_entrypoint = Unicode('tcp')¶
The traefik entrypoint name to use when creating ingressroutetcps
- proxy_web_entrypoint c.KubeController.proxy_web_entrypoint = Unicode('web')¶
The traefik entrypoint name to use when creating ingressroutes
- proxy_web_middlewares c.KubeController.proxy_web_middlewares = List()¶
A list of middlewares to apply to web routes added to the proxy.
- show_config c.KubeController.show_config = Bool(False)¶
Instead of starting the Application, dump configuration to stdout
- show_config_json c.KubeController.show_config_json = Bool(False)¶
Instead of starting the Application, dump configuration to stdout (as JSON)
Job Queues¶
PBSClusterConfig¶
- class dask_gateway_server.backends.jobqueue.pbs.PBSClusterConfig(**kwargs: Any)¶
Dask cluster configuration options when running on PBS
- account c.PBSClusterConfig.account = Unicode('')¶
Accounting string associated with each job.
- adaptive_period c.PBSClusterConfig.adaptive_period = Float(3)¶
Time (in seconds) between adaptive scaling checks.
A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.
- cluster_max_cores c.PBSClusterConfig.cluster_max_cores = Float(None)¶
The maximum number of cores available to this cluster.
Set to
None
for no cores limit (default).
- cluster_max_memory c.PBSClusterConfig.cluster_max_memory = MemoryLimit(None)¶
The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
Set to
None
for no memory limit (default).
- cluster_max_workers c.PBSClusterConfig.cluster_max_workers = Int(0)¶
The maximum number of workers available to this cluster.
Note that this will be combined with
cluster_max_cores
andcluster_max_memory
at runtime to determine the actual maximum number of workers available to this cluster.
- environment c.PBSClusterConfig.environment = Dict()¶
Environment variables to set for both the worker and scheduler processes.
- idle_timeout c.PBSClusterConfig.idle_timeout = Float(0)¶
Time (in seconds) before an idle cluster is automatically shutdown.
Set to 0 (default) for no idle timeout.
- project c.PBSClusterConfig.project = Unicode('')¶
Project associated with each job.
- queue c.PBSClusterConfig.queue = Unicode('')¶
The queue to submit jobs to.
- scheduler_cmd c.PBSClusterConfig.scheduler_cmd = Command()¶
Shell command to start a dask scheduler.
- scheduler_cores c.PBSClusterConfig.scheduler_cores = Int(1)¶
Number of cpu-cores available for a dask scheduler.
- scheduler_memory c.PBSClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask scheduler. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- scheduler_resource_list c.PBSClusterConfig.scheduler_resource_list = Unicode('select=1:ncpus={cores}:mem={memory}')¶
The resource list to use for the scheduler.
This is a template, and receives the following fields:
cores
memory
- scheduler_setup c.PBSClusterConfig.scheduler_setup = Unicode('')¶
Script to run before dask scheduler starts.
- staging_directory c.PBSClusterConfig.staging_directory = Unicode('{home}/.dask-gateway/')¶
The staging directory for storing files before the job starts.
A subdirectory will be created for each new cluster which will store temporary files for that cluster. On cluster shutdown the subdirectory will be removed.
This field can be a template, which receives the following fields:
home (the user’s home directory)
username (the user’s name)
- use_stagein c.PBSClusterConfig.use_stagein = Bool(True)¶
If true, the staging directory created above will be copied into the job working directories at runtime using the
-Wstagein
directive.If the staging directory is on a networked filesystem, you can set this to False and rely on the networked filesystem for access.
- worker_cmd c.PBSClusterConfig.worker_cmd = Command()¶
Shell command to start a dask worker.
- worker_cores c.PBSClusterConfig.worker_cores = Int(1)¶
Number of cpu-cores available for a dask worker.
- worker_memory c.PBSClusterConfig.worker_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask worker. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_resource_list c.PBSClusterConfig.worker_resource_list = Unicode('select=1:ncpus={cores}:mem={memory}')¶
The resource list to use for the workers.
This is a template, and receives the following fields:
cores
memory
- worker_setup c.PBSClusterConfig.worker_setup = Unicode('')¶
Script to run before dask worker starts.
- worker_threads c.PBSClusterConfig.worker_threads = Int(0)¶
Number of threads available for a dask worker.
Defaults to
worker_cores
.
PBSBackend¶
- class dask_gateway_server.backends.jobqueue.pbs.PBSBackend(**kwargs: Any)¶
A backend for deploying Dask on a PBS cluster.
- api_url c.PBSBackend.api_url = Unicode('')¶
The address that internal components (e.g. dask clusters) will use when contacting the gateway.
Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.
- backoff_base_delay c.PBSBackend.backoff_base_delay = Float(0.1)¶
Base delay (in seconds) for backoff when retrying after failures.
If an operation fails, it is retried after a backoff computed as:
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.PBSBackend.backoff_max_delay = Float(300)¶
Max delay (in seconds) for backoff policy when retrying after failures.
- cancel_command c.PBSBackend.cancel_command = Unicode('')¶
The path to the job cancel command
- check_timeouts_period c.PBSBackend.check_timeouts_period = Float(0.0)¶
Time (in seconds) between timeout checks.
This shouldn’t be too small (to keep the overhead low), but should be smaller than
cluster_heartbeat_timeout
,cluster_start_timeout
, andworker_start_timeout
.
- cluster_config_class c.PBSBackend.cluster_config_class = Type('dask_gateway_server.backends.jobqueue.pbs.PBSClusterConfig')¶
The cluster config class to use
- cluster_heartbeat_period c.PBSBackend.cluster_heartbeat_period = Int(15)¶
Time (in seconds) between cluster heartbeats to the gateway.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- cluster_heartbeat_timeout c.PBSBackend.cluster_heartbeat_timeout = Float(0.0)¶
Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.
This should be greater than
cluster_heartbeat_period
. Defaults to2 * cluster_heartbeat_period
.
- cluster_options c.PBSBackend.cluster_options = Union()¶
User options for configuring an individual cluster.
Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:
- cluster_start_timeout c.PBSBackend.cluster_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask cluster.
- cluster_status_period c.PBSBackend.cluster_status_period = Float(30)¶
Time (in seconds) between cluster status checks.
A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- dask_gateway_jobqueue_launcher c.PBSBackend.dask_gateway_jobqueue_launcher = Unicode('')¶
The path to the dask-gateway-jobqueue-launcher executable
- db_cleanup_period c.PBSBackend.db_cleanup_period = Float(600)¶
Time (in seconds) between database cleanup tasks.
This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than
db_record_max_age
(probably by an order of magnitude).
- db_cluster_max_age c.PBSBackend.db_cluster_max_age = Float(86400)¶
Max time (in seconds) to keep around records of completed clusters.
Every
db_cleanup_period
, completed clusters older thandb_cluster_max_age
are removed from the database.
- db_debug c.PBSBackend.db_debug = Bool(False)¶
If True, all database operations will be logged
- db_encrypt_keys c.PBSBackend.db_encrypt_keys = List()¶
A list of keys to use to encrypt private data in the database. Can also be set by the environment variable
DASK_GATEWAY_ENCRYPT_KEYS
, where the value is a;
delimited string of encryption keys.Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:
$ openssl rand -base64 32
A single key is valid, multiple keys can be used to support key rotation.
- db_url c.PBSBackend.db_url = Unicode('sqlite:///:memory:')¶
The URL for the database. Default is in-memory only.
If not in-memory,
db_encrypt_keys
must also be set.
- gateway_hostname c.PBSBackend.gateway_hostname = Unicode('')¶
The hostname of the node running the gateway. Used for referencing the local host in PBS directives.
- parallelism c.PBSBackend.parallelism = Int(20)¶
Number of handlers to use for starting/stopping clusters.
- status_command c.PBSBackend.status_command = Unicode('')¶
The path to the job status command
- stop_clusters_on_shutdown c.PBSBackend.stop_clusters_on_shutdown = Bool(True)¶
Whether to stop active clusters on gateway shutdown.
If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.
- submit_command c.PBSBackend.submit_command = Unicode('')¶
The path to the job submit command
- worker_start_failure_limit c.PBSBackend.worker_start_failure_limit = Int(3)¶
A limit on the number of failed attempts to start a worker before the cluster is marked as failed.
Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.
- worker_start_timeout c.PBSBackend.worker_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask worker.
- worker_status_period c.PBSBackend.worker_status_period = Float(30)¶
Time (in seconds) between worker status checks.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
SlurmClusterConfig¶
- class dask_gateway_server.backends.jobqueue.slurm.SlurmClusterConfig(**kwargs: Any)¶
Dask cluster configuration options when running on SLURM
- account c.SlurmClusterConfig.account = Unicode('')¶
Account string associated with each job.
- adaptive_period c.SlurmClusterConfig.adaptive_period = Float(3)¶
Time (in seconds) between adaptive scaling checks.
A smaller period will decrease scale up/down latency when responding to cluster load changes, but may also result in higher load on the gateway server.
- cluster_max_cores c.SlurmClusterConfig.cluster_max_cores = Float(None)¶
The maximum number of cores available to this cluster.
Set to
None
for no cores limit (default).
- cluster_max_memory c.SlurmClusterConfig.cluster_max_memory = MemoryLimit(None)¶
The maximum amount of memory (in bytes) available to this cluster. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
Set to
None
for no memory limit (default).
- cluster_max_workers c.SlurmClusterConfig.cluster_max_workers = Int(0)¶
The maximum number of workers available to this cluster.
Note that this will be combined with
cluster_max_cores
andcluster_max_memory
at runtime to determine the actual maximum number of workers available to this cluster.
- environment c.SlurmClusterConfig.environment = Dict()¶
Environment variables to set for both the worker and scheduler processes.
- idle_timeout c.SlurmClusterConfig.idle_timeout = Float(0)¶
Time (in seconds) before an idle cluster is automatically shutdown.
Set to 0 (default) for no idle timeout.
- partition c.SlurmClusterConfig.partition = Unicode('')¶
The partition to submit jobs to.
- qos c.SlurmClusterConfig.qos = Unicode('')¶
QOS string associated with each job.
- scheduler_cmd c.SlurmClusterConfig.scheduler_cmd = Command()¶
Shell command to start a dask scheduler.
- scheduler_cores c.SlurmClusterConfig.scheduler_cores = Int(1)¶
Number of cpu-cores available for a dask scheduler.
- scheduler_memory c.SlurmClusterConfig.scheduler_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask scheduler. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- scheduler_setup c.SlurmClusterConfig.scheduler_setup = Unicode('')¶
Script to run before dask scheduler starts.
- staging_directory c.SlurmClusterConfig.staging_directory = Unicode('{home}/.dask-gateway/')¶
The staging directory for storing files before the job starts.
A subdirectory will be created for each new cluster which will store temporary files for that cluster. On cluster shutdown the subdirectory will be removed.
This field can be a template, which receives the following fields:
home (the user’s home directory)
username (the user’s name)
- worker_cmd c.SlurmClusterConfig.worker_cmd = Command()¶
Shell command to start a dask worker.
- worker_cores c.SlurmClusterConfig.worker_cores = Int(1)¶
Number of cpu-cores available for a dask worker.
- worker_memory c.SlurmClusterConfig.worker_memory = MemoryLimit('2 G')¶
Number of bytes available for a dask worker. Allows the following suffixes:
K -> Kibibytes
M -> Mebibytes
G -> Gibibytes
T -> Tebibytes
- worker_setup c.SlurmClusterConfig.worker_setup = Unicode('')¶
Script to run before dask worker starts.
- worker_threads c.SlurmClusterConfig.worker_threads = Int(0)¶
Number of threads available for a dask worker.
Defaults to
worker_cores
.
SlurmBackend¶
- class dask_gateway_server.backends.jobqueue.slurm.SlurmBackend(**kwargs: Any)¶
A backend for deploying Dask on a Slurm cluster.
- api_url c.SlurmBackend.api_url = Unicode('')¶
The address that internal components (e.g. dask clusters) will use when contacting the gateway.
Defaults to {proxy_address}/{prefix}/api, set manually if a different address should be used.
- backoff_base_delay c.SlurmBackend.backoff_base_delay = Float(0.1)¶
Base delay (in seconds) for backoff when retrying after failures.
If an operation fails, it is retried after a backoff computed as:
` min(backoff_max_delay, backoff_base_delay * 2 ** num_failures) `
- backoff_max_delay c.SlurmBackend.backoff_max_delay = Float(300)¶
Max delay (in seconds) for backoff policy when retrying after failures.
- cancel_command c.SlurmBackend.cancel_command = Unicode('')¶
The path to the job cancel command
- check_timeouts_period c.SlurmBackend.check_timeouts_period = Float(0.0)¶
Time (in seconds) between timeout checks.
This shouldn’t be too small (to keep the overhead low), but should be smaller than
cluster_heartbeat_timeout
,cluster_start_timeout
, andworker_start_timeout
.
- cluster_config_class c.SlurmBackend.cluster_config_class = Type('dask_gateway_server.backends.jobqueue.slurm.SlurmClusterConfig')¶
The cluster config class to use
- cluster_heartbeat_period c.SlurmBackend.cluster_heartbeat_period = Int(15)¶
Time (in seconds) between cluster heartbeats to the gateway.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- cluster_heartbeat_timeout c.SlurmBackend.cluster_heartbeat_timeout = Float(0.0)¶
Timeout (in seconds) before killing a dask cluster that’s failed to heartbeat.
This should be greater than
cluster_heartbeat_period
. Defaults to2 * cluster_heartbeat_period
.
- cluster_options c.SlurmBackend.cluster_options = Union()¶
User options for configuring an individual cluster.
Allows users to specify configuration overrides when creating a new cluster. See the documentation for more information:
- cluster_start_timeout c.SlurmBackend.cluster_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask cluster.
- cluster_status_period c.SlurmBackend.cluster_status_period = Float(30)¶
Time (in seconds) between cluster status checks.
A smaller period will detect failed clusters sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
- dask_gateway_jobqueue_launcher c.SlurmBackend.dask_gateway_jobqueue_launcher = Unicode('')¶
The path to the dask-gateway-jobqueue-launcher executable
- db_cleanup_period c.SlurmBackend.db_cleanup_period = Float(600)¶
Time (in seconds) between database cleanup tasks.
This sets how frequently old records are removed from the database. This shouldn’t be too small (to keep the overhead low), but should be smaller than
db_record_max_age
(probably by an order of magnitude).
- db_cluster_max_age c.SlurmBackend.db_cluster_max_age = Float(86400)¶
Max time (in seconds) to keep around records of completed clusters.
Every
db_cleanup_period
, completed clusters older thandb_cluster_max_age
are removed from the database.
- db_debug c.SlurmBackend.db_debug = Bool(False)¶
If True, all database operations will be logged
- db_encrypt_keys c.SlurmBackend.db_encrypt_keys = List()¶
A list of keys to use to encrypt private data in the database. Can also be set by the environment variable
DASK_GATEWAY_ENCRYPT_KEYS
, where the value is a;
delimited string of encryption keys.Each key should be a base64-encoded 32 byte value, and should be cryptographically random. Lacking other options, openssl can be used to generate a single key via:
$ openssl rand -base64 32
A single key is valid, multiple keys can be used to support key rotation.
- db_url c.SlurmBackend.db_url = Unicode('sqlite:///:memory:')¶
The URL for the database. Default is in-memory only.
If not in-memory,
db_encrypt_keys
must also be set.
- parallelism c.SlurmBackend.parallelism = Int(20)¶
Number of handlers to use for starting/stopping clusters.
- status_command c.SlurmBackend.status_command = Unicode('')¶
The path to the job status command
- stop_clusters_on_shutdown c.SlurmBackend.stop_clusters_on_shutdown = Bool(True)¶
Whether to stop active clusters on gateway shutdown.
If true, all active clusters will be stopped before shutting down the gateway. Set to False to leave active clusters running.
- submit_command c.SlurmBackend.submit_command = Unicode('')¶
The path to the job submit command
- worker_start_failure_limit c.SlurmBackend.worker_start_failure_limit = Int(3)¶
A limit on the number of failed attempts to start a worker before the cluster is marked as failed.
Every worker that fails to start (timeouts exempt) increments a counter. The counter is reset if a worker successfully starts. If the counter ever exceeds this limit, the cluster is marked as failed and is shutdown.
- worker_start_timeout c.SlurmBackend.worker_start_timeout = Float(60)¶
Timeout (in seconds) before giving up on a starting dask worker.
- worker_status_period c.SlurmBackend.worker_status_period = Float(30)¶
Time (in seconds) between worker status checks.
A smaller period will detect failed workers sooner, but will use more resources. A larger period will provide slower feedback in the presence of failures.
Proxy¶
Proxy¶
- class dask_gateway_server.proxy.Proxy(**kwargs: Any)¶
The dask-gateway-server proxy
- address c.Proxy.address = Unicode(':8000')¶
The address the HTTP/HTTPS proxy should listen at.
Should be of the form
{hostname}:{port}
.Where:
hostname
sets the hostname to listen at. Set to""
or"0.0.0.0"
to listen on all interfaces.port
sets the port to listen at.
- api_token c.Proxy.api_token = Unicode('')¶
The Proxy api token
A 32 byte hex-encoded random string. Commonly created with the
openssl
CLI:$ openssl rand -hex 32
Loaded from the DASK_GATEWAY_PROXY_TOKEN env variable by default.
- externally_managed c.Proxy.externally_managed = Bool(False)¶
Whether the proxy process is externally managed.
If False (default), the proxy process will be started and stopped by the gateway process. Set to True if the proxy will be started via some external manager (e.g. supervisord).
- gateway_url c.Proxy.gateway_url = Unicode('')¶
The base URL the proxy should use for connecting to the gateway server
- log_level c.Proxy.log_level = CaselessStrEnum('warn')¶
The proxy log-level.
- max_events c.Proxy.max_events = Int(100)¶
The maximum number of events (proxy changes) to retain.
A proxy server that lags by more than this number will have to do a full refesh.
- prefix c.Proxy.prefix = Unicode('')¶
The path prefix the HTTP/HTTPS proxy should serve under.
This prefix will be prepended to all routes registered with the proxy.
- proxy_status_period c.Proxy.proxy_status_period = Float(30)¶
Time (in seconds) between proxy status checks.
Only applies when
externally_managed
is False.
- tcp_address c.Proxy.tcp_address = Unicode('')¶
The address the TCP (scheduler) proxy should listen at.
Should be of the form
{hostname}:{port}
Where:
hostname
sets the hostname to listen at. Set to""
or"0.0.0.0"
to listen on all interfaces.port
sets the port to listen at.
If not specified, will default to address.
- tls_cert c.Proxy.tls_cert = Unicode('')¶
Path to TLS certificate file for the public url of the HTTP proxy.
When setting this, you should also set tls_key.
- tls_key c.Proxy.tls_key = Unicode('')¶
Path to TLS key file for the public url of the HTTP proxy.
When setting this, you should also set tls_cert.
Cluster Manager Options¶
- class dask_gateway_server.options.Options(*fields, handler=None)¶
A declarative specification of exposed cluster options.
- Parameters
*fields (Field) – Zero or more configurable fields.
handler (callable, optional) – A callable with the signature
handler(options)
orhandler(options, user)
, whereoptions
is the validated dict of user options, anduser
is aUser
model for that user. Should return a dict of configuration overrides to forward to the cluster manager. If not provided, the default will return the options unchanged.
Example
Here we expose options for users to configure
c.Backend.worker_cores
andc.Backend.worker_memory
. We set bounds on each resource to prevent users from requesting too large of a worker. The handler is used to convert the user specified memory from GiB to bytes (as expected byc.Backend.worker_memory
).from dask_gateway_server.options import Options, Integer, Float def options_handler(options): return { "worker_cores": options.worker_cores, "worker_memory": int(options.worker_memory * 2 ** 30) } c.Backend.DaskGateway.cluster_options = Options( Integer("worker_cores", default=1, min=1, max=4, label="Worker Cores"), Float("worker_memory", default=1, min=1, max=8, label="Worker Memory (GiB)"), handler=options_handler, )
- class dask_gateway_server.options.Integer(field, default=0, min=None, max=None, label=None, target=None)¶
An integer field, with optional bounds.
- Parameters
field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g.
"worker_cores"
).default (int, optional) – The default value. Default is 0.
min (int, optional) – The minimum valid value (inclusive). Unbounded if not set.
max (int, optional) – The maximum valid value (inclusive). Unbounded if not set.
label (str, optional) – A human readable label that will be used in GUI representations (e.g.
"Worker Cores"
). If not provided,field
will be used.target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided,
field
will be used.
- class dask_gateway_server.options.Float(field, default=0, min=None, max=None, label=None, target=None)¶
A float field, with optional bounds.
- Parameters
field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g.
"worker_cores"
).default (float, optional) – The default value. Default is 0.
min (float, optional) – The minimum valid value (inclusive). Unbounded if not set.
max (float, optional) – The maximum valid value (inclusive). Unbounded if not set.
label (str, optional) – A human readable label that will be used in GUI representations (e.g.
"Worker Cores"
). If not provided,field
will be used.target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided,
field
will be used.
- class dask_gateway_server.options.String(field, default='', label=None, target=None)¶
A string field.
- Parameters
field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g.
"worker_cores"
).default (str, optional) – The default value. Default is the empty string (
""
).label (str, optional) – A human readable label that will be used in GUI representations (e.g.
"Worker Cores"
). If not provided,field
will be used.target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided,
field
will be used.
- class dask_gateway_server.options.Bool(field, default=False, label=None, target=None)¶
A boolean field.
- Parameters
field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g.
"worker_cores"
).default (bool, optional) – The default value. Default is False.
label (str, optional) – A human readable label that will be used in GUI representations (e.g.
"Worker Cores"
). If not provided,field
will be used.target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided,
field
will be used.
- class dask_gateway_server.options.Select(field, options, default=None, label=None, target=None)¶
A select field, allowing users to select between a few choices.
- Parameters
field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g.
"worker_cores"
).options (list) – A list of valid options. Elements may be a tuple of
(key, value)
, or justkey
(in which case the value is the same as the key). Values may be any Python object, keys must be strings.default (str, optional) – The key for the default option. Defaults to the first listed option.
label (str, optional) – A human readable label that will be used in GUI representations (e.g.
"Worker Cores"
). If not provided,field
will be used.target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided,
field
will be used.
- class dask_gateway_server.options.Mapping(field, default=None, label=None, target=None)¶
A mapping field.
- Parameters
field (str) – The field name to use. Must be a valid Python variable name. This will be the keyword users use to set this field programmatically (e.g.
"worker_cores"
).default (dict, optional) – The default value. Default is an empty dict (
{}
).label (str, optional) – A human readable label that will be used in GUI representations (e.g.
"Worker Cores"
). If not provided,field
will be used.target (str, optional) – The target parameter to set in the processed options dict. Must be a valid Python variable name. If not provided,
field
will be used.
Models¶
User¶
- class dask_gateway_server.models.User(name, groups=(), admin=False)¶
A user record.
- Parameters
name (str) – The username
groups (sequence, optional) – A set of groups the user belongs to. Default is no groups.
admin (bool, optional) – Whether the user is an admin user. Default is False.
Cluster¶
- class dask_gateway_server.models.Cluster(name, username, token, options, config, status, scheduler_address='', dashboard_address='', api_address='', tls_cert=b'', tls_key=b'', start_time=None, stop_time=None)¶
A cluster record.
- Parameters
name (str) – The cluster name. An unique string that can be used to identify the cluster in the gateway.
username (str) – The username that started this cluster.
token (str) – The API token associated with this cluster. Used to authenticate the cluster with the gateway.
options (dict) – The normalized set of configuration options provided when starting this cluster. These values are user-facing, and don’t necessarily correspond with the
ClusterConfig
options on the backend.config (dict) – The serialized version of
ClusterConfig
for this cluster.status (ClusterStatus) – The status of the cluster.
scheduler_address (str) – The scheduler address. The empty string if the cluster is not running.
dashboard_address (str) – The dashboard address. The empty string if the cluster is not running, or no dashboard is running on the cluster.
api_address (str) – The cluster’s api address. The empty string if the cluster is not running.
tls_cert (bytes) – The TLS cert credentials associated with the cluster.
tls_key (bytes) – The TLS key credentials associated with the cluster.
start_time (int or None) – Start time in ms since the epoch, or None if not started.
stop_time (int or None) – Stop time in ms since the epoch, or None if not stopped.