Apache Spark - Best Practices and Tuning
  • Introduction
  • RDD
    • Don’t collect large RDDs
    • Don't use count() when you don't need to return the exact number of rows
    • Avoiding Shuffle "Less stage, run faster"
    • Picking the Right Operators
      • Avoid List of Iterators
      • Avoid groupByKey when performing a group of multiple items by key
      • Avoid groupByKey when performing an associative reductive operation
      • Avoid reduceByKey when the input and output value types are different
      • Avoid the flatMap-join-groupBy pattern
      • Use TreeReduce/TreeAggregate instead of Reduce/Aggregate
      • Hash-partition before transformation over pair RDD
      • Use coalesce to repartition in decrease number of partition
    • TreeReduce and TreeAggregate Demystified
    • When to use Broadcast variable
    • Joining a large and a small RDD
    • Joining a large and a medium size RDD
  • Dataframe
    • Joining a large and a small Dataset
    • Joining a large and a medium size Dataset
  • Storage
    • Use the Best Data Format
    • Cache Judiciously and use Checkpointing
  • Parallelism
    • Use the right level of parallelism
    • How to estimate the size of a Dataset
    • How to estimate the number of partitions, executor's and driver's params (YARN Cluster Mode)
  • Serialization and GC
    • Tuning Java Garbage Collection
    • Serialization
  • References
    • References
Powered by GitBook
On this page

Was this helpful?

  1. Parallelism

How to estimate the number of partitions, executor's and driver's params (YARN Cluster Mode)

PreviousHow to estimate the size of a DatasetNextTuning Java Garbage Collection

Last updated 2 years ago

Was this helpful?

yarn.nodemanager.resource.memory-mb = ((Node's Ram GB - 2 GB) * 1024) MB
Total Number Of Node's Core = yarn.nodemanager.resource.cpu-vcores

- Executor's params (Worker Node):

  • Executor (VM) x Node = ((total number of Node's core) / 5) - 1

    • 5 is the upper bound for cores per executor because more than 5 cores per executor can degrade HDFS I/O throughput.

    • If the total number of Node's core is less than or equal to 8 we divide It by 2.

    • If the total number of Node's core is equal to 1 the Executor x Node is equal to 1.

  • numExecutors (Number of executorns to launch for this session) = number of Nodes * Executor (VM) x Node

    • The Driver is included in executors.

  • executorCores (Number of cores to use for each executor) = (total number of Node's core - 5 ) / Executor x Node

  • executorMemory (Amount of memory to use per executor process) = (yarn.nodemanager.resource.memory-mb - 1024) / (Executor (VM) x Node + 1)

For the executorMemory We have to take a further reasoning. If you review the BlockManager source code:

You will note that the memory allocation is based on the algorithm:

Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction.

where memoryFraction = spark.storage.memoryFraction and safetyFraction = spark.storage.safetyFraction

The default values of spark.storage.memoryFraction and spark.storage.safetyFraction are respectively 0.6 and 0.9 so the real executorMemory is:

executorMemory = ((yarn.nodemanager.resource.memory-mb - 1024) / (Executor (VM) x Node + 1)) * memoryFraction * safetyFraction.

- Driver's params (Application Master Node):

  • driverCores = executorCores

  • driverMemory = executorMemory

Example

I have 3 Worker nodes and one Application Master Node each with 16 vCPUs, 52 GB memory

yarn.nodemanager.resource.memory-mb = (52 - 2) * 1024 = 51200 MB

yarn.scheduler.maximum-allocation-mb = 20830 MB (Must be greater than executorMemory)

- Executor's params (Worker Node):

  • Executor x Node = (16) / 5 = 2

  • numExecutors = 2 * 4 = 8

  • executorCores = (16 - 5) / 2 = 5

  • executorMemory = ((51200 - 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

- Driver's params (Application Master Node):

  • driverCores = 5

  • driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

See in the diagram how params are estimated:

I have to process a dataset that has 10.000.000 of rows and 100 double variables.

number of megabytes  =  M  =  10.000.000*100*8/1024^2 =  5.722 megabytes

Partition = 5.722/64 = 89

As in the previous example, I have 3 Worker nodes and one Application Master Node each with 16 vCPUs, 52 GB memory

yarn.nodemanager.resource.memory-mb = (52 - 2) * 1024 = 51200 MB

yarn.scheduler.maximum-allocation-mb = 20830 MB (Must be greater than executorMemory)

- Executor's params (Worker Node):

  • Executor x Node = ((16) / 5) - 1 = 2

  • numExecutors = 2 * 4 = 8

  • executorCores = (16 - 5) / 2 = 5

  • executorMemory = ((51200 - 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

- Driver's params (Application Master Node):

  • driverCores = 5

  • driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB

./core/src/main/scala/org/apache/spark/storage/BlockManager.scala