VISIT PREVIOUS SETS ON
Explain Accumulator:
- We can implement counters or sums using an accumulator.
- Users can create a named or unnamed accumulator.
- We can create numeric accumulator by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() for Long or Double respectively.
Why do we need Dataframe?
- We use DataFrame because it provides memory management and an optimized execution plan better than RDD.
- In Custom memory management: data is stored in off-heap memory(in binary format). Doing so, memory is saved.
- In terms of serialization, java serialization is not used here.
- And Garbage collection process is also not used here.
How to save the contents of a DataFrame to a CSV file?
- use the DataFrameWriter class and its DataFrame.write.csv() function
- the DataFrame.write.csv() function has many arguments. Currently, we will discuss its three arguments—path, sep, and header.
- The path argument defines the directory where DataFrame will be written.
- We can specify the data field separator using the sep argument.
- If the value of the header argument is True, the header of the DataFrame will be written as the first line in the CSV file.
write the DataFrame to a CSV file.
DF1.write.csv(path='csvFileDir', header=True,sep=',')
How to Save a DataFrame as a JSON File?
To save a DataFrame as a JSON file, use the DataFrameWriter class
function called json()
Df.write.json(path='jsonData')
Df.show(6)
How will you read a Parquet file and save a DataFrame as a Parquet file.
#Read a Parquet file
Df = spark.read.parquet('employee')
Df.show(6)
+----+-------------+
| Id Employee|
+----+-------------+
|1| vikas|
|2| cloud|
#save a DataFrame as a Parquet file
Df.write.parquet(path='user/spark')
How will you save a DataFrame as an ORC file?
Df.write.orc(path='orcData')
How will you read a table of data from MySQL and save the contents of a DataFrame to MySQL.?
#Command for PySpark shell with a MySQL JDBC connector.
pyspark --driver-class-path ~/.ivy2/jars/mysql_mysql- connector-
java-8.0.12.jar --packages mysql:mysql-connector- java:8.0.12
URL1 = "jdbc:mysql://localhost/DB1"
DF1 = spark.read.format("jdbc").options(url =URL1, database ='DB1', dbtable ='table1', user="root",password="").load();
#The options() function is used to set different options in this command.
#In the options() function, we set the value of url, the value of database, the value of table, and the user and password of the database.
ucbDataFrame.show()
+--------+------+----------+---------+
| cl1|cl2|cl3|cl4|
+--------+------+----------+---------+
|cloud| vikas | A| 100|
#save the contents of a DataFrame to MySQL.
PySpark shell with the MySQL connector.
$pyspark --driver-class-path ~/.ivy2/jars/mysql_mysql- connector-
java-8.0.12.jar --packages mysql:mysql-connector- java:8.0.12
#Define the url database and then save it into a MySQL database.
dbURL = "jdbc:mysql://localhost/sqlbook"
Df.write.format("jdbc").options(url = dbURL,database ='sqlbook',
dbtable ='mytab', user="root",password="").save()
How will you read a table of data from a PostgreSQL database and save to postgresql?
PostgreSQL JDBC connector command:
$pyspark --driver-class-path ~/.ivy2/jars/org.
postgresql_postgresql- 42.2.4.jar --packages org.
postgresql:postgresql:42.2.4
dbURL = "jdbc:postgresql://localhost/sqldb?user=
postgres&password=""
DfOne = spark.read.format("jdbc").options(url =dbURL, database ='sqldb', dbtable ='table1').load();
We can set different options in the options() function.
DfOne.show(2)
Here is the output:
+-----+-----+-----+
| C1| C2| C3|
+-----+-----+-----+
| CLOUDVIKAS|CLOUD|AWS|
|SPARK| HADOOP| BIG|
save a DataFrame into PostgreSQL.
PySpark shell with the PostgreSQL connector.
$pyspark --driver-class-path ~/.ivy2/jars/org.
postgresql_postgresql- 42.2.4.jar --packages org.
postgresql:postgresql:42.2.4
Define the url database and then save it into a database.
dbURL = "jdbc:postgresql://localhost/pysparksqldb?user=postgres&password=""
We are saving the DataFrame contents to the table.
Df.write.format("jdbc").options(url = dbURL,
database ='pysparksqlbook', dbtable ='mytab').save()
How will you read a table of data from a Cassandra database?
Cassandra connector command:
$ pyspark --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0
read data tables from the Cassandra database.
Df = spark.read.format("org.apache.spark.sql.cassandra").options( keyspace="sqlbook", table="table1").load()
Df.show(1)
output:
+---------+------+-------+
|C1|C2| C3|
+---------+------+-------+
| cloud| hadoop| cloudvikas|
VISIT PREVIOUS SETS ON