An example of a hadoop data analysis using Apache Spark, R and sparklyr in local mode. This Blog will take you step by step with an explanation to learn how to perform an Analysis.
To take handson experience along with this blog readers can go through our previous blog on how to connect spark and R. Also few basics on Sparklyr would be required.
We take a dataset with 3 different species of flower and collect data for length and breadth of petals and sepal.
Just for info if you are thinking what is sepal and petal refer below image.
By looking directly to data present in a table it will be difficult for us to derive a conclusion about any particular species.
Below is a sample of actual dataset.
Sepal.Length Sepal.Width Petal.Length Petal.Width Species
5.1 3.5 1.4 0.2 setosa
4.9 3.0 1.4 0.2 setosa
4.7 3.2 1.3 0.2 setosa
7.0 3.2 4.7 1.4 versicolor
6.4 3.2 4.5 1.5 versicolor
6.9 3.1 4.9 1.5 versicolor
6.3 3.3 6.0 2.5 virginica
5.8 2.7 5.1 1.9 virginica
7.1 3.0 5.9 2.1 virginica
Fields in dataset
Sepal.Length : length of sepal for particular species
Sepal.Width :width of sepal for particular species
Petal.Length : length of petal for particular species
Petal.Width :width of petal for particular species
Species : 3 variety of species we are considering here
The libraries listed below will be used to complete this hadoop data analysis and visualization.
library(sparklyr); library(dplyr); library(ggplot2); library(tidyr); set.seed(100)
Also,beginners may find difficult to understand some functions used below. So little explanation with syntax is added on the go, for their convenience,
sdf_partition(); sdf_register(); Sdf_predict()
First we connect to spark local mode using below commands.
sc <- spark_connect(master = “local”)
Copy data to Spark memory
Here we use dataset i.e iris
import_iris <- copy_to(sc, iris, “spark_iris”,overwrite = TRUE)
We perform partition on the data frame inside spark. Sdf_partition is the function used below to do so.
sdf_partition(x, …, weights = NULL, seed =sample(.Machine$integer.max, 1))
|x||An object coercible to a Spark DataFrame.|
|…||These could be Named parameters, mapping table names to weights. The weights will be normalized such that their sum is 1.|
|weights||This is an alternate mechanism for supplying weights. When specified, this takes precedence over the … arguments.|
|seed||Here Random seed to use for randomly partitioning the dataset.Can set this if you want your partitioning to be reproducible on repeated runs.|
An R list of Spark tables.
The sampling weights define the probability that a particular observation will be assigned to a particular partition, not the resulting size of the partition. This implies that partitioning a Data Frame with, for example, sdf_partition(x, training = 0.5, test = 0.5)
is not guaranteed to produce training and test partitions of equal size.
partition_iris <- sdf_partition(import_iris,training=0.5, testing=0.5)
Create a hive metadata for each partition
Here we register Spark Data Frame (giving a table name for the Spark SQL context), and take return a spark table.
sdf_register(x, name = NULL)
x A Spark DataFrame.
name A name to assign this table.
Transforming Spark DataFrames
The family of functions prefixed with sdf_ generally access the Scala Spark DataFrame API directly, as opposed to the dplyr interface which uses Spark SQL. These functions will ‘force’ any pending SQL in a dplyr pipeline, such that the resulting tbl_spark object returned will no longer have the attached ‘lazy’ SQL operations. Note that the underlying Spark DataFrame does execute its operations lazily, so that even though the pending set of operations (currently) are not exposed at the R level, these operations will only be executed when you explicitly collect() the table.
Now we select specific table columns on which we want to perform analysis. I.e we select species name, length for petal and width for petal.
tidy_iris <- tbl(sc,”spark_iris_training”) %>% select(Species, Petal_Length, Petal_Width)
Spark ML Decision Tree Model
model_iris <- tidy_iris %>% ml_decision_tree(response=”Species”,features=c(“Petal_Length”,”Petal_Width”))
Create reference to Spark table
test_iris <- tbl(sc,”spark_iris_test”)
Bring data back into R memory for plotting
Using collect we will take all the data present in pred_iris into R.
Given a ml_model fit alongside a new data set, produce a new Spark DataFrame with predicted values encoded in the “prediction”column.
sdf_predict(object, newdata, …)
|object, newdata||An object coercable to a Spark DataFrame.|
|…||Optional arguments; currently unused.|
pred_iris <- sdf_predict(model_iris, test_iris) %>% collect
We use ggplot2 library to plot graph for visualization.
pred_iris %>% inner_join(data.frame(prediction=0:2, lab=model_iris$model.parameters$labels)) %>% ggplot(aes(Petal_Length, Petal_Width, col=lab)) + geom_point()
Below is the graphical representation for the code executed.This graph compares Petal length vs Petal width.
Using this graphical representation we can get a clear picture of what species of flower have different dimensions of petals.
Hope you understand the underlying functions in this blog and use it for a great analysis. Enroll for our Big Data and Hadoop Training and kickstart a successful career as a big data developer.