Avoid groupByKey when performing a group of multiple items by key
As already showed in [21] let's suppose we've got a RDD items like:
1
(3922774869,10,1)
2
(3922774869,11,1)
3
(3922774869,12,2)
4
(3922774869,13,2)
5
(1779744180,10,1)
6
(1779744180,11,1)
7
(3922774869,14,3)
8
(3922774869,15,2)
9
(1779744180,16,1)
10
(3922774869,12,1)
11
(3922774869,13,1)
12
(1779744180,14,1)
13
(1779744180,15,1)
14
(1779744180,16,1)
15
(3922774869,14,2)
16
(3922774869,15,1)
17
(1779744180,16,1)
18
(1779744180,17,1)
19
(3922774869,16,4)
20
...
Copied!
which represent (id, age, count) and we want to group those lines to generate a dataset for which each line represent the distribution of age of each id like this ((id, age) is unique):
1
(1779744180, (10,1), (11,1), (12,2), (13,2) ...)
2
(3922774869, (10,1), (11,1), (12,3), (13,4) ...)
Copied!
which is (id, (age, count), (age, count) ...)
The easiest way is first reduce by both fields and then use groupBy:
1
rdd
2
.map { case (id, age, count) => ((id, age), count) }.reduceByKey(_ + _)
3
.map { case ((id, age), count) => (id, (age, count)) }.groupByKey()
Copied!
Which returns an RDD[(Long, Iterable[(Int, Int)])], for the input above it would contain these two records:
1
(1779744180,CompactBuffer((16,3), (15,1), (14,1), (11,1), (10,1), (17,1)))
2
(3922774869,CompactBuffer((11,1), (12,3), (16,4), (13,3), (15,3), (10,1), (14,5)))
Copied!
But if you have a very large dataset, in order to reduce shuffling, you should not to use groupByKey.
Instead you can use aggregateByKey:
1
import scala.collection.mutable
2
3
val rddById = rdd.map { case (id, age, count) => ((id, age), count) }.reduceByKey(_ + _)
4
val initialSet = mutable.HashSet.empty[(Int, Int)]
5
val addToSet = (s: mutable.HashSet[(Int, Int)], v: (Int, Int)) => s += v
6
val mergePartitionSets = (p1: mutable.HashSet[(Int, Int)], p2: mutable.HashSet[(Int, Int)]) => p1 ++= p2
7
val uniqueByKey = rddById.aggregateByKey(initialSet)(addToSet, mergePartitionSets)
Copied!
This will result in:
1
uniqueByKey: org.apache.spark.rdd.RDD[(AnyVal, scala.collection.mutable.HashSet[(Int, Int)])]
Copied!
And you will be able to print the values as:
1
scala> uniqueByKey.foreach(println)
2
(1779744180,Set((15,1), (16,3)))
3
(1779744180,Set((14,1), (11,1), (10,1), (17,1)))
4
(3922774869,Set((12,3), (11,1), (10,1), (14,5), (16,4), (15,3), (13,3)))
Copied!
Shuffling can be a great bottleneck. Having many big HashSet's (according to your dataset) could also be a problem. However, it's more likely that you'll have a large amount of ram than network latency which results in faster reads/writes across distributed machines.
Here are more functions to prefer overgroupByKey:
  • combineByKey can be used when you are combining elements but your return type differs from your input value type. You can see an example here
  • foldByKey merges the values for each key using an associative function and a neutral "zero value".
Last modified 2yr ago
Copy link