Big Data Hadoop & Spark

Spark SQL Use Case – 911 -Emergency Helpline Number Data Analysis

In this post Spark SQL Use Case 911 Emergency Helpine Number Data Analysis, we will be performing analysis on the data provided the callers who had called the emergency helpline number in North America.

In many countries, the public telephone network has a single emergency telephone number (sometimes known as the universal emergency telephone number or the emergency services number) that allows a caller to contact local emergency services for assistance. This emergency number differs from country to country and typically consists of three digits numbers that can be easily remembered and dialed quickly. Some countries have a different emergency number for each of the different emergency services.

Here we have two data sets; one is the data that the callers has given when they called the emergency helpline number.

You can download the 911 data from this link.

The other data set contains the details about the zip code. You can download the dataset from this link

The data set description for the 911 data is as follows:

lat: String variable, Latitude
lng: String variable, Longitude
desc: String variable, Description of the Emergency Call
zip: String variable, Zip code
title: String variable, Title
timeStamp: String variable, YYYY-MM-DD HH:MM:SS
twp: String variable, Township
addr: String variable, Address
e: String variable, Dummy variable (always 1)

The data set description for the zip code file is as follows:

zip: String variable, Zip code

city: String variable, City

state: String variable, State

latitude: String variable, Latitude

longitude: String variable, Longitude

timezone: String variable, Time zone

dst: String variable, district

First, let’s join both these datasets and extract the columns that are required for our analysis.

Here is the Scala code for parsing the data into a dataframe.

val data = sc.textFile("/home/kiran/Downloads/911.csv")
val header = data.first()
val data1 = data.filter(row => row != header)
case class emergency(lat:String,lng:String,desc:String,zip:String,title:String,timeStamp:String,twp:String,addr:String,e:String)
val emergency_data = data1.map(x=>x.split(",")).filter(x => x.length>=9).map(x => emergency(x(0),x(1),x(2),x(3),x(4).substring(0 , x(4).indexOf(":")
),x(5),x(6),x(7),x(8))).toDF
emergency_data.registerTempTable("emergency_911")
  • The first three lines of code will remove the header from the dataset.
  • In the 4th line, we have declared a case class with the schema of the 911 dataset.
  • In the 5th line, we have parsed the data into the case class, which we have declared, and in the 5th column, we are taking out the string up to the character :, because here we need only the cause of calling to the emergency number. Check if it’s correct.
  • In the 6th line, we are registering the data in a table and naming it as emergency_911.

Now, let’s follow the same procedure for the Zipcodes data set also.

val data2 = sc.textFile("/home/kiran/Downloads/zipcode/zipcode.csv")
val header1 = data2.first()
val data3 = data2.filter(row => row != header1)
case class zipcode(zip:String,city:String,state:String,latitude:String,longitude:String,timezone:String,dst:String)
val zipcodes = data3.map(x => x.split(",")).map(x=> zipcode(x(0).replace("\"", "")
,x(1).replace("\"", ""),x(2).replace("\"", ""),x(3),x(4),x(5),x(6))).toDF
zipcodes.registerTempTable("zipcode_table")
  • The first three lines of code will remove the header from the dataset.
  • In the 4th line, we have declared a case class with the schema of the Zipcode dataset.
  • In the 5th line, we are parsing the dataset into the dataset into the case class, which we have declared, and we are removing the Double quotes from the required columns for our analysis.
  • In the 6th line, we are registering data into a table and naming it as zipcode_table.

Now, we are ready to join both the datasets by taking the required columns for our analysis

val build1 = sqlContext.sql("select e.title, z.city,z.state from emergency_911 e join zipcode_table z on e.zip = z.zip")

Here, we are joining the datasets by keeping the key as zipcode, so that we can get the city and the state from where we are getting the call.

Now, let’s evaluate the below problem statements:

Problem Statement 1:

What kind of problems are prevalent, and in which state?

Here is the code to solve the above problem statement.

