Joining a large and a medium size Dataset
If the smaller DataFrame does not fit fully into memory, but its keyset does, it is possible to exploit this. As a join will discard all elements of the larger DataFrame that do not have a matching partner in the medium size DataFrame, we can use the medium key set to do this before the shuffle. If there is a significant amount of entries that get discarded this way, the resulting shuffle will need to transfer a lot fewer data.
1
import org.apache.spark.sql.functions._
2
3
val mediumDf = Seq((0, "zero"), (4, "one")).toDF("id", "value")
4
val largeDf = Seq((0, "zero"), (2, "two"), (3, "three"), (4, "four"), (5, "five")).toDF("id", "value")
5
6
mediumDf.show()
7
largeDf.show()
8
9
/*
10
+---+-----+
11
| id|value|
12
+---+-----+
13
| 0| zero|
14
| 4| one|
15
+---+-----+
16
+---+-----+
17
| id|value|
18
+---+-----+
19
| 0| zero|
20
| 2| two|
21
| 3|three|
22
| 4| four|
23
| 5| five|
24
+---+-----+
25
*/
26
27
val keys = mediumDf.select("id").as[Int].collect().toSeq
28
print(keys)
29
/*
30
keys: Seq[Int] = WrappedArray(0, 4)
31
*/
32
33
val reducedDataFrame = largeDf.filter(col("id").isin(keys:_*))
34
reducedDataFrame.show()
35
/*
36
+---+-----+
37
| id|value|
38
+---+-----+
39
| 0| zero|
40
| 4| four|
41
+---+-----+
42
*/
43
44
val result = reducedDataFrame.join(mediumDf, Seq("id"))
45
result.explain()
46
result.show()
47
48
/*
49
== Physical Plan ==
50
*(1) Project [id#246, value#247, value#238]
51
+- *(1) BroadcastHashJoin [id#246], [id#237], Inner, BuildRight
52
:- LocalTableScan [id#246, value#247]
53
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#234]
54
+- LocalTableScan [id#237, value#238]
55
+---+-----+-----+
56
| id|value|value|
57
+---+-----+-----+
58
| 0| zero| zero|
59
| 4| four| one|
60
+---+-----+-----+
61
*/
Copied!
It is important to note that the efficiency gain here depends on the filter operation, actually reducing the size of the larger DataFrame. If there are not a lot of entries lost here (e.g., because the medium size DataFrame is some king of large dimension table), there is nothing to be gained with this strategy.
Last modified 11mo ago
Copy link