Hive, like other SQL databases, allows users to join various tables. However, Joins can be computationally expensive, especially on big tables. Hive on top of Hadoop makes data processing so straightforward and scalable that we can easily forget to optimize our Hive queries. In this post, we will see some of the best practices for Join optimization on Hive and speed up the query execution time.
For join optimization in Hive, we can use repartition joins, replication joins and semi joins. In this post, we will look into this for join optimization in Hive.
Before we proceed, let us create two Hive tables and feed the data for proper understanding.
Hive> create table emp( id INT, name STRING, salary INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’; Loading the data to table ‘emp’: LOAD DATA LOCAL INPATH ‘/input’ INTO TABLE emp; Check whether the data is loaded into the table or not. SELECT * FROM emp;
Hive> create table dept( id INT, dept_name STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’; Loading the data to table ‘dept’: LOAD DATA LOCAL INPATH ‘/inp’ INTO TABLE dept; Check whether the data is loaded into the table or not. SELECT * FROM dept;
Join table ordering (Largest table last).
As with any type of tuning, it is important to understand the internal working of a system. When Hive executes a join, it needs to select which table is streamed and which table is cached. Hive takes the last table in the JOIN statement for streaming, so we need to ensure that this streaming table is largest among the two.
Let’s look at the example of our two tables. The ‘emp’ table consists of department id, employee name, and employee salary. For any organization, this list can keep growing over the time. But, the ‘dept’ table will be static for most of the time. Hence, when these two tables are joined it is important that the larger table comes last in the query. Let us see the optimized Hive query:
SELECT emp.id,name,salary,dept_name FROM dept JOIN emp ON (dept.id = emp.id);
Or, you can also explicitly tell Hive which table it should stream.
SELECT /*+ STREAMTABLE(emp) */ emp.id,name,salary,dept_name FROM dept JOIN emp ON (dept.id = emp.id);
Map Side Join:
Also known as replicated join, a map-side join is a special type of join where a smaller table is loaded in memory and join is performed in map phase of MapReduce job. Since there is no reducer involved in the map-side join, it is much faster when compared to regular join.
An important point to note is, one table must be small enough to fit into memory. It is recommended to have a proper configuration so that Hive automatically attempt to convert Joins into the map-side join. Below is a Hive join operation which is not a map-side join.
In the image above, note the highlighted part. You can see that “number of reducer” is 1 which slows down the join operation.
Now, to perform map-side join, set few configurations either into hive-site.xml OR directly from Hive shell. Below are the configurations which I have set from Hive shell.
hive> set hive.auto.convert.join=true;
hive> set hive.auto.convert.join.noconditionaltask=true;
Once you are done with the configuration, execute the same join operation as we performed above.
Note the highlighted part again! You will find that there is no reducer phase performed in this join operation. Hence, the map-side join is faster than regular join operation.
Sort-Merge-Bucket (SMB) Map Join:
It is another Hive join optimization technique where all the tables need to be bucketed and sorted. In this case joins are very efficient because they require a simple merge of the presorted tables.
Let us create bucketed tables from our existing tables i.e.; emp and dept. Before creating bucketed table, you need to set below properties.
hive> set hive.enforce.bucketing=true; hive> set hive.enforce.sorting=true;
Now we need to create bucketed table as shown below:
create table buck_emp( id int, name string, salary int) CLUSTERED BY (id) SORTED BY (id) INTO 4 BUCKETS; We need to use regular INSERT statement to insert into bucketed table. INSERT OVERWRITE TABLE buck_emp SELECT * FROM emp;
Similarly, create another bucketed table from ‘dept’ table and inserting into it.
create table buck_dept( id int, dept_name string) CLUSTERED BY (id) SORTED BY (id) INTO 4 BUCKETS; INSERT OVERWRITE TABLE buck_dept SELECT * FROM dept;
Now the stage is set to perform SMB Map Join to optimize Hive joining. Again, make some changes in properties to perform SMB Map join.
hive>set hive.enforce.sortmergebucketmapjoin=false; hive>set hive.auto.convert.sortmerge.join=true; hive>set hive.optimize.bucketmapjoin = true; hive>set hive.optimize.bucketmapjoin.sortedmerge = true; hive>set hive.auto.convert.join=false; // if we do not do this, automatically Map-Side Join will happen SELECT u.name,u.salary FROM buck_dept d INNER JOIN buck_emp u ON d.id = u.id;
Check the highlighted part. You will find that 4 mapper tasks are running (as we had 4 buckets). This helps in performing faster join operation when compared to regular
Hope this post helped you in join optimization on Apache Hive. For more updates, keep visiting www.acadgild.com