What is the comparison between Spark RDD vs DataFrame vs Dataset?
Spark APIsAn RDD stands for Resilient Distributed Datasets. It is a Read-only partition collection of recordsSpark Dataframe APIs – data organized into named columns. It is a table in a relational database.Spark Dataset APIs – Dataset is an extension of DataFrame API which provides a type-safe, object-oriented programming interface.
Spark Release1.0 release1.3 releaseSpark 1.6 release
Data FormatsIt is for Structured and unstructured like media streams or streams of text. RDD does not infer the schema like Dataframe and DataSets.It is for structured and semi-structured data. It is similar to the Table in the database.DataFrames allow Spark to manage the schema. It is for structured and unstructured data. It represents data in the form of JVM objects of row or a collection of row objects.
Compile-time type safetyit supports compile-time type safety.It does not support compile-time error. It detects attribute error only at runtime.It provides compile-time type safety.
OptimizationThere is no inbuilt optimization engine in RDD.Using catalyst optimizer,we can do Optimization . It uses catalyst tree transformation framework in four phases:
a) Analyzing a logical plan to resolve references.
b) Logical plan optimization.
c) Physical planning.
d) Code generation to compile parts of the query to Java bytecode.
It includes the concept of Dataframe Catalyst optimizer for optimizing query plan.
SerializationIt uses Java serialization.It provides a Tungsten physical execution backend that explicitly manages memory and generates bytecode for expression evaluation.The Dataset API has an encoder which handles conversion between JVM objects to tabular representation. It allows on-demand access to individual attribute without desterilizing the entire object.
Garbage CollectionGarbage collection is getting overhead because it is used while creating and destroying individual objects.Garbage collection is avoided here.There is also no need for the garbage collector to destroy the object because serialization takes place through Tungsten.
EfficiencyWhen serialization is performed individually on a java and scala object then Efficiency is decreased.Since we use of off heap memory for serialization which reduces the overhead. Doing so , It generates byte code dynamically to increase efficiency.It allows performing an operation on serialized data and improving memory use.
Schema Projectionwe have to define the schema (manually).We can analyze dataset without defining the schema of our files. Auto discover the schema because of using Spark SQL engine.
AggregationRDD API is slower to perform simple grouping and aggregation operations.DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets.In Dataset it is faster to perform aggregation operation on data sets.
Usage Used for low-level transformation and need high-level abstractionsUsed for high level of abstraction
Lazy EvaluationYesYesYes
Consider you have records of employee id and its joining date details. So write unit testing script to find out number of employees who joined in 4th may 2020.
from datetime import date
from unittest import TestCase

from pyspark.sql import *
from pyspark.sql.types import *

from RowDemo import to_date_df

class RowDemoTestCase(TestCase):

    def setUpClass(cls) -> None:
        cls.spark = SparkSession.builder \
            .master("local[3]") \
            .appName("RowDemoTest") \

        my_schema = StructType([
            StructField("ID", StringType()),
            StructField("EventDate", StringType())])

        my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row("125", "04/5/2020"), Row("126", "4/05/2020")]
        my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
        cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)

    def test_data_type(self):
        rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
        for row in rows:
            self.assertIsInstance(row["EventDate"], date)

    def test_date_value(self):
        rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
        for row in rows:
            self.assertEqual(row["EventDate"], date(2020, 4, 5))