top of page
Search
Writer's pictureJoseph

Distributed-Dask with PBS


Dask is a popular Python library designed for scalable computing with dynamic task scheduling. A key strength of Dask lies in its seamless compatibility with high-performance computing (HPC) workload managers such as PBS and Slurm. In this video by Matthew Rocklin, you'll gain insights into effectively harnessing the power of Dask in conjunction with PBS for your computational needs.



One main problem is that some HPC centres use custom PBS, which incorporates substantial modifications in job requesting procedures. Additionally, some HPC centres don't recommend conda or miniconda environments. This blog post aims to explain the process of employing Dask on a customised PBS environment while working within a Python virtual environment.


First, we create a Python virtual environment. The Python available in my machine is as follows

module avail python
-------------------------- /apps/Modules/modulefiles ---------------------------
python2-as-python  python3-as-python  python3/3.9.2   python3/3.11.0
python2/2.7.16     python3/3.7.4      python3/3.10.0
python2/2.7.17     python3/3.8.5      python3/3.10.4

I load the module python3/3.9.2

module load python3/3.9.2 

Then create a virtual environment named dask-python3.9-venv

python3 -m venv dask-python3.9-venv

We can then activate the environment and install required Python libraries

source dask-python3.9-venv/bin/activate
python3 -m pip install "dask[complete]"
python3 -m pip install ipython
python3 -m pip install dask-jobqueue
deactivate

With the essential Python environment in place, we are now ready to proceed with the implementation of the corresponding Python program.


Import all the required libraries:

import dask.distributed as dd
from dask.distributed import Client, LocalCluster, progress
from dask_jobqueue import PBSCluster
from distributed.utils import tmpfile
import os

We instruct Dask to utilize the Python installation within the Python virtual environment that we have set up

os.environ['DASK_PYTHON'] = 'daskpython3.9venv/bin/activate/bin/python3'

job_script_prologue gives commands to add to script before launching worker. This segment handles all necessary module imports and the activation of the Python virtual environment.

setup_commands = ["module load python3/3.9.2", "source dask-python3.9-venv/bin/activate"]

Failing to activate the Python virtual environment can result in discrepancies between the library versions of Dask workers, the Dask scheduler, and the Dask client. Such discrepancies are likely to lead to a host of unintended issues


In a custom PBS setup, various flags can carry distinct implications. For example, within my HPC cluster, the flag -P designates the project or account under use, -q is employed to indicate the job queue, and -l ncpus is utilized to stipulate the desired count of CPU cores. It's worth noting that these flags can differ across diverse customized PBS schedulers. All these options, along with their respective values, are encapsulated within an additional list termed job_extra_directives.

extra = ['-q normal',
         '-P abc123', 
         '-l ncpus=48', 
         '-l mem=192GB']

Now we can create the Dask cluster

cluster = PBSCluster(walltime="00:50:00", 
                     cores=48, 
                     memory="192GB", 
                     shebang='#!/usr/bin/env bash',
                     job_extra_directives=extra, 
                     local_directory='$TMPDIR', 
                     job_directives_skip=["select"], 
                     interface="ib0",
                     job_script_prologue=setup_commands,
                     python=os.environ["DASK_PYTHON"])

Here job_directives_skip gives the list qsub options we will be skipping. interface gives the network interface I will be using, in this case I am using infiniband network.

We can check the PBS script generated to launch the workers of this Dask cluster


The list job_directives_skip outlines the qsub options that will be omitted. The interface parameter specifies the network interface to be utilized, with the current configuration employing the InfiniBand network. We can noew examine the PBS script that's generated for this cluster

print(cluster.job_script())

#!/usr/bin/env bash

#PBS -N dask-worker
#PBS -l walltime=00:50:00
#PBS -q normal
#PBS -P vp91
#PBS -l ncpus=48
#PBS -l mem=192GB
module load python3/3.9.2
source dask-python3.9-venv/bin/activate/bin/activate
dask-python3.9-venv/bin/activate/bin/python3 -m distributed.cli.dask_worker tcp://10.6.24.29:41347 --nthreads 6 --nworkers 8 --memory-limit 22.35GiB --name dummy-name --nanny --death-timeout 60 --local-directory $TMPDIR --interface ib0

We can then launch the cluster workers using

cluster.scale(jobs=2)

where jobs gives the number nodes requested for this Dask cluster.


Now we can check if the nodes are allocated as expected, by checking the status of PBS job in another terminal.

Job id                 Name             User              Time Use S Queue
---------------------  ---------------- ----------------  -------- - -----
93910490.cluster-pbs      sys-dashboard-s* username            00:06:02 R normal-exec     
93912994.cluster-pbs      dask-worker      username            00:00:00 R normal-exec     
93912995.cluster-pbs      dask-worker      username            00:00:13 R normal-exec

In this case I can see that PBS is running a two nodes cluster. Now we can launch the Dask. clients for the Dask cluster.

client = Client(cluster)

This blog post shows how we can use PBS and a python virtual environment to launch a distributed Dask cluster.







42 views0 comments

Recent Posts

See All

Comments


bottom of page