SPARK-IQ-9

What are the different functions in Dataframe?
  • agg (exprs) select (cols)
  • cov(coll, coi2)
  • sort(*cols, *kwargs) crosstab (coil, col2) where (condition) cube (cols)
  • withColumn (colName, col)
  • filter (condition)
  • avg (cols) groupBy (cois)
  • max (cols) join(other, on=None, how=None) mean (cols)
  • orderBy(*cols, *kwargs) replace (to_replace, value, sub5et) min (cols)
  • roilup(*cols)
How to read data from column?

from pyspark.sql.functions import *
airlinesDF.select(column(”Origin”), col(’Dest”), airlinesDF.Distance,"year" ).show(1O)

  • we can use column,col object ,double quotes within column.
  • Consider we have one table,in that there are 3 columns.Day,month,year. Create one table with one column having DATE as in format YYYY-MM-DD.
using expr function

a1r1nesDF.setect(”0r1g1n, Dest’, “Distance”, expr(”to_date(concat(Year,Month,DayofMonth),’yyyyMMdd) as FlightDate”)) .show(1)I
origin dest distance flightdate
1	san	sf0	4471	1987-10-14


a1r1nesDF.setect(”0r1g1n, Dest’, “Distance”, to_date(concat("Year","Month","DayofMonth"),’yyyyMMdd').alias("FlightDate”)) .show(1)I
origin dest distance flightdate
1	san	sf0	4471	1987-10-14

Convert year in proper format like 81 -> 1981, 11->2011
+—–+—+—–+—-+———–+
| name|day|month|year| id|
+—–+—+—–+—-+———–+
|vikas| 23| 5| 81| 0|
| Ravi| 28| 1|2002| 8589934592|
|vikas| 23| 5| 81| 8589934593|
| John| 12| 12| 6|17179869184|
| kamal| 7| 8| 63|17179869185|
+—–+—+—–+—-+———–+



df3 = df1.withColumn("year", expr("""
         case when year < 21 then cast(year as int) + 2000
         when year < 100 then cast(year as int) + 1900
         else year
         end"""))
df3.show()



+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|vikas| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|vikas| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| kamal|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+



OR

df4 = df1.withColumn("year", expr("""
         case when year < 21 then year + 2000
         when year < 100 then year + 1900
         else year
         end""").cast(IntegerType()))
df4.show()


+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|vikas| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|vikas| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| kamal|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

OR

df5 = df1.withColumn("day", col("day").cast(IntegerType())) \
         .withColumn("month", col("month").cast(IntegerType())) \
         .withColumn("year", col("year").cast(IntegerType())) 

df6 = df5.withColumn("year", expr("""
          case when year < 21 then year + 2000
          when year < 100 then year + 1900
          else year
          end"""))
df6.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|vikas| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|vikas| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| kamal|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+

OR

df7 = df5.withColumn("year", \
                    when(col("year") < 21, col("year") + 2000) \
                    .when(col("year") < 100, col("year") + 1900) \
                    .otherwise(col("year")))
df7.show()

+-----+---+-----+----+-----------+
| name|day|month|year|         id|
+-----+---+-----+----+-----------+
|vikas| 23|    5|1981|          0|
| Ravi| 28|    1|2002| 8589934592|
|vikas| 23|    5|1981| 8589934593|
| John| 12|   12|2006|17179869184|
| kamal|  7|    8|1963|17179869185|
+-----+---+-----+----+-----------+
Add DOB column at the end of table.

df8 = df7.withColumn(“dob”, expr(“to_date(concat(day,’/’,month,’/’,year), ‘d/M/y’)”))
df8.show()

+—–+—+—–+—-+———–+———-+
| name|day|month|year| id| dob|
+—–+—+—–+—-+———–+———-+
|vikas| 23| 5|1981| 0|1981-05-23|
| Ravi| 28| 1|2002| 8589934592|2002-01-28|
|vikas| 23| 5|1981| 8589934593|1981-05-23|
| John| 12| 12|2006|17179869184|2006-12-12|
| kamal| 7| 8|1963|17179869185|1963-08-07|
+—–+—+—–+—-+———–+———-+


There is a running competition. The race is for 20 meters. The number of participants in the competition is 10. The time in seconds has been noted.

id Gender Name runningtime

id1 Male cloudvikas 10

id2 Female shyam 20

calculate the running speed for each person and add it as a new column to the DataFrame.

id Gender Name runningtime personSpeed

id1 Male cloudvikas 10 2

If we want to do an operation on each element of a column in the 
DataFrame, we have to use the withColumn() function.

The withColumn() function is defined on the DataFrame object. 
This function can add a new column to the existing DataFrame or 
it can replace a column with a new column containing new data. 

we can read a CSV file and create a DataFrame from it.

personDf = spark.read.csv('personData.csv',header=True, 
inferSchema=True)

personDf.show(4)

+----+------+-----------+----------------+
|  id|Gender| Name|runningtime|
+----+------+-----------+----------------+
| id1|  Male| cloudvikas|           10|
| id2|Female|    shyam|           20|

The personDf DataFrame has been created. We used the inferSchema 
argument set to True. It means that PySpark SQL will infer the 
schema on its own. Therefore, it is better to check it. 
So let's print it using the printSchema() function.

In [3]:  personDf.printSchema()
Here is the output:

root
 |-- id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- runningtime: double (nullable = true)
Calculating person Speed for Each person and Adding It as a Column
How do we calculate the person speed? 
Speed is defined as distance over time, which we can calculate using 
the withColumn() function as an expression. 
We have to calculate the speed of each person and add the results as 
a new column. The following line of code serves this purpose.

personDf1 = personDf.withColumn('personSpeed',20.0/personDf.runningtime)
We know that the withColumn() function returns a new DataFrame. 
The new column will be added to the newly created DataFrame called 
personDf1. We can verify this using the show() function.

personDf1.show(4)
Here is the output:

+---+------+-----------+----------------+------------------+
| id|Gender| Name|runningtime|      personSpeed|
+---+------+-----------+----------------+------------------+
|id1|  Male| cloudvikas|           10| 2|

How to select one column CL1 from dataframe DF2?

Df3 = Df2.select("CL1")

Df3 = Df2.select("CL1","CL2")