Apache Spark - Best Practices and Tuning
  • Introduction
  • RDD
    • Don’t collect large RDDs
    • Don't use count() when you don't need to return the exact number of rows
    • Avoiding Shuffle "Less stage, run faster"
    • Picking the Right Operators
      • Avoid List of Iterators
      • Avoid groupByKey when performing a group of multiple items by key
      • Avoid groupByKey when performing an associative reductive operation
      • Avoid reduceByKey when the input and output value types are different
      • Avoid the flatMap-join-groupBy pattern
      • Use TreeReduce/TreeAggregate instead of Reduce/Aggregate
      • Hash-partition before transformation over pair RDD
      • Use coalesce to repartition in decrease number of partition
    • TreeReduce and TreeAggregate Demystified
    • When to use Broadcast variable
    • Joining a large and a small RDD
    • Joining a large and a medium size RDD
  • Dataframe
    • Joining a large and a small Dataset
    • Joining a large and a medium size Dataset
  • Storage
    • Use the Best Data Format
    • Cache Judiciously and use Checkpointing
  • Parallelism
    • Use the right level of parallelism
    • How to estimate the size of a Dataset
    • How to estimate the number of partitions, executor's and driver's params (YARN Cluster Mode)
  • Serialization and GC
    • Tuning Java Garbage Collection
    • Serialization
  • References
    • References
Powered by GitBook
On this page

Was this helpful?

  1. RDD

Don’t collect large RDDs

PreviousIntroductionNextDon't use count() when you don't need to return the exact number of rows

Last updated 2 years ago

Was this helpful?

When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory; takeor takeSamplecan be used to retrieve only a capped number of elements instead.

Another way has been showed in where you can get the array of partition indexes:

val parallel = sc.parallelize(1 to 9)
val parts = parallel.partitions

and then create a smaller rdd filtering out everything but a single partition. Collect the data from smaller rdd and iterate over values of a single partition:

for(p <- parts){
  val idx = p.index
  val partRDD = parallel.mapPartitionsWithIndex((index: Int, it: Iterator[Int]) => if(index == idx) it else Iterator(), true)
  val data = partRDD.collect
  // Data contains all values from a single partition in the form of array.
  // Now you can do with the data whatever you want: iterate, save to a file, etc.
}

// You can use also the foreachPartition operation
parallel.foreachPartition(partition => {
  partition.toArray
  // Your code
})

Of cause, it will work only if the partitions are small enough. If they aren't, you can always increase the number of partitions with rdd.coalesce(numParts, true).

[8]