Apache Spark - Best Practices and Tuning
  • Introduction
  • RDD
    • Don’t collect large RDDs
    • Don't use count() when you don't need to return the exact number of rows
    • Avoiding Shuffle "Less stage, run faster"
    • Picking the Right Operators
      • Avoid List of Iterators
      • Avoid groupByKey when performing a group of multiple items by key
      • Avoid groupByKey when performing an associative reductive operation
      • Avoid reduceByKey when the input and output value types are different
      • Avoid the flatMap-join-groupBy pattern
      • Use TreeReduce/TreeAggregate instead of Reduce/Aggregate
      • Hash-partition before transformation over pair RDD
      • Use coalesce to repartition in decrease number of partition
    • TreeReduce and TreeAggregate Demystified
    • When to use Broadcast variable
    • Joining a large and a small RDD
    • Joining a large and a medium size RDD
  • Dataframe
    • Joining a large and a small Dataset
    • Joining a large and a medium size Dataset
  • Storage
    • Use the Best Data Format
    • Cache Judiciously and use Checkpointing
  • Parallelism
    • Use the right level of parallelism
    • How to estimate the size of a Dataset
    • How to estimate the number of partitions, executor's and driver's params (YARN Cluster Mode)
  • Serialization and GC
    • Tuning Java Garbage Collection
    • Serialization
  • References
    • References
Powered by GitBook
On this page

Was this helpful?

  1. Storage

Use the Best Data Format

PreviousJoining a large and a medium size DatasetNextCache Judiciously and use Checkpointing

Last updated 2 years ago

Was this helpful?

Apache Spark supports several data formats, including CSV, JSON, ORC, and Parquet, but just because Spark supports a given data storage or format doesn’t mean you’ll get the same performance with all of them. Parquet is a columnar storage format designed to only select data from columns that we actually are using, skipping over those that are not requested. This format reduces the size of the files dramatically and makes the Spark SQL query more efficient. The following picture from [24] about parquet file, explains what we could achieve with Column Pruning (projection push down) and Predicate Push Down:

As an example, we can imagine having a vehicle dataset containing information like:

{    
"id": "AA-BB-00-77",
"type": "truck",
"origin": "ATL",
"destination": "LGA",
"depdelay": 0.0,
"arrdelay": 0.0,
"distance": 762.0
}

Here is the code to persist a vehicles DataFrame as a table consisting of Parquet files partitioned by the destination column:

df.write.format("parquet")
.partitionBy("destination")
.option("path", "/data/vehicles")
.saveAsTable("vehicles")

Below is the resulting directory structure as shown by a Hadoop list files command:

hadoop fs -ls /data/vehicles

  /data/vehicles/destination=ATL
  /data/vehicles/destination=BOS
  /data/vehicles/destination=CLT
  /data/vehicles/destination=DEN
  /data/vehicles/destination=DFW
  /data/vehicles/destination=EWR
  /data/vehicles/destination=IAH
  /data/vehicles/destination=LAX
  /data/vehicles/destination=LGA
  /data/vehicles/destination=MIA
  /data/vehicles/destination=ORD
  /data/vehicles/destination=SEA
  /data/vehicles/destination=SFO

Given a table of vehicles containing the information as above, using the Column pruning technique if the table has 7 columns, but in the query, we list only 2, the other 5 will not be read from disk. Predicate pushdown is a performance optimization that limits with what values will be scanned and not what columns. So, if you apply a filter on column destination to only return records with value BOS, the predicate push down will make parquet read-only blocks that may contain values BOS. So, improve performance by allowing Spark to only read a subset of the directories and files. For example, the following query reads only the files in the destination=BOS partition directory in order to query the average arrival delay for vehicles destination to Boston:

df.filter("destination = 'BOS' and arrdelay > 1") 
.groupBy("destination").avg("arrdelay") .sort(desc("avg(arrdelay)")).show()

+-----------+------------------+
|destination|     avg(depdelay)|
+-----------+------------------+
|    EWR    |54.352020860495436|
|    MIA    | 48.95263157894737|
|    SFO    |47.189473684210526|
|    ORD    | 46.47721518987342|
|    DFW    |44.473118279569896|
|    CLT    |37.097744360902254|
|    LAX    |36.398936170212764|
|    LGA    | 34.59444444444444|
|    BOS    |33.633187772925766|
|    IAH    | 32.10775862068966|
|    SEA    |30.532345013477087|
|    ATL    | 29.29113924050633|
+-----------+------------------+

You can see the physical plan for a DataFrame calling the explain method as follow:

df.filter("destination = 'BOS' and arrdelay > 1") 
.groupBy("destination")
.avg("arrdelay")
.sort(desc("avg(arrdelay)"))
.explain

== Physical Plan ==
TakeOrderedAndProject(limit=1001, orderBy=[avg(arrdelay)#304 DESC NULLS LAST], 
output=[src#157,dst#149,avg(arrdelay)#314])
+- *(2) HashAggregate(keys=[destination#157, arrdelay#149],
       functions=[avg(arrdelay#152)],
       output=[destination#157, avg(arrdelay)#304])
+- Exchange hashpartitioning(destination#157, 200)
+- *(1) HashAggregate(keys=[destination#157],
              functions=[partial_avg(arrdelay#152)],  
              output=[destination#157, sum#321, count#322L])
+- *(1) Project[arrdelay#152, destination#157]
+- *(1)Filter (isnotnull(arrdelay#152) && (arrdelay#152 > 1.0))
+- *(1) FileScan parquet default.flights[arrdelay#152,destination#157] 
Batched: true, Format: Parquet, 
Location: PrunedInMemoryFileIndex[dbfs:/data/vehicles/destination=BOS], 
PartitionCount: 1, 
PartitionFilters: [isnotnull(destination#157), (destination#157 = BOS)], 
PushedFilters: [IsNotNull(arrdelay), GreaterThan(arrdelay,1.0)],
ReadSchema: struct<destination:string,arrdelay>

Here in PartitionFilters, we can see partition filter push down, which means that the destination=BOS filter is pushed down into the Parquet file scan. This minimizes the files and data scanned and reduces the amount of data passed back to the Spark engine for the aggregation average on the arrival delay.