Spark - RDD Actions in scala

Category : Apache Spark | Sub Category : Apache Spark Programs | By Prasad Bonam Last updated: 2020-10-25 06:19:48 Viewed : 478


Spark - RDD Actions in scala

 

·        def aggregate[U](zeroValue: U)(seqOp: (U, Int) => U, combOp: (U, U) => U)(implicit evidence$33: ClassTag[U]): U

Aggregate the elements of each partition, and then the results for all the partitions, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U and one operation for merging two U`s, as in scala.TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

·         def treeAggregate[U](zeroValue: U)(seqOp: (U, Int) => U, combOp: (U, U) => U, depth: Int)(implicit evidence$74: ClassTag[U]): U

Aggregates the elements of this RDD in a multi-level tree pattern. 

·        def fold(zeroValue: (String, Int))(op: ((String, Int), (String, Int)) => (String, Int)): (String, Int)

Aggregate the elements of each partition, and then the results for all the partitions, using a given associative and commutative function and a neutral "zero value". The function op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object allocation; however, it should not modify t2.

This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. This fold operation may be applied to partitions individually, and then fold those results into the final result, rather than apply the fold to each element sequentially in some defined ordering. For functions that are not commutative, the result may differ from that of a fold applied to a non-distributed collection.

·      def reduce(f: (Int, Int) => Int): Int

Reduces the elements of this RDD using the specified commutative and associative binary operator.

·         def treeReduce(f: (Int, Int) => Int, depth: Int): Int

Reduces the elements of this RDD in a multi-level tree pattern.

·       def countByValue()(implicit ord: Ordering[Int]): Map[Int, Long]

Return the count of each unique value in this RDD as a local map of (value, count) pairs.

Note that this method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver`s memory. To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map.

·     def first(): Int

Return the first element in this RDD.

·         def top(num: Int)(implicit ord: Ordering[Int]): Array[Int]

Returns the top k (largest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering. This does the opposite of takeOrdered. For example:

 

sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
// returns Array(12)
 
sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
// returns Array(6, 5)

·        def mkString(sep: String): String

Displays all elements of this mutable indexed sequence in a string using a separator string.

Example

List(1, 2, 3).mkString("|") = "1|2|3"

·       def min()(implicit ord: Ordering[Int]): Int

Returns the min of this RDD as defined by the implicit Ordering[T].

·      def max()(implicit ord: Ordering[Int]): Int

Returns the max of this RDD as defined by the implicit Ordering[T].

·      def takeOrdered(num: Int)(implicit ord: Ordering[Int]): Array[Int]

Returns the first k (smallest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering. This does the opposite of top. For example:

 

sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)
// returns Array(2)
 
sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)
// returns Array(2, 3)

·        def countApproxDistinct(relativeSD: Double): Long

Return approximate number of distinct elements in the RDD.

The algorithm used is based on streamlib`s implementation of "HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm",

·         def count(): Long

Return the number of elements in the RDD. 

1.    Program Setup instructions:

Create a maven project and make sure present below jars in pom.xml

·        JDK 1.7 or higher

·        Scala 2.10.3

·        spark-core_2.10

·        scala-library    

2.    Example:

Following example illustrates about Spark - read ,write and delete   files in scala

Save the file as −  SparkAction.scala 

SparkAction.scala 

 

 package com.runnerdev.rdd

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

 

object SparkAction extends App {

 

  val conf = new SparkConf().setAppName("SparkAction").setMaster("local")

  // Create a Scala Spark Context.

  val sc = new SparkContext(conf)

 

  val inputRDD = sc.parallelize(List(("Ant", 10), ("Bat", 30), ("Cat", 40), ("Dog", 30), ("Elephant", 60), ("Fox", 70)))

  val listRdd = sc.parallelize(List(9, 1, 2, 4, 5, 2, 1))

 

  println("Collect: ")

  val data: Array[Int] = listRdd.collect()

  data.foreach(println)

 

  //aggregate

  def param0 = (accu: Int, v: Int) => accu + v

  def param1 = (accu1: Int, accu2: Int) => accu1 + accu2

  println("aggregateOne : " + listRdd.aggregate(0)(param0, param1))

 

  //aggregate

  def param3 = (accu: Int, v: (String, Int)) => accu + v._2

  def param4 = (accu1: Int, accu2: Int) => accu1 + accu2

  println("aggregateTwo : " + inputRDD.aggregate(0)(param3, param4))

 

  //treeAggregate. This is similar to aggregate

  def param8 = (accu: Int, v: Int) => accu + v

  def param9 = (accu1: Int, accu2: Int) => accu1 + accu2

  println("treeAggregate : " + listRdd.treeAggregate(0)(param8, param9))

 

  //fold

  println("fold :  " + listRdd.fold(0) { (acc, v) =>

    val sum = acc + v

    sum

  })

 

  println("fold :  " + inputRDD.fold(("Total", 0)) { (acc: (String, Int), v: (String, Int)) =>

    val sum = acc._2 + v._2

    ("Total", sum)

  })

 

  //reduce

  println("reduce : " + listRdd.reduce(_ + _))

  println("reduce alternate : " + listRdd.reduce((x, y) => x + y))

  println("reduce : " + inputRDD.reduce((x, y) => ("Total", x._2 + y._2)))

 

  //treeReduce. This is similar to reduce

  println("treeReduce : " + listRdd.treeReduce(_ + _))

 

  //count, countApprox, countApproxDistinct

  println("Count : " + listRdd.count)

  println("countApprox : " + listRdd.countApprox(1200))

  println("countApproxDistinct : " + listRdd.countApproxDistinct())

  println("countApproxDistinct : " + inputRDD.countApproxDistinct())

 

  //countByValue, countByValueApprox

  println("countByValue :  " + listRdd.countByValue())

 

  //first

  println("first :  " + listRdd.first())

  println("first :  " + inputRDD.first())

 

  //top

  println("top : " + listRdd.top(2).mkString(","))

  println("top : " + inputRDD.top(2).mkString(","))

 

  //min

  println("min :  " + listRdd.min())

  println("min :  " + inputRDD.min())

 

  //max

  println("max :  " + listRdd.max())

  println("max :  " + inputRDD.max())

 

  //take, takeOrdered, takeSample

  println("take : " + listRdd.take(2).mkString(","))

  println("takeOrdered : " + listRdd.takeOrdered(2).mkString(","))

 

}

 

 

Compile and run the above example as follows

mvn clean install

run as a scala application-

 

 

Output:

Collect:

 

9

1

2

4

5

2

1

AggregateOne : 24

AggregateTwo : 240

treeAggregate : 24

fold :  24

fold :  (Total,240)

reduce : 24

reduce alternate : 24

reduce : (Total,240)

treeReduce : 24

Count : 7

countApprox : (final: [7.000, 7.000])

countApproxDistinct : 5

countApproxDistinct : 6

countByValue :  Map(5 -> 1, 1 -> 2, 9 -> 1, 2 -> 2, 4 -> 1)

first :  9

first :  (Ant,10)

top : 9,5

min :  1

min :  (Ant,10)

max :  9

max :  (Fox,70)

take : 9,1

takeOrdered : 1,1

Search
Related Articles

Leave a Comment: