SPARK-IQ-8

We can save dataframe into managed table.What will happen when
1)user save data without using partitionby and bucketby?
2) user save data using partitionby?
3) user save data using bucketby?

-> When we save data into managed table without partitionby and bucketby then managed table used to be created and parquet file resides under it.


from pyspark.sql import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \ ---for enabling Hive access
        .getOrCreate()

    logger = Log4j(spark)

    FINALDF = spark.read \
        .format("parquet") \
        .load("dataSource/")

    spark.sql("CREATE DATABASE IF NOT EXISTS example_db")
    spark.catalog.setCurrentDatabase("example_db")

    FINALDF.write \
        .mode("overwrite") \
        .saveAsTable("example_data_db")

    logger.info(spark.catalog.listTables("example_db"))---spark catalog has all data information so trying to display list of tables under database.

Output data location:
spark-warehouse
example_db.db
example_data_db
part-0000abc.snappy.parquet

–Using PartitionBy :
Lets apply partitionby on two columns:


from pyspark.sql import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \
        .getOrCreate()

    logger = Log4j(spark)

    FINALDF = spark.read \
        .format("parquet") \
        .load("dataSource/")

    spark.sql("CREATE DATABASE IF NOT EXISTS example_db")
    spark.catalog.setCurrentDatabase("example_db")

    FINALDF.write \
        .mode("overwrite") \
		.partitionBy("column1","column2") \
        .saveAsTable("example_data_db")

    logger.info(spark.catalog.listTables("example_db"))

After running this, we get data as partitions into managed table. These partitions counts depends on number of unique values respective to partition columns.

Output data location:
spark-warehouse
example_db.db
example_data_db
column1=a
column2=1
column1=b
column2=12

–Using BucketBy()
If we want to specify specific number of partitions count then we can use bucketby:


from pyspark.sql import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \
        .getOrCreate()

    logger = Log4j(spark)

    FINALDF = spark.read \
        .format("parquet") \
        .load("dataSource/")

    spark.sql("CREATE DATABASE IF NOT EXISTS example_db")
    spark.catalog.setCurrentDatabase("example_db")

    FINALDF.write \
        .mode("overwrite") \
		.bucketBy(5,"column1","column2") \ ---- number of buckets is 5.
		.saveAsTable("example_data_db")

    logger.info(spark.catalog.listTables("example_db"))

based on hashing , data is getting stored in different partitions.
If you want data to be sorted in sorting order then you can use sortby.

If we want to specify specific number of partitions count then we can use bucketby:


from pyspark.sql import *

from lib.logger import Log4j

if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("SparkSQLTableDemo") \
        .enableHiveSupport() \
        .getOrCreate()

    logger = Log4j(spark)

    FINALDF = spark.read \
        .format("parquet") \
        .load("dataSource/")

    spark.sql("CREATE DATABASE IF NOT EXISTS example_db")
    spark.catalog.setCurrentDatabase("example_db")

    FINALDF.write \
        .mode("overwrite") \
		.bucketBy(5,"column1","column2") \ ---- number of buckets is 5.
		.sortBy("column1","column2")---- use sortBy if you want columns to be in sorted .
        .saveAsTable("example_data_db")

    logger.info(spark.catalog.listTables("example_db"))
What is purpose of transformations in Dataframe?
  • Combining Data Frames
  • Aggregating and Summarizing
  • Applying Functions and built-in Transformations
  • Using Built-in and column-level functions
  • Creating and using UDFs
  • Referencing Rows/Columns
  • Creating Column Expressions
We got one requirement in which we have to convert date string into date format.
EXAMPLE: 22/3/2020 INTO 2020-03-22


from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

from lib.logger import Log4j


def to_date_df(df, fmt, fld):
    return df.withColumn(fld, to_date(fld, fmt))


if __name__ == "__main__":
    spark = SparkSession \
        .builder \
        .master("local[3]") \
        .appName("RowDemo") \
        .getOrCreate()

    logger = Log4j(spark)

    my_schema = StructType([
        StructField("ID", StringType()),
        StructField("EventDate", StringType())])

    my_rows = [Row("123", "04/05/2021"), Row("124", "4/5/2021"), Row("125", "04/5/2021")]
    my_rdd = spark.sparkContext.parallelize(my_rows, 2)
    my_df = spark.createDataFrame(my_rdd, my_schema)

    my_df.printSchema()
    my_df.show()
    new_df = to_date_df(my_df, "M/d/y", "EventDate")
    new_df.printSchema()
    new_df.show()

UNIT TESTING:

from datetime import date
from unittest import TestCase

from pyspark.sql import *
from pyspark.sql.types import *

from RowDemo import to_date_df


class RowDemoTestCase(TestCase):

    @classmethod
    def setUpClass(cls) -> None:
        cls.spark = SparkSession.builder \
            .master("local[3]") \
            .appName("RowDemoTest") \
            .getOrCreate()

        my_schema = StructType([
            StructField("ID", StringType()),
            StructField("createdate", StringType())])

        my_rows = [Row("129", "04/02/2030"), Row("121", "4/2/2010"), Row("120", "04/3/2021")]
        my_rdd = cls.spark.sparkContext.parallelize(my_rows, 2)
        cls.my_df = cls.spark.createDataFrame(my_rdd, my_schema)

    def test_data_type(self):
        rows = to_date_df(self.my_df, "M/d/y", "createdate").collect()
        for row in rows:
            self.assertIsInstance(row["createdate"], date)

    def test_date_value(self):
        rows = to_date_df(self.my_df, "M/d/y", "createdate").collect()
        for row in rows:
            self.assertEqual(row["createdate"], date(2020, 3, 8))
You want to read an Employee CSV file (having 3 string type columns and1 float type column)and perform below operations:
Read, print schema.
employee.csv. This has four columns—id, Gender, Occupation, and timestamp.
The first three columns are String datatype.
The last column is float or double data type.

Creating the Schema of the DataFrame
In our DataFrame, we have four columns. We define columns using the StructField() function.

from pyspark.sql.types import *
idColumn = StructField("id",StringType(),True)
gender = StructField("Gender",StringType(),True)
Occupation = StructField("Occupation",StringType(),True)
timestampColumn = StructField("timestamp",DoubleType(),True)

StructField for each column is created. Now we have to create a schema of the full DataFrame using a StructType object

columnList = [idColumn, gender, Occupation,timestampColumn]
employeeDfSchema = StructType(columnList)
employeeDfSchema is the full schema of our DataFrame.

StructType(List(StructField(id,StringType,true),StructField(Gender,StringType,true),StructField(Occupation,
                                     StringType,true),StructField(timestamp,DoubleType,true)))
                                 )
The schema of the four columns of the DataFrame can be observed by using the employeeDfSchema variable.

Reading a CSV File

empDf = spark.read.csv('data/employee.csv',header=True,schema=employeeDfSchema)

empDf.show(4)

empDf.printSchema()
Here is the output:

root
 |-- id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- timestamp: double (nullable = true)

If we set the value of the inferSchema argument to True in the csv() function, PySpark SQL will try to infer the schema.

empDf = spark.read.csv('employee.csv',header=True, inferSchema=True)

empDf.show(4)

The DataFrame called empDf has been created. We have used the inferSchema argument set to True. I