Spark-DataFrame

Table of Contents

What is the comparison between Spark RDD vs DataFrame vs Dataset?

Spark RDD APIs – An RDD stands for Resilient Distributed Datasets. It is a Read-only partition collection of records. Spark Dataframe APIs – data organized into named columns. It is a table in a relational database. Spark Dataset APIs – Dataset is an extension of DataFrame API which provides a type-safe, object-oriented programming interface.

2.Spark Release
RDD – The RDD APIs are released in the 1.0 release.
DataFrames – Spark DataFrames are released in the 1.3 release.
DataSet – Spark Datasets are released in Spark 1.6 release.

3.Data Formats

  • RDD – It is for structured and unstructured. RDD does not infer the schema like Dataframe and DataSets.
  • DataFrame – It is for structured and semi-structured data. It is similar to the Table in the database.DataFrames allow Spark to manage the schema.
  • DataSet – It is for structured and unstructured data. It represents data in the form of JVM objects of row or a collection of row objects.

4.Compile-time type safety
RDD – it supports compile-time type safety.
DataFrame – It does not support compile-time error. It detects attribute error only at runtime.
DataSet – It provides compile-time type safety.

5.Optimization
RDD – There is no inbuilt optimization engine in RDD.
DataFrame – Using catalyst optimizer,we can do Optimization . It uses catalyst tree transformation framework in four phases:
a) Analyzing a logical plan to resolve references.
b) Logical plan optimization.
c) Physical planning.
d) Code generation to compile parts of the query to Java bytecode.

Dataset – It includes the concept of Dataframe Catalyst optimizer for optimizing query plan.

6.Serialization
RDD – It uses Java serialization.

DataFrame – It provides a Tungsten physical execution backend that explicitly manages memory and generates bytecode for expression evaluation.

DataSet – The Dataset API has an encoder which handles conversion between JVM objects to tabular representation. It allows on-demand access to individual attribute without desterilizing the entire object.

7.Garbage Collection
RDD – Garbage collection is getting overhead because it is used while creating and destroying individual objects.
DataFrame – Garbage collection is avoided here.
DataSet – There is also no need for the garbage collector to destroy the object because serialization takes place through Tungsten.

8.Efficiency
RDD – When serialization is performed individually on a java and scala object then Efficiency is decreased.
DataFrame – Since we use of off heap memory for serialization which reduces the overhead. Doing so , It generates byte code dynamically to increase efficiency.
DataSet – It allows performing an operation on serialized data and improving memory use.

9.Programming Language Support
RDD – Java, Scala, Python, and R languages.
DataFrame – Java, Python, Scala, and R.
DataSet – Dataset APIs is currently only available in Scala and Java. Spark version 2.1.1 does not support Python and R.

10.Schema Projection
RDD – we have to define the schema (manually).
DataFrame – We can analyze dataset without defining the schema of our files.
DataSet – Auto discover the schema because of using Spark SQL engine.

11.Aggregation:

  • RDD — RDD API is slower to perform simple grouping and aggregation operations.
  • DataFrame — DataFrame API is very easy to use. It is faster for exploratory analysis, creating aggregated statistics on large data sets.
  • DataSet — In Dataset it is faster to perform aggregation operation on data sets.

The Spark’s documentation encourages you to avoid operations groupBy operations instead they suggest combineByKey or some of its derivated operation (reduceByKey or aggregateByKey). You have to use this operation in order to make an aggregation before and after the shuffle (in the Map’s and in the Reduce’s phase if we use Hadoop terminology) so your execution times will improve

Explain below transformations respective to Dataframe?
Narrow Dependency Transformation
Wide Dependency Transformation

Narrow Dependency Transformation : A transformation performed independently on a single partition to produce valid results.

Df3 = Df2.filter(Df2.Gender == ‘Male’)

Wide Dependency Transformation:A transformation that requires data from other partitions to produce correct results.GrouBy,Join,OrderBy are example of Wide Dependency Transformation.

Df3 = Df2.groupBy(country)

Consider you have records of employee id and its joining date details. So write unit testing script to find out number of employees who joined in 4th may 2020.
from datetime import date
from unittest import TestCase

from pyspark.sql import *
from pyspark.sql.types import *

from RowDemo import to_date_df


class RowDemoTestCase(TestCase):

    @classmethod
    def setUpClass(cls) -> None:
        cls.spark = SparkSession.builder \
            .master("local[3]") \
            .appName("RowDemoTest") \
            .getOrCreate()

        my_schema = StructType([
            StructField("ID", StringType()),
            StructField("EventDate", StringType())])

        my_rows = [Row("123", "04/05/2020"), Row("124", "4/5/2020"), Row("125", "04/5/2020"), Row("126", "4/05/2020")]
        my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
        cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)

    def test_data_type(self):
        rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
        for row in rows:
            self.assertIsInstance(row["EventDate"], date)

    def test_date_value(self):
        rows = to_date_df(self.my_df, "M/d/y", "EventDate").collect()
        for row in rows:
            self.assertEqual(row["EventDate"], date(2020, 4, 5))
What are the External Sources for Spark?

These are an example of External Sources:
1.JDBC Data Sources
Oracle,SQL Server,PostgreSQL
2.No SQL Data Systems
Cassandra,MongoDB
3.Cloud Data Warehouses
Snowflake,Redshift
4.Stream Integrators
Kafka,Kinesis

Why is Parquet file format is recommended?

Parquet is a columnar file format that provides optimizations to speed up queries and it is more efficient file format which is supported by many data processing systems.
In Spark SQL, Parquet provides support for both reading and writing and it capture the schema of the original data.
It also reduces data storage by 75% .
Below are some advantages of storing data in Parquet format.
Reduces IO operations.
Fetches specific columns that you need to access.
Consumes less space.
Support type-specific encoding.

Let me explain in detail:
Read csv file and print its default schema

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSchemaDemo") \
        .getOrCreate()

    logger = Log4j(spark)
	
    inputfiledf = spark.read \
        .format("csv") \
        .option("header", "true") \
        .load("data/cloud*.csv")

    inputfiledf.show(5)
    logger.info("CSV Schema:" + inputfiledf.schema.simpleString())

After running this script, All fields datatype is String.
Now let’s try InferSchema.



from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSchemaDemo") \
        .getOrCreate()

    logger = Log4j(spark)
	
    inputfiledf = spark.read \
        .format("csv") \
        .option("header", "true") \
		.option("inferSchema", "true") \
        .load("data/cloud*.csv")

    inputfiledf.show(5)
    logger.info("CSV Schema:" + inputfiledf.schema.simpleString())

post execution , we get integer datatype in schema for integer fields.But Date field is still String.

Thats why we dont rely on inferschema.
So we have two options for Schema in Dataframe.
1.Implicit and 2. Explicit
In implicit way, we can use parquet file format to load data. It will store data in correct data type format as per schema.

Lets check:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSchemaDemo") \
        .getOrCreate()

    logger = Log4j(spark)

    inputfiledf = spark.read \
        .format("parquet") \
        .load("data/inputfile*.parquet")

    inputfiledf.show(5)
  logger.info("Parquet Schema:" + inputfiledf.schema.simpleString())

