In this post, we will see in detail the JOIN in Apache Spark Core RDDs and DataFrame. Apache Spark is evolving exponentially, including the changes and additions that have been added to core APIs. One of the most innovative areas of change spins around the representation of data sets. Spark 1.0 and above uses the Spark Core RDD API, but in the past nine to ten months, two new APIs have been introduced that are, DataFrame and DataSets. The DataFrame API was introduced in Spark 1.3, and Spark 1.6 saw a new DataSet API.
Joining data is an important part of many of our pipeline projects. While join in Apache spark is very common and powerful, they require special tuning for better performance. At the same time, it can become a bottleneck if not handled with care.
Core Spark Joins
Here we will see various RDD joins. In general, a JOIN in Apache spark is expensive as it requires keys from different RDDs to be located on the same partition so that they can be combined locally. If the RDDs do not have a known partitioner, then shuffle operations occur to bring the keys into the same partitioner. Once the tables are joined, we can perform various Transformations as well as Actions on the joined RDDs.
In order to join the data, Spark needs it to be present on the same partition. The default process of join in apache Spark is called a shuffled Hash join. The shuffled Hash join ensures that data on each partition has the same keys by partitioning the second dataset with the same default partitioner as the first.
Now, we will perform a JOIN in Apache spark RDDs.
- Description of customer_data
Customer ID: unique for every unique customer
Store ID: ID of the store where purchasing has been done
Description of store_data
Store ID: ID of the store
Rating: Rating of the store
Now we will perform JOIN for the two datasets.
*Note: JOIN in Apache spark rdd expects a tuple with KEY based on which join will happen. We need to be careful while choosing our key field to perform joining of data sets.
case class cust(cust_id: Int, store_id:Int, sale:Int) // defining a case class "cust" with fields according to customer_data file. We have mentioned the data type as well. case class store(store_id:Int, rating:Float) // defining a case class "store" with fields according to store_data file. val inp1 = sc.textFile("file:///home/acadgild/sparkjoins/customer_data").map(_.split(",")) // loading the customer data and splitting it based on the delimiter ',' val inp2 = sc.textFile("file:///home/acadgild/sparkjoins/store_data").map(_.split(",")) // loading the store data and splitting it based on the delimiter ','
val cust_record = inp1.map(x => (x(1).toInt, cust(x(0).toInt, x(1).toInt, x(2).toInt))) /* creating a tuple containing 2 fields. Field 1 is an integer and this will be treated as the KEY(2nd column from customer_data) Field 2 is a 'cust' class which will take 3 parameters. */ val store_record = inp2.map(x => (x(0).toInt, store(x(0).toInt, x(1).toFloat))) /* creating a tuple containing 2 fields. Field 1 is an integer and this will be treated as the KEY ( 1st column from store_data) Field 2 is a 'store' class which will take 2 parameters. */
Now based on common KEY, we will join the data sets.
val joined_rdd = cust_record.join(store_record) // performing the join operation.
If you see the output properly, you will find that one row (store_id: 444) from customer_data file is missing because the same is not present in the store_data file. So, if the key is not present in both the files, data loss occurs with a simple join.
It is always safer to use outer join which gives a guarantee to keep all the data.
Given below is the command to perform leftouterjoin.
val leftjoin_rdd = cust_record.leftouterJoin(store_record)
DataFrame is a tabular data abstraction introduced in Spark 1.3. It is designed to ease processing large amount of structured tabular data by Spark. A DataFrame is a data abstraction to work with structured and semi-structured data, i.e., datasets with a particular schema. Thus, a DataFrame is a collection of rows with a schema. This API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization.
*Note: Even though self-join in Apache spark df is supported, it is always a good practice to alias the fields so that they can be easily accessed.
case class cust(cust_id: Int, store_id:Int, sale:Int) Val inp1 = sc.textFile("file:///home/acadgild/sparkjoins/customer_data").map(_.split(",")).map(x => cust(x(0).toInt, x(1).toInt, x(2).toInt)).toDF() // creating a dataframe using toDF()
inp1.show() // to check the inp1 dataframe inp1.printSchema() // gives the schema structure of dataframe
case class store(store_id:Int, rating:Float) Val inp2 = sc.textFile("file:///home/acadgild/sparkjoins/store_data").map(_.split(",")).map(x => store(x(0).toInt, x(1).toFloat)).toDF()
Naive inner join
We can perform a simple join using these two DFs, ie; inp1 and inp2.
val naiveinnerjoin = inp1.join(inp2, inp1("store_id") === inp2("store_id"), "inner")
// Joining inp1 and inp2 based on the common column "store_id" naiveinnerjoin.show()
However, when we perform any operation on “store_id”, Spark will show an error stating the field as “ambiguous.”
naiveinnerjoin.select("store_id").show() //will throw error
To overcome this type of an issue, we have to use “Explicit inner join”. This will help in referencing “store_id” as unique that will allow us to perform any type of queries.
val joineddf = inp1.as('a).join(inp2.as('b), $"a.store_id" === $"b.store_id") joineddf.show()
We hope this blog helped you in understanding joins in Apache spark. Keep visiting our site www.acadgild.com for more updates on Certification courses and other technologies.
ill take 2 parameters.
- Description of customer_data