Joining a large and a medium size Dataset
If the smaller DataFrame does not fit fully into memory, but its keyset does, it is possible to exploit this. As a join will discard all elements of the larger DataFrame that do not have a matching partner in the medium size DataFrame, we can use the medium key set to do this before the shuffle. If there is a significant amount of entries that get discarded this way, the resulting shuffle will need to transfer a lot fewer data.
1
import org.apache.spark.sql.functions._
2
3
val mediumDf = Seq((0, "zero"), (4, "one")).toDF("id", "value")
4
val largeDf = Seq((0, "zero"), (2, "two"), (3, "three"), (4, "four"), (5, "five")).toDF("id", "value")
5
6
mediumDf.show()
7
largeDf.show()
8
9
/*
10
+---+-----+
11
| id|value|
12
+---+-----+
13
| 0| zero|
14
| 4| one|
15
+---+-----+
16
+---+-----+
17
| id|value|
18
+---+-----+
19
| 0| zero|
20
| 2| two|
21
| 3|three|
22
| 4| four|
23
| 5| five|
24
+---+-----+
25
*/
26
27
val keys = mediumDf.select("id").as[Int].collect().toSeq
28
print(keys)
29
/*
30
keys: Seq[Int] = WrappedArray(0, 4)
31
*/
32
33
val reducedDataFrame = largeDf.filter(col("id").isin(keys:_*))
34
reducedDataFrame.show()
35
/*
36
+---+-----+
37
| id|value|
38
+---+-----+
39
| 0| zero|
40
| 4| four|
41
+---+-----+
42
*/
43
44
val result = reducedDataFrame.join(mediumDf, Seq("id"))
45
result.explain()
46
result.show()
47
48
/*
49
== Physical Plan ==
50
*(1) Project [id#246, value#247, value#238]
51
+- *(1) BroadcastHashJoin [id#246], [id#237], Inner, BuildRight
52
:- LocalTableScan [id#246, value#247]
53
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#234]
54
+- LocalTableScan [id#237, value#238]
55
+---+-----+-----+
56
| id|value|value|
57
+---+-----+-----+
58
| 0| zero| zero|
59
| 4| four| one|
60
+---+-----+-----+
61
*/
Copied!
It is important to note that the efficiency gain here depends on the filter operation, actually reducing the size of the larger DataFrame. If there are not a lot of entries lost here (e.g., because the medium size DataFrame is some king of large dimension table), there is nothing to be gained with this strategy.
Copy link