after running this, we can see the date field format as the date and integer type as well.
So whatever input file formats we have, we should try to get into a parquet file format.

How will you define Schema explicitly?
	
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSchemaDemo") \
        .getOrCreate()

    logger = Log4j(spark)

    inputSchemaStruct = StructType([
        StructField("col1", DateType()),
        StructField("col2", StringType()),
        StructField("col3", IntegerType()),
        StructField("col4", IntegerType())
    ])

    inputSchemaDDL = """col1 DATE, col2 STRING, col3 INT, col4 INT"""
	
	inputTimeCsvDF = spark.read \
        .format("csv") \
        .option("header", "true") \
        .schema(inputSchemaStruct) \
        .option("mode", "FAILFAST") \
        .option("dateFormat", "M/d/y") \
        .load("data/input*.csv")

    inputTimeCsvDF.show(5)
    logger.info("CSV Schema:" + inputTimeCsvDF.schema.simpleString())
	
What is Spark DataFrameWriter API?

DataFrameWriter is the interface to describe data (as the result of executing a structured query) should be saved to an external data source. DataFrameWriter defaults to parquet data source format.
You can change the default format using spark.

Its General structure is :
DataFrameWriter
.format(…)
.option(…)
.partitionBy(…)
.bucketBy(…)
.sortBy(…)
. save ()

Take an example:
dataframe.write
.format("parquet")
.mode (saveMode)
.option(”path”, “/data/cloud”)
. save ()

.format –> there are many file formats in which Spark stores data. Such as CSV,JSON,AVRO,PARQUET,ORC,JDBC.By default, Spark considers parquet file format if you do not specify the file format.
We can store data in Community Formats like Cassandra, MongoDB, AVRO, XML,HBase, Redshift.

.mode –
Read Mode

  1. append —will create a new file at a given location
  2. overwrite — will overwrite the existing file
  3. errorifExists–will display an error if it exists
  4. ignore —

Spark File Layout

  1. Number of files and file size
  2. Partitions and Buckets
  3. Sorted data

Dataframe is partitioned. So when we are going to write data into files then each partition will generate one file for each partition.
It is the default behavior but we can change it while writing data into files.
We can repartition data before writing to files.
1st way to use repartition transformation.
ex: dataframe.repartition(n)

2nd way is to use partitionby transformation.
DataFrameWriter.partitionBy(coIl, coI2)
In this, we can use the keys column to partition data. we can use a single column or composite keys columns to partition.
The key-based method is a powerful logic to partition data.

3rd way is to use bucketBy : it is only available in the spark managed table.
DataFrameWriter.bucketBy(n, coil, coi2)

There are two more methods to study:

  1. sortBy()--it is used in bucketing.
  2. maxRecordsPerFile--it is used to limit the count of records per file.
How will you store data into Avro file?

First we should include scala packege in spark-default.conf file to use avro file.
write below line in spark-default.conf file:

spark. jars . packages org.apache.spark:spark=avro 2.11 :2.4.5

Next, we can write dataframewrite API to write data into Avro file.


from pyspark.sql import *
from pyspark.sql.functions import spark_partition_id

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSchemaDemo") \
        .getOrCreate()

    logger = Log4j(spark)

    flightTimeParquetDF = spark.read \
        .format("parquet") \
        .load("dataSource/cloud*.parquet")

 
    partitionedDF.write \
        .format("avro") \
        .mode("overwrite") \
        .option("path", "data/avro/") \
        .save()
How will you find out partitions count?

If you want to check partitions count then execute below code:

logger.info("Num Partitions before: " + str(flightTimeParquetDF.rdd.getNumPartitions()))

it will give you partitions count. post-execution, we got partitions to count as 2.

In what scenario,number of output files and number of partitions are not same?

It is not mandatory. number of output files and number of partitions can be different.
Sometimes, some partition does not have records so the output file does not create for that kind of partition.
Consider you have two partitions and one partition is blank then only one output file would be created.

logger.info("Num Partitions before: " + str(flightTimeParquetDF.rdd.getNumPartitions()))
    flightTimeParquetDF.groupBy(spark_partition_id()).count().show() -- it will display records count grouped by partition.
How will you get same number of output files as per partiotions count?

we can use repartition transformation. It will create output files based on partitions number.
Example:

    FinalDF= spark.read \
        .format("parquet") \
        .load("dataSource/final*.parquet")

    partitionedDF = FinalDF.repartition(5)
    logger.info("Num Partitions after: " + str(partitionedDF.rdd.getNumPartitions()))
    partitionedDF.groupBy(spark_partition_id()).count().show()

    partitionedDF.write \
        .format("avro") \
        .mode("overwrite") \
        .option("path", "data/avro/") \
        .save()
 	
Post execution, we will get 5 output files as we have given 5 partitions count.
Why do we use partitionby in Dataframe?

If we want output data based on particular columns then we can use partitionby transformation.
Example:

 FINALDF.write \
        .format("json") \
        .mode("overwrite") \
        .option("path", "data/json/") \
        .partitionBy("column1", "column2") \
        .save()

it will create output files based on partiotions columns.
we can get many directories for column1 and then column2 would be under column1.

–json
column1=A
column2=1
part-001234.json
part-001234.json.crc
column2=2
part-0012134.json
part-0012134.json.crc
column1=B
column2=3
column2=4

Note: file part-001234.json won’t keep column1 and column2 because these columns are used in partitionBy.

How can we control output file size?

we can control file size using maxRecordsPerFile.

 FINALDF.write \
        .format("json") \
        .mode("overwrite") \
        .option("path", "data/json/") \
        .partitionBy("column1", "column2") \
        .option("maxRecordsPerFile", 10000) \--- only 10000 records would be in 1 file.
        .save()
Where do we store Table metadata in Spark?

Catalog Metastore

What are the types of Table in Spark?

Managed Table
Unmanaged Table(External Table)

From a managed table, we can manage metadata and data.
All managed tables are stored into spark.sql.warehouse.dir

Why do we need enableHiveSupport in spark? and how can we use this?

In spark, if a user wants to connect Hive then we can use enableHiveSupport.

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \
        .getOrCreate()
Why do we save dataframe into managed table instead of parquet file?

After saving output data into table/files, we may need these data for Tableu, PowerBI, or other tools for reporting or analysis.
We can save output data into a parquet/Avro file format. But whenever we read data from these files then we need dataframe reader API to read it.
In another way, if we save data into the managed table then we can access these data through JDBC/ODBC connection in SQL fashion. There are many SQL tools to read managed table data.

What is database name in Spark?
default database
We can save dataframe into managed table.What will happen when
1)user save data without using partitionby and bucketby?
2) user save data using partitionby?
3) user save data using bucketby?

-> When we save data into managed table without partitionby and bucketby then managed table used to be created and parquet file resides under it.


from pyspark.sql import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \ ---for enabling Hive access
        .getOrCreate()

    logger = Log4j(spark)

    FINALDF = spark.read \
        .format("parquet") \
        .load("dataSource/")

    spark.sql("CREATE DATABASE IF NOT EXISTS example_db")
    spark.catalog.setCurrentDatabase("example_db")

    FINALDF.write \
        .mode("overwrite") \
        .saveAsTable("example_data_db")

    logger.info(spark.catalog.listTables("example_db"))---spark catalog has all data information so trying to display list of tables under database.

