In this blog, we will discuss the working of Broadcast variables and Accumulators in Spark.
We all know Apache Spark is an open-source and a widely used cluster computing framework, which comes up with built-in features like in-memory computation, streaming API’s, machine learning libraries and graph processing algorithms. Apart from this, Spark provides two types of shared variables which are distributed across cluster nodes to help perform read and write operations like lookup, join, count and summation.
The two shared variables in Spark are listed below:
- Broadcast Variables
- Accumulators
Broadcast Variables:
At times, developers will be storing a dataset of smaller size at the Spark worker nodes in-memory where the higher sized dataset resides to reduce the memory I/O and to reduce the communication cost which speeds up the query performance while executing lookup or join operations.
Hence, Broadcast variables are read-only variables that are distributed across worker nodes in-memory instead of shipping a copy of data with tasks.
Broadcast variables are mostly used when the tasks across multiple stages require the same data or when caching the data in the deserialized form is required.
Broadcast variables are created using a variable v by calling SparkContext.broadcast(v).
The Broadcast variable is a wrapper around v, and its value can be accessed by calling the value method.
The data broadcasted this way is cached in a serialized form and deserialized before running each task.
Example:
In the following example, we will be broadcasting a variable to the worker nodes and performing a lookup operation.
Before moving forward, let us understand the use of the lookup function.
When there is a need to find and extract a column/row data from one table and place it in another we can perform lookup operations to achieve this task.
Here, the objective of the use case example is to find and add lookup values to a word based on their repeated frequency.
vi /home/acadgild/spark/wordcount_input_file
val lookup = Map(“This” -> “frequent”, “is” -> “frequent”, “my” -> “moderate”, “file” -> “rare”)
val broadcastLookup = sc.broadcast(lookup)
def lookupWord(word: String):(String, String) = (word, broadcastLookup.value.get(word).getOrElse(“NA”))
val myRDD = sc.textFile(“/home/acadgild/spark/wordcount_input_file”)
//val rdd1 = sc.parallelize([“this is is a”, “is a gossip”, “this is introvert”, “is is a”])
myRDD.flatMap(line => if(line != “”) line.split(” “) else Array[String]()).map(lookupWord).countByValue()
From the above output, we can see that we have broadcasted the lookup variable across spark cluster with the help of sc.broadcast(lookup) command. We have also mapped the lookup values (frequent, rare, moderate and NA) for each word with their number of occurrences in the input file shared.
Therefore, with the help of the above example, we can distribute a variable across the Spark cluster using Broadcast variables.
Accumulator
Similar to Map-reduce counters (to know more on MapReduce counters please visit our blog https://acadgild.com/blog/counters-in-mapreduce) Spark introduced shared variables which are used to perform counters or sum operations.
These accumulator variables can only be used when a user wants to perform associative or commutative operations on the data.
The accumulators can be created with or without a name. If the accumulators are created with a name, they can be viewed in Spark’s UI which will be useful to understand the progress of running stages.
The accumulators are created using an initial value v. by calling SparkContext.accumulator(v)
Syntax:
val acc = sc.accumulator(v)
Here, the initial value of v will be set to zero mostly while performing a count or summation operation.
Example: Count Blank lines using accumulator:
In the following example, we will be using an accumulator to find the blank lines from the below input file:
Consider a scenario where there is a file consisting of blank lines which are considered a bad record. To find the total number of blank lines we can use the below-given code.
Here in the example, we are using an input file which consists of 2 blank lines.
Code:
val blank_line_accumulator = sc.accumulator(0,”Blank Lines”)
blank_line_accumulator.value
res1: Int = 0
val input_file_blank_line_count = sc.textFile(“file:///home/cloudera/Desktop/linecount”).foreach{x => if(x.length() == 0) blank_line_accumulator += 1 }
println(“The total Blank Lines in the dataset linecount file is: ” +blank_line_accumulator.value)
From the above example, we can find out the number of empty or blank lines for a given dataset.
We hope this post has been helpful in understanding the working of Broadcast variables and Accumulators in Spark.
Suggested Reading
https://acadgild.com/blog/map-side-joins-in-hive
https://acadgild.com/blog/spark-use-case-uber-data-analysis
Keep visiting our website Acadgild for more updates on Big Data and other technologies.