Free Shipping

Secure Payment

easy returns

24/7 support

  • Home
  • Blog
  • Social Media Analysis Using Apache Flink

Social Media Analysis Using Apache Flink

 July 19  | 0 Comments

In this post, we will be looking at a case study to calculate the average number of friends based on their age, on a social media website using Apache Flink in Scala.

In our previous post, we had a brief introduction to Flink. Hence, we request you to go through that first, before going through this post.

Let’s begin by considering a sample of four records.

Column 1: User ID

Column 2: User Name

Column 3: Age of the User

Column 4: Number of Friends with that User

You can download the input file from here.

We can enter into Scala Flink shell by just typing start-scala-shell.sh local (as we are running it in local, otherwise you need to provide the host name and the port number). You can refer to the below image to get an idea of how the Scala Flink shell looks like.

To set the environment in the Flink Scala shell for the first time, the below line of code is used.

val env = ExecutionEnvironment.getExecutionEnvironment

Here getExecutionEnvironment will automatically choose the environment where we are executing the program.

Now, we need to load the dataset in Flink as shown below.

val lines: DataSet[String] = env.readTextFile("/home/kiran/Desktop/social_friends.csv")

In Flink, all the operations are performed on DataSet[T].

At this point, it is essential to know what DataSet[T] is.

What is DataSet[T]?

A DataSet represents a collection of elements of the same type. A DataSet can be transformed into another DataSet by applying any one of the transformations shown below:

  • map(org.apache.flink.api.common.functions.MapFunction),
  • reduce(org.apache.flink.api.common.functions.ReduceFunction),
  • join(DataSet), or
  • coGroup(DataSet).

The readTextFile(“path of the file�?) as shown in the above line of code, will load the text file containing the social media data.

Now, let’s perform map operation on the above data to split the data.

val Average = lines.map(line => line.split(","))

This line will split the comma-separated data and store it in an array. Now, from this dataset, we will perform another map operation to take out the columns, which we want for performing an average operation.

According to the dataset description, we need to take out 3rd column, which gives the age of the person and 4th column which gives the number of friends.

To perform the average operation, we will add one more column, which contains that is useful to calculate the average after performing sum operation.

val Average = lines.map(line => line.split(",")).map(word => (word(2).toString,word(3).toInt,1))

You can see the output of this line of code in the below screen shot.

 

Next, let’s perform groupby operation on this data to group the persons of same age using the below line of code.

val Average = lines.map(line => line.split(",")).map(word => (word(2).toString,word(3).toInt,1)).groupBy("0")

This will perform groupby operation on the above-extracted columns. Now, we need to take out the average number of friends present in each age group.

In Flink, there is no direct operator for performing an average operation. Therefore, we need to write one custom function. We will do this using reduce, as shown below.

val Average = lines.map(line => line.split(",")).map(word => (word(2).toString,word(3).toInt,1)).groupBy("0").reduce { (left, right) => (left._1, left._2 + right._2, left._3 + right._3) }

Here, in the reduce function, we have taken two objects named left and right, each containing three tuples. Here, the first one is the key, the second one is the number of friends, and the third one is the number of occurrences. The output of this line is as shown in the below screen shot.

 

Next, we need to calculate only the tuple._2/tuple._3 to take out the average number of friends for each age group. For that, we will use map function as shown below.

val Average = lines.map(line => line.split(",")).map(word => (word(2).toString,word(3).toInt,1)).groupBy("0").reduce { (left, right) => (left._1, left._2 + right._2, left._3 + right._3) }.map(tuple => (tuple._1, tuple._2/tuple._3)

Now, if we print this, we will get the average number of friends that are present in each group. The same can be seen in the below screen shot.

 

In the above screen shot, you can see that we have successfully obtained the average number of friends in each group.

We hope this post has been helpful and given you the needed information for performing social media analysis using Apache Flink in Scala. In case of any queries, feel free to comment below and we will get back to you at the earliest.

Keep visiting our site www.acadgild.com for more updates on Big Data and other technologies.

 

>