Output data location:
spark-warehouse
example_db.db
example_data_db
part-0000abc.snappy.parquet

–Using PartitionBy :
Lets apply partitionby on two columns:


from pyspark.sql import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \
        .getOrCreate()

    logger = Log4j(spark)

    FINALDF = spark.read \
        .format("parquet") \
        .load("dataSource/")

    spark.sql("CREATE DATABASE IF NOT EXISTS example_db")
    spark.catalog.setCurrentDatabase("example_db")

    FINALDF.write \
        .mode("overwrite") \
		.partitionBy("column1","column2") \
        .saveAsTable("example_data_db")

    logger.info(spark.catalog.listTables("example_db"))

After running this, we get data as partitions into managed table. These partitions counts depends on number of unique values respective to partition columns.

Output data location:
spark-warehouse
example_db.db
example_data_db
column1=a
column2=1
column1=b
column2=12

–Using BucketBy()
If we want to specify specific number of partitions count then we can use bucketby:


from pyspark.sql import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \
        .getOrCreate()

    logger = Log4j(spark)

    FINALDF = spark.read \
        .format("parquet") \
        .load("dataSource/")

    spark.sql("CREATE DATABASE IF NOT EXISTS example_db")
    spark.catalog.setCurrentDatabase("example_db")

    FINALDF.write \
        .mode("overwrite") \
		.bucketBy(5,"column1","column2") \ ---- number of buckets is 5.
		.saveAsTable("example_data_db")

    logger.info(spark.catalog.listTables("example_db"))

based on hashing , data is getting stored in different partitions.
If you want data to be sorted in sorting order then you can use sortby.

If we want to specify specific number of partitions count then we can use bucketby:


from pyspark.sql import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \
        .getOrCreate()

    logger = Log4j(spark)

    FINALDF = spark.read \
        .format("parquet") \
        .load("dataSource/")

    spark.sql("CREATE DATABASE IF NOT EXISTS example_db")
    spark.catalog.setCurrentDatabase("example_db")

    FINALDF.write \
        .mode("overwrite") \
		.bucketBy(5,"column1","column2") \ ---- number of buckets is 5.
		.sortBy("column1","column2")---- use sortBy if you want columns to be in sorted .
        .saveAsTable("example_data_db")

    logger.info(spark.catalog.listTables("example_db"))
What is purpose of transformations in Dataframe?
  • Combining Data Frames
  • Aggregating and Summarizing
  • Applying Functions and built-in Transformations
  • Using Built-in and column-level functions
  • Creating and using UDFs
  • Referencing Rows/Columns
  • Creating Column Expressions
We got one requirement in which we have to convert date string into date format.
EXAMPLE: 22/3/2020 INTO 2020-03-22


from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

from lib.logger import Log4j


def to_date_df(df, fmt, fld):
    return df.withColumn(fld, to_date(fld, fmt))


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("RowDemo") \
        .getOrCreate()

    logger = Log4j(spark)

    my_schema = StructType([
        StructField("ID", StringType()),
        StructField("EventDate", StringType())])

    my_rows = [Row("123", "04/05/2021"), Row("124", "4/5/2021"), Row("125", "04/5/2021")]
    my_rdd = spark.sparkContext.parallelize(my_rows, 2)
    my_df = spark.createDataFrame(my_rdd, my_schema)

    my_df.printSchema()
    my_df.show()
    new_df = to_date_df(my_df, "M/d/y", "EventDate")
    new_df.printSchema()
    new_df.show()

UNIT TESTING:

from datetime import date
from unittest import TestCase

from pyspark.sql import *
from pyspark.sql.types import *

from RowDemo import to_date_df


class RowDemoTestCase(TestCase):

    @classmethod
    def setUpClass(cls) -> None:
        cls.spark = SparkSession.builder \
            .master("local[3]") \
            .appName("RowDemoTest") \
            .getOrCreate()

        my_schema = StructType([
            StructField("ID", StringType()),
            StructField("createdate", StringType())])

        my_rows = [Row("129", "04/02/2030"), Row("121", "4/2/2010"), Row("120", "04/3/2021")]
        my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
        cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)

    def test_data_type(self):
        rows = to_date_df(self.my_df, "M/d/y", "createdate").collect()
        for row in rows:
            self.assertIsInstance(row["createdate"], date)

    def test_date_value(self):
        rows = to_date_df(self.my_df, "M/d/y", "createdate").collect()
        for row in rows:
            self.assertEqual(row["createdate"], date(2020, 3, 8))
What are the different functions in Dataframe?

agg (exprs) select (cols)
cov(coll, coi2)
sort(*cols, *kwargs) crosstab (coil, col2) where (condition) cube (cols)
withColumn (colName, col)
filter (condition)
avg (cols) groupBy (cois)
max (cols) join(other, on=None, how=None) mean (cols)
orderBy(*cols, *kwargs) replace (to_replace, value, sub5et) min (cols)
roilup(*cols)

How to read data from column?

from pyspark.sql.functions import *
airlinesDF.select(column(”Origin”), col(’Dest”), airlinesDF.Distance,"year" ).show(1O)

we can use column,col object ,double quotes within column.

Consider we have one table,in that there are 3 columns.Day,month,year. Create one table with one column having DATE as in format YYYY-MM-DD.

using expr function

a1r1nesDF.setect(”0r1g1n, Dest’, “Distance”, expr(”to_date(concat(Year,Month,DayofMonth),’yyyyMMdd) as FlightDate”)) .show(1)I
origin dest distance flightdate
1	san	sf0	4471	1987-10-14


a1r1nesDF.setect(”0r1g1n, Dest’, “Distance”, to_date(concat("Year","Month","DayofMonth"),’yyyyMMdd').alias("FlightDate”)) .show(1)I
origin dest distance flightdate
1	san	sf0	4471	1987-10-14

Convert year in proper format like 81 -> 1981, 11->2011
+—–+—+—–+—-+———–+
| name|day|month|year| id|
+—–+—+—–+—-+———–+
|vikas| 23| 5| 81| 0|
| Ravi| 28| 1|2002| 8589934592|
|vikas| 23| 5| 81| 8589934593|
| John| 12| 12| 6|17179869184|
| kamal| 7| 8| 63|17179869185|
+—–+—+—–+—-+———–+



df3 = df1.withColumn("year", expr("""
         case when year < 21 then cast(year as int) + 2000
         when year < 100 then cast(year as int) + 1900
         else year
         end"""))
df3.show()



+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|vikas| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|vikas| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| kamal|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+



OR

df4 = df1.withColumn("year", expr("""
         case when year < 21 then year + 2000
         when year < 100 then year + 1900
         else year
         end""").cast(IntegerType()))
df4.show()


+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|vikas| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|vikas| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| kamal|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

OR

df5 = df1.withColumn("day", col("day").cast(IntegerType())) \
         .withColumn("month", col("month").cast(IntegerType())) \
         .withColumn("year", col("year").cast(IntegerType())) 

df6 = df5.withColumn("year", expr("""
          case when year < 21 then year + 2000
          when year < 100 then year + 1900
          else year
          end"""))
df6.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|vikas| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|vikas| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| kamal|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

OR

df7 = df5.withColumn("year", \
                    when(col("year") < 21, col("year") + 2000) \
                    .when(col("year") < 100, col("year") + 1900) \
                    .otherwise(col("year")))
df7.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|vikas| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|vikas| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| kamal|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+
Add DOB column at the end of table.

df8 = df7.withColumn(“dob”, expr(“to_date(concat(day,’/’,month,’/’,year), ‘d/M/y’)”))
df8.show()

+—–+—+—–+—-+———–+———-+
| name|day|month|year| id| dob|
+—–+—+—–+—-+———–+———-+
|vikas| 23| 5|1981| 0|1981-05-23|
| Ravi| 28| 1|2002| 8589934592|2002-01-28|
|vikas| 23| 5|1981| 8589934593|1981-05-23|
| John| 12| 12|2006|17179869184|2006-12-12|
| kamal| 7| 8|1963|17179869185|1963-08-07|
+—–+—+—–+—-+———–+———-+


How will you remove duplicate name and drop any column?

df9 = df7.withColumn(“dob”, to_date(expr(“concat(day,’/’,month,’/’,year)”), ‘d/M/y’)) \
.drop(“day”, “month”, “year”) \
.dropDuplicates([“name”, “dob”]) \
.sort(expr(“dob desc”))
df9.show()

+—–+———–+———-+
| name| id| dob|
+—–+———–+———-+
| kamal|17179869185|1963-08-07|
|vikas| 0|1981-05-23|
| Ravi| 8589934592|2002-01-28|
| John|17179869184|2006-12-12|
+—–+———–+———-+

Why do we need Performance tuning in Spark?
  • In Spark, data is processed in memory so sometimes issues happen related to CPU, Network Bandwidth, or memory.
  • In some cases, Processing data does not fit into the memory which creates a big performance issue/penalty.
  • If the data packet size is big then it creates a problem in the network during shuffling.
How to improve performance tuning in Spark?

we have below performance tuning techniques

  1. Data Serialization
  2. Memory Tuning
  3. Tuning Data Structures
  4. Serialized RDD Storage
  5. Garbage Collection Tuning
  6. Level of Parallelism
  7. Broadcast Large Variables
  8. Data Locality

Data Serialization:

There are 2 serialization libraries, Java, and Kyro Serialization. Java Serialization is quite slow and it leads to large serialized formats for many classes. Because of that, we face issues in network bandwidth and memory while data movement.

Incase of Kyro Serialization, it is 10x faster than java serialization.For using Kyro,we can set below configuration:

Sparkconf.set(“spark.serializer”,”org.apache.spark.serializer.kyroSerializer”)

  • We can register for custom classes with Kyro. Kyro will still work without registering custom classes, but it will not be useful.

To register custom classes with Kryo, we can use the registerKryoClasses method

Val conf = new SparkConf().setMaster().setAppName()
Conf.registerKryoClasses(Array(classOf[MyClass1],classOf[Myexample]))
val sc = new SparkContext(conf)

Consider your objects are large to fit into memory then we can increase the buffer max size

Spark.kryoserializer.buffer.max 64Mb

Memory_only_ser and Memory_and_disk_ser are two storage levels which support serialized RDDs. In Memory_only_ser, RDDs are stored as serialized objects and it will create one byte array per partition.

In memory_and_disk_ser, partitions are spilled into disk which is not fitted into memory.

Val words =sc.textFile(“words”)
Words.persist(MEMORY_ONLY_SER)

It adds extra CPU cycles due to deserialization.

Memory Tuning: There are 3 ways to tune memory usage:

Amount of memory used by objects.

Cost of accessing those objects

Overhead of garbage collection

In Spark, There are 2 ways where memory optimization is done.

1.Driver level

2.Executor level

Driver Level: Commands to set the driver memory:

Spark shell:

Spark-shell –drive-memory 4g

Spark submit

Spark-submit –drive-memory 4g

Executor Level: Commands to set the executor memory:

Spark shell:

Spark-shell – executor -memory 4g

Spark submit

Spark-submit – executor -memory 4g

Tuning Data Structure

  • Prefer array of objects and primitive types in data structure design.
  • The fastutil library provides convenient collection classes for primitive types.
  • Avoid nested structures with a lot of small objects and pointers.
  • Use numeric IDs or enumeration objects instead of strings for keys.

Serialized RDD Storage:

  • Whenever objects become large then we can reduce memory usage by storing them in serialized form (MEMORY_ONLY_SER)
  • Doing so, each RDD partition is stored in form of a large byte array.

Level of Parallelism:

  • Increase the level of parallelism,so that each tasks’s input set is smaller.So that the data set can fit into the memory and avoid OutOfMemory errors.
  • For that we can pass the level of parallelism as an argument to pairrdd
  • Set the config property spark.default.parallelism to change the default.
  • It is always recommended 2-3 smaller tasks per CPU core in your cluster
  • Repartitioning is an expensive task because it moves the data around, but we can use coalesce() instead only of you are decreasing the number of partitions.
  • If a collection is used once there is no point in repartitioning it, but repartitioning is useful only if it is used multiple times in key-oriented operations.

Spark-shell –conf spark.default.parallelism=10

We can set new value of default parallelism.

Garbage Collection

GIGC is a recommended garbage collector algorithm used for performance tuning.

By default, Spark uses 60% of the executor memory to cache RDDs and the rest 40% for regular objects.In some cases, Rdds don’t need 60% then we can reduce this limit for performance.

Spark-shell –conf spark.storage.memoryFraction=0.4

You can measure GC frequency and time spent by adding -verbose:gc -XX:+PrintGCDetails and -XX:+PrintGCTimeStamps to Java options.

 Broadcasting large variables

If tasks use any large object from the driver program then we should use Broadcast Variable.Broadcast variable  be sent to all worker nodes in one time only

Data Locality

. Computations take place faster if data and code both operate together. If anyone of them is separated, one must move to other. As code size is much smaller than data, it is faster to ship serialized code from place to place.

Set the configuration:

Spark.locality

Using compression to improve performance

If Data is small then both disk I/O and network I/O become faster. That’s why data is compressed. By doing so, storage space will be saved. LZO and Snappy compression formats are used.

What are the various levels of persistence in Apache Spark?

The various storage/persistence levels in Spark are –

  • MEMORY_ONLY
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK
  • MEMORY_AND_DISK_SER, DISK_ONLY
  • OFF_HEAP
Explain Broadcast variable.

broadcast variable increases the efficiency of joins between small and large RDDs. the broadcast variable allows keeping a read-only variable cached on every machine in place of shipping a copy of it with tasks. We create broadcast variable v by calling SparlContext.broadcast(v)and we can access its value by calling the value method.

Explain Accumulator:

Accumulators can only be added through the associative and commutative operation. We can implement counters or sums using an accumulator. Users can create a named or unnamed accumulator. We can create numeric accumulator by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() for Long or Double respectively.

Why do we need Dataframe?

We use DataFrame because it provides memory management and an optimized execution plan better than RDD.
In Custom memory management: data is stored in off-heap memory(in binary format). Doing so, memory is saved.
In terms of serialization, java serialization is not used here.
And Garbage collection process is also not used here.

In an optimized execution plan: whenever we execute any query then an optimized execution plan is getting generated with help of a catalyst optimizer. Which provides better performance.

How will you read json file and perform below operations:
Show Data,PrintSchema,Show columns name,Describe dataframe,
If you have JSON file having age and name as columns.If we want to have count of records,mean of their age,min and max age value,then how will you get it?
Spark SQL.
1)Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cloudvikas').getOrCreate()
2)Read JSON file
df = spark.read.json('cloudvikas.json')
3)Show Data
df.show()
4)PrintSchema
df.printSchema()
5) Show columns name
df.columns
6)Describe dataframe 
df.describe()
7)If you have JSON file having age and name as columns.If we want 
to have count of records,mean of their age,min and max age value,
then how will you get it?
df.describe().show()
it will give count of all records.
it provides mean value,max value,min value and stddev value.
8)Perform SQL :
from pyspark.sql.types import (StructField,StringType,IntegerType,
StructType)
data_schema = [StructField('age',IntegerType(),True),StructField
('name',StringType(),True)]
final_struc = StructType(fields=data_schema)
df=spark.read.json('cloudvikas.json',schema=final_struc)
df.printSchema()
You want to read an Employee CSV file (having 3 string type columns and1 float type column)and perform below operations:
Read, print schema.
employee.csv. This has four columns—id, Gender, Occupation, and timestamp.
The first three columns are String datatype.
The last column is float or double data type.

Creating the Schema of the DataFrame
In our DataFrame, we have four columns. We define columns using the StructField() function.

from pyspark.sql.types import *
idColumn = StructField("id",StringType(),True)
gender = StructField("Gender",StringType(),True)
Occupation = StructField("Occupation",StringType(),True)
timestampColumn = StructField("timestamp",DoubleType(),True)

StructField for each column is created. Now we have to create a schema of the full DataFrame using a StructType object

columnList = [idColumn, gender, Occupation,timestampColumn]
employeeDfSchema = StructType(columnList)
employeeDfSchema is the full schema of our DataFrame.

StructType(List(StructField(id,StringType,true),StructField(Gender,StringType,true),StructField(Occupation,
                                     StringType,true),StructField(timestamp,DoubleType,true)))
                                 )
The schema of the four columns of the DataFrame can be observed by using the employeeDfSchema variable.

Reading a CSV File

empDf = spark.read.csv('data/employee.csv',header=True,schema=employeeDfSchema)

empDf.show(4)

empDf.printSchema()
Here is the output:

root
 |-- id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- timestamp: double (nullable = true)

If we set the value of the inferSchema argument to True in the csv() function, PySpark SQL will try to infer the schema.

empDf = spark.read.csv('employee.csv',header=True, inferSchema=True)

empDf.show(4)

The DataFrame called empDf has been created. We have used the inferSchema argument set to True. I

How to save the contents of a DataFrame to a CSV file?
  • use the DataFrameWriter class and its DataFrame.write.csv() function
  • the DataFrame.write.csv() function has many arguments. Currently, we will discuss its three arguments—path, sep, and header.
  • The path argument defines the directory where DataFrame will be written.
  • We can specify the data field separator using the sep argument.
  • If the value of the header argument is True, the header of the DataFrame will be written as the first line in the CSV file.

write the DataFrame to a CSV file.

DF1.write.csv(path='csvFileDir', header=True,sep=',')

How to Save a DataFrame as a JSON File?
To save a DataFrame as a JSON file, use the DataFrameWriter class 
function called json()

Df.write.json(path='jsonData')
Df.show(6)
How will you read a Parquet file and save a DataFrame as a Parquet file.
#Read a Parquet file
Df = spark.read.parquet('employee')

Df.show(6)

+----+-------------+
| Id	Employee|
+----+-------------+
|1|         vikas|
|2|         cloud|

#save a DataFrame as a Parquet file

Df.write.parquet(path='user/spark')
How will you save a DataFrame as an ORC file?
Df.write.orc(path='orcData')
How will you read a table of data from MySQL and save the contents of a DataFrame to MySQL.?
#Command for PySpark shell with a MySQL JDBC connector.

pyspark --driver-class-path  ~/.ivy2/jars/mysql_mysql- connector-
java-8.0.12.jar --packages mysql:mysql-connector- java:8.0.12

URL1 = "jdbc:mysql://localhost/DB1"

DF1 = spark.read.format("jdbc").options(url =URL1, database ='DB1', dbtable ='table1', user="root",password="").load();
#The options() function is used to set different options in this command. 
#In the options() function, we set the value of url, the value of database, the value of table, and the user and password of the database.

ucbDataFrame.show()


+--------+------+----------+---------+
|   cl1|cl2|cl3|cl4|
+--------+------+----------+---------+
|cloud| vikas |         A|      100|

#save the contents of a DataFrame to MySQL.
PySpark shell with the MySQL connector.
$pyspark --driver-class-path  ~/.ivy2/jars/mysql_mysql- connector-
java-8.0.12.jar --packages  mysql:mysql-connector- java:8.0.12

#Define the url database and then save it into a MySQL database.
dbURL = "jdbc:mysql://localhost/sqlbook"
Df.write.format("jdbc").options(url = dbURL,database ='sqlbook', 
dbtable ='mytab', user="root",password="").save()
How will you read a table of data from a PostgreSQL database and save to postgresql?
PostgreSQL JDBC connector command:

$pyspark --driver-class-path  ~/.ivy2/jars/org.
postgresql_postgresql- 42.2.4.jar  --packages org.
postgresql:postgresql:42.2.4

dbURL = "jdbc:postgresql://localhost/sqldb?user=
postgres&password=""

DfOne = spark.read.format("jdbc").options(url =dbURL, database ='sqldb', dbtable ='table1').load();
We can set different options in the options() function.

DfOne.show(2)
Here is the output:

+-----+-----+-----+
|  C1|  C2|  C3|
+-----+-----+-----+
|  CLOUDVIKAS|CLOUD|AWS|
|SPARK| HADOOP| BIG|

save a DataFrame into PostgreSQL.
PySpark shell with the PostgreSQL connector.

$pyspark --driver-class-path  ~/.ivy2/jars/org.
postgresql_postgresql- 42.2.4.jar  --packages org.
postgresql:postgresql:42.2.4

Define the url database and then save it into a database.

dbURL = "jdbc:postgresql://localhost/pysparksqldb?user=postgres&password=""
We are saving the DataFrame contents to the table.

Df.write.format("jdbc").options(url = dbURL,
database ='pysparksqlbook', dbtable ='mytab').save()
How will you read a table of data from a Cassandra database?
Cassandra connector command:

$ pyspark  --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0

read data tables from the Cassandra database.

Df = spark.read.format("org.apache.spark.sql.cassandra").options( keyspace="sqlbook", table="table1").load()

Df.show(1)
output:

+---------+------+-------+
|C1|C2|   C3|
+---------+------+-------+
|      cloud|     hadoop|  cloudvikas|

How will you read Data from Apache Hive?
Step 1:
hive> show databases;
output:

OK
cloudvikas
default
Time taken: 3 seconds, Fetched: 2 row(s)

The database is called cloudvikas, so we have to specify this database 
using the use command.

Step 2:
hive> use cloudvikas;
Here is the output:

OK
Time taken: 0.125 seconds

After using the database, we are going to create a table named 
cloudvikastable using the following command.

step 3:
hive> create table cloudvikastable (
       >  c1 string,
       >  c2 string,
       >   c3 float
        >)
        > row format delimited
        > fields terminated by ',';

hive> show tables;
Here is the output:

OK
cloudvikastable
Time taken: 0.118 seconds, Fetched: 1 row(s)

step 4:
hive> load data local inpath 'cloudvikaspath.csv' overwrite into table 
cloudvikastable;

output:

Loading data to table cloudvikas.cloudvikastable
OK
Time taken: 5.39 seconds

step 5:
hive> select * from cloudvikastable limit 2;
Here is the output:

OK
mentA    10W    60.0
mentB    10W    68.0

Time

taken: 0.532 seconds, Fetched: 5 row(s)
We have displayed some rows of the cloudvikastable table. We have to 
read this table data using PySpark SQL.

step 6:
spark.table() function: We can read the table from Hive through this 
function.

The table name is provided in the format <databaseName>.<tableName>. 
the database's name is cloudvikas and the table's name is 
cloudvikastable. The argument value to the table function will be 
cloudvikas.cloudvikastable.

DF = spark.table('cloudvikas.cloudvikastable')

DF.show(1)
output:

+------------+---------+-----------+
|C1|C2|C3|
+------------+---------+-----------+
|   mentA|     10W|      60.0|

There is a running competition. The race is for 20 meters. The number of participants in the competition is 10. The time in seconds has been noted.

id Gender Name runningtime

id1 Male cloudvikas 10

id2 Female shyam 20

calculate the running speed for each person and add it as a new column to the DataFrame.

id Gender Name runningtime personSpeed

id1 Male cloudvikas 10 2

If we want to do an operation on each element of a column in the 
DataFrame, we have to use the withColumn() function.

The withColumn() function is defined on the DataFrame object. 
This function can add a new column to the existing DataFrame or 
it can replace a column with a new column containing new data. 

we can read a CSV file and create a DataFrame from it.

personDf = spark.read.csv('personData.csv',header=True, 
inferSchema=True)

personDf.show(4)

+----+------+-----------+----------------+
|  id|Gender| Name|runningtime|
+----+------+-----------+----------------+
| id1|  Male| cloudvikas|           10|
| id2|Female|    shyam|           20|

The personDf DataFrame has been created. We used the inferSchema 
argument set to True. It means that PySpark SQL will infer the 
schema on its own. Therefore, it is better to check it. 
So let's print it using the printSchema() function.

In [3]:  personDf.printSchema()
Here is the output:

root
 |-- id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- runningtime: double (nullable = true)
Calculating person Speed for Each person and Adding It as a Column
How do we calculate the person speed? 
Speed is defined as distance over time, which we can calculate using 
the withColumn() function as an expression. 
We have to calculate the speed of each person and add the results as 
a new column. The following line of code serves this purpose.

personDf1 = personDf.withColumn('personSpeed',20.0/personDf.runningtime)
We know that the withColumn() function returns a new DataFrame. 
The new column will be added to the newly created DataFrame called 
personDf1. We can verify this using the show() function.

personDf1.show(4)
Here is the output:

+---+------+-----------+----------------+------------------+
| id|Gender| Name|runningtime|      personSpeed|
+---+------+-----------+----------------+------------------+
|id1|  Male| cloudvikas|           10| 2|

How to select one column CL1 from dataframe DF2?

Df3 = Df2.select("CL1")

Df3 = Df2.select("CL1","CL2")

How to display person id and distance covered in 2 hour. where col1 is id and col2 is speed in km/h?
df2.show(2)
ID	Speed
vikas	4
mohan	6

Df5 = Df2.select("id",Df2.Speed*2)
Dataframe DF2 has details of male and female students. How will you find only Male students details?
Df3 = Df2.filter(Df2.Gender == 'Male')

Df3.show()
output:

+----+------+-----------+----------------+------------+
|  id|Gender| Occupation|TimeInSecond|Speed|
+----+------+-----------+----------------+------------+
| id1|  Male| Programmer|           16|       1|
| id3|  Male|    Manager|           15|       1|
In Dataframe DF1, How will you find Records Where Occupation Is Programmer and age > 25? where DF1 has 3 columns.Col1 is Occupation
Col2 is age and Col3 is place.

Df1.filter((Df1.Occupation =='Programmer') & (Df1.age > 25) ).show(3)

How will you drop columns in Dataframe?

The drop() function is used to drop one or more columns from a DataFrame.
We can mention dropped column names in the argument.
It returns a new DataFrame. The new DataFrame will not contain the dropped columns.

Df4 = Df2.drop("id")

Df5 = Df2.drop("id", "Name")
Suppose you have one table having employee details. It has 2 coulmns name and salary.
How will you add 3rd column as Lebel whose value depends on salary?
it will return High if salary is more than 20000 else its value would be Low.
we need a udf() function because we have to create a UDF that can create labels .

from pyspark.sql.functions import udf

We are going to create a Python function named labelsalary. 
This function will take the salary  as the input and return High or Low, 
depending on the conditions.

def labelsalary(salary) :
     if salary > 20000 :
         return "High"
     else :
         return "Low"

 labelsalary(10000)

 'Low'

 labelsalary(30000)

 'High'
 
 Let's create a PySpark SQL UDF using labelsalary.

labelsalaryUdf = udf(labelsalary)

The new DataFrame called tempDf2 is created with a new column label using the withColumn() function:

tempDf2 = tempDf.withColumn("label",labelsalaryUdf(tempDf.salary))

tempDf2.show()
Here is the output:

+----+-------------+-----+
| name|salary|label|
+----+-------------+-----+
|ram|         12000|  Low|
|vikas|         30000| High|
How will you perform order by in Dataframe?
Df.orderBy("TimeInSecond", ascending=True).
DfSorted2 =Df.orderBy("TimeInSecond",ascending=False)
DfSorted3 = Df.orderBy("Occupation","TimeInSecond",  ascending=[False,True])

Where will you use sortWithinPartitions?

In PySpark SQL, partition-wise sorting is executed using the sortWithinPartitions(*cols, **kwargs) function. The sortWithinPartitions() function returns a new DataFrame. Arguments are similar to the orderBy() function’s argument.

sortedPartitons = Df.sortWithinPartitions
("name","TimeInSecond", ascending=[False,True])

How will you remove duplicate records from a DataFrame?
we can use the drop_duplicates() function. 
This function can remove duplicated data conditioned on column. 
If no column is specified then all the records in all the columns are checked.

duplicateDf = spark.read.orc(path='duplicate')

drop all the duplicate records.

noDuplicateDf1 = duplicateDf.drop_duplicates()

Remove duplicates from column1.
noDuplicateDf2 = duplicateDf.drop_duplicates(['col1'])

Remove duplicate from col1,col2:
noDuplicateDf3 = duplicateDf.drop_duplicates(['col1','col2'])
How will you retrieve some records from a given DataFrame?

