What is the comparison between Spark RDD vs DataFrame vs Dataset?
Parameter | RDD | DATAFRAME | DATASET |
---|---|---|---|
Spark APIs | An RDD stands for Resilient Distributed Datasets. It is a Read-only partition collection of records | Spark Dataframe APIs – data organized into named columns. It is a table in a relational database. | Spark Dataset APIs – Dataset is an extension of DataFrame API which provides a type-safe, object-oriented programming interface. |
Spark Release | 1.0 release | 1.3 release | Spark 1.6 release |
Data Formats | It is for Structured and unstructured like media streams or streams of text. RDD does not infer the schema like Dataframe and DataSets. | It is for structured and semi-structured data. It is similar to the Table in the database.DataFrames allow Spark to manage the schema. | It is for structured and unstructured data. It represents data in the form of JVM objects of row or a collection of row objects. |
Compile-time type safety | it supports compile-time type safety. | It does not support compile-time error. It detects attribute error only at runtime. | It provides compile-time type safety. |
Optimization | There is no inbuilt optimization engine in RDD. | Using catalyst optimizer,we can do Optimization . It uses catalyst tree transformation framework in four phases: a) Analyzing a logical plan to resolve references. b) Logical plan optimization. c) Physical planning. d) Code generation to compile parts of the query to Java bytecode. | It includes the concept of Dataframe Catalyst optimizer for optimizing query plan. |
Serialization | It uses Java serialization. | It provides a Tungsten physical execution backend that explicitly manages memory and generates bytecode for expression evaluation. | The Dataset API has an encoder which handles conversion between JVM objects to tabular representation. It allows on-demand access to individual attribute without desterilizing the entire object. |
Garbage Collection | Garbage collection is getting overhead because it is used while creating and destroying individual objects. | Garbage collection is avoided here. | There is also no need for the garbage collector to destroy the object because serialization takes place through Tungsten. |
Efficiency | When serialization is performed individually on a java and scala object then Efficiency is decreased. | Since we use of off heap memory for serialization which reduces the overhead. Doing so , It generates byte code dynamically to increase efficiency. | It allows performing an operation on serialized data and improving memory use. |
Schema Projection | we have to define the schema (manually). | We can analyze dataset without defining the schema of our files. | Auto discover the schema because of using Spark SQL engine. |
Aggregation | RDD API is slower to perform simple grouping and aggregation operations. | DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets. | In Dataset it is faster to perform aggregation operation on data sets. |
SPARK SQL | NO | YES | YES |
CATALYST OPTIMIZER | NO | YES | YES |
TUNGSTEN COMPONENT | NO | YES | YES |
ADVANCED ENCODERS | NO | NO | YES |
FASTER(AGGREGATION OPERATIONS) | SLOWER THAN DF AND DS | FASTER | SLOWER THAN DF |
Usage | Used for low-level transformation and need high-level abstractions | Used for high level of abstraction | |
Lazy Evaluation | Yes | Yes | Yes |
Consider you have records of employee id and its joining date details. So write unit testing script to find out number of employees who joined in 4th may 2020.
from datetime import date
from unittest import TestCase
from pyspark.sql import *
from pyspark.sql.types import *
from RowDemo import to_date_df
class RowDemoTestCase(TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.spark = SparkSession.builder \
.master("local[3]") \
.appName("RowDemoTest") \
.getOrCreate()
my_schema = StructType([
StructField("ID", StringType()),
StructField("EventDate", StringType())])
my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row("125", "04/5/2020"), Row("126", "4/05/2020")]
my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)
def test_data_type(self):
rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
for row in rows:
self.assertIsInstance(row["EventDate"], date)
def test_date_value(self):
rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
for row in rows:
self.assertEqual(row["EventDate"], date(2020, 4, 5))