SPARK-IQ-6

How will you get same number of output files as per partiotions count?

we can use repartition transformation. It will create output files based on partitions number.
Example:

    FinalDF= spark.read \
        .format("parquet") \
        .load("dataSource/final*.parquet")

    partitionedDF = FinalDF.repartition(5)
    logger.info("Num Partitions after: " + str(partitionedDF.rdd.getNumPartitions()))
    partitionedDF.groupBy(spark_partition_id()).count().show()

    partitionedDF.write \
        .format("avro") \
        .mode("overwrite") \
        .option("path", "data/avro/") \
        .save()
 	
Post execution, we will get 5 output files as we have given 5 partitions count.
Why do we use partitionby in Dataframe?

If we want output data based on particular columns then we can use partitionby transformation.
Example:

 FINALDF.write \
        .format("json") \
        .mode("overwrite") \
        .option("path", "data/json/") \
        .partitionBy("column1", "column2") \
        .save()

it will create output files based on partiotions columns.
we can get many directories for column1 and then column2 would be under column1.

–json
column1=A
column2=1
part-001234.json
part-001234.json.crc
column2=2
part-0012134.json
part-0012134.json.crc
column1=B
column2=3
column2=4

Note: file part-001234.json won’t keep column1 and column2 because these columns are used in partitionBy.

How can we control output file size?

we can control file size using maxRecordsPerFile.

 FINALDF.write \
        .format("json") \
        .mode("overwrite") \
        .option("path", "data/json/") \
        .partitionBy("column1", "column2") \
        .option("maxRecordsPerFile", 10000) \--- only 10000 records would be in 1 file.
        .save()
Where do we store Table metadata in Spark?

Catalog Metastore

Suppose you have one table having employee details. It has 2 coulmns name and salary.
How will you add 3rd column as Lebel whose value depends on salary?
it will return High if salary is more than 20000 else its value would be Low.
we need a udf() function because we have to create a UDF that can create labels .

from pyspark.sql.functions import udf

We are going to create a Python function named labelsalary. 
This function will take the salary  as the input and return High or Low, 
depending on the conditions.

def labelsalary(salary) :
     if salary > 20000 :
         return "High"
     else :
         return "Low"

 labelsalary(10000)

 'Low'

 labelsalary(30000)

 'High'
 
 Let's create a PySpark SQL UDF using labelsalary.

labelsalaryUdf = udf(labelsalary)

The new DataFrame called tempDf2 is created with a new column label using the withColumn() function:

tempDf2 = tempDf.withColumn("label",labelsalaryUdf(tempDf.salary))

tempDf2.show()
Here is the output:

+----+-------------+-----+
| name|salary|label|
+----+-------------+-----+
|ram|         12000|  Low|
|vikas|         30000| High|
Write a query to find High Score, Low Score, Total No. of Users (irrespective of status)
& Total No. of Active Users ( based on enrollment.status=’active’ and user.status=TRUE )
per each Course.

course
cid cname
mth Maths
phy Physics
chm Chemistry

user
uid uname status
u1 John TRUE
u2 Harry TRUE
u3 Ted FALSE
u4 Pattrick TRUE

enrollment
eid cid uid score status
1 mth u1 50 active
2 mth u2 60 inactive
3 mth u3 70 active
4 phy u1 40 active
5 phy u2 80 active

df_course = spark.createDataFrame([("mth","Maths"),("phy","Physics"),("chm","Chemistry")], ["course_cid", "course_cname"])

df_course.show()

+----------+------------+

|course_cid|course_cname|

+----------+------------+

|       mth|       Maths|

|       phy|     Physics|

|       chm|   Chemistry|

+----------+------------+

 

df_user = spark.createDataFrame([("u1","John","True"),("u2","Harry","True"),("u3","Ted","False"),("u4","Petrick","True")], ["user_uid", "user_uname","user_status"])

df_user.show()

+--------+----------+-----------+

|user_uid|user_uname|user_status|

+--------+----------+-----------+

|      u1|      John|       True|

|      u2|     Harry|       True|

|      u3|       Ted|      False|

|      u4|   Petrick|       True|

+--------+----------+-----------+

 

df_enrollment = spark.createDataFrame([(1,"mth","u1",50,"active"),(2,"mth","u2",60,"inactive"),(3,"mth","u3",70,"active"),(4,"phy","u1",40,"active"),(5,"phy","u2",80,"active")], ["enrollment_eid", "enrollment_cid","enrollment_uid","enrollment_score","enrollment_status"])

df_enrollment.show()