There are two DataFrame functions that can be used to get samples from DataFrames.
The first function—sample(withReplacement, fraction, seed=None)—returns a sample from a DataFrame.
This function returns a new DataFrame. Its first argument, withReplacement, specifies that, if you need duplicate records in sampled data, the second argument fraction is the sampling fraction.
Since that sample is taken randomly using some random number mechanism, the seed is used in the random number generation internally.

The second function—sampleBy(col, fractions, seed=None)—will perform sampling conditioned on some column of the DataFrame which we provide as the col argument. The fractions argument is used to provide a sample fraction of each strata. This argument takes its value as a dictionary.

Example:
Df1.show(2)

+—+—+—–+
|cl1|cl2| cl3|
+—+—+—–+
| c1| d2| 9|
| c1| d2|11|

To fetch 50% of the records as a sample without replacement.

sampleWithoutKeyConsideration = noDuplicateDf1.sample(withReplacement=False, fraction=0.5, seed=200)

sampleWithoutKeyConsideration.show()

To get distinct records in the output.

sampleWithoutKeyConsideration1 = noDuplicateDf1.sample(withReplacement=True, fraction=0.5, seed=200)

Sampling Data from the noDuplicateDf1 DataFrame Conditioned on the cl1 Column
Column cl1 has two values—x1 and x2. To perform sampling, we have to use the sampleBy() function. Now we are going to condition on column cl1, which we are providing as a value of the argument col to the sampleBy() function.
We are looking for equal representatives in output DataFrame from strata x1 and x2. Therefore, we have provided {‘x1’:0.5, ‘x2’:0.5} as the value of fractions.

sampleWithKeyConsideration = noDuplicateDf1.sampleBy(col='cl1', fractions={'c1':0.5, 'c2':0.5},seed=200)
How will you find most occuring data in the columns of the dataframe?

we can use the freqItems() function.

First we are going to calculate the frequent items in column col1.

DataDf.freqItems(cols=['col1']).show()
+————-+
|col1_freqItems|
+————-+
| [c1, c2]|
+————-+
Now we are going to determine the frequent items in the col1 and col2 columns.

DataDf.freqItems(cols=['col1','col2']).show()

+————-+————-+
|col1_freqItems|col2_freqItems|
+————-+————-+
| [c1, c2]| [d2, d1]|
+————-+————-+

How will you use groupby in dataframe?
Calculating the Required Means

groupedOndf = DF2.groupby(["admit"]).mean()

Grouping by Gender

groupedDF3 = DF2.groupby(["gender"]).mean()

DF4 = DF3.groupby([
"admit", "department" ]).mean()
How will you perform different types of JOINS in Dataframe?
innerDf = TableDF1.join(TableDF2, TableDF1.CL1 == TableDF2.CL1, how= "inner")

leftOuterDf = TableDF1.join(TableDF2, TableDF1.CL1 == TableDF2.CL1, how= "left")

rightOuterDf = TableDF1.join(TableDF2, TableDF1.CL1 == TableDF2.CL1, how= "right")

outerDf = TableDF1.join(TableDF2, TableDF1.CL1 == TableDF2.CL1, how= "outer")

How will you perform UNION between 2 dataframes?

DFUNION = DF1.union(DF2)

Suppose you have two dataframes DF1 ,DF2 and you have to create DF3 from DF1 and DF2.DF1 has 2 columns C1,C2 and DF2 has C3,C4. Then DF3 should have combination of these 2 DF1,DF2 Example :df3 should have c1,c2,c3,c4.

DF1.SHOW(3)
 col1 col2 
 1	ram	
 2	vikas
 3	cloud
 
 DF2.SHOW(3)
  col3 col4 
 4	JOB	
 5	EMPLOYEE
 6	SELF
 
 DF3.SHOW(3)
  col1 col2 COL3 COL4
 1	ram		4	JOB
 2	vikas	5	EMPLOYEE
 3	cloud	6	SELF

Solution:

we have to add a new column to both dataframes. Both DataFrames need to have a new column that shows the integer sequence.
We can create this new column using the monotonically_increasing_id() function.
We can get this function from the pyspark.sql.functions submodule.



from pyspark.sql.functions import monotonically_increasing_id

In the following line of code, we are going to add a new column that contains a sequence of integers, starting from zero.

DF1 = DF1.withColumn("id",monotonically_increasing_id() )

DF1.show()
 col1 col2 id
 1	ram	0
 2	vikas 1
 3	cloud 2
 

The following line of code will add the same column to the DF2 DataFrame.

DF2 = DF2.withColumn( "id",monotonically_increasing_id() )

DF2.show()

  col3 col4 id
 4	JOB	0
 5	EMPLOYEE 1
 6	SELF 2


We are now going to perform an inner join on DF1 and DF2. An inner join is performed on the id column.

Df4 = DF1.join(DF2,DF1.id == DF2.id, how='inner')
Df3 = Df4.drop("id")

Df3.show()
  col1 col2 COL3 COL4
 1	ram		4	JOB
 2	vikas	5	EMPLOYEE
 3	cloud	6	SELF
How will you remove missing value records in a DataFrame?
We have tools in PySparkSQL to handle missing data in DataFrames. 
dropna() and fillna() are two important functions that deal with 
missing or null values.

DF1.SHOW()

C1	C2	C3
1	1	4
2	3	
3		2
4	

The dropna() function can remove rows that has null data. 
It has 3 arguments. The first argument is how, and it can take two 
values—any or all. 
If the value of how is all, a row will be removed only if all the 
values of the row are null. 
If the value of how is any, the row will be removed if any of its 
values are null.

The second argument of the dropna() function is thresh. The default 
value for thresh is None. It has an integer value. The thresh argument 
overwrites the first argument, how. If thresh is set to the integer n,
 all rows where the number of non-null values is less than n will be 
removed.

The last argument of the dropna() function is subset. This is an 
optional name of columns to be considered.

The second important function is fillna(). The first argument is a 
value. The fillna() function will replace any null value with the 
value argument.

We can typecast the datatype using the cast() function inside the 
withColumn() function, as shown in this code.

DF1 =  DF1.withColumn("CL2", DF1.CL2.cast(DoubleType())).withColumn
("CL3", DF1.CL3.cast(DoubleType()))

DF1.show()

+-----+-----+-----+
|  CL1|  CL2|  CL3|
+-----+-----+-----+
1	1	4
2	3	NULL
3	NULL	2
4	NULL	NULL
we can observe nulls in the DataFrame.

Step Dropping the Rows that Have Null Values
We can drop those rows(HAVING NULL RECORD) using the dropna() function. 
We are going to set the value of the how argument to any. That way, 
the data in rows two and three will be removed.

DF1.dropna(how ='any').show()
Here is the output:

+-----+-----+-----+
|  CL1|  CL2|  CL3|
+-----+-----+-----+
1	1	4
Since all the values are not null, the all value of how won't affect 
the DataFrame.

In [11]: DF1.dropna(how ='all').show()
Here is the output:

+-----+-----+-----+
|  CL1|  CL2|  CL3|
+-----+-----+-----+
1	1	4
2	3	NULL
3	NULL	2
4	NULL	NULL
Step Dropping Rows that Have Null Values Using the thresh Argument
If the thresh value is set to 2, any row containing less than 
two non-null values will be removed. Only the fourth column has 
fewer than two non-null values (it has only one), so it is the only 
row that will be removed.

