If you new to this page then please go through
FLATMAP AND MAP
flatMap()
- In flatMap() function, for each input element it produces one or more elements in the output RDD.
- Map and flatMap are similar in the way that they take a element from input RDD and apply a function on that element.
- The difference between map and flatMap is map returns only one element, while flatMap can return a list of elements.
In detail:
- Spark flatMap() transformation flattens the RDD/DataFrame column after applying the function on every element and returns a new RDD/DataFrame respectively.
- The returned RDD/DataFrame can have the same count or more number of elements.
- This is one of the major differences between flatMap() and map(), where map() transformation always returns the same number of elements as input.
- Example:
val data = Seq(“Project cloudvikas’s”,
“Alex’s Adventures in world”,
“Project cloudvikas’s”,
“Adventures in world”,
“Project cloudvikas’s”)
val rdd=spark.sparkContext.parallelize(data)
rdd.foreach(println)
it will display output line by line as we have given input.
- flatMap() Example
- Now, let’s see with an example of how to apply a Spark flatMap() transformation on RDD. In the below example, first, it splits each element in RDD by space and finally flattens it.
val rdd1 = rdd.flatMap(f=>f.split(” “))
rdd1.foreach(println)
This yields below output. Resulting RDD consists of a single word on each record.
Project
cloudvikas’s
Alex’s
Adventures
in
world
Project
cloudvikas’s
Adventures
in
world
Project
cloudvikas’s
Spark RDD flatMap() example
import org.apache.spark.sql.SparkSession
object FlatMapExample extends App{
val spark: SparkSession = SparkSession.builder()
.master(“local[1]”)
.appName(“sparkdata.com”)
.getOrCreate()
val data = Seq(“Project cloudvikas’s”,
“Alex’s Adventures in world”,
“Project cloudvikas’s”,
“Adventures in world”,
“Project cloudvikas’s”)
val rdd=spark.sparkContext.parallelize(data)
rdd.foreach(println)
val rdd1 = rdd.flatMap(f=>f.split(” “))
rdd1.foreach(println)
}
Using flatMap() on Spark DataFrame
flatMap() on Spark DataFrame operates similar to RDD, when applied it executes the function specified on every element of the DataFrame by splitting or merging the elements hence, the result count of the flapMap() can be different.
val arrayStructureData = Seq(
Row(“ram,,Smith”,List(“Java”,”Scala”,”C++”),”CA”),
Row(“john,Rose,”,List(“Spark”,”Java”,”C++”),”NJ”),
Row(“comfort,,Williams”,List(“CSharp”,”VB”,”R”),”NV”)
)
val arrayStructureSchema = new StructType()
.add(“name”,StringType)
.add(“languagesAtSchool”, ArrayType(StringType))
.add(“currentState”, StringType)
val df = spark.createDataFrame(
spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
import spark.implicits._
//flatMap() Usage
val df2=df.flatMap(f=> f.getSeqString.map((f.getString(0),_,f.getString(2))))
.toDF(“Name”,”language”,”State”)
df2.show(false)
This yields below output after flatMap() transformation. As you notice the input of the data frame has 3 records but after exploding the “language” using flatMap(), it returns 6 elements.
+—————-+——–+—–+
|Name |Language|State|
+—————-+——–+—–+
|ram,,Smith |Java |CA |
|ram,,Smith |Scala |CA |
|ram,,Smith |C++ |CA |
|john,Rose, |Spark |NJ |
|john,Rose, |Java |NJ |
|john,Rose, |C++ |NJ |
|comfort,,Williams|CSharp |NV |
|comfort,,Williams|VB |NV |
|comfort,,Williams|R |NV |
+—————-+——–+—–+
flatMapValues()
• it works for Paired RDD.
• The flatMapValues() is similar to flatMap but it performs the specified operation values.
flatMapValues
x = sc.parallelize([(‘A’,(1,2,3)),(‘B’,(4,5))])
y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened
print(x.collect())
print(y.collect())
OUTPUT>>
[(‘A’, (1, 2, 3)), (‘B’, (4, 5))]
[(‘A’, 1), (‘A’, 4), (‘A’, 9), (‘B’, 16), (‘B’, 25)]
Process finished with exit code 0
filter()
• filter () function returns a new RDD that contains only the elements that meet a predicate/condition.
val filterfunc = data.filter(x => x!=35)
groupBy()
• The data are grouped according to the result of the expression specified.
We need to pass one function (which defines a group for an element) which will be applied to the source RDD and will create a new RDD as with the individual groups and the list of items in that group.
This operation is a wide operation as data shuffling may happen across the partitions.
groupBy is a transformation operation in Spark hence its evaluation is lazy
It is a wide operation as it shuffles data from multiple partitions and create another RDD
This operation is costly as it doesn’t use combiner local to a partition to reduce the data transfer
Not recommended to use when you need to do further aggregation on grouped data
This function has three variants
groupBy(function)
groupBy(function, [numPartition])
groupBy(partitioner, function)
First variant will generate hash-partitioned output with existing partitioner
Second variant will generate hash-partitioned output with number of partitions given by numPartition
And finally third variant will generate output using Partitioner object referenced by partitioner
Bazic groupBy example in python
creating RDD x with words
x = sc.parallelize([“John”, “Jimmy”, “Tina”,
“Tho”, “James”, “Cory”,
“Christ”, “Jackeline”, “Juan”], 3)
Applying groupBy operation on x
y = x.groupBy(lambda word: word[0])
for t in y.collect():
print((t[0],[i for i in t[1]]))
(‘J’, [‘John’, ‘Jimmy’, ‘James’, ‘Jackeline’, ‘Juan’])
(‘C’, [‘Cory’, ‘Christ’])
(‘T’, [‘Tina’, ‘Tho’])
groupByKey()
• It works for Paired RDI).
• The data is grouped according to the key of paired RDD.
x = sc.parallelize([(‘B’,5),(‘B’,4),(‘A’,3),(‘A’,2),(‘A’,1)])
y = x.groupByKey()
print(x.collect())
print([(j[0],[i for i in j[1]]) for j in y.collect()])
OUTPUT>>
[(‘B’, 5), (‘B’, 4), (‘A’, 3), (‘A’, 2), (‘A’, 1)]
[(‘A’, [3, 2, 1]), (‘B’, [5, 4])]
sortBY()
• The data is sorted according to the result of the expression specified.
sortByKey()
• It works for Paired RDD.
• The data is sorted according to the key of the paired RDD.
reduceBykey()
• It works for Paired RDD.
• It is used to combine the values with the same key, before the data is shuffled.
mapPartitipns()
• The map () iterates over every partition in RDD and returns new RDD.
mapPartitionsWithlndexQ
• It is like mapPartition except it also provides the index of the partition.
distinct()
• It returns a new RDD that contains the unique elements of the source RDD.
• It is used to remove duplicate data.
union()
• union function gives the elements of both the RDD in new RDD.
• The two RDDs should be of the same type.
• It is same as set operation called UNION.
intersection()
• intersection function gives only the common element of both the RDD in the new RDD.
• It is same as set operation called INTERSECT.
subtract()
• subtract returns an RDD with the elements from self that are not in other.
• It is same as set operation called MINUS.
SubtractByKey()
• It works for Paired RDD.
• subtractByKeyO returns an RDD with the elements from self that are not in other.
• It matches the same element depending on key.
JOINS
• It combines the fields from two RDDs using common values.
• It works for Paired RDD.
coalesce()
• coalesce reduces the number of partitions.
• It can trigger shuffling depending on the shuffle flag.
• By default shuffle flag is disabled i.e false.
• You can not increase the number of partitions with coalesce.
repartition()
• repartition can be used to increase or decrease the number of partitions.
• Repartition does a full data shuffle and equally distributes the data among the partitions
partitionby()
• partitionBy is used to create the number of partitions depending on the specified partitioner.
Action
o Action applies some function on a RDD and produce non-RDD values,
o Actions trigger execution using lineage graph.
o Actions trigger execution using lineage graph.
o An Action is used to either save result to some location or to display it.
getNumPartitions
• It is used to get the number of partitions created for RDD.
collect()
• It returns the entire content of RDD.
collectAsMap()
• It converts Paired RDD into Map.
take()
• take returns specified number of elements from RDD.
first()
• take returns first element from RDD.
top()
• It returnd specified number of top elements from our RDD.
• top use default ordering of data.
reduce()
• reduce() takes the two elements as input from the RDD and then produces the output of the same type as that of the input elements.
max
• max returns the maximum element available in the RDD.
min()
• min{) returns the minimum element available in the RDD.
sum()
• sum() return the sum of the element available in the RDD.
mean()
• mean() return the mean of the element available in the RDD.