Category : Apache Spark | Sub Category : Apache Spark Programs | By Prasad Bonam Last updated: 2020-10-25 07:20:24 Viewed : 915
::
Experimental :: A Dataset is a strongly typed collection of objects that can be
transformed in parallel using functional or relational operations.
A
Dataset differs from an RDD in the following ways:
§ Internally, a Dataset
is represented by a Catalyst logical plan and the data is stored in the encoded
form. This representation allows for additional logical operations and enables
many operations (sorting, shuffling, etc.) to be performed without
deserializing to an object.
§ The creation of a
Dataset requires the presence of an explicit Encoder that can be used to
serialize the object into a binary format. Encoders are also capable of mapping
the schema of a given object to the Spark SQL type system. In contrast, RDDs
rely on runtime reflection based serialization. Operations that change the type
of object stored in the dataset also need an encoder for the new type.
A
Dataset can be thought of as a specialized DataFrame, where the elements map to
a specific JVM object type, instead of to a generic Row container. A DataFrame
can be transformed into specific Dataset by calling df.as[ElementType]
. Similarly you can
transform a strongly-typed Dataset to a generic DataFrame by calling ds.toDF()
.
COMPATIBILITY
NOTE: Long term we plan to make DataFrame extend Dataset[Row]
. However, making this change to the class
hierarchy would break the function signatures for the existing functional
operations (map, flatMap, etc). As such, this class should be considered a
preview of the final API. Changes will be made to the interface after Spark
1.6.
Returns
an array that contains all the elements in this Dataset.
Running
collect requires moving all the data into the application`s driver process, and
doing so on a very large Dataset can crash the driver process with
OutOfMemoryError.
For
Java API, use collectAsList.
A
container for a Dataset, used for implicit conversions.
To use this, import implicit conversions in SQL:
import sqlContext.implicits._
Distribute a local Scala collection to form an RDD.
toDS
is available with sqlContext.implicits._
val sqlContext =
new
SQLContext(sc);
import sqlContext.implicits._
val people = peopleRDD.toDS()
·
JDK 1.7 or higher
·
Scala 2.10.3
·
spark-core_2.10
· scala-library
JSON File: employeeData.json
{"name":"Bob","age":31,
"job":"Developer","salary":1000}
{"name":"swetha","age":30,
"job":"Tester","salary":2000}
{"name":"Ram","age":30,
"job":"DataScience","salary":6000}
{"name":"Prasad","age":32,
"job":"Developer","salary":1500}
{"name":"Bonam","age":30,
"job":"Developer","salary":2000}
{"name":"Bob","age":33,
"job":"Developer","salary":1000}
package com.runnerdev.rdd
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
case class Employee(name: String, age: Long, job: String, salary: Double)
case class EmployeeOne(name: String, age: Long)
object SparkDataset extends App {
val conf = new SparkConf().setAppName("SparkDataset").setMaster("local")
// Create a
Scala Spark Context.
val sc = new SparkContext(conf)
// Create a
Scala SQLContext .
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._
val employeeRDD: RDD[EmployeeOne] = sc.parallelize(Seq(EmployeeOne("Bob", 27)))
println("employee dataset : ")
val employee = employeeRDD.toDS.show()
// Encoders
are created for case classes
val empClassDS = Seq(Employee("Ram", 32, "Devleoper", 1000.00)).toDS()
println("empClassDS : ");
empClassDS.show()
// Encoders
for most common types are automatically provided by importing spark.implicits._
/*val
primitiveDS: Dataset[Int]*/
val primitiveDS = Seq(10, 20, 30).toDS()
/*var prmDS:
Array[Int] */
var prmDS = primitiveDS.map(_ + 1).collect()
println("primitiveDS :")
prmDS.foreach(println)
// DataFrames
can be converted to a Dataset by providing a class.
val empJsonPath = "/resources/employeeData.json"
val employeeDS = sqlContext.read.json(empJsonPath).as[Employee]
println("employee DataSet : ");
employeeDS.show()
Compile and run the above example
as follows
mvn clean install
run as a scala application
Output:
employee
dataset :
+----+---+
|name|age|
+----+---+
| Bob|
27|
empClassDS :
+----+---+---------+------+
|name|age| job|salary|
+----+---+---------+------+
| Ram|
32|Devleoper|1000.0|
primitiveDS
:
11
21
employee
DataSet :
+------+---+------------+------+
| name|age| job|salary|
+------+---+------------+------+
| Jorge|
39|Sr.Developer|1000.0|
| Bob| 31|
Developer|1000.0|
|swetha|
30| Tester|2000.0|
| Ram| 30| DataScience|6000.0|
|Prasad|
32| Developer|1500.0|
| Bonam|
30| Developer|2000.0|
| Bob| 33|
Developer|1000.0|