DF1.dropna(how ='all',thresh=2).show()

+-----+-----+-----+
|  CL1|  CL2|  CL3|
+-----+-----+-----+
1	1	4
2	3	NULL
3	NULL	2
We can observe that in output, only the fourth row was removed. 
Now let's change the value of thresh from 2 to 3. In that case,
 rows two and three also are removed.

DF1.dropna(how ='all',thresh=3).show()

+-----+-----+-----+
|  CL1|  CL2|  CL3|
+-----+-----+-----+
1	1	4
Step :Filling in the Missing Value with Some Number
we can replace null values with zeros using the fillna().

DF1.fillna(value=0).show()
Here is the output:

+-----+-----+-----+
|  CL1|  CL2|  CL3|
+-----+-----+-----+
1	1	4
2	3	0
3	0	2
4	0	0

Write a query to find High Score, Low Score, Total No. of Users (irrespective of status)
& Total No. of Active Users ( based on enrollment.status=’active’ and user.status=TRUE )
per each Course.

course
cid cname
mth Maths
phy Physics
chm Chemistry

user
uid uname status
u1 John TRUE
u2 Harry TRUE
u3 Ted FALSE
u4 Pattrick TRUE

enrollment
eid cid uid score status
1 mth u1 50 active
2 mth u2 60 inactive
3 mth u3 70 active
4 phy u1 40 active
5 phy u2 80 active

df_course = spark.createDataFrame([("mth","Maths"),("phy","Physics"),("chm","Chemistry")], ["course_cid", "course_cname"])

df_course.show()

+----------+------------+

|course_cid|course_cname|

+----------+------------+

|       mth|       Maths|

|       phy|     Physics|

|       chm|   Chemistry|

+----------+------------+

 

df_user = spark.createDataFrame([("u1","John","True"),("u2","Harry","True"),("u3","Ted","False"),("u4","Petrick","True")], ["user_uid", "user_uname","user_status"])

df_user.show()

+--------+----------+-----------+

|user_uid|user_uname|user_status|

+--------+----------+-----------+

|      u1|      John|       True|

|      u2|     Harry|       True|

|      u3|       Ted|      False|

|      u4|   Petrick|       True|

+--------+----------+-----------+

 

df_enrollment = spark.createDataFrame([(1,"mth","u1",50,"active"),(2,"mth","u2",60,"inactive"),(3,"mth","u3",70,"active"),(4,"phy","u1",40,"active"),(5,"phy","u2",80,"active")], ["enrollment_eid", "enrollment_cid","enrollment_uid","enrollment_score","enrollment_status"])

df_enrollment.show()

+--------------+--------------+--------------+----------------+-----------------+

|enrollment_eid|enrollment_cid|enrollment_uid|enrollment_score|enrollment_status|

+--------------+--------------+--------------+----------------+-----------------+

|             1|           mth|            u1|              50|           active|

|             2|           mth|            u2|              60|         inactive|

|             3|           mth|            u3|              70|           active|

|             4|           phy|            u1|              40|           active|

|             5|           phy|            u2|              80|           active|

+--------------+--------------+--------------+----------------+-----------------+

 

joined_df = df_enrollment.join(df_course, df_enrollment.enrollment_cid == df_course.course_cid,how='right')

joined_df1 = joined_df.join(df_user, joined_df.enrollment_uid == df_user.user_uid,how='left')

joined_df1.show()

+--------------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----------+

|enrollment_eid|enrollment_cid|enrollment_uid|enrollment_score|enrollment_status|course_cid|course_cname|user_uid|user_uname|user_status|

+--------------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----------+

|             3|           mth|            u3|              70|           active|       mth|       Maths|      u3|       Ted|      False|

|          null|          null|          null|            null|             null|       chm|   Chemistry|    null|      null|       null|

|             4|           phy|            u1|              40|           active|       phy|     Physics|      u1|      John|       True|

|             1|           mth|            u1|              50|           active|       mth|       Maths|      u1|      John|       True|

|             5|           phy|            u2|              80|           active|       phy|     Physics|      u2|     Harry|       True|

|             2|           mth|            u2|              60|         inactive|       mth|       Maths|      u2|     Harry|       True|

+--------------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----------+

 

 

import pyspark.sql.functions as F

result_1=joined_df1.groupBy('course_cid','course_cname').agg(F.min('enrollment_score') ,F.max('enrollment_score'),F.count('enrollment_uid'))

result_1_new=result_1.toDF("result1_course_cid","result1_course_cname","low_score","high_score","Total_users")

result_1_new.show(truncate=False)

+------------------+--------------------+---------+----------+-----------+

|result1_course_cid|result1_course_cname|low_score|high_score|Total_users|

+------------------+--------------------+---------+----------+-----------+

|phy               |Physics             |40       |80        |2          |

|mth               |Maths               |50       |70        |3          |

|chm               |Chemistry           |null     |null      |0          |

+------------------+--------------------+---------+----------+-----------+

 

import pyspark.sql.functions as F

result_2=joined_df1.filter("enrollment_status == 'active'").filter("user_status == 'True'").groupBy('course_cid','course_cname').agg(F.count('enrollment_status'))

result_2_new=result_2.toDF("result2_course_cid","result2_course_cname","active_user")

result_2_new.show(truncate=False)

+------------------+--------------------+-----------+

|result2_course_cid|result2_course_cname|active_user|

+------------------+--------------------+-----------+

|phy               |Physics             |2          |

|mth               |Maths               |1          |

+------------------+--------------------+-----------+

 

finalresult=result_1_new.join(result_2_new, result_1_new.result1_course_cid == result_2_new.result2_course_cid,how='full')

finalresult.select("result1_course_cid","result1_course_cname","high_score","low_score","Total_users","active_user").orderBy("result1_course_cid", ascending=False).toDF("cid","cname","high_score","low_score","total_users","active_users").show(truncate=False)

+---+---------+----------+---------+-----------+------------+

|cid|cname    |high_score|low_score|total_users|active_users|

+---+---------+----------+---------+-----------+------------+

|phy|Physics  |80        |40       |2          |2           |

|mth|Maths    |70        |50       |3          |1           |

|chm|Chemistry|null      |null     |0          |null        |

+---+---------+----------+---------+-----------+------------+


Write Dataframe code to generate top salary,employee name department wise following table data:

Employee
emp_id emp_name dept_id salary

1 Sachin IT 5000
2 Virat Tech 6000
3 Rohit IT 9000
4 Dhoni Tech 5000

Dept
dept_id dept_name
IT Information Tech
Tech Technology

Output
emp_name dept_name salary
Rohit Information Tech 9000
Virat Technology 6000

schema1 = new structtype().add("emp_id","String").add("emp_name","String").add("dept_id","String").add("salary","String")
schema2 = new structtype().add("dept_id","String").add("dept_name","String")
df1 = spark.read.csv("csv").option("header",true).schema(schema1).load(employee.csv)
df1 = spark.read.csv("csv").option("header",true).schema(schema2).load(Dept.csv)