Spark - RDD Transformations in scala

Category : Apache Spark | Sub Category : Apache Spark Programs | By Prasad Bonam Last updated: 2020-10-25 06:11:15 Viewed : 469


Spark - RDD Transformations in scala

 

·        def repartition(numPartitions: Int)(implicit ord: Ordering[String]): RDD[String]

Return a new RDD that has exactly numPartitions partitions.

Can increase or decrease the level of parallelism in this RDD. Internally, this uses a shuffle to redistribute data.

If you are decreasing the number of partitions in this RDD, consider using coalesce, which can avoid performing a shuffle.

 

·        def flatMap[U](f: String => TraversableOnce[U])(implicit evidence$4: ClassTag[U]): RDD[U]

Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.

·        def filter(f: ((String, Int)) => Boolean): RDD[(String, Int)]

Return a new RDD containing only the elements that satisfy a predicate.

·        def reduceByKey(func: (Int, Int) => Int): RDD[(String, Int)]

Merge the values for each key using an associative reduce function. This will also perform  the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner parallelism level

1.  Program Setup instructions:

Create a maven project and make sure present below jars in pom.xml

·        JDK 1.7 or higher

·        Scala 2.10.3

·        spark-core_2.10

·        scala-library

   CSV file name : employeeData.csv

name;age;job

Jorge;30;Developer

Bob;32;Developer

Bob;32;Developer

swetha;39;Tester

Ram;34;DataScience

Ram;34;DataScience

Prasad,34,DataScience

Bonam,34,Developer

2.  Example:

Following example illustrates about Spark - read ,write and delete   files in scala

Save the file as −  Transformation.scala  

 package com.runnerdev.rdd

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

 

/**

 * Narrow transforamtions:

 * map(), mapPartition(), flatMap(), filter(), union()

 * Wide transformations:

 * roupByKey(), aggregateByKey(), aggregate(), join(), repartition()

 */

object Transformation extends App {

 

  //Create a SparkContext to initialize Spark

  val conf = new SparkConf().setAppName("transformation").setMaster("local")

  val sc = new SparkContext(conf)

 

  val rdd1: RDD[String] = sc.textFile("tmp /resources/employeeData.csv")

  println("  partition count:" + rdd1.getNumPartitions)

 

  /*val reparRdd: RDD[String] */

  val reparRdd = rdd1.repartition(5)

  println("re-partition count:" + reparRdd.getNumPartitions)

  rdd1.collect().foreach(println)

 

  println("rdd flatMap transformation:");

  /* val rdd2: RDD[String]*/

  val rdd2 = rdd1.flatMap(f => f.split(","))

  rdd2.foreach(f => println(f))

 

  println("Create a Tuple by adding 1 to each word: ")

  val rdd3: RDD[(String, Int)] = rdd2.map(t => (t, 1))

  rdd3.foreach(println)

 

  println("Filter transformation: ")

  val rdd4 = rdd3.filter(f => f._1.startsWith("B"))

  rdd4.foreach(println)

 

  println("ReduceBy transformation: ")

  val rdd5 = rdd3.reduceByKey(_ + _)

 

  rdd5.foreach(println)

 

  println("Swap word,count and sortByKey transformation: ")

  val rdd6 = rdd5.map(a => (a._2, a._1)).sortByKey()

  println("Final Result")

 

  println("Action - foreach: ")

  rdd6.foreach(println)

 

  println("Action - count: ")

  println("Count : " + rdd6.count())

 

  println("Action - first: ")

  val firstRecord = rdd6.first()

  println("First Record : " + firstRecord._1 + "," + firstRecord._2)

 

  println("Action - max: ");

  val datMax = rdd6.max()

  println("Max Record : " + datMax._1 + "," + datMax._2)

 

  println("Action - reduce: ");

  val totalWordCount = rdd6.reduce((x, y) => (x._1 + y._1, x._2))

  println("dataReduce Record : " + totalWordCount._1)

  println("Action - take: ");

  val data3 = rdd6.take(3)

  data3.foreach(t => {

    println("data3 Key:" + t._1 + ", Value:" + t._2)

  })

 

  println("Action - collect : ")

  val data = rdd6.collect()

  data.foreach(c => {

    println("Key:" + c._1 + ", Value:" + c._2)

  })

 

  println("Action - saveAsTextFile : ")

  rdd5.saveAsTextFile("c:/tmp/wordCount")

}

 

Compile and run the above example as follows

mvn clean install

run as a scala application-

Output:

 

Total input paths to process : 1

  partition count:1

re-partition count:5

name;age;job

Jorge;30;Developer

Bob;32;Developer

Bob;32;Developer

swetha;39;Tester

Ram;34;DataScience

Ram;34;DataScience

Prasad,34,DataScience

Bonam,34,Developer

 

rdd flatMap transformation:

name;age;job

Jorge;30;Developer

Bob;32;Developer

Bob;32;Developer

swetha;39;Tester

Ram;34;DataScience

Ram;34;DataScience

Prasad

34

DataScience

Bonam

34

Developer

Create a Tuple by adding 1 to each word:

(name;age;job,1)

(Jorge;30;Developer,1)

(Bob;32;Developer,1)

(Bob;32;Developer,1)

(swetha;39;Tester,1)

(Ram;34;DataScience,1)

(Ram;34;DataScience,1)

(Prasad,1)

(34,1)

(DataScience,1)

(Bonam,1)

(34,1)

(Developer,1)

 

Filter transformation:

(Bob;32;Developer,1)

(Bob;32;Developer,1)

(Bonam,1)

ReduceBy transformation:

(Bonam,1)

(Jorge;30;Developer,1)

(Prasad,1)

(Ram;34;DataScience,2)

(swetha;39;Tester,1)

(Bob;32;Developer,2)

(34,2)

(name;age;job,1)

(DataScience,1)

(Developer,1)

Swap word,count and sortByKey transformation:

Final Result

Action - foreach:

(1,Bonam)

(1,Jorge;30;Developer)

(1,Prasad)

(1,swetha;39;Tester)

(1,name;age;job)

(1,DataScience)

(1,Developer)

(2,Ram;34;DataScience)

(2,Bob;32;Developer)

(2,34)

Action - count:

Count : 10

Action - first:

First Record : 1,Bonam

Action - max:

Max Record : 2,Ram;34;DataScience

Action - reduce:

dataReduce Record : 13

Action - take:

data3 Key:1, Value:Bonam

data3 Key:1, Value:Jorge;30;Developer

data3 Key:1, Value:Prasad

Action - collect :

 

Key:1, Value:Bonam

Key:1, Value:Jorge;30;Developer

Key:1, Value:Prasad

Key:1, Value:swetha;39;Tester

Key:1, Value:name;age;job

Key:1, Value:DataScience

Key:1, Value:Developer

Key:2, Value:Ram;34;DataScience

Key:2, Value:Bob;32;Developer

Key:2, Value:34

Action - saveAsTextFile :

Search
Related Articles

Leave a Comment: