This tutorial will get you started with Spark SQL by developing a Java program to perform SQL like analysis on JSON data.
Abstract
Apache Spark is a large scale data processing engine and comes with following built-in libraries -
- Spark SQL (Structured Data Analysis)
- Spark Streaming (Real-time stream processing)
- Spark MLlib (Machine Learning)
- Spark GraphX (Graph Computations)
Introduction to Apache Spark SQL
Apache Spark SQL is a module / library for analysing structured data at large scale. Spark SQL can be used to execute SQL queries on data in various formats (CSV, JSON, Parquet etc.) from various data sources such as text files, JDBC, Hive to name a few.
Apart from SQL queries, Spark SQL also comes with a DSL like API (Dataset API) that lets you build your queries with chain of methods. However, both of SQL queries and DSL API use same execution engine making these interchangeable without any side effects.
In contrast to Spark RDDs, Spark SQL has more information about structure of the data (schema). This schema information is used by Spark core engine to optimise query execution. Hence, it is advisable to use Spark SQL APIs in place of Spark RDD API for analysing structured data.
Apache Spark SQL API Details
We need to understand following components of Spark SQL API before we start writing our demo program -
- SparkSession - SparkSession is entrypoint to Spark in Spark SQL. It is basically a wrapper on SparkContext and used for loading data from datasources and creating collections out of that.
- Dataset - Dataset represents a distributed collection of data and built over Spark RDDs. Similar to RDDs, It also provides functional transformations such as map, flatMap, filter etc. Likewise, Datasets are not necessarily for structured data and contain any type of data.
- DataFrame - DataFrame is nothing but a Dataset organised into named columns. It is basically a special type of Dataset and, In RDBMS terms, is similar to a database table. In Java API, it is represented using Dataset<Row> i.e. Dataset of type Row where Row type is simply a collection of Column types.
Spark SQL provides two ways to analyse structured data. Here is how querying data from a DataFrame looks like using both of these techniques -
SQL Queries
// Create a view on DataFrame
jsonDataFrame.createOrReplaceTempView("people");
// Execute the query on created view using SparkSession
sparkSession.sql("SELECT name FROM people WHERE age = 30").show();
DSL API
// DSL API with conditional expression
jsonDataFrame.select("name").where("age = 30").show();
// Pure DSL API
jsonDataFrame.select("name").where(jsonDataFrame.col("age").equalTo(30)).show();
Performing SQL like Analysis on JSON data
We will be using a Java Maven project to develop sample program. Hence, in order to get SQL library imported in your project, please add Maven dependency in your pom.xml.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
Next thing is to create a JSON file using below content and put it in src/main/resources folder of your Java Maven project -
{"name":"Ghani"}
{"name":"Amit", "age":30}
{"name":"Sumit", "age":24}
{"name":"John", "age":40}
Finally, here is Java program demonstrating reading data from JSON and querying it using SQL query and Dataset API -
package com.aksain.sparksql.basics;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Demonstrates reading of a JSON file using Spark SQL.
*
* @author Amit Kumar
*/
public class SparkSQLDemo {
/**
* @param args
*/
public static void main(String[] args) {
// Create Spark Session to create connection to Spark
final SparkSession sparkSession = SparkSession.builder().appName("Spark SQL Demo").master("local[5]").getOrCreate();
// Load JSON file data into DataFrame using SparkSession
final Dataset<Row> jsonDataFrame = sparkSession.read().json("src/main/resources/data.json");
// Print Schema to see column names, types and other metadata
jsonDataFrame.printSchema();
// Query name column from JSON where age column value is equal to 30
// DSL API with conditional expression
System.out.println("DSL API with Condition Expression:");
jsonDataFrame.select("name").where("age = 30").show();
// Pure DSL API
System.out.println("Pure DSL API:");
jsonDataFrame.select("name").where(jsonDataFrame.col("age").equalTo(30)).show();
// Create a view on DataFrame and execute the query on created view using SparkSession
System.out.println("SQL Query:");
jsonDataFrame.createOrReplaceTempView("people");
sparkSession.sql("SELECT name FROM people WHERE age = 30").show();
}
}
You can execute above Spark SQL program like any Java program with main method. Here is the console output that you will get after executing above program -
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
DSL API with Condition Expression:
+----+
|name|
+----+
|Amit|
+----+
Pure DSL API:
+----+
|name|
+----+
|Amit|
+----+
SQL Query:
+----+
|name|
+----+
|Amit|
+----+
Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.