Introduction

Databricks is an analytics eco-system now available on most major cloud providers Google, AWS, and Azure. Databrick cluster computations use the Spark engine and Python (PySpark) highly popular for analytics. The Python interpreter mostly runs on the driver node to collect result, while the worker nodes execute JVM jar files. New developments support computations on a driver without workers, but why?

Single-node Databricks clusters

The option of running single-node clusters was introduced in October 2020 and support all Databricks runtimes. Databricks Machine Learning Runtimes are well curated and run out of the box. You can choose GPU or standard CPU runtimes and deploy them on various VM sizes. The announcement by Databricks, explains their motivation:

Standard Databricks Spark clusters consist of a driver node and one or more worker nodes. These clusters require a minimum of two nodes — a driver and a worker — in order to run Spark SQL queries, read from a Delta table, or perform other Spark operations. However, for many machine learning model training or lightweight data workloads, multi-node clusters are unnecessary.

Pandas DataFrames

Spark commands are run in local mode on the driver. Single-node clusters can still use Spark data sources and related cloud authentication/authorization support, such as ADLS authentication pass-through on Azure. Apart from data extraction and loading, Spark functionality is not so useful on single-node clusters. For training a machine learning model we often convert from Spark to Pandas DataFrames. Since Spark 2.3-2.4 conversions to and from Pandas have been sped up with arrow. Sometimes we even convert parquet (Data Lake) to Pandas immediately on Databricks, see below.

to_pandas.png

An Azure use case

Let us talk about using single-node clusters in computation pipelines. We want to train multiple DeepAR models, each on a different timeseries in our dataset. This is embarrassingly parallel as each model can be trained on a single CPU VM with one timeseries in memory. Each training job is equal to running one Databricks notebook, that imports a specific timeseries and a Python package with unit tested functionality. We have hundreds of timeseries and would want to spin up tens of single-nodes to perform our training jobs and register our models. We use Azure Data Factory to schedule our Databricks notebooks and launch clusters (see Figure below).

architecture.png

Parallelism with Azure Data Factory

DataFactory pipelines can run Databricks notebooks in parallel and wait for them to complete before moving on to the next activity of the pipeline. The ForEach operator starts a notebook for element in a sequence (for instance data lake parquet path). It can run in sequence or in parallel with a batch size. See ‘isSequential’ and ‘batchCount’ in the below Data Factory json export. We launch 16 single-nodes running the same Databricks notebook activity processing our timeseries. Data Factory manages these cluster resources through a Databricks linked service.

df_json.png

Databricks instance pool

To run a Databricks notebook from DataFactory we need to create a linked service in DataFactory. There are three ways to configure the linked service to select or create a Databricks cluster when a notebook activity is launched.

  1. Interactive cluster is a pre-existing running cluster which we can select for our notebook activities.
  2. Job clusters are created for each notebook activity which is launched. The job cluster exists for the duration of the notebook activity. This can lead to long waiting times for cluster creation and termination (tens of minutes for both).
  3. Databricks instance pools is a configurable number of ready VM instances waiting until they are included into clusters. The delay between idle and running is a few minutes. Supported since late 2019 instance pools allow fast turnaround time between jobs.

We create an instance pool in Databricks with f.i. 16 CPU intensive VMs of our choice. When we create our Databricks linked service in DataFactory we select this instance pool and provide an access token of the Databricks workspace. When a notebook activity finishes its cluster VM instances are released back to the pool. The ForEach operator can parallelize up to 50 activities, which we limit depending on the size of our instances pool. The DF pipeline will fail if our resource request is larger than the Databricks instance pool size. If we want to parallelize 20 notebook activities on single-nodes we need a pool of 20 instances. In this case, Data Factory does not do smart resource management.

To be continued

Follow-up blogs about our CICD workflow for Databricks notebooks and Data Factory pipeline using the Python SDK will dive further into this use case. In the next section, we wonder what use multi-node Databricks clusters are if we do not use Spark for model training.

Distributed Deep Learning

We have seen the value of single-node Databricks clusters for modern machine learning. We run Databricks notebooks on single-nodes with adequate memory and GPU/CPU resources. Let us backtrack and consider distributed computing. Are multi-node Databricks clusters only useful for Spark computations? Can we piggyback a different distributed computing framework on top of the multi-node Databricks clusters?

Databricks Horovod runner

Horovod (Uber) is a framework for distributed deep learning using MPI and NCCL and supports TensorFlow, Keras, PyTorch, and Apache MXNet. Spark-Deep-Learning by Databricks supports Horovod on Databricks clusters with the Machine Learning Runtime. It provides a HorovodRunner that runs a Python Deep Learning process on multiple workers within a Spark task. A Spark task runs within an existing SparkContext on the worker (spark executor) and involves a Python runtime on the worker node, which communicates with a coexisting JVM.

Using distributed data

Typically, a distributed Horovod optimizer uses gradient descent computations communicated between nodes with MPI. Data extraction and preprocessing is done within the Python interpreter on the workers. To persist training results; a checkpoint directory on DBFS is used to store model checkpoints by one of the workers, see concise code below.

horovod.png

When writing Python code for the Horovod runner we cannot use Spark SQL functionality to query our data from our HDFS-like filesystem. Only the Spark driver node can coordinate this functionality and collect its results. However, there are options to read from HDFS-like data lake storage.

Petastorm (also open-sourced by Uber) fits our needs. This library enables the use of data lake stored Parquet files during single-node and distributed training of deep learning models. It can also use in-memory Spark DataFrames. Petastorm converts a materialized Spark DataFrame into a dataloader specific to PyTorch and Tensorflow. Preprocessing transformations can be added to the Petastorm converter as a TransformSpec, which acts on Pandas DataFrames.

We can use Petastorm for distributed deep learning if we configure the ‘shard_count’ in the Petastorm converter to the number of Horovod runners and let Horovod select the appropriate shard based on the rank of the worker. We are careful to partition the underlying parquet file to the number of Horovod workers as well. This is great! Distributed data on HDFS-like filesystems can be enabled for distributed training of Deep Learning models.

Future synergy through streaming

We have discussed the integration of Horovod (Deep Learning) with the Databricks spark cluster and Petastorm for data loading. How could the Spark and Horovod integration be improved? We would want easier data transfer between the worker JVM and the Python interpreter on each worker node. Arrow is just the technology and already supported. One solution direction could be Spark structured streaming.

Structured Streaming abstractions are based on a continuous DataFrame which grows as events come in from streaming sources (f.i. Kafka). Operations are performed in micro-batches or continuously with three output formats (Append, Update, Complete). The PySpark API is trailing behind the Java/Scala API and does not support arbitrary operations with or without state. Performance limiations are part of the reasons why the PySpark structured streaming API does not support using arbitrary operations and focuses on Spark SQL aggregations.

Wishlist: arbitrary PySpark streaming operations

During training our DL model represents state that we wish to update with each training step. The Python UDF passed to the HorovodRunner is a stateful operation on a batch of data. For instance, training a DNN on each image in a data set. We could stream each image from HDFS to the Python process with Spark Structured Streaming just in time for training. The non-functional code below gives us some idea how it would look.

pseudo_stream.png

Wrap up

This blog revolved around non-Spark centric uses of Databricks clusters. We moved from running Deep Learning training in parallel on single-node clusters to running distributed training on multi-node clusters. Our focus was to describe using distributed data for our model training and suggest some ways to close the gap between data- and compute intensive usages of Spark/Databricks clusters. In follow-up blogs we will discuss in more detail how we approach CICD with Databricks notebooks and Data Factory pipelines.