import org.apache.commons.lang.SystemUtils
import org.apache.spark.mllib.random.RandomRDDs._
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
def main(args: Array[String]) {
var mapReduceTimeArr : Array[Double]= Array.ofDim(20)
var treeReduceTimeArr : Array[Double]= Array.ofDim(20)
var treeAggregateTimeArr : Array[Double]= Array.ofDim(20)
val config = new SparkConf().setAppName("TestStack")
val sc: SparkContext = new SparkContext(config)
val sql: SQLContext = new SQLContext(sc)
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 5 partitions.
val input1 = normalRDD(sc, 1000000L, 5)
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 5 partitions.
val input2 = normalRDD(sc, 1000000L, 5)
val xy = input1.zip(input2).cache()
val t1 = System.nanoTime()
val euclideanDistanceMapRed = sqrt(xy.map { case (v1, v2) => (v1 - v2) * (v1 - v2) }.reduce(_ + _))
val t11 = System.nanoTime()
println("Map-Reduce - Euclidean Distance "+euclideanDistanceMapRed)
mapReduceTimeArr(i)=(t11 - t1)/1000000.0
println("Map-Reduce - Elapsed time: " + (t11 - t1)/1000000.0 + "ms")
for(i:Int <-0 until 20) {
val t2 = System.nanoTime()
val euclideanDistanceTreeRed = sqrt(xy.map { case (v1, v2) => (v1 - v2) * (v1 - v2) }.treeReduce(_ + _))
val t22 = System.nanoTime()
println("TreeReduce - Euclidean Distance "+euclideanDistanceTreeRed)
treeReduceTimeArr(i)=(t22 - t2) / 1000000.0
println("TreeReduce - Elapsed time: " + (t22 - t2) / 1000000.0 + "ms")
for(i:Int <-0 until 20) {
val t3 = System.nanoTime()
val euclideanDistanceTreeAggr = sqrt(xy.treeAggregate(0.0)(
(c + ((v._1 - v._2) * (v._1 - v._2)))
val t33 = System.nanoTime()
println("TreeAggregate - Euclidean Distance " + euclideanDistanceTreeAggr)
treeAggregateTimeArr(i) = (t33 - t3) / 1000000.0
println("TreeAggregate - Elapsed time: " + (t33 - t3) / 1000000.0 + "ms")
val mapReduceAvgTime = mapReduceTimeArr.sum / mapReduceTimeArr.length
val treeReduceAvgTime = treeReduceTimeArr.sum / treeReduceTimeArr.length
val treeAggregateAvgTime = treeAggregateTimeArr.sum / treeAggregateTimeArr.length
val mapReduceMinTime = mapReduceTimeArr.min
val treeReduceMinTime = treeReduceTimeArr.min
val treeAggregateMinTime = treeAggregateTimeArr.min
val mapReduceMaxTime = mapReduceTimeArr.max
val treeReduceMaxTime = treeReduceTimeArr.max
val treeAggregateMaxTime = treeAggregateTimeArr.max
println("Map-Reduce - Avg:" + mapReduceAvgTime+ "ms "+ "Max:" +mapReduceMaxTime+ "ms "+ "Min:" +mapReduceMinTime+ "ms ")
println("TreeReduce - Avg:" + treeReduceAvgTime + "ms "+ "Max:" +treeReduceMaxTime+ "ms "+ "Min:" +treeReduceMinTime+ "ms ")
println("TreeAggregate - Avg:" + treeAggregateAvgTime + "ms "+ "Max:" +treeAggregateMaxTime+ "ms "+ "Min:" +treeAggregateMinTime+ "ms ")