Apache Spark - Best Practices and Tuning
  • Introduction
  • RDD
    • Don’t collect large RDDs
    • Don't use count() when you don't need to return the exact number of rows
    • Avoiding Shuffle "Less stage, run faster"
    • Picking the Right Operators
      • Avoid List of Iterators
      • Avoid groupByKey when performing a group of multiple items by key
      • Avoid groupByKey when performing an associative reductive operation
      • Avoid reduceByKey when the input and output value types are different
      • Avoid the flatMap-join-groupBy pattern
      • Use TreeReduce/TreeAggregate instead of Reduce/Aggregate
      • Hash-partition before transformation over pair RDD
      • Use coalesce to repartition in decrease number of partition
    • TreeReduce and TreeAggregate Demystified
    • When to use Broadcast variable
    • Joining a large and a small RDD
    • Joining a large and a medium size RDD
  • Dataframe
    • Joining a large and a small Dataset
    • Joining a large and a medium size Dataset
  • Storage
    • Use the Best Data Format
    • Cache Judiciously and use Checkpointing
  • Parallelism
    • Use the right level of parallelism
    • How to estimate the size of a Dataset
    • How to estimate the number of partitions, executor's and driver's params (YARN Cluster Mode)
  • Serialization and GC
    • Tuning Java Garbage Collection
    • Serialization
  • References
    • References
Powered by GitBook
On this page

Was this helpful?

  1. Parallelism

Use the right level of parallelism

PreviousCache Judiciously and use CheckpointingNextHow to estimate the size of a Dataset

Last updated 2 years ago

Was this helpful?

Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles. When you use a DataFrame or a Dataset, it creates several partitions equal to spark.sql.shuffle.partitions parameter with a default value of 200. Most of the time works well, but when the dataset is starting to be massive, you can do better using the repartition function to balance the dataset across the workers.

The below picture I made inspired by [25] shows how to use the Spark UI to answer to the following questions:

· Any outlier in task execution?

· Skew in data size, compute time?

· Too many/few tasks (partitions)?

· Load balanced?

You can tune the number of partitions, asking those questions. The above example is a best-case scenario where all the tasks are balanced, and there isn’t skew in data size. In our specific use case, where we are dealing with billions of rows, we have found that partitions in the range of 10k work most efficiently. Suppose we have to apply an aggregate function like a groupBy or a Window function on a dataset containing a time-series of billions of rows, and the column id is a unique identifier, and the column timestamp is the time. In order to well-balance the data, you can repartition properly the DataFrame/Dataset as follows:

val repartitionedDf = originalSizeDf
.repartition(10000, 
             Seq(col("id"), 
                 col("timestamp")): _*)

This repartition basically will balance the dataset and the load on the workers speeding up the pipeline. Another rule of thumb [26] is that tasks should take at most 100 ms to execute. You can ensure that this is the case by monitoring the task duration from the Spark UI. If your tasks take considerably longer than that keep increasing the level of parallelism, by say 1.5, until performance stops improving.