Spark SQL - DataFrame Examples in Scala

Category : Apache Spark | Sub Category : Apache Spark Programs | By Prasad Bonam Last updated: 2020-10-25 07:05:04 Viewed : 463


Spark SQL  - DataFrame Examples in Scala



·        override def printSchema(): Unit

Prints the schema to the console in a nice tree format.

·        def select(col: String, cols: String*): DataFrame

Selects a set of columns. This is a variant of select that can only select existing columns using column names (i.e. cannot construct expressions).

// The following two are equivalent:
df.select("colA", "colB")
df.select($"colA", $"colB")
 

·        def filter(condition: Column): DataFrame

Filters rows using the given condition.

·        def filter(conditionExpr: String): DataFrame

Filters rows using the given SQL expression. 

peopleDf.filter("age > 15")

·        def where(condition: Column): DataFrame

Filters rows using the given condition. This is an alias for filter.

 

// The following are equivalent:
peopleDf.filter($"age" > 15)
peopleDf.where($"age" > 15)

·        def groupBy(cols: Column*): GroupedData

Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

 

// Compute the average for all numeric columns grouped by department.
df.groupBy($"department").avg()
 
// Compute the max age and average salary, grouped by department and gender.
df.groupBy($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))
 

·        def groupBy(col1: String, cols: String*): GroupedData

Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

This is a variant of groupBy that can only group by existing columns using column names (i.e. cannot construct expressions).

 

// Compute the average for all numeric columns grouped by department.
df.groupBy("department").avg()
 
// Compute the max age and average salary, grouped by department and gender.
df.groupBy($"department", $"gender").agg(Map(
  "salary" -> "avg",
  "age" -> "max"
))
 

·        def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame

(Scala-specific) Aggregates on the entire DataFrame without groups.

 

// df.agg(...) is a shorthand for df.groupBy().agg(...)
df.agg("age" -> "max", "salary" -> "avg")
df.groupBy().agg("age" -> "max", "salary" -> "avg")
 

·        def agg(exprs: Map[String, String]): DataFrame

(Scala-specific) Aggregates on the entire DataFrame without groups.

 

// df.agg(...) is a shorthand for df.groupBy().agg(...)
df.agg(Map("age" -> "max", "salary" -> "avg"))
df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
 

·        def limit(n: Int): DataFrame

Returns a new DataFrame by taking the first n rows. The difference between this function and head is that head returns an array while limit returns a new DataFrame.

·        def unionAll(other: DataFrame): DataFrame

Returns a new DataFrame containing union of rows in this frame and another frame. This is equivalent to UNION ALL in SQL.

·        def drop(colName: String): DataFrame

