This tutorial will provide you with steps to write a Java program to perform SQL like analysis on CSV file using Apache Spark SQL.
Abstract
Apache Spark SQL is a build-in library of Apache Spark for analysing structured data. It provides capability to read and analyse data in various format (such as JSON, CSV, Parquet etc.) from various data sources (such as text files, JDBC, Hive etc.)
CSV is one of commonly used format for exporting and importing data from various data sources. In this tutorial, we will be analysing data from CSV file with following content by executing SQL like queries in Apache Spark SQL -
"id","date","Temperature","Humidity","Light","CO2","HumidityRatio","Occupancy"
"140","2015-02-02 14:19:00",23.7,26.272,585.2,749.2,0.00476416302416414,1
"141","2015-02-02 14:19:59",23.718,26.29,578.4,760.4,0.00477266099212519,1
"142","2015-02-02 14:21:00",23.73,26.23,572.666666666667,769.666666666667,0.00476515255246541,1
"143","2015-02-02 14:22:00",23.7225,26.125,493.75,774.75,0.00474377335599685,1
"144","2015-02-02 14:23:00",23.754,26.2,488.6,779,0.00476659399998615,1
"145","2015-02-02 14:23:59",23.76,26.26,568.666666666667,790,0.00477933243163454,1
"146","2015-02-02 14:25:00",23.73,26.29,536.333333333333,798,0.00477613633274892,1
"147","2015-02-02 14:25:59",23.754,26.29,509,797,0.00478309370839038,1
"148","2015-02-02 14:26:59",23.754,26.35,476,803.2,0.00479409399662041,1
"149","2015-02-02 14:28:00",23.736,26.39,510,809,0.00479618871038935,1
"150","2015-02-02 14:29:00",23.745,26.445,481.5,815.25,0.00480888622067716,1
"151","2015-02-02 14:30:00",23.7,26.56,481.8,824,0.0048167933677358,1
"152","2015-02-02 14:31:00",23.7,26.6,475.25,832,0.00482410383674874,1
"153","2015-02-02 14:31:59",23.7,26.7,469,845.333333333333,0.00484238075533563,1
"154","2015-02-02 14:32:59",23.7,26.774,464,852.4,0.00485590636128976,1
"155","2015-02-02 14:34:00",23.7,26.89,464,861,0.00487710983719076,1
"156","2015-02-02 14:35:00",23.7,26.9725,455,880,0.004892190768364,1
"157","2015-02-02 14:36:00",23.6,26.89,454,891,0.00484759441396992,1
"158","2015-02-02 14:37:00",23.64,26.976,458,897.6,0.0048750447811301,1
"159","2015-02-02 14:38:00",23.65,27.05,464,900.5,0.00489149158929623,1
"160","2015-02-02 14:38:59",23.64,27.1,473,908.8,0.00489763024345823,1
"161","2015-02-02 14:39:59",23.6,27.16,464,918,0.00489665184522539,1
"162","2015-02-02 14:41:00",23.6,27.236,498.4,925.2,0.00491046198855591,1
"163","2015-02-02 14:42:00",23.6,27.29,530.2,929.4,0.0049202748285747,1
"164","2015-02-02 14:43:00",23.6,27.33,533.6,936.4,0.00492754379701558,1
"165","2015-02-02 14:44:00",23.6,27.34,524.25,950,0.0049293610654639,1
"166","2015-02-02 14:44:59",23.625,27.3925,498.666666666667,961,0.00494640574744149,1
"167","2015-02-02 14:45:59",23.6,27.39,516.333333333333,963,0.00493844756573873,1
"168","2015-02-02 14:47:00",23.6,27.412,501.2,958.6,0.00494244570930318,1
"169","2015-02-02 14:48:00",23.6,27.5,522,965.333333333333,0.00495843879351724,1
Pre-requisites
If you are new to Apache Spark SQL, you are strongly recommended to go through Getting Started with Apache Spark SQL tutorial to be able to follow steps in this tutorial effectively
We will be creating Java Maven project in Eclipse to analyse CSV data in Spark SQL. Here are the pre-requisites for developing this program -
- JDK 7 or later
- Eclipse IDE with Maven Plugin
Analysing CSV Data in Java Program using Spark SQL
First step is to create a Maven project in Eclipse IDE and add following dependency in pom.xml to include Spark SQL library -
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
Next step is to copy CSV file to src/main/resources folder with name data.csv. This file contains following data columns -
- id
- date
- Temperature
- Humidity
- Light
- CO2
- HumidityRatio
- Occupancy
We will be finding records that satisfies all of following criterio -
- Temperature is greater than equal to 23.6
- Humidity is greater than 27
- Light is greater than 500
- CO2 is less than 920 and 950
Finally here is the program for performing above query on our CSV file data -
package com.aksain.sparksql.basics;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Demonstrates analysing CSV data by executing SQL like queries in Apache Spark SQL.
*
* @author amit-kumar
*/
public class CSVFileAnalysisInSparkSQL {
/**
* @param args
*/
public static void main(String[] args) {
// Create Spark Session to create connection to Spark
final SparkSession sparkSession = SparkSession.builder().appName("Spark CSV Analysis Demo").master("local[5]")
.getOrCreate();
// Please note that you will need to add a config property "spark.sql.warehouse.dir" with value as directory path in
// SparkSession builder if your csv file location is different from application directory as shown below -
// final SparkSession sparkSession = SparkSession.builder().appName("Spark CSV Analysis Demo").master("local[5]")
// .config("spark.sql.warehouse.dir","/opt/data/").getOrCreate();
// Get DataFrameReader using SparkSession
final DataFrameReader dataFrameReader = sparkSession.read();
// Set header option to true to specify that first row in file contains
// name of columns
dataFrameReader.option("header", "true");
final Dataset<Row> csvDataFrame = dataFrameReader.csv("src/main/resources/data.csv");
// Print Schema to see column names, types and other metadata
csvDataFrame.printSchema();
// Create view and execute query to convert types as, by default, all columns have string types
csvDataFrame.createOrReplaceTempView("ROOM_OCCUPANCY_RAW");
final Dataset<Row> roomOccupancyData = sparkSession
.sql("SELECT CAST(id as int) id, CAST(date as string) date, CAST(Temperature as float) Temperature, "
+ "CAST(Humidity as float) Humidity, CAST(Light as float) Light, CAST(CO2 as float) CO2, "
+ "CAST(HumidityRatio as float) HumidityRatio, CAST(Occupancy as int) Occupancy FROM ROOM_OCCUPANCY_RAW");
// Print Schema to see column names, types and other metadata
roomOccupancyData.printSchema();
// Create view to execute query to get filtered data
roomOccupancyData.createOrReplaceTempView("ROOM_OCCUPANCY");
sparkSession.sql("SELECT * FROM ROOM_OCCUPANCY WHERE Temperature >= 23.6 AND Humidity > 27 AND Light > 500 "
+ "AND CO2 BETWEEN 920 and 950").show();
}
}
You can execute above Spark SQL program like any Java program with main method in Eclipse IDE. Here is the console output that you will get after executing above program -
root
|-- id: string (nullable = true)
|-- date: string (nullable = true)
|-- Temperature: string (nullable = true)
|-- Humidity: string (nullable = true)
|-- Light: string (nullable = true)
|-- CO2: string (nullable = true)
|-- HumidityRatio: string (nullable = true)
|-- Occupancy: string (nullable = true)
root
|-- id: integer (nullable = true)
|-- date: string (nullable = true)
|-- Temperature: float (nullable = true)
|-- Humidity: float (nullable = true)
|-- Light: float (nullable = true)
|-- CO2: float (nullable = true)
|-- HumidityRatio: float (nullable = true)
|-- Occupancy: integer (nullable = true)
+---+-------------------+-----------+--------+------+-----+-------------+---------+
| id| date|Temperature|Humidity| Light| CO2|HumidityRatio|Occupancy|
+---+-------------------+-----------+--------+------+-----+-------------+---------+
|163|2015-02-02 14:42:00| 23.6| 27.29| 530.2|929.4| 0.004920275| 1|
|164|2015-02-02 14:43:00| 23.6| 27.33| 533.6|936.4| 0.004927544| 1|
|165|2015-02-02 14:44:00| 23.6| 27.34|524.25|950.0| 0.004929361| 1|
+---+-------------------+-----------+--------+------+-----+-------------+---------+
Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.