Spark – SQL Schema in scala

Category : Apache Spark | Sub Category : Apache Spark Programs | By Prasad Bonam Last updated: 2020-10-25 07:39:08 Viewed : 544


Spark – SQL Schema   in scala

 

1. Program Setup instructions:

Create a maven project and make sure present below jars in pom.xml

·        JDK 1.7 or higher

·        Scala 2.10.3

·        spark-core_2.10

·        scala-library

     Text File:  employeeData.txt

employeeData.txt

Jorge,30,Developer

Bob,32,Developer

Bob,32,Developer

swetha,39,Tester

Ram,34,DataScience

Ram,34,DataScience

Prasad,34,DataScience

Bonam,34,Developer

 2. Example:

Following example illustrates about  Spark SQL  - Schema  Examples in Scala

Save the file as −  SparkSqlSchema.scala     

SparkSqlSchema.scala 

 package com.runnerdev.rdd

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

 

import org.apache.spark.sql.Dataset

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SQLContext

 

case class EmployeeData(name: String, age: Long, job: String, salary: Double)

object SparkSqlSchema extends App {

 

  val conf = new SparkConf().setAppName("SparkDataset").setMaster("local")

  // Create a Scala Spark Context.

  val sc = new SparkContext(conf)

 

  // Create a Scala SQLContext .

  val sqlContext = new SQLContext(sc)

  // For implicit conversions from RDDs to DataFrames

  import sqlContext.implicits._

 

  // Create an RDD of Employee objects from a text file, convert it to a Data frame

 

  val employeeDF = sc

    .textFile("/resources/employeeData.txt")

    .map(_.split(","))

    .map(attributes => EmployeeData(attributes(0), attributes(1).trim.toInt, attributes(2), attributes(3).toDouble)).toDF()

  // Register the DataFrame as a temporary view

  employeeDF.registerTempTable("employee")

  println("registerTempTable (employee): ")

  employeeDF.show();

  // SQL statements can be run by using the sql methods provided by Spark

  val sqlQueryDF = sqlContext.sql("SELECT name, age,job,salary FROM employee WHERE age BETWEEN 34 AND 39")

  println("Select query :")

  sqlQueryDF.show()

 

  // The columns of a row in the result can be accessed by field index

  sqlQueryDF.map(empData => "Name: " + empData(0) + " age" + empData(1))

 

  sqlQueryDF.map(empData => "Name: " + empData.getAs[String]("name"))

 

  // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]

  sqlQueryDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()

} 


Compile and run the above example as follows

mvn clean install

run as a scala application-

 

Output:

registerTempTable (employee):

+------+---+-----------+------+

|  name|age|        job|salary|

+------+---+-----------+------+

| Jorge| 30|  Developer|1000.0|

|   Bob| 32|  Developer|1000.0|

|   Bob| 32|  Developer|2000.0|

|swetha| 39|     Tester|6000.0|

|   Ram| 34|DataScience|1500.0|

|   Ram| 34|DataScience|2000.0|

|Prasad| 34|DataScience|1000.0|

| Bonam| 34|  Developer|1500.0|

+------+---+-----------+------+

 

 

Select query :

 

+------+---+-----------+------+

|  name|age|        job|salary|

+------+---+-----------+------+

|swetha| 39|     Tester|6000.0|

|   Ram| 34|DataScience|1500.0|

|   Ram| 34|DataScience|2000.0|

|Prasad| 34|DataScience|1000.0|

| Bonam| 34|  Developer|1500.0|

+------+---+-----------+------+

 

3. Example -Schema Struct:

SparkSqlSchemaStruct.scala  

 package com.runnerdev.rdd

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

 

import org.apache.spark.sql.Dataset

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SQLContext

import org.apache.spark.sql.types.StructType

import org.apache.spark.sql.types.StructField

import org.apache.spark.sql.types.StringType

import org.apache.spark.sql.Row

import org.apache.spark._

 

object SparkSqlSchemaStruct extends App {

 

  val conf = new SparkConf().setAppName("SparkDataset").setMaster("local")

  // Create a Scala Spark Context.

  val sparkContext = new SparkContext(conf)

 

  // Create a Scala SQLContext .

  val sqlContext = new SQLContext(sparkContext)

  // For implicit conversions from RDDs to DataFrames

  import sqlContext.implicits._

 

  // Create an RDD of Employee objects from a text file, convert it to a Data frame

 

  // The schema is encoded in a string

  val schemaString = "name,age,job,salary"

 

  // Generate the schema based on the string of schema

  val fields = schemaString.split(",")

    .map(fieldName => StructField(fieldName, StringType, nullable = true))

  val schema = StructType(fields)

  // Create an RDD

  val employeeRDD = sparkContext.textFile("resources/employeeData.txt")

 

  // Convert records of the RDD (people) to Rows

  val rowRDD = employeeRDD

    .map(_.split(","))

    .map(attributes => Row(attributes(0), attributes(1).trim, attributes(2).trim, attributes(3).trim))

 

  // Apply the schema to the RDD

  val employeeDF = sqlContext.createDataFrame(rowRDD, schema)

 

  // Creates a temporary view using the DataFrame

  employeeDF.registerTempTable("employee")

  employeeDF.show()

  // SQL can be run over a temporary view created using DataFrames

  val results = sqlContext.sql("SELECT name,salary FROM employee")

  results.show()

  // The results of SQL queries are DataFrames and support all the normal RDD operations

  // The columns of a row in the result can be accessed by field index or by field name

 

  results.map(attributes => "salary: " + attributes(1))

 

} 


Compile and run the above example as follows

mvn clean install

run as a scala application-

Output:

 

 +------+---+-----------+------+

|  name|age|        job|salary|

+------+---+-----------+------+

| Jorge| 30|  Developer|  1000|

|   Bob| 32|  Developer|  1000|

|   Bob| 32|  Developer|  2000|

|swetha| 39|     Tester|  6000|

|   Ram| 34|DataScience|  1500|

|   Ram| 34|DataScience|  2000|

|Prasad| 34|DataScience|  1000|

| Bonam| 34|  Developer|  1500|

+------+---+-----------+------+

 

+------+------+

|  name|salary|

+------+------+

| Jorge|  1000|

|   Bob|  1000|

|   Bob|  2000|

|swetha|  6000|

|   Ram|  1500|

|   Ram|  2000|

|Prasad|  1000|

| Bonam|  1500|

+------+------+


Search
Related Articles

Leave a Comment: