Data Analytics with R, Excel & Tableau

Sparklyr R Interface for Apache Spark

Sparklyr is a package, that when installing in R, can be integrated with Spark.
Advantages

  • Connect to Spark from R. The sparklyr package provides an entire dplyr backend.
  • Filter and aggregate Spark datasets and bring them into R for analysis and visualizing.
  • Use Spark’s distributed machine learning library from R.
  • Create extensions that call the complete Spark API and supply interfaces to the Spark packages.
  • This sparklyr package gives many options for analysis. In this blog, we will see briefs on topics that this package introduces.

Readers can go to our previous blogs where we have given an explanation on how to integrate Spark and R using sparklyr.

Using dplyr

This section focusses on the tools that can be applied to data frames. We will be overviewing 3 major objectives:
• Determine the most vital data manipulation verbs and make them simple to use from R.
• Give blazing fast performance for in-memory data by writing key items in C++.
• Use the same interface to work with data regardless of where it’s stored, whether in a data frame, a data table, or database
Since we have a connection established, we can now use all of the available dplyr actions against the tables within a cluster. Copy some datasets from R into the Spark cluster.
Note: You may need to install the nycflights13 and Lahman packages in order to execute this code.
install.packages (c(“nycflights13”, “Lahman”))
Remember to make a connection to Spark as the installation of the new package will restart the R session.
Now, we can go ahead and insert the tables in Spark and proceed with filtering our data.
library(dplyr)
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, “flights”)
batting_tbl <- copy_to(sc, Lahman::Batting, “batting”)
src_tbls(sc)


To start with, here is a simple filtering example:

  • It filters by departure delay and prints the first few records.

flights_tbl %>% filter(dep_delay == 2)

  • It plots data for flight delays.

delay <- flights_tbl %>% group_by(tailnum) %>% summarise(count = n(), dist = mean(distance), delay = mean(arr_delay)) %>% filter(count > 20, dist < 2000, !is.na(delay)) %>% collect

It plots a delay graph.
library(ggplot2)
ggplot(delay, aes(dist, delay)) + geom_point(aes(size = count), alpha = 1/2) + geom_smooth() + scale_size_area(max_size = 2)

Note: geom_smooth() is using the method = ‘gam’


The blue line is the Average Delay Vs Distance graph that has been plotted by our calculations. From this, we can conclude that flights flying for a distance of 500–750 Km get delayed the most.

Reading and Writing Data

You can read and write this data in CSV, JSON, and Parquet formats. The knowledge will be held on in HDFS, S3, or on other native filesystems of cluster nodes. Simply provide the parameter in the file extension.
A few examples are listed below:
temp_csv <- tempfile(fileext = “.csv”)
spark_write_csv(iris_tbl, temp_csv)
iris_csv_tbl <- spark_read_csv(sc, “iris_csv”, temp_csv)
 

Window Functions

dplyr also supports the Window functions. For example:
batting_tbl %>% select(playerID, yearID, teamID, G, AB:H) %>% arrange(playerID, yearID, teamID) %>% group_by(playerID) %>% filter(min_rank(desc(H)) <= 2 & H > 0)

Using SQL

It’s also possible to execute SQL queries directly against tables inside a Spark cluster. The spark_connection object implements a DBI interface for Spark, therefore, you’ll use dbGetQuery to execute SQL and return the result as an R data frame:
install.packages(“DBI”)
Remember to establish a connection to Spark as the installation of a new package will restart the R session.
iris_tbl <- copy_to(sc, iris)
We need to make a table once again as the session is restarted.
library(DBI)

iris_preview <- dbGetQuery(sc, “SELECT * FROM iris LIMIT 10”)
iris_preview

Machine Learning

You can orchestrate machine learning algorithms in a Spark cluster via the machine learning functions available inside sparklyr. These functions connect with a set of high-level APIs that are designed on top of data frames that assist you in creating and tuning machine learning workflows.
Here is an example:
Wherever we use ml_linear_regression to suit a linear regression model, we’ll use the inbuilt mtcars dataset and see if we can predict a car’s fuel consumption (mpg) based on its weight (wt), and the variety of cylinders the engine contains (cyl).
We will assume that in every case, the relationship between mpg and each of our features is linear.
Copy mtcars data into Spark.
mtcars_tbl <- copy_to(sc, mtcars)

Transform your data set and then partition it into ‘training’, ‘test’, and ‘speed.’
partitions <- mtcars_tbl %>% filter(hp >= 100) %>% mutate(cyl8 = cyl == 8) %>% sdf_partition(training = 0.5, test = 0.5, seed = 1099)

Fit a linear model to the training dataset.
fit <- partitions$training %>% ml_linear_regression(response = “mpg”, features = c(“wt”, “cyl”))
No rows are dropped by the ‘na.omit’ call.
fit

For linear regression models created by Spark, we will use summary() to find out about the quality of our fit and the applied math significance of each of our predictors.
summary(fit)

Spark machine learning supports a large array of algorithms and feature transformations, and as illustrated above, it is easy to chain these functions together with dplyr pipelines.

Extensions

The facilities used internally by sparklyr for its dplyr and machine learning interfaces are available to extension packages. Since Spark is a general-purpose cluster computing system, there are many potential applications for the extensions (e.g., interfaces to custom machine learning pipelines, interfaces to 3rd party Spark packages, etc.)
Here’s a simple example that wraps a Spark text file line counting function with an R function:
Writing a CSV File
tempfile <- tempfile(fileext = “.csv”)
write.csv(nycflights13::flights, tempfile, row.names = FALSE, na = “”)
We define an R interface to Spark line counting.
count_lines <- function(sc, path) { spark_context(sc) %>% invoke(“textFile”, path, 1L) %>% invoke(“count”)}
We call Spark to count the lines of the CSV.
count_lines(sc, tempfile)
We have to count the number of lines in the tempfile.

Table Utilities

You can cache a table into memory using the following command:
tbl_cache(sc, “batting”)
Also, we can unload from memory using the command:
tbl_uncache(sc, “batting”)

Connection Utilities

We can view the Spark web console using the spark_web function, using:
spark_web(sc)
Refer to the screenshot below:

We can see the log using the spark_log function:
spark_log(sc, n = 10)

Finally, we disconnect from Spark, using:
spark_disconnect(sc)
Hope you enjoyed knowing the different applications of sparklyr.
Enroll for Apache Spark Developer Training Certification conducted by Acadgild and become a successful Spark developer

One Comment

  1. Pingback: DataDotz Bigdata Weekly - DataDotz - Big Data | Hadoop | NoSQL Consulting Services

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Related Articles

Close
Close