SPARK-IQ-4

How will you define Schema explicitly?
	
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSchemaDemo") \
        .getOrCreate()

    logger = Log4j(spark)

    inputSchemaStruct = StructType([
        StructField("col1", DateType()),
        StructField("col2", StringType()),
        StructField("col3", IntegerType()),
        StructField("col4", IntegerType())
    ])

    inputSchemaDDL = """col1 DATE, col2 STRING, col3 INT, col4 INT"""
	
	inputTimeCsvDF = spark.read \
        .format("csv") \
        .option("header", "true") \
        .schema(inputSchemaStruct) \
        .option("mode", "FAILFAST") \
        .option("dateFormat", "M/d/y") \
        .load("data/input*.csv")

    inputTimeCsvDF.show(5)
    logger.info("CSV Schema:" + inputTimeCsvDF.schema.simpleString())
	
What is Spark DataFrameWriter API?

DataFrameWriter is the interface to describe data (as the result of executing a structured query) should be saved to an external data source. DataFrameWriter defaults to parquet data source format.
You can change the default format using spark.

Its General structure is :
DataFrameWriter
.format(…)
.option(…)
.partitionBy(…)
.bucketBy(…)
.sortBy(…)
. save ()

Take an example:
dataframe.write
.format("parquet")
.mode (saveMode)
.option(”path”, “/data/cloud”)
. save ()

.format –> there are many file formats in which Spark stores data. Such as CSV,JSON,AVRO,PARQUET,ORC,JDBC.By default, Spark considers parquet file format if you do not specify the file format.
We can store data in Community Formats like Cassandra, MongoDB, AVRO, XML,HBase, Redshift.

.mode –
Read Mode

  1. append —will create a new file at a given location
  2. overwrite — will overwrite the existing file
  3. errorifExists–will display an error if it exists
  4. ignore —

Spark File Layout

  1. Number of files and file size
  2. Partitions and Buckets
  3. Sorted data

Dataframe is partitioned. So when we are going to write data into files then each partition will generate one file for each partition.
It is the default behavior but we can change it while writing data into files.
We can repartition data before writing to files.

1st way to use repartition transformation.
ex: dataframe.repartition(n)

2nd way is to use partitionby transformation.
DataFrameWriter.partitionBy(coIl, coI2)
In this, we can use the keys column to partition data. we can use a single column or composite keys columns to partition.
The key-based method is a powerful logic to partition data.

3rd way is to use bucketBy : it is only available in the spark managed table.
DataFrameWriter.bucketBy(n, coil, coi2)

There are two more methods to study:

  1. sortBy()--it is used in bucketing.
  2. maxRecordsPerFile--it is used to limit the count of records per file.
How will you remove duplicate name and drop any column?

df9 = df7.withColumn(“dob”, to_date(expr(“concat(day,’/’,month,’/’,year)”), ‘d/M/y’)) \
.drop(“day”, “month”, “year”) \
.dropDuplicates([“name”, “dob”]) \
.sort(expr(“dob desc”))
df9.show()

+—–+———–+———-+
| name| id| dob|
+—–+———–+———-+
| kamal|17179869185|1963-08-07|
|vikas| 0|1981-05-23|
| Ravi| 8589934592|2002-01-28|
| John|17179869184|2006-12-12|
+—–+———–+———-+