Joining a large and a medium size RDD

If the medium size RDD does not fit fully into memory but its key set does, it is possible to exploit this [23]. As a join will discard all elements of the larger RDD that do not have a matching partner in the medium size RDD, we can use the medium key set to do this before the shuffle. If there is a significant amount of entries that gets discarded this way, the resulting shuffle will need to transfer a lot less data.

val keys = sc.broadcast(mediumRDD.map(_._1).collect.toSet)
val reducedRDD = largeRDD.filter{ case(key, value) => keys.value.contains(key) }
reducedRDD.join(mediumRDD)

It is important to note that the efficiency gain here depends on the filter operation actually reducing the size of the larger RDD. If there are not a lot of entries lost here (e.g., because the medium size RDD is some king of large dimension table), there is nothing to be gained with this strategy.

You can find more details for efficient shuffles in this Databricks presentation.

Last updated