Returns a new DataFrame with a column dropped. This is a no-op if schema doesn`t contain column name.

·        def show(): Unit

Displays the top 20 rows of DataFrame in a tabular form. Strings more than 20 characters will be truncated, and all cells will be aligned right.

1.  Program Setup instructions:

Create a maven project and make sure present below jars in pom.xml

·        JDK 1.7 or higher

·        Scala 2.10.3

·        spark-core_2.10

·        scala-library

   JSON File:  employeeData.json

{"name":"Jorge","age":39, "job":"Sr.Developer","salary":1000}

{"name":"Bob","age":31, "job":"Developer","salary":1000}

{"name":"swetha","age":30, "job":"Tester","salary":2000}

{"name":"Ram","age":30, "job":"DataScience","salary":6000}

{"name":"Prasad","age":32, "job":"Developer","salary":1500}

{"name":"Bonam","age":30, "job":"Developer","salary":2000}

{"name":"Bob","age":33, "job":"Developer","salary":1000}

2.  Example:

Following example illustrates about Spark SQL  - DataFrame Examples in Scala

Save the file as −  SparkSqlDataFrame.scala

 SparkSqlDataFrame.scala

 

 package com.runnerdev.rdd

import org.apache.spark._

import org.apache.spark.sql._

 

object SparkSqlDataFrame extends App {

  val conf = new SparkConf().setAppName("SparkPairFunctions").setMaster("local")

  // Create a Scala Spark Context.

  val sc = new SparkContext(conf)

 

  val sqlContext = new SQLContext(sc)

 

  val empDF = sqlContext.read.json("/resources/employeeData.json")

 

  println("Displays the content of the DataFrame to stdout: ")

  empDF.show()

 

  println("Print the schema in a tree format: ")

  empDF.printSchema()

  println("Select only the `name` column : ")

  empDF.select("name").show()

 

  import org.apache.spark.sql.functions.col

  println("Select name with increment the salary by 100: ")

  empDF.select(col("name"), col("salary") + 100).show()

 

  println("Select people older than 32")

  empDF.filter(col("age") > 32).show()

  /* another way calling like  df.filter($"age" > 32).show() */

 

  println("Count employees by salary :")

  empDF.groupBy("salary").count().show()

 

  println("select with out using sql context: ")

  empDF.toDF().select(col("name"), col("age"))

 

  println("Register the DataFrame as a SQL temporary view :")

  empDF.toDF().registerTempTable("employee")

 

  println("sqlDF : ")

  val sqlDF = sqlContext.sql("SELECT * FROM employee")

  sqlDF.show()

 

  println("sqlDF.groupBy : ")

  sqlDF.groupBy(col("name")).agg(Map(

    "salary" -> "avg",

    "age" -> "max")).show()

 

  println("sqlDF.agg : ")

  sqlDF.agg("age" -> "max", "salary" -> "avg").show()

 

} 

 Compile and run the above example as follows

mvn clean install

run as a scala application-


Output: 

 

Displays the content of the DataFrame to stdout:

+---+------------+------+------+

|age|         job|  name|salary|

+---+------------+------+------+

| 39|Sr.Developer| Jorge|  1000|

| 31|   Developer|   Bob|  1000|

| 30|      Tester|swetha|  2000|

| 30| DataScience|   Ram|  6000|

| 32|   Developer|Prasad|  1500|

| 30|   Developer| Bonam|  2000|

| 33|   Developer|   Bob|  1000|

+---+------------+------+------+

 

Print the schema in a tree format:

root

 |-- age: long (nullable = true)

 |-- job: string (nullable = true)

 |-- name: string (nullable = true)

 |-- salary: long (nullable = true)

 

Select only the `name` column :

 

+------+

|  name|

+------+

| Jorge|

|   Bob|

|swetha|

|   Ram|

|Prasad|

| Bonam|

|   Bob|

+------+

 

Select name with increment the salary by 100:

 

+------+--------------+

|  name|(salary + 100)|

+------+--------------+

| Jorge|          1100|

|   Bob|          1100|

|swetha|          2100|

|   Ram|          6100|

|Prasad|          1600|

| Bonam|          2100|

|   Bob|          1100|

+------+--------------+

 

Select people older than 32

 

+---+------------+-----+------+

|age|         job| name|salary|

+---+------------+-----+------+

| 39|Sr.Developer|Jorge|  1000|

| 33|   Developer|  Bob|  1000|

+---+------------+-----+------+

 

 

 

Count employees by salary :

+------+-----+

|salary|count|

+------+-----+

|  1500|    1|

|  1000|    3|

|  2000|    2|

|  6000|    1|

+------+-----+

 

select with out using sql context:

+------+---+

|  name|age|

+------+---+

| Jorge| 39|

|   Bob| 31|

|swetha| 30|

|   Ram| 30|

|Prasad| 32|

| Bonam| 30|

|   Bob| 33|

+------+---+

Register the DataFrame as a SQL temporary view :

sqlDF :

+---+------------+------+------+

|age|         job|  name|salary|

+---+------------+------+------+

| 39|Sr.Developer| Jorge|  1000|

| 31|   Developer|   Bob|  1000|

| 30|      Tester|swetha|  2000|

| 30| DataScience|   Ram|  6000|

| 32|   Developer|Prasad|  1500|

| 30|   Developer| Bonam|  2000|

| 33|   Developer|   Bob|  1000|

+---+------------+------+------+

sqlDF.groupBy :

 

+------+-----------+--------+

|  name|avg(salary)|max(age)|

+------+-----------+--------+

|swetha|     2000.0|      30|

|   Ram|     6000.0|      30|

| Jorge|     1000.0|      39|

|   Bob|     1000.0|      33|

| Bonam|     2000.0|      30|

|Prasad|     1500.0|      32|

+------+-----------+--------+

 

sqlDF.agg :

 +--------+------------------+

|max(age)|       avg(salary)|

+--------+------------------+

|      39|2071.4285714285716|

+--------+------------------+


Search
Related Articles

Leave a Comment: