Spark SQL - DataSet Examples in Scala

Category : Apache Spark | Sub Category : Apache Spark Programs | By Prasad Bonam Last updated: 2020-10-25 07:20:24 Viewed : 542


Spark SQL  - DataSet Examples in Scala



org.apache.spark.sql.Dataset

:: Experimental :: A Dataset is a strongly typed collection of objects that can be transformed in parallel using functional or relational operations.

A Dataset differs from an RDD in the following ways:

§  Internally, a Dataset is represented by a Catalyst logical plan and the data is stored in the encoded form. This representation allows for additional logical operations and enables many operations (sorting, shuffling, etc.) to be performed without deserializing to an object.

§  The creation of a Dataset requires the presence of an explicit Encoder that can be used to serialize the object into a binary format. Encoders are also capable of mapping the schema of a given object to the Spark SQL type system. In contrast, RDDs rely on runtime reflection based serialization. Operations that change the type of object stored in the dataset also need an encoder for the new type.

A Dataset can be thought of as a specialized DataFrame, where the elements map to a specific JVM object type, instead of to a generic Row container. A DataFrame can be transformed into specific Dataset by calling df.as[ElementType]. Similarly you can transform a strongly-typed Dataset to a generic DataFrame by calling ds.toDF().

COMPATIBILITY NOTE: Long term we plan to make DataFrame extend Dataset[Row]. However, making this change to the class hierarchy would break the function signatures for the existing functional operations (map, flatMap, etc). As such, this class should be considered a preview of the final API. Changes will be made to the interface after Spark 1.6.

def collect(): Array[Int]

Returns an array that contains all the elements in this Dataset.

Running collect requires moving all the data into the application`s driver process, and doing so on a very large Dataset can crash the driver process with OutOfMemoryError.

For Java API, use collectAsList.

 

def toDS(): Dataset[T]

 

org.apache.spark.sql.DatasetHolder

A container for a Dataset, used for implicit conversions.

To use this, import implicit conversions in SQL: 

import sqlContext.implicits._

def parallelize[T](seq: Seq[T], numSlices: Int)(implicit evidence$1: ClassTag[T]): RDD[T]

Distribute a local Scala collection to form an RDD.

·        Spark version before  2.x

    toDS is available with sqlContext.implicits._

    val sqlContext = new SQLContext(sc);
    import sqlContext.implicits._
    val people = peopleRDD.toDS()

·        Spark version  after 2.x  

val spark: SparkSession = SparkSession.builder

  .config(conf)

  .getOrCreate;

import spark.implicits._

val people = peopleRDD.toDS()

1. Program Setup instructions:

Create a maven project and make sure present below jars in pom.xml

·        JDK 1.7 or higher

·        Scala 2.10.3

·        spark-core_2.10

·        scala-library

JSON File: employeeData.json

{"name":"Jorge","age":39, "job":"Sr.Developer","salary":1000}

{"name":"Bob","age":31, "job":"Developer","salary":1000}

{"name":"swetha","age":30, "job":"Tester","salary":2000}

{"name":"Ram","age":30, "job":"DataScience","salary":6000}

{"name":"Prasad","age":32, "job":"Developer","salary":1500}

{"name":"Bonam","age":30, "job":"Developer","salary":2000}

{"name":"Bob","age":33, "job":"Developer","salary":1000}

2. Example:

Following example illustrates about  Spark SQL  - DataSet Examples in Scala
Save the file as −  SparkDataset.scala  

sqlContext is not created by spark-shell, you can create it by yourself from sc, followed by import implicits. Then it should work. You can also replace sqlContext by hiveContext

SparkDataset.scala
 
package com.runnerdev.rdd

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

 

import org.apache.spark.sql.Dataset

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SQLContext

 

case class Employee(name: String, age: Long, job: String, salary: Double)

case class EmployeeOne(name: String, age: Long)

 

object SparkDataset extends App {

 

  val conf = new SparkConf().setAppName("SparkDataset").setMaster("local")

  // Create a Scala Spark Context.

  val sc = new SparkContext(conf)

 

  // Create a Scala SQLContext .

  val sqlContext = new SQLContext(sc);

 

  import sqlContext.implicits._

 

  val employeeRDD: RDD[EmployeeOne] = sc.parallelize(Seq(EmployeeOne("Bob", 27)))

  println("employee dataset : ")

  val employee = employeeRDD.toDS.show()

 

  // Encoders are created for case classes

  val empClassDS = Seq(Employee("Ram", 32, "Devleoper", 1000.00)).toDS()

  println("empClassDS   : ");

  empClassDS.show()

 

  // Encoders for most common types are automatically provided by importing spark.implicits._

 

  /*val primitiveDS: Dataset[Int]*/

  val primitiveDS = Seq(10, 20, 30).toDS()

  /*var prmDS: Array[Int] */

  var prmDS = primitiveDS.map(_ + 1).collect()

  println("primitiveDS :")

  prmDS.foreach(println)

 

  // DataFrames can be converted to a Dataset by providing a class.

  val empJsonPath = "/resources/employeeData.json"

  val employeeDS = sqlContext.read.json(empJsonPath).as[Employee]

  println("employee DataSet : ");

  employeeDS.show()

 

} 

Compile and run the above example as follows

mvn clean install

run as a scala application

Output:  

employee dataset :

 

+----+---+

|name|age|

+----+---+

| Bob| 27|

    +----+---+

 

 

empClassDS   :

+----+---+---------+------+

|name|age|      job|salary|

+----+---+---------+------+

| Ram| 32|Devleoper|1000.0|

      +----+---+---------+------+

 

primitiveDS :

11

21

      31

 

employee DataSet :

+------+---+------------+------+

|  name|age|         job|salary|

+------+---+------------+------+

| Jorge| 39|Sr.Developer|1000.0|

|   Bob| 31|   Developer|1000.0|

|swetha| 30|      Tester|2000.0|

|   Ram| 30| DataScience|6000.0|

|Prasad| 32|   Developer|1500.0|

| Bonam| 30|   Developer|2000.0|

|   Bob| 33|   Developer|1000.0|

      +------+---+------------+------+

Search
Related Articles

Leave a Comment: