GPU Accelerated Spark 3.0 for Data and ML Engineering

Kunal Kumar
11 min readJan 25, 2021

If AI/ML is represented as neurons, then data are signal to these neurons which helps neuron to learn. There is reciprocal relationship between data and ML where ML success depends on data it’s being trained on. Any action in this world can be collected as data of any volume and variety when it is cleaned, processed and fed, models flourish. Marrying of big data and Machine Learning creates new requirement around infrastructure where its being processed.

Spark has been de facto for big data processing where it leverages CPU and memory on cluster of machines whereas GPU has been in the use for deep learning use cases. This architecture requires CPU and GPU to communicate on the same host and across network which is not efficient.

The above pic demonstrate the transition from a traditional design to optimal design. In a tradition design, ETL and training of a machine learning models happen on CPU. A typical example would be where data processing and ML training happen with Spark, python, R and etc. on CPU. In accelerated design ETL continues to happen on CPU but training may require happening on GPU, especially when we have deep learning use cases. In this case GPU across network communicate through CPU over PCIe bus. Finally, in optimal design end to end data and ML pipeline executes on GPU. In case when large amount data need to be aggregated or shuffled across network, GPUs communicate through GPUDirect RDMA which is much faster. This leads us to the need where Spark and other distributed programming paradigm can leverage GPUs’ massively parallel architecture.

Spark on GPU:

Apache Spark is lightning fast unified analytics engine for big data and machine learning. Spark distribute the processing across multiple worker nodes where tasks run in parallel by leveraging cores on CPUs. Spark achieves parallelism by running multiple tasks concurrently. A CPU consist of a few cores, some of the compute intensive AWS instance has 124 vCPUs whereas a GPU has a massively parallel architecture consisting of thousands of smaller and efficient cores.

Given the parallel nature of Spark processing, it’s very apparent that it can leverage GPU to boost its performance.

NVIDIA has created RAPIDs accelerator for Spark 3.0 that accelerate ETL pipeline by leveraging GPU infrastructure. The RAPIDS Accelerator combines the power of the RAPIDS cuDF library and the scale of the Spark distributed computing framework.

Achieving Parallelism with Spark on GPU:

Concurrent tasks:

Spark runs multiple task depending on number of partitions of input data. So, when Spark runs on GPU with RAPID, each task offload the work to CUDA stream on GPU. If each task on host machine uses a single default stream then CUDA operation will be serialized, but with RAPID accelerator each task has its own stream where CUDA operations run in parallel. Memory management for each CUDA stream is also taken care by RAPID.

Spark on GPU also leverages inherent parallelism in the data for each task. When data passed to GPU enabled operator it is converted into columnar format based on Apache arrow data structure. So, each task schedules the GPU thread on each row of data within the columnar batch. So multiple rows will be processed at the same time. GPU parallelism is more about the size of the data from the task than the number of tasks itself. Although we can schedule more than one task in GPU in some cases where one task on GPU can’t utilize the entire processors.

Executors and GPU mapping:

Currently we require 1:1 mapping between executors and GPUS. So, this can cause problem where we have many cores per GPU on the instance. Recommendation would be to have either multiple GPUs and same number of executors on that instance so that we can have cores distributed across executors or to have small instance with few numbers of cores with 1 GPU.

Since the RAPIDS Accelerator plugin only supports a one-to-one mapping between GPUs and executors. In our case there is 1 GPU per machine so we will only have one executor per node. Each executor can run multiple tasks at the same time depending on number core available.

spark.executor.resource.gpu.amount=1, this configuration will make sure that only one gpu assigned to one executor. spark.task.resource.gpu.amount= .1 that means the natural parallelism of the tasks relative to the one GPU on an executor is 10. Below is snapshot from executor tab on SpakUI, we have one GPU available per node

However, each executor only has 8 cores, so that would be the limiting factor for the parallelism. So, we can expect 8 tasks running in parallel on the executor instead of 9. Executor tabs shows 9 task which seems to be incorrect and may be related to this jira issue https://issues.apache.org/jira/browse/SPARK-30831

Below snapshot is from the run when we have the spark.task.resource.gpu.amount= 1. So, in this case only one task running in GPU within one executor.

We can also control number of parallel tasks active within an executor which is simultaneously using GPU. For example this configuration spark.rapids.sql.concurrentGpuTasks=2, will make sure that only 2 tasks run concurrently in GPU within an executor. This helps to prevent out of memory exception. For an instance, if the executor has 32 cores (i.e.: natural task parallelism of 32) and only 1 GPU, this setting prevents all 32 active tasks on the executor from piling on the GPU at the same time and likely exhausting GPU memory. We may be wondering why we need a separate setting rather than telling users to limit the executor parallelism by giving it fewer cores when using a GPU. The main reason is that not all of a Spark query runs directly on the GPU. When using the Spark built-in shuffle, the data is compressed and transferred by the CPU and therefore the task isn’t using the GPU during that phase. Similarly, there may be parts of the query that the RAPIDS Accelerator was not able to run on the GPU (as is the case in your test query). Therefore, we want to run with the same natural parallelism as the CPU query would for the CPU-only parts, otherwise we will always be slower in a comparison over those periods because the CPU query simply has more cores to throw at the same CPU-only code than the GPU setup.

Finding from the experiment of Spark workload on CPU & GPU:

Case 1:

Cluster configuration and run time comparison:

Cluster configuration and performance comparison

We found Spark3.0 on GPU with RAPID plugin took more time to process and overall cost was also high with GPU. Let’s see what all could be the reasons for this.

Spark before executing any query firs create a logical plan then passes this to Catalyst to create a physical plan. Physical Plan provides the fundamental information about Spark query, so we can use the information provided by physical plan to compare the optimization between our CPU and GPU version of the workload.

Spark Physical Plan on GPU(Left) and CPU(Right)

In GPU version of Spark3.0 (Left side of the above diagram), we can see after reading from Snowflake database, underneath data-structure is converted into Columnar format. This is because it needs data in columnar format for the fast processing by GPU enabled operator whereas in CPU version this will remain in row format of RDD. This needs extra processing time by GPU version but the expectation here is that the gain we have by processing columnar data in GPU should be much higher than the cost we pay for converting it from row to columnar format. At the same time if there is an operation which is not yet supported by GPU enabled operator, then it will be processed through normal Spark CPU operator which is an extra overhead we have to pay until Rapid plugin which enables Spark3.0 to use GPUs, supports maximum operations in GPU. This is what exactly we have found with our experiment as well.

Operations not supported on GPU:

We can see there is transformation happening from columnar to row before window analytical function is executed. So is this operation not being processed on CPU, how we would know that what being processed on CPU vs GPU?! In order to have insight where the processing is happening, we can enable spark.rapids.sql.explain spark configuration to NOT_ON_GPU. For our execution this is what we have found in our logs

!NOT_FOUND <RunningWindowFunctionExec> cannot run on GPU because no GPU enabled version of operator class com.databricks.sql.execution.window.RunningWindowFunctionExec could be found

!Exec <FilterExec> cannot run on GPU because not all expressions can be replaced

!NOT_FOUND <EphemeralSubstring> ephemeralsubstring(REGISTRATION_DTTM#1004, 1, 10) cannot run on GPU because no GPU enabled version of expression class com.databricks.sql.optimizer.EphemeralSubstring could be found

!NOT_FOUND <EphemeralSubstring> ephemeralsubstring(REGISTRATION_DTTM#1004, 1, 10) cannot run on GPU because no GPU enabled version of expression class com.databricks.sql.optimizer.EphemeralSubstring could be found

!NOT_FOUND <RowDataSourceScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RowDataSourceScanExec could be found

!NOT_FOUND <RowDataSourceScanExec> cannot run on GPU because no GPU enabled version of operator class org.apache.spark.sql.execution.RowDataSourceScanExec could be found

We are running our expriments on Databirkcs environment, com.databricks.sql.execution.window.RunningWindowFunctionExe or any com.databricks.sql.. classes are specific to Databricks, and Rapid Plugin dont have sufficient insight into their operations to replace them safely. That’s why both the window and filter operations could not be placed on the GPU. This is one of the expensive operation if supported on GPU can help in overall run time.

Greedy instead of Cost based optimizer:

Right after columnar to row transformation for window analytical function we can we see another transformation from row to columnar in physical plan.

- GpuColumnarExchange gpuhashpartitioning(upmid#7678, 200), [id=#2404]

+- GpuProject [upmid#7678, registration_siteid#7727, user_received_dttm#7726]

+- GpuRowToColumnar TargetSize(2147483647)

+- *(1) Filter ((ephemeralsubstring(REGISTRATION_DTTM#7728, 1, 10) >= 2021–01–01) AND (ephemeralsubstring(REGISTRATION_DTTM#7728, 1, 10) <= 2021–01–11))

So from this row to columnar transformation we are not gaining much. As we can see we have filter operation, then selecting some rows indicated by GpuProject and right after that data have been pulled off via shuffle exchange.It would have been more efficient to leave the data in a row format and shuffle that (as normal for CPU processing) then convert it, if necessary, in the subsequent stage. Converting row to columnar format pushes data to GPU memory. This unwanted overhead of tranforaming row to columnar or vice-versa makes the GPU’s version much slower than its CPU version. Currently the plugin is “greedy” when it replaces operators in the physical plan. If it can replace it, it always does. It should be good if we have cost-based optimizer to make the plugin smarter about replacing portions of the plan with GPU operations so it doesn’t do silly things like moving the data to the GPU only for a no-op project operation.

The other observation we can make from the physical plan that the plugin has replaced SortMergeJoin with ShufflehashJoin in GPU because it is faster and GPU has enough memory to process it. Our spark version in CPU can also be tweaked to make this join happen with ShufflehashJoin.

WholeStageCodegen in GPU:

The other difference we notice that in GPU version we have more opertors whereas in CPU version, operators are fused to fewer blocks. This block is Whole-Stage Java Code Generation block (aka Whole-Stage CodeGen), a physical query optimization in Spark SQL that fuses multiple physical operators (as a subtree of plans that support code generation) together into a single Java function.Whole-stage codegen is designed to avoid the overhead of invoking virtual iterators, row-by-row, millions or billions of times over the course of a query by generating the java code on fly. The RAPID plugins does not support codegen for most of its operations because Spark on GPU does not process the data in rows rather it does in columnar batch fashion. For example, if input data to spark query has million rows, spark will invoke an iterator for million times on CPU version but on GPU with the plugin, it processes the data in columnar fashion, so each invocation for iterator will return batch of rows than a single row. Because of this, in GPU version invocation of iterator happens many time lesser than it’s CPU counterparts. Since the RAPID accelerators does not support Codegen for most of its operator, so the operators are not fused to fewer Codegen block but as we see this is not considered as the disadvantage for GPU version.

Case 2:

In this case I have data reading from a hive table where underlying data is in Parquet format and resides in S3. I was limiting the data to fewer rows, so on databricks with normal Spark the CPU plan is running one task and only loading a small batch of the input data whereas the GPU plan is running the entire first stage, loading all of the data and causing ~2k tasks to be generated which was taking forever to finish. Databricks may have some optimizations for limit processing that are not able to be applied after the plugin modifies the physical plan.

Case 3:

With the same configuration on the databricks, I tried running the sample application with data in dbfs with CSV format. For this too, GPU version of spark could not beat its CPU version.

For all the above cases, we did not see much gain from running Spark3.0 on GPU, as Databricks do some optimization on the functions which RAPID plugin may not have insight into, so it can’t improve it.

Conclusion:

Jobs which are mostly I/O bound won’t see much benefit from using GPUs unless it has faster access to data (like GPU Direct Storage). CUPs will be faster in this case it will have direct access to data. Job which is memory bound can also be benefited by GPU as it’s memory bandwidth is faster than CPU. Finally, the queries which are CPU intensive are ideal candidate for leveraging from GPUs. Though we don’t not see immedaiate adoption of Spark3.0 on GPU on our enterprise platform but in the longer run, Data Engineering and Data Science can be beniftted from this.

References :

--

--