+--------------+--------------+--------------+----------------+-----------------+

|enrollment_eid|enrollment_cid|enrollment_uid|enrollment_score|enrollment_status|

+--------------+--------------+--------------+----------------+-----------------+

|             1|           mth|            u1|              50|           active|

|             2|           mth|            u2|              60|         inactive|

|             3|           mth|            u3|              70|           active|

|             4|           phy|            u1|              40|           active|

|             5|           phy|            u2|              80|           active|

+--------------+--------------+--------------+----------------+-----------------+

 

joined_df = df_enrollment.join(df_course, df_enrollment.enrollment_cid == df_course.course_cid,how='right')

joined_df1 = joined_df.join(df_user, joined_df.enrollment_uid == df_user.user_uid,how='left')

joined_df1.show()

+--------------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----------+

|enrollment_eid|enrollment_cid|enrollment_uid|enrollment_score|enrollment_status|course_cid|course_cname|user_uid|user_uname|user_status|

+--------------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----------+

|             3|           mth|            u3|              70|           active|       mth|       Maths|      u3|       Ted|      False|

|          null|          null|          null|            null|             null|       chm|   Chemistry|    null|      null|       null|

|             4|           phy|            u1|              40|           active|       phy|     Physics|      u1|      John|       True|

|             1|           mth|            u1|              50|           active|       mth|       Maths|      u1|      John|       True|

|             5|           phy|            u2|              80|           active|       phy|     Physics|      u2|     Harry|       True|

|             2|           mth|            u2|              60|         inactive|       mth|       Maths|      u2|     Harry|       True|

+--------------+--------------+--------------+----------------+-----------------+----------+------------+--------+----------+-----------+

 

 

import pyspark.sql.functions as F

result_1=joined_df1.groupBy('course_cid','course_cname').agg(F.min('enrollment_score') ,F.max('enrollment_score'),F.count('enrollment_uid'))

result_1_new=result_1.toDF("result1_course_cid","result1_course_cname","low_score","high_score","Total_users")

result_1_new.show(truncate=False)

+------------------+--------------------+---------+----------+-----------+

|result1_course_cid|result1_course_cname|low_score|high_score|Total_users|

+------------------+--------------------+---------+----------+-----------+

|phy               |Physics             |40       |80        |2          |

|mth               |Maths               |50       |70        |3          |

|chm               |Chemistry           |null     |null      |0          |

+------------------+--------------------+---------+----------+-----------+

 

import pyspark.sql.functions as F

result_2=joined_df1.filter("enrollment_status == 'active'").filter("user_status == 'True'").groupBy('course_cid','course_cname').agg(F.count('enrollment_status'))

result_2_new=result_2.toDF("result2_course_cid","result2_course_cname","active_user")

result_2_new.show(truncate=False)

+------------------+--------------------+-----------+

|result2_course_cid|result2_course_cname|active_user|

+------------------+--------------------+-----------+

|phy               |Physics             |2          |

|mth               |Maths               |1          |

+------------------+--------------------+-----------+

 

finalresult=result_1_new.join(result_2_new, result_1_new.result1_course_cid == result_2_new.result2_course_cid,how='full')

finalresult.select("result1_course_cid","result1_course_cname","high_score","low_score","Total_users","active_user").orderBy("result1_course_cid", ascending=False).toDF("cid","cname","high_score","low_score","total_users","active_users").show(truncate=False)

+---+---------+----------+---------+-----------+------------+

|cid|cname    |high_score|low_score|total_users|active_users|

+---+---------+----------+---------+-----------+------------+

|phy|Physics  |80        |40       |2          |2           |

|mth|Maths    |70        |50       |3          |1           |

|chm|Chemistry|null      |null     |0          |null        |

+---+---------+----------+---------+-----------+------------+


Write Dataframe code to generate top salary,employee name department wise following table data:

Employee
emp_id emp_name dept_id salary

1 Sachin IT 5000
2 Virat Tech 6000
3 Rohit IT 9000
4 Dhoni Tech 5000

Dept
dept_id dept_name
IT Information Tech
Tech Technology

Output
emp_name dept_name salary
Rohit Information Tech 9000
Virat Technology 6000

schema1 = new structtype().add("emp_id","String").add("emp_name","String").add("dept_id","String").add("salary","String")
schema2 = new structtype().add("dept_id","String").add("dept_name","String")
df1 = spark.read.csv("csv").option("header",true).schema(schema1).load(employee.csv)
df1 = spark.read.csv("csv").option("header",true).schema(schema2).load(Dept.csv)