Why is Parquet file format is recommended?
1 | Parquet is a columnar file format that provides optimizations to speed up queries |
2 | In Spark SQL,Parquet provides support for both reading and writing and it capture the schema of the original data. |
3 | It also reduces data storage by 75% . |
4 | Reduces IO operations. |
5 | Fetches specific columns that you need to access. |
6 | Consumes less space. |
7 | Support type-specific encoding. |
Let me explain in detail:
Read csv file and print its default schema
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.getOrCreate()
logger = Log4j(spark)
inputfiledf = spark.read \
.format("csv") \
.option("header", "true") \
.load("data/cloud*.csv")
inputfiledf.show(5)
logger.info("CSV Schema:" + inputfiledf.schema.simpleString())
After running this script, All fields datatype is String.
Now let’s try InferSchema.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.getOrCreate()
logger = Log4j(spark)
inputfiledf = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("data/cloud*.csv")
inputfiledf.show(5)
logger.info("CSV Schema:" + inputfiledf.schema.simpleString())
post execution , we get integer datatype in schema for integer fields.But Date field is still String.
Thats why we dont rely on inferschema.
So we have two options for Schema in Dataframe.
1.Implicit and 2. Explicit
In implicit way, we can use parquet file format to load data. It will store data in correct data type format as per schema.
Lets check:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, StringType, IntegerType
from lib.logger import Log4j
if __name__ == "__main__":
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("SparkSchemaDemo") \
.getOrCreate()
logger = Log4j(spark)
inputfiledf = spark.read \
.format("parquet") \
.load("data/inputfile*.parquet")
inputfiledf.show(5)
logger.info("Parquet Schema:" + inputfiledf.schema.simpleString())
after running this, we can see the date field format as the date and integer type as well.
So whatever input file formats we have, we should try to get into a parquet file format.
How will you find out partitions count?
If you want to check partitions count then execute below code:
logger.info("Num Partitions before: " + str(flightTimeParquetDF.rdd.getNumPartitions()))
it will give you partitions count. post-execution, we got partitions to count as 2.
How can we control output file size?
we can control file size using maxRecordsPerFile.
FINALDF.write \
.format("json") \
.mode("overwrite") \
.option("path", "data/json/") \
.partitionBy("column1", "column2") \
.option("maxRecordsPerFile", 10000) \--- only 10000 records would be in 1 file.
.save()