In this post Spark SQL Use Case 911 Emergency Helpine Number Data Analysis, we will be performing analysis on the data provided the callers who had called the emergency helpline number in North America.
In many countries, the public telephone network has a single emergency telephone number (sometimes known as the universal emergency telephone number or the emergency services number) that allows a caller to contact local emergency services for assistance. This emergency number differs from country to country and typically consists of three digits numbers that can be easily remembered and dialed quickly. Some countries have a different emergency number for each of the different emergency services.
Here we have two data sets; one is the data that the callers has given when they called the emergency helpline number.
You can download the 911 data from this link.
The other data set contains the details about the zip code. You can download the dataset from this link.
The data set description for the 911 data is as follows:
lat: String variable, Latitude
lng: String variable, Longitude
desc: String variable, Description of the Emergency Call
zip: String variable, Zip code
title: String variable, Title
timeStamp: String variable, YYYY-MM-DD HH:MM:SS
twp: String variable, Township
addr: String variable, Address
e: String variable, Dummy variable (always 1)
The data set description for the zip code file is as follows:
zip: String variable, Zip code
city: String variable, City
state: String variable, State
latitude: String variable, Latitude
longitude: String variable, Longitude
timezone: String variable, Time zone
dst: String variable, district
First, let’s join both these datasets and extract the columns that are required for our analysis.
Here is the Scala code for parsing the data into a dataframe.
val data = sc.textFile("/home/kiran/Downloads/911.csv") val header = data.first() val data1 = data.filter(row => row != header) case class emergency(lat:String,lng:String,desc:String,zip:String,title:String,timeStamp:String,twp:String,addr:String,e:String) val emergency_data = data1.map(x=>x.split(",")).filter(x => x.length>=9).map(x => emergency(x(0),x(1),x(2),x(3),x(4).substring(0 , x(4).indexOf(":") ),x(5),x(6),x(7),x(8))).toDF emergency_data.registerTempTable("emergency_911")
- The first three lines of code will remove the header from the dataset.
- In the 4th line, we have declared a case class with the schema of the 911 dataset.
- In the 5th line, we have parsed the data into the case class, which we have declared, and in the 5th column, we are taking out the string up to the character :, because here we need only the cause of calling to the emergency number. Check if it’s correct.
- In the 6th line, we are registering the data in a table and naming it as emergency_911.
Now, let’s follow the same procedure for the Zipcodes data set also.
val data2 = sc.textFile("/home/kiran/Downloads/zipcode/zipcode.csv") val header1 = data2.first() val data3 = data2.filter(row => row != header1) case class zipcode(zip:String,city:String,state:String,latitude:String,longitude:String,timezone:String,dst:String) val zipcodes = data3.map(x => x.split(",")).map(x=> zipcode(x(0).replace("\"", "") ,x(1).replace("\"", ""),x(2).replace("\"", ""),x(3),x(4),x(5),x(6))).toDF zipcodes.registerTempTable("zipcode_table")
- The first three lines of code will remove the header from the dataset.
- In the 4th line, we have declared a case class with the schema of the Zipcode dataset.
- In the 5th line, we are parsing the dataset into the dataset into the case class, which we have declared, and we are removing the Double quotes from the required columns for our analysis.
- In the 6th line, we are registering data into a table and naming it as zipcode_table.
Now, we are ready to join both the datasets by taking the required columns for our analysis
val build1 = sqlContext.sql("select e.title, z.city,z.state from emergency_911 e join zipcode_table z on e.zip = z.zip")
Here, we are joining the datasets by keeping the key as zipcode, so that we can get the city and the state from where we are getting the call.
Now, let’s evaluate the below problem statements:
Problem Statement 1:
What kind of problems are prevalent, and in which state?
Here is the code to solve the above problem statement.
val ps = build1.map(x => (x(0)+" -->"+x(2).toString)) val ps1 = ps.map(x=> (x,1)).reduceByKey(_+_).map(item => item.swap).sortByKey(false).foreach(println)
Here is the output we have when we try to find out the problems state-wise.
(13012,Fire -->PA) (44326,EMS -->PA) (29297,Traffic -->PA) (1,Traffic -->AL) (1,EMS -->TX)
Problem Statement 2:
What kind of problems are prevalent, and in which city?
Here is the code to solve the above problem statement.
val ps = build1.map(x => (x(0)+" -->"+x(1).toString)) val ps1 = ps.map(x=> (x,1)).reduceByKey(_+_).map(item => item.swap).sortByKey(false).foreach(println)
Here is the sample output we have when we try to find out the problems city wise.
(7409,EMS -->Norristown) (4237,EMS -->Pottstown) (2665,EMS -->Lansdale) (1822,Traffic -->Pottstown) (1677,EMS -->King Of Prussia) (1505,Fire -->Norristown) (1466,Traffic -->Lansdale) (1397,EMS -->Ambler) (1451,EMS -->Royersford) (1262,Traffic -->North Wales) (1354,EMS -->Glenside) (1214,Traffic -->Ambler) (1257,EMS -->Jenkintown) (1257,EMS -->Willow Grove) (1253,EMS -->Harleysville) (1205,EMS -->North Wales) (1071,EMS -->Hatboro) (1128,EMS -->Collegeville) (1051,Traffic -->Willow Grove) (1042,Fire -->Pottstown) (1015,Traffic -->King Of Prussia) (999,Traffic -->Royersford) (982,EMS -->Huntingdon Valley) (942,EMS -->Blue Bell) (917,EMS -->Conshohocken) (911,Traffic -->Huntingdon Valley)
You can refer to the above result to know the number of problems and the type of problems that people have.
We hope this post has been helpful in understanding how to perform analysis using Spark SQL. In the case of any queries, feel free to comment below and we will get back to you at the earliest.
Keep visiting our website Acadgild for more updates on Spark and other technologies. Click here to learn Apache Spark.