What are the different functions in Dataframe?
- agg (exprs) select (cols)
- cov(coll, coi2)
- sort(*cols, *kwargs) crosstab (coil, col2) where (condition) cube (cols)
- withColumn (colName, col)
- filter (condition)
- avg (cols) groupBy (cois)
- max (cols) join(other, on=None, how=None) mean (cols)
- orderBy(*cols, *kwargs) replace (to_replace, value, sub5et) min (cols)
- roilup(*cols)
How to read data from column?
from pyspark.sql.functions import *
airlinesDF.select(column(”Origin”), col(’Dest”), airlinesDF.Distance,"year" ).show(1O)
- we can use column,col object ,double quotes within column.
- Consider we have one table,in that there are 3 columns.Day,month,year. Create one table with one column having DATE as in format YYYY-MM-DD.
using expr function
a1r1nesDF.setect(”0r1g1n, Dest’, “Distance”, expr(”to_date(concat(Year,Month,DayofMonth),’yyyyMMdd) as FlightDate”)) .show(1)I
origin dest distance flightdate
1 san sf0 4471 1987-10-14
a1r1nesDF.setect(”0r1g1n, Dest’, “Distance”, to_date(concat("Year","Month","DayofMonth"),’yyyyMMdd').alias("FlightDate”)) .show(1)I
origin dest distance flightdate
1 san sf0 4471 1987-10-14
Convert year in proper format like 81 -> 1981, 11->2011
+—–+—+—–+—-+———–+
| name|day|month|year| id|
+—–+—+—–+—-+———–+
|vikas| 23| 5| 81| 0|
| Ravi| 28| 1|2002| 8589934592|
|vikas| 23| 5| 81| 8589934593|
| John| 12| 12| 6|17179869184|
| kamal| 7| 8| 63|17179869185|
+—–+—+—–+—-+———–+
df3 = df1.withColumn("year", expr("""
case when year < 21 then cast(year as int) + 2000
when year < 100 then cast(year as int) + 1900
else year
end"""))
df3.show()
+-----+---+-----+----+-----------+
| name|day|month|year| id|
+-----+---+-----+----+-----------+
|vikas| 23| 5|1981| 0|
| Ravi| 28| 1|2002| 8589934592|
|vikas| 23| 5|1981| 8589934593|
| John| 12| 12|2006|17179869184|
| kamal| 7| 8|1963|17179869185|
+-----+---+-----+----+-----------+
OR
df4 = df1.withColumn("year", expr("""
case when year < 21 then year + 2000
when year < 100 then year + 1900
else year
end""").cast(IntegerType()))
df4.show()
+-----+---+-----+----+-----------+
| name|day|month|year| id|
+-----+---+-----+----+-----------+
|vikas| 23| 5|1981| 0|
| Ravi| 28| 1|2002| 8589934592|
|vikas| 23| 5|1981| 8589934593|
| John| 12| 12|2006|17179869184|
| kamal| 7| 8|1963|17179869185|
+-----+---+-----+----+-----------+
OR
df5 = df1.withColumn("day", col("day").cast(IntegerType())) \
.withColumn("month", col("month").cast(IntegerType())) \
.withColumn("year", col("year").cast(IntegerType()))
df6 = df5.withColumn("year", expr("""
case when year < 21 then year + 2000
when year < 100 then year + 1900
else year
end"""))
df6.show()
+-----+---+-----+----+-----------+
| name|day|month|year| id|
+-----+---+-----+----+-----------+
|vikas| 23| 5|1981| 0|
| Ravi| 28| 1|2002| 8589934592|
|vikas| 23| 5|1981| 8589934593|
| John| 12| 12|2006|17179869184|
| kamal| 7| 8|1963|17179869185|
+-----+---+-----+----+-----------+
OR
df7 = df5.withColumn("year", \
when(col("year") < 21, col("year") + 2000) \
.when(col("year") < 100, col("year") + 1900) \
.otherwise(col("year")))
df7.show()
+-----+---+-----+----+-----------+
| name|day|month|year| id|
+-----+---+-----+----+-----------+
|vikas| 23| 5|1981| 0|
| Ravi| 28| 1|2002| 8589934592|
|vikas| 23| 5|1981| 8589934593|
| John| 12| 12|2006|17179869184|
| kamal| 7| 8|1963|17179869185|
+-----+---+-----+----+-----------+
Add DOB column at the end of table.
df8 = df7.withColumn(“dob”, expr(“to_date(concat(day,’/’,month,’/’,year), ‘d/M/y’)”))
df8.show()
+—–+—+—–+—-+———–+———-+
| name|day|month|year| id| dob|
+—–+—+—–+—-+———–+———-+
|vikas| 23| 5|1981| 0|1981-05-23|
| Ravi| 28| 1|2002| 8589934592|2002-01-28|
|vikas| 23| 5|1981| 8589934593|1981-05-23|
| John| 12| 12|2006|17179869184|2006-12-12|
| kamal| 7| 8|1963|17179869185|1963-08-07|
+—–+—+—–+—-+———–+———-+
There is a running competition. The race is for 20 meters. The number of participants in the competition is 10. The time in seconds has been noted.
id Gender Name runningtime
id1 Male cloudvikas 10
id2 Female shyam 20
calculate the running speed for each person and add it as a new column to the DataFrame.
id Gender Name runningtime personSpeed
id1 Male cloudvikas 10 2
If we want to do an operation on each element of a column in the
DataFrame, we have to use the withColumn() function.
The withColumn() function is defined on the DataFrame object.
This function can add a new column to the existing DataFrame or
it can replace a column with a new column containing new data.
we can read a CSV file and create a DataFrame from it.
personDf = spark.read.csv('personData.csv',header=True,
inferSchema=True)
personDf.show(4)
+----+------+-----------+----------------+
| id|Gender| Name|runningtime|
+----+------+-----------+----------------+
| id1| Male| cloudvikas| 10|
| id2|Female| shyam| 20|
The personDf DataFrame has been created. We used the inferSchema
argument set to True. It means that PySpark SQL will infer the
schema on its own. Therefore, it is better to check it.
So let's print it using the printSchema() function.
In [3]: personDf.printSchema()
Here is the output:
root
|-- id: string (nullable = true)
|-- Gender: string (nullable = true)
|-- Name: string (nullable = true)
|-- runningtime: double (nullable = true)
Calculating person Speed for Each person and Adding It as a Column
How do we calculate the person speed?
Speed is defined as distance over time, which we can calculate using
the withColumn() function as an expression.
We have to calculate the speed of each person and add the results as
a new column. The following line of code serves this purpose.
personDf1 = personDf.withColumn('personSpeed',20.0/personDf.runningtime)
We know that the withColumn() function returns a new DataFrame.
The new column will be added to the newly created DataFrame called
personDf1. We can verify this using the show() function.
personDf1.show(4)
Here is the output:
+---+------+-----------+----------------+------------------+
| id|Gender| Name|runningtime| personSpeed|
+---+------+-----------+----------------+------------------+
|id1| Male| cloudvikas| 10| 2|
How to select one column CL1 from dataframe DF2?
Df3 = Df2.select("CL1")
Df3 = Df2.select("CL1","CL2")