Don’t collect large RDDs

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; takeor takeSamplecan be used to retrieve only a capped number of elements instead.

Another way has been showed in [8] where you can get the array of partition indexes:

val parallel = sc.parallelize(1 to 9)
val parts = parallel.partitions

and then create a smaller rdd filtering out everything but a single partition. Collect the data from smaller rdd and iterate over values of a single partition:

for(p <- parts){
  val idx = p.index
  val partRDD = parallel.mapPartitionsWithIndex((index: Int, it: Iterator[Int]) => if(index == idx) it else Iterator(), true)
  val data = partRDD.collect
  // Data contains all values from a single partition in the form of array.
  // Now you can do with the data whatever you want: iterate, save to a file, etc.

// You can use also the foreachPartition operation
parallel.foreachPartition(partition => {
  // Your code

Of cause, it will work only if the partitions are small enough. If they aren't, you can always increase the number of partitions with rdd.coalesce(numParts, true).

Last updated

Was this helpful?