Spark - Different ways to create RDD in Scala

Category : Apache Spark | Sub Category : Apache Spark Programs | By Prasad Bonam Last updated: 2020-10-25 05:52:11 Viewed : 481


Spark - Different ways to create RDD in Scala

 In Apache Spark, you can create Resilient Distributed Datasets (RDDs) in Scala using various methods and data sources. RDDs are the fundamental data structure in Spark and can be created from different types of data. Here are different ways to create RDDs in Scala:


To create an RDD from a SparkSession in Apache Spark, you can obtain the SparkContext object associated with the SparkSession. Here is how to do it and create an RDD with examples:

scala
import org.apache.spark.sql.SparkSession // Create a SparkSession val spark: SparkSession = SparkSession.builder() .appName("MySparkApp") .master("local[*]") // Replace with your cluster manager or "local" for local mode .getOrCreate() // Get the SparkContext from the SparkSession val sc = spark.sparkContext // Create an RDD from a collection (example) val data = Seq(1, 2, 3, 4, 5) val rdd = sc.parallelize(data) // Perform operations on the RDD (example: square each element) val squaredRDD = rdd.map(x => x * x) // Show the result squaredRDD.foreach(println) // Stop the SparkSession when you are done spark.stop()

In this example:

  1. We create a SparkSession as usual.
  2. We get the SparkContext object from the SparkSession using spark.sparkContext.
  3. We create an RDD from a collection (in this case, a sequence of integers) using sc.parallelize(data).
  4. We perform an operation on the RDD, which squares each element using the map transformation.
  5. We use foreach(println) to print the squared values.

Remember to replace "local[*]" with your specific cluster manager configuration when running on a cluster. The SparkSession is used for DataFrame operations, while the SparkContext is used for RDD operations.

  1. Parallelize a Collection: You can create an RDD from an existing Scala collection (e.g., Array, List) using the parallelize method of the SparkContext object. Here is an example:

    scala
    import org.apache.spark.{SparkConf, SparkContext} val conf = new SparkConf().setAppName("RDDCreation").setMaster("local") val sc = new SparkContext(conf) val data = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(data)
  2. From External Data Sources:

    • Text Files: You can create an RDD from one or more text files using the textFile method of SparkContext. Each line in the file becomes an element in the RDD.

      scala
      val rdd = sc.textFile("path/to/text/file")
    • HDFS Files: You can create RDDs from files stored in Hadoop Distributed File System (HDFS) using hadoopFile or newAPIHadoopFile methods.

    • Sequence Files: You can create RDDs from Hadoop Sequence Files using sequenceFile method.

    • Hive Tables: RDDs can be created from Hive tables using Sparks integration with Hive.

  3. Transforming Existing RDDs: You can create new RDDs by applying transformations to existing RDDs. For example, using map, filter, flatMap, and other transformation operations.

    scala
    val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5)) val rdd2 = rdd1.map(_ * 2) // Creates a new RDD by applying a transformation to rdd1.
  4. External Data Sources (e.g., Cassandra, HBase): Spark provides connectors to various external data sources, and you can create RDDs directly from these sources using the appropriate libraries.

    scala
    val rdd = sc.cassandraTable("keyspace", "table")
  5. Using SparkSession (for DataFrames and Datasets): While RDDs are a lower-level API in Spark, you can also create DataFrames or Datasets using SparkSession and then convert them into RDDs if needed.

    scala
    import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("RDDCreation") .master("local") .getOrCreate() val df = spark.read.csv("path/to/csv/file") val rdd = df.rdd // Convert DataFrame to RDD

provide examples of different ways to create RDDs in Scala with Apache Spark, along with sample output for each method.

Before you start, make sure you have Apache Spark installed and configured. You can use the following Scala code snippets in a Spark-enabled Scala environment.

  1. Parallelize a Collection:

    scala
    import org.apache.spark.{SparkConf, SparkContext} // Create a SparkConf and SparkContext val conf = new SparkConf().setAppName("RDDCreation").setMaster("local") val sc = new SparkContext(conf) // Create an RDD by parallelizing a Scala collection val data = Array(1, 2, 3, 4, 5) val rdd = sc.parallelize(data) // Print the RDD contents rdd.collect().foreach(println)

    Output:

    1 2 3 4 5
  2. From External Text Files:

    scala
    import org.apache.spark.{SparkConf, SparkContext} // Create a SparkConf and SparkContext val conf = new SparkConf().setAppName("RDDCreation").setMaster("local") val sc = new SparkContext(conf) // Create an RDD from a text file val rdd = sc.textFile("path/to/text/file") // Print the RDD contents rdd.collect().foreach(println)

    Output:

    mathematica
    Line 1 Line 2 Line 3 ...
  3. Transforming Existing RDDs:

    scala
    import org.apache.spark.{SparkConf, SparkContext} // Create a SparkConf and SparkContext val conf = new SparkConf().setAppName("RDDCreation").setMaster("local") val sc = new SparkContext(conf) // Create an initial RDD val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5)) // Create a new RDD by applying a transformation val rdd2 = rdd1.map(_ * 2) // Print the RDD contents rdd2.collect().foreach(println)

    Output:

    2 4 6 8 10
  4. From External Data Sources (e.g., Cassandra):

    scala
    import org.apache.spark.{SparkConf, SparkContext} import com.datastax.spark.connector._ // Create a SparkConf and SparkContext val conf = new SparkConf().setAppName("RDDCreation").setMaster("local") val sc = new SparkContext(conf) // Create an RDD from a Cassandra table val rdd = sc.cassandraTable("keyspace", "table") // Print the RDD contents rdd.collect().foreach(println)

    Output:

    python
    CassandraRow{...} CassandraRow{...} CassandraRow{...} ...
  5. Using SparkSession (for DataFrames and Datasets):

    scala
    import org.apache.spark.sql.SparkSession // Create a SparkSession val spark = SparkSession.builder() .appName("RDDCreation") .master("local") .getOrCreate() // Create a DataFrame val df = spark.read.csv("path/to/csv/file") // Convert DataFrame to RDD val rdd = df.rdd // Print the RDD contents rdd.collect().foreach(println)

    Output (assuming the CSV file contains two columns):

    csharp
    [value1, value2] [value3, value4] ...

These examples demonstrate different ways to create RDDs in Scala with Apache Spark, and they show the sample output for each method. Keep in mind that the actual output may vary depending on the data you use and your Spark configuration.

These are some of the common ways to create RDDs in Scala when working with Apache Spark. The choice of method depends on your specific data and use case, as well as your familiarity with the Spark API.

·        org.apache.spark.rdd.RDD

A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map, filter, and persist. In addition, org.apache.spark.rdd.PairRDDFunctions contains operations available only on RDDs of key-value pairs, such as groupByKey and join; org.apache.spark.rdd.DoubleRDDFunctions contains operations available only on RDDs of Doubles; and org.apache.spark.rdd.SequenceFileRDDFunctions contains operations available on RDDs that can be saved as SequenceFiles. All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit.

Internally, each RDD is characterized by five main properties:

§  A list of partitions

§  A function for computing each split

§  A list of dependencies on other RDDs

§  Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

§  Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)

 

·        override def apply[A](xs: A*): List[A]

Creates a collection with the specified elements.

·        def parallelize[T](seq: Seq[T], numSlices: Int)(implicit evidence$1: ClassTag[T]): RDD[T]

Distribute a local Scala collection to form an RDD.

Note

  avoid using parallelize(Seq()) to create an empty RDD. Consider emptyRDD     for an RDD with no partitions, or parallelize(Seq[T]()) for an RDD of T with empty partitions.

  Parallelize acts lazily. If seq is a mutable collection and is altered after the call to parallelize and before the first action on the RDD, the resultant RDD will reflect the modified collection. Pass a copy of the argument to avoid this.

·        def countByValue()(implicit ord: Ordering[String]): Map[String, Long]:

Return the count of each unique value in this RDD as a local map of (value, count) pairs.

Note that this method should only be used if the resulting map is expected to be small, as the whole thing is loaded into the driver`s memory. To handle very large results, consider using rdd.map(x => (x, 1L)).reduceByKey(_ + _), which returns an RDD[T, Long] instead of a map.

 

1.  Program Setup instructions:

Create a maven project and make sure present below jars in pom.xml

·        JDK 1.7 or higher

·        Scala 2.10.3

·        spark-core_2.10

·        scala-library

2.  Example:

Following example illustrates about Spark - Different ways to create RDD in Scala

Save the file as −  CreateSparkRDD.scala

 CreateSparkRDD.scala 

 package com.runnerdev.rdd

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

 

object CreateSparkRDD extends App{

  /* Create SparkConf */

  val conf = new SparkConf().setAppName("sparkRDD").setMaster("local[*]")

  val sc = new SparkContext(conf)

 

  /*create a RDD using List

   * val inputRDD: List[String]

   */

  val inputRDD = List("Java", "Scala", "spark", "Scala", "hadoop", "spark", "hive", "Apache pig", "cassandra", "hadoop")

 

  /* create a RDD using parallelize

   * val wordRdd: RDD[String]

   */

  val wordRdd = sc.parallelize(inputRDD)

  println("Count: " + wordRdd.count())

 

  /* val wordCountByValue: Map[String, Long] */

  val wordCountByValue = wordRdd.countByValue()

   //Map(Scala -> 2, cassandra -> 1,....

  println("CountByValue:" +wordCountByValue)

 

  for ((word, count) <- wordCountByValue) {

    println(word + " : " + count)

  }

} 


 Compile and run the above example as follows

mvn clean install

run as a scala application-

  

 

Output:

Count: 10

CountByValue:Map(Scala -> 2, cassandra -> 1, hadoop -> 2, spark -> 2, Apache pig -> 1, hive -> 1, Java -> 1)

Scala : 2

cassandra : 1

hadoop : 2

spark : 2

Apache pig : 1

hive : 1

Java : 1

Search
Related Articles

Leave a Comment: