Big Data Hadoop & Spark

Building Spark Application in Java Using Maven

Here we will run a Spark application in Java built by using Maven. We will also discuss the Java API’s which we have used in the word count program.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
public class WordCount {
  private static final FlatMapFunction<String, String> WORDS_EXTRACTOR =
      new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String s) throws Exception {
          return Arrays.asList(s.split(" "));
        }
      };
  private static final PairFunction<String, String, Integer> WORDS_MAPPER =
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) throws Exception {
          return new Tuple2<String, Integer>(s, 1);
        }
      };
  private static final Function2<Integer, Integer, Integer> WORDS_REDUCER =
      new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer a, Integer b) throws Exception {
          return a + b;
        }
      };
  public static void main(String[] args) {
    if (args.length < 1) {
      System.err.println("Please provide the input file full path as argument");
      System.exit(0);
    }
    SparkConf conf = new SparkConf().setAppName("org.sparkexample.WordCount").setMaster("local");
    JavaSparkContext context = new JavaSparkContext(conf);
    JavaRDD<String> file = context.textFile(args[0]);
    JavaRDD<String> words = file.flatMap(WORDS_EXTRACTOR);
    JavaPairRDD<String, Integer> pairs = words.mapToPair(WORDS_MAPPER);
    JavaPairRDD<String, Integer> counter = pairs.reduceByKey(WORDS_REDUCER);
    counter.saveAsTextFile(args[1]);
  }
}

Code Explanation

100% Free Course On Big Data Essentials

Subscribe to our blog and get access to this course ABSOLUTELY FREE.

Spark application can be built in many languages. Spark provides APIs in different languages like Python, Scala, Java. To use one specific language we need to import the APIs of that specific language. Now we are working on Java so we have imported Java APIs to work on Spark.

Now we will see what we require, for writing our spark application.
1.We need a text file
2.Split the words inside the text file
3.Make the split words as key and value pair
4.Aggregate the values of one key
We will see the loading of text file later which should be done in the context.
Now to Split the words and store them as an array. We need some function. The function should be like a form. It should take one input (say a line) and the output can be of one or more (words in a line). Spark Java API has such type of function called FlatMapFunction.
FlatMapFunction<T R> has two parameters. T-stands for take-in one input R-stands for return 0 or more outputs. It will be like Iterable<R> call(T).
Import org.apache.spark.api.java.function.FlatMapFunction;
We have imported the FlatMap function in the Spark Java API using the above import statement.

private static final FlatMapFunction<String, String> WORDS_EXTRACTOR =
      new FlatMapFunction<String, String>() {
        @Override
        public Iterable<String> call(String s) throws Exception {
          return Arrays.asList(s.split(" "));
        }
      };

To create key-value pairs, we will have to override the default PairFunction

private static final PairFunction<String, String, Integer> WORDS_MAPPER =
      new PairFunction<String, String, Integer>() {

We name this function as WORDS_MAPPER

The parameters for the pair function are <T K V> T-stands for taking in one input. K-stands for key and V-stands for value.
To create the key value pair we need to override the default Tuple2 function in Spark Java API

public Tuple2<String, Integer> call(String s) throws Exception {
          return new Tuple2<String, Integer>(s, 1);

Tuple2<K V> generates the key and value pairs for us. It calls the String s, make it as key and 1 as value
call(s,1)
With this we have achieved one more goal i.e., creating key-value pairs.
Now we need to aggregate the obtained key value pairs which will be done in the reducer part in Hadoop. So now we will write a function to aggregate the values for a particular key.
For performing aggregation type of operation Spark Java API has a function called Function2<T1,T2,R> which takes two inputs and return one output.
To override the Function2 the syntax will be like this

Hadoop

private static final Function2<Integer, Integer, Integer> WORDS_REDUCER =
      new Function2<Integer, Integer, Integer>() {

We will name this function as WORDS_REDUCER
Now for aggregating, we will call the two inputs by using call method

public Integer call(Integer a, Integer b) throws Exception {
return a + b;

We will aggregate the two inputs and return as one output.
Now the last goal is also completed i.e., to aggregate the values.
If we want to make it complete we need to add a driver class as it is in Hadoop.

public static void main(String[] args) {
    if (args.length < 1) {
      System.err.println("Please provide the input file full path as argument");
      System.exit(0);
    }

In the main method, we will define the configurations of the Spark.
We have already set a condition that if the arguments given are less than 1, print an error message. This means if we do not specify the input and output path please exit.
Let us see the configuration part for Spark
SparkConf conf = new SparkConf().setAppName(“org.sparkexample.WordCount”).setMaster(“local”);
As it is in Hadoop, we need to set the configuration using SparkConf() method and also we can specify the application name using the method setAppName() which is used to identify the application in the cluster.
The most important thing that we need to specify is the Master for Spark i.e. in which mode it should run setMaster().Now we are running it in the local mode so we have given it as “local”.

JavaSparkContext context = new JavaSparkContext(conf);

Now SparkContext also has to be set as it is in Hadoop so that it can read the configuration given.
The context in Spark differs by the API for Java. API follows the above syntax.

JavaRDD<String> file = context.textFile(args[0]);

This is our first goal to be achieved to load a file.
Now we need to load the input file for performing the word count operation.
This is somewhat different from Hadoop. In Hadoop, we will give the input file as TextInputFormat. But in Spark, everything is performed in RDDs.
Creating an RDD
So we will create an RDD using Java API. The syntax for creating the RDD looks like this

JavaRDD< >-Inside the brackets RDD of type should be given. In our case, we are creating an RDD of type string.
Variable-It is followed by a variable to store that RDD.
Context-It means that we are giving the file name in the context.
TextFile-It means that we are loading a text file.
With this, we can create an RDD initially which can load the data.Every operation on RDDs is of transforming the present RDD to another RDD. So everything is performing on RDDs only.

JavaRDD<String> words = file.flatMap(WORDS_EXTRACTOR);

FlatMap will run one time for every input line. In Hadoop for using the Map method, we need to inherit the features of default Mapper class and then we need to override it. But in Spark, it is served as a function. FlatMapFunction is different from flatMap.
Now this flatMap performs operations on the WORDS_EXTRACTOR (which will split the words).
The flatMap runs on every line in the file split the words and save it as an RDD with variable name words.

If we observe initially we had created an RDD to load a text file. Now this text file is transformed into RDD of words.
Which means we have successfully performed a transformation on RDDs.

JavaPairRDD<String, Integer> pairs = words.mapToPair(WORDS_MAPPER);

To create key value pair type of RDDs in Spark application using Java we need to use JavaPairRDD< > -In the brackets, we need to give the pair of which RDD type we want. We name that RDD as pairs. Now we want to create this RDD from the previously extracted words.
So we apply a transformation on RDD with name words as words.mapToPair (WORDS_MAPPER).
mapToPair will run for every word in the words RDD and perform the operations which are specified in WORDS_MAPPER. This will result in an RDD with key value pair.
JavaPairRDD<String, Integer> counter = pairs.reduceByKey(WORDS_REDUCER);
Now we have the key value pairs ready with us. We need to aggregate the values for each key. To perform this operation, we have something called reduceByKey which will do the Reducer operation in Hadoop.
We want to create an RDD of type key value pair and aggregate the values of each key. We have the key-value pairs inside the variable pairs. So we will transform that RDD into another RDD.
pairs.reduceByKey(WORDS_REDUCER)
For each pair inside the pairs RDD, the reduceByKey method is run. This performs the operations specified in WORDS_REDUCER.
The reduceByKey method literally performs the aggregation type of operations based on the key.
In this example, each pair of the pairs is transformed into a new RDD which will be formed by method reduceByKey(WORDS_REDUCER).
While transforming reduceByKey method check whether there are any similar keys, if there are any similar keys then it will add the values of that key(this logic is written in WORDS_REDUCER) .
Our spark application i.e., word count program in Spark’s Java API is done. Now we can deploy the developed spark application in Spark cluster.
Follow the below steps
Copy and paste the program in Eclipse by creating a Java project. After creating a Java Project, create a class with name WordCount and then paste the whole program.
After copying, we will get so many errors. To fix those errors,
Right click on the project —> Go to BuildPath —> Configure BuildPath
open the Spark folder —>lib —> Spark Assembly 1.5.1 jar
After adding the jar all the errors will be cleared.

Now we want to make a jar of the spark application run in the cluster.
We need to install Maven and build the jar with Maven.
We will have to follow the below steps to install Maven in our system
Open terminal and then type
yum install maven
It will take some time to download and install. After the complete process, we need to check whether Maven is installed or not.
Use the bellow command to check whether Maven is installed or not
mvn
Check the version of installed Maven by using the command
mvn -version

We have successfully installed Maven in our system.
Now we have to create a Maven package for our project.
Steps to follow to create a Maven project
Right click on the project —>Configure —> Convert to Maven project


After clicking this, our project will be converted into Maven Project.
A pom.xml file will be created in our project
Open that pom.xml file and move to the Dependencies tab. We will have to click on ADD to add the dependencies to our project.

Give the dependency as follows
GroupId —> org.apache.spark
Artifact Id —> spark-core_2.10
Version —> 1.20
Then click ok
It will download some dependencies from the Internet and it will be configured to our spark application automatically.
After the whole process, we can check whether the dependencies were downloaded or not, by navigating into the Maven Dependencies in our project.

All the dependencies required for building the jar is finished.
A jar is to be created now.
To create the jar, follow the below-mentioned steps
Open Terminal —>Move to the place where our project is present
We can check the path of our project by
Right Click on the project —> Properties
After clicking on properties, besides the console tab a properties tab will be opened. In the properties, there will be a location. Copy that path

In the terminal type
cd <path>
We will be navigated to the folder where our project is present
Now to make the package, type the command
mvn package

It will take some time to build the package. After the whole process, we can check the build jar inside the target file.

Spark_Java_WordCount-0.0.1-SNAPSHOT.jar will be created that is our jar file.
The remaining work is to deploy the built jar for our spark application in the cluster.
To deploy the spark application in the Spark cluster we will need to specify a few things
Open the Spark installed folder and type
./bin/spark-submit –class org.sparkexample.WordCount –master local[2] /<path to maven project>/target/spark-examples-1.0-SNAPSHOT.jar /<path to a demo test file> /<path to output directory>
Each element of the folder is as explained:
./bin – Move into the bin folder
/spark-submit – In client mode, the driver is launched directly within the sparksubmit process which acts as a client to the cluster. The input and output of the application are attached to the console.
–class – We are giving a class file
org.sparkexample.WordCount –> Name of the main class (org.sparkexample is the package where word count is there)
–master local[2] –> We are running the Spark master in local with 2 cores
/<path to maven project> –>Path where the jar file is created
/<path to a demo test file> –> Path where our input file is present
/<path to output directory>—> Path where our output file should be created.
In our case the syntax will be like this
./bin/spark-submit –class WordCount –master local[2] /home/kiran/workspace/Spark_Java_WordCount/target/Spakr_Java_WordCount-0.0.1-SNAPSHOT.jar /home/kiran/Desktop/input /home/kiran/Desktop/word_out
Note: After –Class I have specified just WordCount(My main class file name) because I have created it in the default package(I have not created any package, I used the default one) so I did not type org.*
If we have created the class file inside any package, we need to specify our package also.
After executing this command, your spark application will start running


Now open the path which is given as the output path.

We will see a part file where our output will be stored.
We hope this blog helped you understand, how to run and build Spark application in Java using Maven. Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.
Spark

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close