SPARK-IQ-13

VISIT PREVIOUS SETS ON

Explain Accumulator:
  • We can implement counters or sums using an accumulator.
  • Users can create a named or unnamed accumulator.
  • We can create numeric accumulator by calling SparkContext.longAccumulator() or SparkContext.doubleAccumulator() for Long or Double respectively.
Why do we need Dataframe?
  • We use DataFrame because it provides memory management and an optimized execution plan better than RDD.
  • In Custom memory management: data is stored in off-heap memory(in binary format). Doing so, memory is saved.
  • In terms of serialization, java serialization is not used here.
  • And Garbage collection process is also not used here.
How to save the contents of a DataFrame to a CSV file?
  • use the DataFrameWriter class and its DataFrame.write.csv() function
  • the DataFrame.write.csv() function has many arguments. Currently, we will discuss its three arguments—path, sep, and header.
  • The path argument defines the directory where DataFrame will be written.
  • We can specify the data field separator using the sep argument.
  • If the value of the header argument is True, the header of the DataFrame will be written as the first line in the CSV file.
write the DataFrame to a CSV file.

DF1.write.csv(path='csvFileDir', header=True,sep=',')

How to Save a DataFrame as a JSON File?
To save a DataFrame as a JSON file, use the DataFrameWriter class 
function called json()

Df.write.json(path='jsonData')
Df.show(6)
How will you read a Parquet file and save a DataFrame as a Parquet file.
#Read a Parquet file
Df = spark.read.parquet('employee')

Df.show(6)

+----+-------------+
| Id	Employee|
+----+-------------+
|1|         vikas|
|2|         cloud|

#save a DataFrame as a Parquet file

Df.write.parquet(path='user/spark')
How will you save a DataFrame as an ORC file?
Df.write.orc(path='orcData')
How will you read a table of data from MySQL and save the contents of a DataFrame to MySQL.?
#Command for PySpark shell with a MySQL JDBC connector.

pyspark --driver-class-path  ~/.ivy2/jars/mysql_mysql- connector-
java-8.0.12.jar --packages mysql:mysql-connector- java:8.0.12

URL1 = "jdbc:mysql://localhost/DB1"

DF1 = spark.read.format("jdbc").options(url =URL1, database ='DB1', dbtable ='table1', user="root",password="").load();
#The options() function is used to set different options in this command. 
#In the options() function, we set the value of url, the value of database, the value of table, and the user and password of the database.

ucbDataFrame.show()


+--------+------+----------+---------+
|   cl1|cl2|cl3|cl4|
+--------+------+----------+---------+
|cloud| vikas |         A|      100|

#save the contents of a DataFrame to MySQL.
PySpark shell with the MySQL connector.
$pyspark --driver-class-path  ~/.ivy2/jars/mysql_mysql- connector-
java-8.0.12.jar --packages  mysql:mysql-connector- java:8.0.12

#Define the url database and then save it into a MySQL database.
dbURL = "jdbc:mysql://localhost/sqlbook"
Df.write.format("jdbc").options(url = dbURL,database ='sqlbook', 
dbtable ='mytab', user="root",password="").save()
How will you read a table of data from a PostgreSQL database and save to postgresql?
PostgreSQL JDBC connector command:

$pyspark --driver-class-path  ~/.ivy2/jars/org.
postgresql_postgresql- 42.2.4.jar  --packages org.
postgresql:postgresql:42.2.4

dbURL = "jdbc:postgresql://localhost/sqldb?user=
postgres&password=""

DfOne = spark.read.format("jdbc").options(url =dbURL, database ='sqldb', dbtable ='table1').load();
We can set different options in the options() function.

DfOne.show(2)
Here is the output:

+-----+-----+-----+
|  C1|  C2|  C3|
+-----+-----+-----+
|  CLOUDVIKAS|CLOUD|AWS|
|SPARK| HADOOP| BIG|

save a DataFrame into PostgreSQL.
PySpark shell with the PostgreSQL connector.

$pyspark --driver-class-path  ~/.ivy2/jars/org.
postgresql_postgresql- 42.2.4.jar  --packages org.
postgresql:postgresql:42.2.4

Define the url database and then save it into a database.

dbURL = "jdbc:postgresql://localhost/pysparksqldb?user=postgres&password=""
We are saving the DataFrame contents to the table.

Df.write.format("jdbc").options(url = dbURL,
database ='pysparksqlbook', dbtable ='mytab').save()
How will you read a table of data from a Cassandra database?
Cassandra connector command:

$ pyspark  --packages com.datastax.spark:spark-cassandra-connector_2.11:2.3.0

read data tables from the Cassandra database.

Df = spark.read.format("org.apache.spark.sql.cassandra").options( keyspace="sqlbook", table="table1").load()

Df.show(1)
output:

+---------+------+-------+
|C1|C2|   C3|
+---------+------+-------+
|      cloud|     hadoop|  cloudvikas|

VISIT PREVIOUS SETS ON