Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles. By default spark create one partition for each block of the file in HDFS it is 64MB by default.
You can also pass second argument as a number of partition when creating RDD.
Let see example of creating RDD of text file:
val rdd = sc.textFile(“file.txt”,5)
above statement make a RDD of textFile with 5 partition. Now if we have a cluster with 4 cores then each partition need to process 5 minutes so 4 partition process parallel and 5 partition process after that whenever core will be free so it so final result will be completed in 10 minutes and resources also ideal while only one partition process.
So to overcome this problem we should make RDD with number of partition is equal to number of cores in the cluster by this all partition will process parallel and resources are also used equally.
As a rule of thumb tasks should take at least 100 ms to execute; you can ensure that this is the case by monitoring the task execution latency from the Spark Shell. If your tasks take considerably longer than that keep increasing the level of parallelism, by say 1.5, until performance stops improving.
DataFrame create a number of partitions equal to spark.sql.shuffle.partitions parameter. spark.sql.shuffle.partitions's default value is 200.
An approximated calculation for the size of a dataset is
number Of Megabytes = M = (N*V*W) / 1024^2
where
N = number of recordsV = number of variablesW = average width in bytes of a variable
In approximating W, remember:
Type of variable | Width |
Integers, −127 <= x <= 100 | 1 |
Integers, 32,767 <= x <= 32,740 | 2 |
Integers, -2,147,483,647 <= x <= 2,147,483,620 | 4 |
Floats single precision | 4 |
Floats double precision | 8 |
Strings | maximum lenght |
Say that you have a 20,000-observation dataset. That dataset contains
1 string identifier of length 20 2010 small integers (1 byte each) 104 standard integers (2 bytes each) 85 floating-point numbers (4 bytes each) 20--------------------------------------------------------20 variables total 58
Thus the average width of a variable is
W = 58/20 = 2.9 bytes
The size of your dataset is
M = 20000*20*2.9/1024^2 = 1.13 megabytes
This result slightly understates the size of the dataset because we have not included any variable labels, value labels, or notes that you might add to the data. That does not amount to much. For instance, imagine that you added variable labels to all 20 variables and that the average length of the text of the labels was 22 characters.
That would amount to a total of 20*22=440 bytes or 440/10242=.00042 megabytes.
Explanation of formula
M = 20000*20*2.9/1024^2 = 1.13 megabytes
N*V*W is, of course, the total size of the data. The 1,0242 in the denominator rescales the results to megabytes.
Yes, the result is divided by 1,0242 even though 1,0002 = a million. Computer memory comes in binary increments. Although we think of k as standing for kilo, in the computer business, k is really a “binary” thousand, 210 = 1,024. A megabyte is a binary million—a binary k squared:
1 MB = 1024 KB = 1024*1024 = 1,048,576 bytes
With cheap memory, we sometimes talk about a gigabyte. Here is how a binary gig works:
1 GB = 1024 MB = 10243 = 1,073,741,824 bytes
yarn.nodemanager.resource.memory-mb = ((Node's Ram GB - 2 GB) * 1024) MB
Total Number Of Node's Core = yarn.nodemanager.resource.cpu-vcores
- Executor's params (Worker Node):
Executor (VM) x Node = ((total number of Node's core) / 5) - 1
5 is the upper bound for cores per executor because more than 5 cores per executor can degrade HDFS I/O throughput.
If the total number of Node's core is less than or equal to 8 we divide It by 2.
If the total number of Node's core is equal to 1 the Executor x Node is equal to 1.
numExecutors (Number of executorns to launch for this session) = number of Nodes * Executor (VM) x Node
The Driver is included in executors.
executorCores (Number of cores to use for each executor) = (total number of Node's core - 5 ) / Executor x Node
executorMemory (Amount of memory to use per executor process) = (yarn.nodemanager.resource.memory-mb - 1024) / (Executor (VM) x Node + 1)
For the executorMemory We have to take a further reasoning. If you review the BlockManager source code: ./core/src/main/scala/org/apache/spark/storage/BlockManager.scala
You will note that the memory allocation is based on the algorithm:
Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction.
where memoryFraction = spark.storage.memoryFraction and safetyFraction = spark.storage.safetyFraction
The default values of spark.storage.memoryFraction and spark.storage.safetyFraction are respectively 0.6 and 0.9 so the real executorMemory is:
executorMemory = ((yarn.nodemanager.resource.memory-mb - 1024) / (Executor (VM) x Node + 1)) * memoryFraction * safetyFraction.
- Driver's params (Application Master Node):
driverCores = executorCores
driverMemory = executorMemory
Example
I have 3 Worker nodes and one Application Master Node each with 16 vCPUs, 52 GB memory
yarn.nodemanager.resource.memory-mb = (52 - 2) * 1024 = 51200 MB
yarn.scheduler.maximum-allocation-mb = 20830 MB (Must be greater than executorMemory)
- Executor's params (Worker Node):
Executor x Node = (16) / 5 = 2
numExecutors = 2 * 4 = 8
executorCores = (16 - 5) / 2 = 5
executorMemory = ((51200 - 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB
- Driver's params (Application Master Node):
driverCores = 5
driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB
See in diagram how params are estimated:Example
I have to process a dataset that have 10.000.000 of rows and 100 double variables.
number of megabytes = M = 10.000.000*100*8/1024^2 = 5.722 megabytes
Partition = 5.722/64 = 89
As in the previous example, I have 3 Worker nodes and one Application Master Node each with 16 vCPUs, 52 GB memory
yarn.nodemanager.resource.memory-mb = (52 - 2) * 1024 = 51200 MB
yarn.scheduler.maximum-allocation-mb = 20830 MB (Must be greater than executorMemory)
- Executor's params (Worker Node):
Executor x Node = ((16) / 5) - 1 = 2
numExecutors = 2 * 4 = 8
executorCores = (16 - 5) / 2 = 5
executorMemory = ((51200 - 1024) / 3) * 0.6 * 0.9 = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB
- Driver's params (Application Master Node):
driverCores = 5
driverMemory = 16725,33 MB * 0.6 * 0.9 = 9031,68 MB