This tutorial will get you started with RDDs (Resilient Distributed Datasets) in Apache Spark by covering its types and few examples.
What are RDDs?
Resilient Distributed Datasets (RDDs) are distributed memory abstraction for performing in-memory computations on large clusters in a fault-tolerant manner. It is very crucial and important part of Apache Spark.
RDDs are similar to distributed memory cache (such as key-value stores, databases etc.) but it provides coarse-grained transformations rather than fine-grained updates to shared state. E.g. Distributed memory cache provides interface to update a record (or object) whereas RDDs provide interfaces for map, union, sample, filter, join and persist that are applied to many data items (records or objects).
RDDs, being a distributed abstraction, support parallel computation of data and hence are also referred as read-only, partitioned collection of records.
RDDs can be created using basic opeations on either data in stable storage or other RDDs as shown in below diagram. It is important to note that RDDs are just an abstraction and does not need to be materialised at all times. It, instead, has metadata about how it was dervied from other datasets to compute its partitions from data in stable storage (HDFS, FileSystem, databases, other RDDs etc.). This makes RDDs resilient as they can always be recreated in case of failures. Datasets which a RDD is derived from, is referred as lineage.
Similarly, users can also configure partitioning in RDDs across machines based on a key in each record. This customised partitioning is quite useful when we want to combine the data for a key from two different datasets.
In Apache Spark, RDDs can be persisted into following three ways -
- In-memory storage as deserialized Java objects: This option provides the fastest performance because it allows JVM to access each RDD element natively in form of objects.
- In-memory storage as serialized data: This option lets user choose a memory-efficient representation that Java object graphs in case of limited memory at the cost of lower performance.
- On-disk storage: This is useful for RDDs that are too large to keep in RAM but costly enough to recompute on each use. In this case, retrieving RDD elements from disk is faster than recomputing these.
Characteristics of RDDs
There are many types of RDDs and all of those have following characteristics -
- List of partitions: This refers to all set of partitions for a RDD. Each RDD has its own logic for creating the partitions. E.g. HadoopRDD may use HDFS block size to calculate no of partitions whereas PairRDD will decide number of partitions based on the keys of records.
- Function for computing each split: RDDs also need to have a function to get the data items for a specified partition. This function is an integral part of RDDs as Executors use this function to get the data items for their assigned partitions. This also come handy in case of Worker node failures as there is no loss of data and RDD can again be computed using this function by other worker nodes.
- List of dependencies on other RDDs: RDDs may also have a dependency on other RDDs. This characteristics refers to all RDDs that this RDD is dependent on. This is important for Apache Spark as it needs to make available data items from other dependent RDDs. E.g. UnionRDD is responsible for joining two or multiple RDDs and hence Apache Spark needs to make data items available from these RDDs to UnionRDD in order to create a new RDD.
- Partitioner for key-value RDDs: This is an optional characteristic and can only be specified for key-value based RDDs such as PairRDD or HadoopRDD. This enables us to decide number of partitions based on the keys in datasets.
- List of preferred locations to compute each split on: This is also an optional attribute and helps us to specify which worker node a particular partition (split) should be executed on. This comes handy to achieve data locality in case of HadoopRDD by specifying the worker machines having the data for a partition,
Types of RDDs
Since Apache Spark is developed using Scala language, RDDs are modeled as Scala types (classes). Apache Spark provides apis for mainly four programming languages - Scala, Java, Python and R. Here is how these RDD types from different languages are linked together -
- RDD: This is base or main type for all RDDs in Apache Spark and defines characteristics methods - getPartitions, compute, getDependencies, partitioner and getPreferredLocations. This type can be used in any language but it is more tailored for Scala usage. All other RDD types, except for Java ones, extend this RDD. Apart from other language RDD types, there are many other types of RDDs such as JdbcRDD, HadoopRDD, ShuffledRDD, CoalescedRDD, PipedRDD to name a few. Most of these RDDs are used as it is in other lanugages as well.
- JavaRDD: This is main class for Java RDDs. Spark applications written in Java should deal with this class as it provides many utilites (such as sample, union, filter, map, coalesce(reducing the number of partitions), etc.) to make Java programming easier. JavaRDD unlike others does not extend RDD and instead is a wrapper on it. It just delegates the calls to Scala RDD, wraps the result into JavaRDD and finally return it to the caller.
- PythonRDD: This RDD class extends from RDD class of Scala and represent RDDs in Python language. Python applications need to use this class in order to work with RDDs in spark. Since Python is not JVM based language, it needs to be handled differently from Java and hence has its own types of RDD instead of just using Scala RDDs.
- RRDD: This class represents RDDs in R language and extends from RDD class of Scala. It also defines many other types such as BaseRRDD, StringRRDD and PairwiseRRDD.
RDDs in Java
We will now be having detailed look in how RDDs are structred in Java. Below diagam represents the hierarchy of RDDs in Java -
- JavaRDDLike: This is implemented as trait in Scala. Traits in Scala allows multiple inheritance as a class can implement multiple traits. Traits are comparable to functional interfaces of Java 8. This trait defines operations common to several Java RDD implementation such as map, flatMap, groupBy, pipe, zip, collect, reduce etc.
- AbstractJavaRDDLike: This is an adbstract class that extends JavaRDDLike trait. This class does not define any methods and was introduced to address type erasure problem in Scala in case of extending from traits.All other RDD classes extend this class directly or indirectly.
- JavaRDD: This represents a general RDD in Java and defines operations for manipulating RDDs. This class delegates all the calls to Scala RDD and wraps returned RDDs to JavaRDD before returning it to client.
- JavaDoubleRDD: This class represents a RDD having elements of type Double in Java and defines operations for manipulating RDDs.This class delegates all the calls to Scala RDD and wraps returned RDD instance to JavaDoubleRDD instance before returning it to client.
- JavaPairRDD: This class represents a RDD having elements having key and value pair in Java and defines operations for manipulating RDDs.This class delegates all the calls to Scala RDD and wraps returned RDD instance to JavaPairRDD instance before returning it to client.
- JavaHadoopRDD: This class extends JavaPairRDD and represents a RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`). This class delegates all the calls to Scala HadoopRDD and wraps returned HadoopRDD instance to JavaHadoopRDD instance before returning it to client.
- JavaNewHadoopRDD: This class extends JavaPairRDD and represents a RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`). This class delegates all the calls to Scala NewHadoopRDD and wraps returned NewHadoopRDD instance to JavaNewHadoopRDD instance before returning it to client.
It is very important to note that RDD classes should not be instantiated directly and instead should be created using SparkContext (JavaSparkContext for Java) class to link it to Apache Spark cluster.
Examples of RDDs creation
Note: We will just have a look on some use cases to demonstrate creation and usage of RDDs without getting into the details of setting development environment. If you are new to Apache Spark, you can find detailed instructions for developing Java applications in Spark here.
Java RDD Demo:
package com.aksain.spark.basics.rdds;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* @author Amit Kumar
*
* Demonstrates the usage of JavaRDD with a use case involving following steps -
* - Filter out the numbers greater than 10
* - Transform the numbers by calculating their squares
* - Find out sum of all the transformed numbers
*
*/
public class JavaRDDDemo {
public static void main(String[] args) {
// Prepare the spark configuration by setting application name and master node "local" i.e. embedded mode
final SparkConf sparkConf = new SparkConf().setAppName("Java RDD Demo").setMaster("local");
// Create the Java Spark Context by passing spark config.
try(final JavaSparkContext jSC = new JavaSparkContext(sparkConf)) {
//Create Java RDD of type integer with list of integers
final JavaRDD<Integer> intRDD = jSC.parallelize(Arrays.asList(1, 2, 3, 4, 50, 61, 72, 8, 9, 19, 31, 42, 53, 6, 7, 23));
// Create a new Java RDD by removing numbers greater than 10 from integer RDD
final JavaRDD<Integer> filteredRDD = intRDD.filter((x) -> (x > 10 ? false : true));
// Create a new transformed RDD by transforming the numbers to their squares
final JavaRDD<Integer> transformedRDD = filteredRDD.map((x) -> (x * x) );
// Calculate the sum of all transformed integers. Since reduce is a value function, it will trigger actual execution
final int sumTransformed = transformedRDD.reduce( (x, y) -> (x + y) );
System.out.println(sumTransformed);
}
}
}
Java Double RDD Demo:
package com.aksain.spark.basics.rdds;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaSparkContext;
/**
* @author Amit Kumar
*
* Demonstrates the usage of JavaDoubleRDD with a use case involving following steps -
* - Filter out the numbers greater than 10
* - Transform the numbers by calculating their squares
* - Find out sum of all the transformed numbers
*
*/
public class JavaDoubleRDDDemo {
public static void main(String[] args) {
// Prepare the spark configuration by setting application name and master node "local" i.e. embedded mode
final SparkConf sparkConf = new SparkConf().setAppName("Java Double RDD Demo").setMaster("local");
// Create the Java Spark Context by passing spark config.
try(final JavaSparkContext jSC = new JavaSparkContext(sparkConf)) {
//Create Java Double RDD of type double with list of doubles
final JavaDoubleRDD doubleRDD = jSC.parallelizeDoubles(Arrays.asList(1.5, 2.2, 3.1, 4.2, 50.1, 61.3, 72.8, 8.2, 9.5, 19.6, 31.7, 42.8, 53.3, 6.6, 7.4, 23.1));
// Create a new Java Double RDD by removing doubles greater than 10 from double RDD
final JavaDoubleRDD filteredRDD = doubleRDD.filter((x) -> (x > 10 ? false : true));
// Create a new transformed RDD by transforming the numbers to their squares
final JavaDoubleRDD transformedRDD = filteredRDD.mapToDouble((x) -> ( x * x) );
// Calculate the sum of all transformed doubles. Since reduce is a value function, it will trigger actual execution
final double sumTransformed = transformedRDD.reduce( (x, y) -> (x + y) );
System.out.println(sumTransformed);
}
}
}
Java Pair RDD Demo:
package com.aksain.spark.basics.rdds;
import java.util.Arrays;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
/**
* @author Amit Kumar
*
* Demonstrates the usage of JavaPairRDD with a use case of counting the words in list of sentences.
*
*/
public class JavaPairRDDDemo {
public static void main(String[] args) {
// Prepare the spark configuration by setting application name and master node "local" i.e. embedded mode
final SparkConf sparkConf = new SparkConf().setAppName("Java Pair RDD Demo").setMaster("local");
// Create the Java Spark Context by passing spark config.
try(final JavaSparkContext jSC = new JavaSparkContext(sparkConf)) {
// Create Java RDD of type String with list of sentences
final JavaRDD<String> sentenceRDD = jSC.parallelize(Arrays.asList(
"Java RDD Like Demo",
"Abstract Java RDD Like Demo",
"Java RDD Demo",
"Java Double RDD Demo",
"Java Pair RDD Demo",
"Java Hadoop RDD Demo",
"java New Hadoop RDD Demo"
));
// Create a new Java RDD for words by splitting the sentences into words from sentence RDD
final JavaRDD<String> wordsRDD = sentenceRDD.flatMap( (x) -> Arrays.asList(x.split(" ")) );
// Convert Java RDD to Java Pair RDD with key as word and value as 1
final JavaPairRDD<String, Integer> individualWordsCountRDD = wordsRDD.mapToPair( (x) -> new Tuple2<String, Integer>(x, 1));
// Count all the occurrences of a word and get the results into Map with key as word and value as no of occurrences.
final Map<String, Object> wordsCount = individualWordsCountRDD.countByKey();
System.out.println(wordsCount);
}
}
}
Thank you for reading through the tutorial. In case of any feedback/questions/concerns, you can communicate same to us through your comments and we shall get back to you as soon as possible.