VISIT PREVIOUS SETS ON
Explain Broadcast variable in Spark and how it is useful in performance tuning?
- Broadcast variable increases the efficiency of joins between small and large RDDs.
- The broadcast variable allows keeping a read-only variable cached on every machine in place of shipping a copy of it with tasks.
- We create broadcast variable by calling SparkContext.broadcast(v) and we can access its value by calling the value method.
- Instead of sending this data along with every task, spark distributes broadcast variables to the machine using efficient broadcast algorithms to reduce communication costs.
Explain any use case on Broadcast variable.
Assume we are getting a two-letter state code in a file and you wanted to transform it to full state name, (for example UK to united kingdom, NY to New York e.t.c).
Instead of distributing this information along with each task over the network , we can use the broadcast variable to cache this lookup info on each machine and tasks use this cached info while executing the transformations.
How does Spark Broadcast work?
When you run a Spark RDD, DataFrame jobs that has the Broadcast variables defined and used, Spark does the following.
- Spark breaks the job into stages that have distributed shuffling and actions are executed with in the stage.
- Later Stages are also broken into tasks
- Spark broadcasts the common data (reusable) needed by tasks within each stage.
- The broadcasted data is cache in serialized format and deserialized before executing each task.
Broadcast variables are not sent to executors with sc.broadcast(variable) call instead, they will be sent to executors when they are first used.
How to create Broadcast variable
The Spark Broadcast is created using the broadcast(v)
method of the SparkContext class.
In Spark shell
scala> val broadcastVar = sc.broadcast(Array(0, 1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(0, 1, 2, 3)
Spark RDD Broadcast variable example
using SparkContext.broadcast()
and then use these variables on RDD map() transformation.
import org.apache.spark.sql.SparkSession
object RDDBroadcast extends App {
val spark = SparkSession.builder()
.appName("cloudvikas.com")
.master("local")
.getOrCreate()
val states = Map(("NY","New York"),("CA","California"),("FL","Florida"))
val countries = Map(("USA","United States of America"),("IN","India"))
val broadcastStates = spark.sparkContext.broadcast(states)
val broadcastCountries = spark.sparkContext.broadcast(countries)
val data = Seq(("ram","Smith","USA","CA"),
("mohan","Rose","USA","NY"),
("shyam","Jones","USA","FL")
)
val rdd = spark.sparkContext.parallelize(data)
val rdd2 = rdd.map(f=>{
val country = f._3
val state = f._4
val fullCountry = broadcastCountries.value.get(country).get
val fullState = broadcastStates.value.get(state).get
(f._1,f._2,fullCountry,fullState)
})
println(rdd2.collect().mkString("\n"))
}
Spark DataFrame Broadcast variable example
This defines commonly used data (country and states) in a Map variable and distributes the variable using SparkContext.broadcast()
and then use these variables on DataFrame map() transformation.
import org.apache.spark.sql.SparkSession
object BroadcastExample extends App{
val spark = SparkSession.builder()
.appName("cloudvikas.com")
.master("local")
.getOrCreate()
val states = Map(("NY","New York"),("FL","Florida"))
val countries = Map(("USA","United States of America"),("IN","India"))
val broadcastStates = spark.sparkContext.broadcast(states)
val broadcastCountries = spark.sparkContext.broadcast(countries)
val data = Seq(("ram","Smith","USA","CA"),
("mohan","Rose","USA","NY"),
("shyam","Jones","USA","FL")
)
val columns = Seq("firstname","lastname","country","state")
import spark.sqlContext.implicits._
val df = data.toDF(columns:_*)
val df2 = df.map(row=>{
val country = row.getString(2)
val state = row.getString(3)
val fullCountry = broadcastCountries.value.get(country).get
val fullState = broadcastStates.value.get(state).get
(row.getString(0),row.getString(1),fullCountry,fullState)
}).toDF(columns:_*)
df2.show(false)
}
VISIT PREVIOUS SETS ON