Big Data Hadoop & Spark

Spark SQL Use Case – Machine and Sensor Data Analysis

In this post, we shall be discussing machine and sensor data analysis using Spark SQL. Here, we have the temperatures collected every minute, from 20 top buildings all over the world. After this analysis, we can conclude the building in which country has the most number of temperature variation.

For this data analysis, you can download the necessary dataset from this link.

In the above link there are two datasets; building.csv contains the details of the top 20 buildings all over the world and HVAC.csv contains the target temperature and the actual temperature along with the building Id.

HVAC (heating, ventilating/ventilation, and air conditioning) is the technology of indoor and vehicular environmental comfort. Its goal is to provide thermal comfort and acceptable indoor air quality. Through the HVAC sensors, we will get the temperature of the buildings.

Here are the columns that are present in the datasets:

Building.csv – BuildingID, BuildingMgr, BuildingAge, HVACproduct,Country

HVAC.csv – Date, Time, TargetTemp, ActualTemp, System, SystemAge, BuildingID

Now, let’s perform analysis on the HVAC dataset to obtain the temperature changes in the building. We are performing this analysis using Spark SQL. The following is the code for performing this analysis.

val data = sc.textFile("/home/kiran/Documents/SensorFiles/HVAC.csv")
val header = data.first()
val data1 = data.filter(row => row != header)
case class hvac_cls(Date:String,Time:String,TargetTemp:Int,ActualTemp:Int,System:Int,SystemAge:Int,BuildingId:Int)
val hvac =>x.split(",")).map(x => hvac_cls(x(0),x(1),x(2).toInt,x(3).toInt,x(4).toInt,x(5).toInt,x(6).toInt)).toDF
val hvac1 = sqlContext.sql("select *,IF((targettemp - actualtemp) > 5, '1', IF((targettemp - actualtemp) < -5, '1', 0)) AS tempchange from HVAC")
  • The first three lines of code will remove the header from the CSV file.

  • In the 4th line, we are writing a case class holding the schema of the dataset.

  • In the 5th line, we are splitting each row of the dataset with the delimiter ‘as’ and we are mapping the columns to our case class and finally, we are converting it into a data frame.

  • In the 6th line, we are creating a table HVAC for our dataframe.

  • In the 7th line, we are performing an SQL query on the table, which creates one new column tempchange, which will set to 1 if there is a temperature change of either +5 or -5 between the actual_temperature and the target_temperature.

  • In the 8th line, we are registering that table as HVAC1.

Now, let’s create a table for the building dataset.


val data2 = sc.textFile("/home/kiran/Documents/SensorFiles/building.csv")
val header1 = data2.first()
val data3 = data2.filter(row => row != header1)
case class building(buildid:Int,buildmgr:String,buildAge:Int,hvacproduct:String,Country:String)
val build => x.split(",")).map(x => building(x(0).toInt,x(1),x(2).toInt,x(3),x(4))).toDF
  • The first 3 lines of the dataset will remove the header from the dataset.

  • In the 4th line, we have defined a case class holding the schema of the building dataset.

  • In the 5th line, we are mapping the dataset to the case class which we have built.

  • In the 6th line, we are registering the build dataframe as a table building.

Now, we will join both the tables building and hvac1 as shown below.

val build1 = sqlContext.sql("select h.*,, b.hvacproduct from building b join hvac1 h on buildid = buildingid")
val test = => (new Integer(x(7).toString),x(8).toString))
val test1 = test.filter(x=> {if(x._1==1) true else false})
val test2 =>(x._2,1)).reduceByKey(_+_).map(item => item.swap).sortByKey(false).collect
  • In the 1st line, we are joining the two datasets using the buildingId.

  • In the 2nd line, we are taking the tempchange column and the country column, which are required to find the maximum temperature change areas.

  • In the 3rd line, we are filtering the rows which have a change in temperature, which is identified by 1.

  • In the 4th line, we are taking the country and we are adding 1 to know how many times the temperature in that building has changed. We are applying reduceByKey operation on the data to count the number of times temperature has been changed and finally, we are sorting the in descending order and printing it out.

Below are the results based on our analysis.

(473,Finland), (251,France), (248,Hong Kong), (243,Indonesia), (243,Turkey), (241,China), (237,South Africa), (236,Egypt), (233,Saudi Arabia), (232,Israel), (232,Canada), (230,Argentina), (230,Singapore), (228,Mexico), (226,Brazil), (225,Australia), (213,USA), (199,Belgium), (196,Germany)

From the above output, we can say that temperature in Finland is changing more frequently followed by France and Hong Kong.

If there is a continuous stream of data collected from the sensors, we can automate this analysis using Spark Streaming to know the temperature changes in the real-time, so that we can take accurate measures to reduce the temperature changes.

We hope this post has been helpful in understanding how to perform analysis using Spark SQL. In the case of any queries, feel free to comment below and we will get back to you at the earliest.

Keep visiting our website Acadgild for more updates on Apache Spark and other technologies. Click here to learn Apache Spark by Industry Experts.




  1. Dear Kiran,
    The following query not working in spark and I changed a bit and working
    val hvac1 = sqlContext.sql(“select *,IF((targettemp – actualtemp) > 5, ‘1’, IF((targettemp – actualtemp) 5 OR (targetTemp-actualTemp) <-5 ,'1','0') AS tempchange from HVAC")
    correct if I am wrong.

  2. Dear Kiran,
    The following query not working in spark and I changed a bit and working
    val hvac1 = sqlContext.sql(“select *,IF((targettemp – actualtemp) > 5, ‘1’, IF((targettemp – actualtemp) 5 OR (targetTemp-actualTemp) <-5 ,'1','0') AS tempchange from HVAC")
    correct me if I am wrong.

    1. Hi Shafi,
      The syntax for the if statemnet is IF( Test Condition, True Value, False Value ). As per the requirement of the problem statement in the blog, we have given that if the temperature change is >5 set the new column as 1 i.e., the true value for the first if statement and for the false value we have given another if statement that if the temperature change is < -5 set the new column as 1 i.e., the true value of the second if statement and the false value as 0. According to the query which you have written, the false value of the first if statement will lead to an OR condition checking another two other conditions that is the only change. Please check your query once, in the second if condition you have written (targettemp – actualtemp) 5 it won't compare anything as there is no comparison operator in between.

  3. Dear kiran,
    reduceByKey function is not showing for me in test1 rdd. I can understand I’ve made some mistake but not sure where it is.

  4. By using Spark 2.0 we can get the output in few statements:
    val building=“csv”).option(“header”,true).option(“delimiter”,”,”).load(“**********/Desktop/building.csv”)
    val hvac=“csv”).option(“header”,true).option(“delimiter”,”,”).load(“**********/Desktop/HVAC.csv”)
    val hvac1=hvac.withColumn(“tempchange”,when(($”TargetTemp”-$”ActualTemp”)>5,1).when(($”TargetTemp”-$”ActualTemp”)< -5,1).otherwise(0)).withColumn("diff",$"TargetTemp"-$"ActualTemp")
    val joined=building.join(hvac1,building("BuildingID")===hvac1("BuildingID"))
    val fin=joined.filter($"tempchange"===1).groupBy("Country").count.sort(desc("count"))

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles