Distributed-Dask with PBS
Dask is a popular Python library designed for scalable computing with dynamic task scheduling. A key strength of Dask lies in its seamless compatibility with high-performance computing (HPC) workload managers such as PBS and Slurm. In this video by Matthew Rocklin, you'll gain insights into effectively harnessing the power of Dask in conjunction with PBS for your computational needs.
One main problem is that some HPC centres use custom PBS, which incorporates substantial modifications in job requesting procedures. Additionally, some HPC centres don't recommend conda or miniconda environments. This blog post aims to explain the process of employing Dask on a customised PBS environment while working within a Python virtual environment.
First, we create a Python virtual environment. The Python available in my machine is as follows
module avail python -------------------------- /apps/Modules/modulefiles --------------------------- python2-as-python python3-as-python python3/3.9.2 python3/3.11.0 python2/2.7.16 python3/3.7.4 python3/3.10.0 python2/2.7.17 python3/3.8.5 python3/3.10.4
I load the module python3/3.9.2
module load python3/3.9.2
Then create a virtual environment named dask-python3.9-venv
python3 -m venv dask-python3.9-venv
We can then activate the environment and install required Python libraries
python3 -m pip install "dask[complete]"
python3 -m pip install ipython
python3 -m pip install dask-jobqueue
With the essential Python environment in place, we are now ready to proceed with the implementation of the corresponding Python program.
Import all the required libraries:
import dask.distributed as dd from dask.distributed import Client, LocalCluster, progress from dask_jobqueue import PBSCluster from distributed.utils import tmpfile import os
We instruct Dask to utilize the Python installation within the Python virtual environment that we have set up
os.environ['DASK_PYTHON'] = 'daskpython3.9venv/bin/activate/bin/python3'
job_script_prologue gives commands to add to script before launching worker. This segment handles all necessary module imports and the activation of the Python virtual environment.
setup_commands = ["module load python3/3.9.2", "source dask-python3.9-venv/bin/activate"]
Failing to activate the Python virtual environment can result in discrepancies between the library versions of Dask workers, the Dask scheduler, and the Dask client. Such discrepancies are likely to lead to a host of unintended issues
In a custom PBS setup, various flags can carry distinct implications. For example, within my HPC cluster, the flag -P designates the project or account under use, -q is employed to indicate the job queue, and -l ncpus is utilized to stipulate the desired count of CPU cores. It's worth noting that these flags can differ across diverse customized PBS schedulers. All these options, along with their respective values, are encapsulated within an additional list termed job_extra_directives.
extra = ['-q normal', '-P abc123', '-l ncpus=48', '-l mem=192GB']
Now we can create the Dask cluster
cluster = PBSCluster(walltime="00:50:00", cores=48, memory="192GB", shebang='#!/usr/bin/env bash', job_extra_directives=extra, local_directory='$TMPDIR', job_directives_skip=["select"], interface="ib0", job_script_prologue=setup_commands, python=os.environ["DASK_PYTHON"])
Here job_directives_skip gives the list qsub options we will be skipping. interface gives the network interface I will be using, in this case I am using infiniband network.
We can check the PBS script generated to launch the workers of this Dask cluster
The list job_directives_skip outlines the qsub options that will be omitted. The interface parameter specifies the network interface to be utilized, with the current configuration employing the InfiniBand network. We can noew examine the PBS script that's generated for this cluster
#!/usr/bin/env bash #PBS -N dask-worker #PBS -l walltime=00:50:00 #PBS -q normal #PBS -P vp91 #PBS -l ncpus=48 #PBS -l mem=192GB module load python3/3.9.2 source dask-python3.9-venv/bin/activate/bin/activate dask-python3.9-venv/bin/activate/bin/python3 -m distributed.cli.dask_worker tcp://10.6.24.29:41347 --nthreads 6 --nworkers 8 --memory-limit 22.35GiB --name dummy-name --nanny --death-timeout 60 --local-directory $TMPDIR --interface ib0
We can then launch the cluster workers using
where jobs gives the number nodes requested for this Dask cluster.
Now we can check if the nodes are allocated as expected, by checking the status of PBS job in another terminal.
Job id Name User Time Use S Queue --------------------- ---------------- ---------------- -------- - ----- 93910490.cluster-pbs sys-dashboard-s* username 00:06:02 R normal-exec 93912994.cluster-pbs dask-worker username 00:00:00 R normal-exec 93912995.cluster-pbs dask-worker username 00:00:13 R normal-exec
In this case I can see that PBS is running a two nodes cluster. Now we can launch the Dask. clients for the Dask cluster.
client = Client(cluster)
This blog post shows how we can use PBS and a python virtual environment to launch a distributed Dask cluster.