Free Shipping

Secure Payment

easy returns

24/7 support

Spark RDD Operations in Scala

 July 19  | 0 Comments

In this blog, we will be discussing the operations on Apache Spark RDD using Scala programming language. Before getting started, let us first understand what is a RDD in spark?

RDD  is abbreviated to Resilient Distributed Dataset.

  1. RDD is simply a distributed collection of elements
  2. Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark.
  3. It’s also an immutable distributed collection of objects.

For the detailed explanation about what is rdd and how does it works, I would suggest you go through this link.

Let’s now have a look at the ways to create spark RDD with an example in the below section:

Methods to Create RDD:

1. Referencing an external data set in an external storage system, such as a shared file system, HDFS, HBase, or any data source offering a Hadoop Input Format.

This method takes a URI for the file (either a local path on the machine or a hdfs://, s3n://, etc URI) and reads it as a collection of lines. Here is an example invocation where we have taken a file test_file and have created RDD by using SparkContexts textFile method.
Here we are creating a new RDD by loading a new dataset/textfile from HDFS. Please refer to the below screen shot for the same.

Apache Spark can handle different data formats like parquet file format and they can be further processed by using Scala/Python. You can follow the link mentioned below

spark rdd saveAsParquetFile

2. By parallelizing a collection of Objects (a list or a set) in the driver program.

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program. The elements of the collection are copied to form a distributed data set that can be operated on in parallel.
Below is the sample demonstration of the above scenario.

Here we have created some data in the variable data and we have loaded that data as an RDD using Parallelize method.

These are the two ways to create RDD’s in spark using Scala.

Now we will look into the spark RDD operations. Spark rdd perform two types of operations. Transformations and actions.

Operations in RDD:

We will be using the below data set for operations in the following examples.

Transformations:

Any function that returns an RDD is a transformation, elaborating it further we can say that Transformation is functions which create a new data set from an existing one by passing each data set element through a function and returns a new RDD representing the results.

All transformations in Spark are lazy. They do not compute their results right away. Instead, they just remember the transformations applied to some base data set (e.g. a file). The transformations are only computed when an action requires a result that needs to be returned to the driver program.

In the below example shown in the screenshot, we have applied the transformation using filter function.
Now let us see some transformations like map,flatmap, filter which are commonly used.

Spark rdd Map:
Map will take each row as input and return an RDD for the row.
Below is the sample demonstration of the above scenario.

You can see that in the above screen shot we have created a new RDD using sc.textFile method and have used the map method to transform the created RDD.

In the first map method i.e., map_test we are splitting each record by ‘\t’ tab delimiter as the data is tab separated. And you can see the result in the below step.

We have transformed the RDD again by using the map method i.e., map_test1. Here we are creating two columns as a pair. We have used column2 and column3. You can see the result in the below step.

Spark rdd Flat map:

flatMap will take an iterable data as input and returns the RDD as the contents of the iterator.

Below is the sample demonstration of the above scenario.

Previously the contents of map_test are iterable. Now after performing flatMap on the data, it is not iterable.

Filter:

filter returns an RDD which meets the filter condition. Below is the sample demonstration of the above scenario.

We can see that all the records of India are present in the output.

ReduceByKey:
reduceByKey 
takes a pair of key and value pairs and combines all the values for each unique key. Below is the sample demonstration of the above scenario.

Here in this scenario, we have taken a pair of Country and total medals columns as key and value and we are performing reduceByKey operation on the RDD.

We have got the final result as country and the total number of medals won by each country in all the Olympic games.

Actions:

Actions return final results of RDD computations. Actions trigger execution using lineage graph to load the data into original RDD and carry out all intermediate transformations and return the final results to the Driver program or writes it out to the file system.

Collect:
collect is used to return all the elements in the RDD. Refer the below screen shot for the same.

In the above screenshot, we have displayed the Athlete name and his country as two elements from the map method and performed collect action on the newly created rdd as map_test1. The result is displayed on the screen.

Count:
count 
is used to return the number of elements in the RDD. Below is the sample demonstration of the above scenario.

In the above screenshot, you can see that there are 8618 records in the RDD map_test1.

CountByValue:
countByValue 
is used to count the number of occurrences of the elements in the RDD. Below is the sample demonstration of the above scenario.

In the above scenario, we have taken a pair of Country and Sport. By performing countByValue action we have got the count of each country in a particular sport.

Reduce:
Below is the sample demonstration of the above scenario where we have taken the total number of medals column in the dataset and loaded into the RDD map_test1. On this RDD we are performing reduce operation. Finally, we have got that there is a total of 9529 medals declared as the winners in Olympic.Take:
take 
will display the number of records we explicitly specify. Below is the sample demonstration of the above scenario.

Here in the above screenshot, you can see that when we performed collect operation, it displayed all the elements of the RDD. But when we perform take operation we can limit the number of elements getting displayed by explicitly passing some integer value as an argument.

There are two more transformations in spark to maintain the number of partitions that your RDD should have they are,

  1. coalesce()
  2. repartition()

The difference between both of them is as follows:

  • Both are used to modify the number of partitions in an RDD. Coalesce avoids full shuffle.
  • If you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead, each of the 100 new partitions will claim 10 of the current partitions and this does not require a shuffle.
  • Repartition performs a coalesce with a shuffle. Repartition will result in the specified number of Partitions with data distributed using a hash-partitioner.

Let’s take a situation like this, you have initially created an RDD and it has N partitions and on that RDD you have applied filter transformation, spark applies transformation on the partitions of RDD so if in case the data inside a partition is completely filtered out then also spark will maintain the number of partitions as the same  as it has while creating the RDD initially, this scenario is same for all the narrow transformations(Transformations where shuffling is not required).

scala> val data = sc.parallelize(1 to 4)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
scala> data.getNumPartitions
res14: Int = 4
scala> val filtering = data.filter(x => (x%2)==0)
filtering: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at <console>:26
scala> filtering.getNumPartitions
res15: Int = 4

In the above case, you can see that we have created an RDD that contains 1 to 4 and it has 4 partitions and after applying filter transformation also the number of partitions are the same. Which means there are few partitions with empty contents. So in these situations, you can go for coalesce() to reduce the number of partitions as shown below.

scala> val fil = filtering.coalesce(2)
fil: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[8] at coalesce at <console>:28
scala> fil.getNumPartitions
res16: Int = 2

If you want to increase the number of partitions, you can go for repartition as shown below.

scala> val data = sc.parallelize(1 to 4)
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:24
scala> data.getNumPartitions
res21: Int = 4
scala> val inc = data.zipWithIndex
inc: org.apache.spark.rdd.RDD[(Int, Long)] = ZippedWithIndexRDD[15] at zipWithIndex at <console>:26
scala> inc.getNumPartitions
res22: Int = 4
scala> val repar = inc.repartition(6)
repar: org.apache.spark.rdd.RDD[(Int, Long)] = MapPartitionsRDD[19] at repartition at <console>:28
scala> repar.getNumPartitions
res23: Int = 6

Spark also has a very important module named sparksql to work with structured data. Spark Sql allows you to create relational table called dataframes in Spark. Spark also allows you to convert Spark rdd to dataframes and run Sql queries to it. For details, kindly follow the link spark sql rdd

Hope this blog helped you in understanding the RDD’s and the most commonly used RDD’s in scala. But you can also make spark rdd in Python ( pyspark rdd )

Keep visiting our site www.acadgild.com for more updates on Android and other technologies.Click here to learn Android from our Expert Mentors

>