This tutorial will provide instructions for developing an application for detecting room occupancy using Logistic Regression in Apache Spark MLlib.
Problem Statement
We have a problem of detecting whether a room is occupied based on data collected such as Temperature, Humdiity, Light, CO2 etc. Since there is no fixed rule for figuring it out, we will be using Machine learning approach by training the sytem with a training data set.
We will be leveraging Occupancy Detection Data Set from UCI Machine Learning Repository to train our Machine learning algorithm model. This dataset contains 3 files - datatraining.txt, dataset.txt and dataset1.txt.
We will be utilizing datatraining.txt file in this tutorial. However, there is a problem in header of this file as it contains a missing column name "id". Here are the original and updated csv file header for your reference -
Original and Problematic Header with missing "id" -
"date","Temperature","Humidity","Light","CO2","HumidityRatio","Occupancy"
Updated and Corrected Header -
"id","date","Temperature","Humidity","Light","CO2","HumidityRatio","Occupancy"
This problem will be modeled using Logistic Regression classifier that is introduced in next section.
Introduction to Logistic Regression Classifier
Logistic Regression is a machine learning technique based on Logistic function. Unlike its name, it is used for classification task instead of regression.
Here are some of advantages of Logistic Regression classifier -
- It is more robust in comparison to Decision trees or discriminant analysis
- It can handle non-linear effects
- it does not require independent variables to be normally distributed, or have equal variance in each group
However, Logistic Regression requires more data points to provide stable results in comparison to other classification algorithms. Hence it becomes difficult to utilize in scenario with limited data points.
Pre-requisites
We will be using a Java Maven project to develop program for detecting Room occupancy. On these lines, here are pre-requisites to enable you to follow instructions effectively -
- Basic knowledge of Spark ML - If you are new to Spark ML, you are recommended to go through tutorial - Getting Started with Apache Spark MLlib
- JDK 7 or later
- Eclipse IDE (or your favourite Java IDE) with Maven plugin
Room Occupancy Detection Program in Spark MLlib
First step is to create a Java Maven project in Eclipse (or any Java) IDE and add following dependency to pom.xml to include Spark SQL and MLlib -
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>2.0.0</version>
</dependency>
Next step is to copy datatraining.txt with added "id" header column to src/main/resources in your Maven project.
It's time to develop a Java program to detect room occupancy based on Spark SQL and Spark MLlib for room occupancy detection using Logistic Regression -
package com.aksain.sparkml.basics.logisticregression;
import java.io.IOException;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorIndexer;
import org.apache.spark.ml.feature.VectorIndexerModel;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Detects whether a Room is occupied using Logistic Regression Classifier.
*
* @author amit-kumar
*/
public class LogisticRegressionRoomOccupancyDetector {
public static void main(String[] args) throws IOException {
// Create Spark Session to create connection to Spark
final SparkSession sparkSession = SparkSession.builder().appName("Spark Logistic Regression Classifer Demo")
.master("local[5]").getOrCreate();
// Get DataFrameReader using SparkSession and set header option to true
// to specify that first row in file contains name of columns
final DataFrameReader dataFrameReader = sparkSession.read().option("header", true);
final Dataset<Row> trainingData = dataFrameReader.csv("src/main/resources/datatraining.txt");
// Create view and execute query to convert types as, by default, all
// columns have string types
trainingData.createOrReplaceTempView("TRAINING_DATA");
final Dataset<Row> typedTrainingData = sparkSession
.sql("SELECT cast(Temperature as float) Temperature, cast(Humidity as float) Humidity, "
+ "cast(Light as float) Light, cast(CO2 as float) CO2, "
+ "cast(HumidityRatio as float) HumidityRatio, "
+ "cast(Occupancy as int) Occupancy FROM TRAINING_DATA");
// Combine multiple input columns to a Vector using Vector Assembler
// utility
final VectorAssembler vectorAssembler = new VectorAssembler()
.setInputCols(new String[] { "Temperature", "Humidity", "Light", "CO2", "HumidityRatio" })
.setOutputCol("features");
final Dataset<Row> featuresData = vectorAssembler.transform(typedTrainingData);
// Print Schema to see column names, types and other metadata
featuresData.printSchema();
// Split the data into training and test sets (30% held out for
// testing).
Dataset<Row>[] splits = featuresData.randomSplit(new double[] { 0.7, 0.3 });
Dataset<Row> trainingFeaturesData = splits[0];
Dataset<Row> testFeaturesData = splits[1];
// Load the model
PipelineModel model = null;
try {
model = PipelineModel.load("src/main/resources/logisticregression");
} catch(Exception exception) {
}
if(model == null) {
// Indexing is done to improve the execution times as comparing indexes
// is much cheaper than comparing strings/floats
// Index labels, adding metadata to the label column (Occupancy). Fit on
// whole dataset to include all labels in index.
final StringIndexerModel labelIndexer = new StringIndexer().setInputCol("Occupancy")
.setOutputCol("indexedLabel").fit(featuresData);
// Index features vector
final VectorIndexerModel featureIndexer = new VectorIndexer().setInputCol("features")
.setOutputCol("indexedFeatures").fit(featuresData);
// Train a Logistic Regression model.
final LogisticRegression logisticRegression = new LogisticRegression().setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures");
// Convert indexed labels back to original labels.
final IndexToString labelConverter = new IndexToString().setInputCol("prediction")
.setOutputCol("predictedOccupancy").setLabels(labelIndexer.labels());
// Chain indexers and tree in a Pipeline.
final Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] { labelIndexer, featureIndexer, logisticRegression, labelConverter });
// Train model. This also runs the indexers.
model = pipeline.fit(trainingFeaturesData);
model.save("src/main/resources/logisticregression");
}
// Make predictions.
final Dataset<Row> predictions = model.transform(testFeaturesData);
// Select example rows to display.
System.out.println("Example records with Predicted Occupancy as 0:");
predictions.select("predictedOccupancy", "Occupancy", "features")
.where(predictions.col("predictedOccupancy").equalTo(0)).show(10);
System.out.println("Example records with Predicted Occupancy as 1:");
predictions.select("predictedOccupancy", "Occupancy", "features")
.where(predictions.col("predictedOccupancy").equalTo(1)).show(10);
System.out.println("Example records with In-correct predictions:");
predictions.select("predictedOccupancy", "Occupancy", "features")
.where(predictions.col("predictedOccupancy").notEqual(predictions.col("Occupancy"))).show(10);
}
}
Note: You can find complete project code in my Github repository.
Here is the output that you will get on your console after executing above program like any simple Java program -
root
|-- Temperature: float (nullable = true)
|-- Humidity: float (nullable = true)
|-- Light: float (nullable = true)
|-- CO2: float (nullable = true)
|-- HumidityRatio: float (nullable = true)
|-- Occupancy: integer (nullable = true)
|-- features: vector (nullable = true)
Example records with Predicted Occupancy as 0:
+------------------+---------+--------------------+
|predictedOccupancy|Occupancy| features|
+------------------+---------+--------------------+
| 0| 0|[19.0,31.35666656...|
| 0| 0|[19.0,31.38999938...|
| 0| 0|[19.0,31.38999938...|
| 0| 0|[19.1000003814697...|
| 0| 0|[19.1000003814697...|
| 0| 0|[19.1000003814697...|
| 0| 0|[19.1000003814697...|
| 0| 0|[19.1000003814697...|
| 0| 0|[19.1000003814697...|
| 0| 0|[19.1000003814697...|
+------------------+---------+--------------------+
only showing top 10 rows
Example records with Predicted Occupancy as 1:
+------------------+---------+--------------------+
|predictedOccupancy|Occupancy| features|
+------------------+---------+--------------------+
| 1| 1|[19.5750007629394...|
| 1| 1|[19.6333332061767...|
| 1| 1|[19.7000007629394...|
| 1| 1|[19.7450008392334...|
| 1| 1|[19.7900009155273...|
| 1| 1|[19.8400001525878...|
| 1| 1|[19.9449996948242...|
| 1| 1|[20.0,29.44499969...|
| 1| 1|[20.0499992370605...|
| 1| 0|[20.1000003814697...|
+------------------+---------+--------------------+
only showing top 10 rows
Example records with In-correct predictions:
+------------------+---------+--------------------+
|predictedOccupancy|Occupancy| features|
+------------------+---------+--------------------+
| 0| 1|[19.5249996185302...|
| 1| 0|[20.1000003814697...|
| 1| 0|[20.1749992370605...|
| 1| 0|[20.1749992370605...|
| 1| 0|[20.1749992370605...|
| 1| 0|[20.3566665649414...|
| 1| 0|[20.7000007629394...|
| 1| 0|[20.7000007629394...|
| 1| 0|[20.7000007629394...|
| 1| 0|[20.7000007629394...|
+------------------+---------+--------------------+
only showing top 10 rows
Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.