val ps = build1.map(x => (x(0)+" -->"+x(2).toString))
val ps1 = ps.map(x=> (x,1)).reduceByKey(_+_).map(item => item.swap).sortByKey(false).foreach(println)

Here is the output we have when we try to find out the problems state-wise.

(13012,Fire -->PA)
(44326,EMS -->PA)
(29297,Traffic -->PA)
(1,Traffic -->AL)
(1,EMS -->TX)

Hadoop

Problem Statement 2:

What kind of problems are prevalent, and in which city?

Here is the code to solve the above problem statement.

val ps = build1.map(x => (x(0)+" -->"+x(1).toString))
val ps1 = ps.map(x=> (x,1)).reduceByKey(_+_).map(item => item.swap).sortByKey(false).foreach(println)

Here is the sample output we have when we try to find out the problems city wise.

(7409,EMS -->Norristown)
(4237,EMS -->Pottstown)
(2665,EMS -->Lansdale)
(1822,Traffic -->Pottstown)
(1677,EMS -->King Of Prussia)
(1505,Fire -->Norristown)
(1466,Traffic -->Lansdale)
(1397,EMS -->Ambler)
(1451,EMS -->Royersford)
(1262,Traffic -->North Wales)
(1354,EMS -->Glenside)
(1214,Traffic -->Ambler)
(1257,EMS -->Jenkintown)
(1257,EMS -->Willow Grove)
(1253,EMS -->Harleysville)
(1205,EMS -->North Wales)
(1071,EMS -->Hatboro)
(1128,EMS -->Collegeville)
(1051,Traffic -->Willow Grove)
(1042,Fire -->Pottstown)
(1015,Traffic -->King Of Prussia)
(999,Traffic -->Royersford)
(982,EMS -->Huntingdon Valley)
(942,EMS -->Blue Bell)
(917,EMS -->Conshohocken)
(911,Traffic -->Huntingdon Valley)

You can refer to the above result to know the number of problems and the type of problems that people have.

We hope this post has been helpful in understanding how to perform analysis using Spark SQL. In the case of any queries, feel free to comment below and we will get back to you at the earliest.

Keep visiting our website Acadgild for more updates on Spark and other technologies. Click here to learn Apache Spark.

Spark

4 Comments

  1. Dear Kiran
    Thank you very much for the great work
    I am new to Spark & learning it though Implementation cases
    in your code if i run :
    val build1 = sqlContext.sql(“select e.title, z.city,z.state from emergency_911 e join zipcode_table z on e.zip = z.zip”)
    :23: error: not found: value sqlContext
    val build1 = sqlContext.sql(“select e.title, z.city,z.state from emergency_911 e join zipcode_table z on e.zip = z.zip”)
    Is there an quick way to fix the error?

  2. You have to first import the SQLContext then only it will work
    import org.apache.spark.sql.SQLContext
    val sqlcontext =new SQLContext(sc)
    val build1 = sqlContext.sql(“select e.title, z.city,z.state from emergency_911 e join zipcode_table z on e.zip = z.zip”)
    Do this thing it will definitely work

  3. Thanks. Should have mentioned this:
    scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
    And this is occuring now. What’s the fix?
    scala> val ps1 = ps.map(x=> (x,1)).reduceByKey(_+_).map(item => item.swap).sortByKey(false).foreach(println)
    :48: error: value reduceByKey is not a member of org.apache.spark.sql.Dataset[(String, Int)]
    val ps1 = ps.map(x=> (x,1)).reduceByKey(_+_).map(item => item.swap).sortByKey(false).foreach(println)

  4. 2 items were not listed in the original source:
    1). sqlConnect object
    2). when you ran into “value reduceByKey is not a member of org.apache.spark.sql.Dataset[(String, Int)]” You need to look into sql.DataSet type, it is associated with RDD. Use .rdd notation to get RDD type so reduceByKey becomes available.
    val ps1 = ps.map(x=> (x,1)).rdd.reduceByKey(_+_).map(item => item.swap).sortByKey(false).foreach(println)

Leave a Reply

Your email address will not be published. Required fields are marked *

Related Articles

Close