PYSPARK Interview Questions

Table of Contents

How to create Spark Project in Pycharm and run any simple program?

Ans: First we have to install Pycharm and python. Follow below steps to complete this task.

Step 1: Download and install Pycharm Community Edition.

https://www.jetbrains.com/pycharm/download/#section=windows

python version : 2.7

Step 2: Create Project PySparkPractice and navigate to settings.

Be ensure that , Interpreter is selected as python 2.7.

Step 3: Create SQLPackege and Create demo file

Run this program and will get output.

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os
import sqlite3

os.environ['HADOOP_HOME']='C:\\hadoop'
spark=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44,com.databricks:spark-xml_2.11:0.4.1').appName('demoapp').getOrCreate()
print(spark)
df = spark.read.format('xml').options(rowTag='contact').load('C:\\Users\\cloudvikas\\Documents\\booknew.xml')
df.show()
df.printSchema()
df2 = df.select('communications.communication.emailid')
df2.show()
df2.coalesce(1).write.saveAsTable('output2', format='parquet', mode='overwrite', path='C:\\Users\\cloudvikas\\PycharmProjects\\Project1\\SQLPackege\\spark-warehouse\\ouput2')
df2.registerTempTable("Students")
result = spark.sql("select * from Students")
print("result")
result.show()
spark.stop()
QUESTION 2:Why do we have to configure winutils as Hadoop path in spark program?

Ans: If we have to apply Hadoop functions in window environment then we have to configure Hadoop path as winutils:

Step 1: write below code in your driver class:

import os
os.environ['HADOOP_HOME']='C:\\hadoop'

Step 2: Download winutils and keep under same path mentioned above:

https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe

If you don’t use this then you can get below error in your program:

Why does Spark application fail with “IOException: (null) entry in command string: null chmod 0644”? [duplicate]

Question 3) How to overwrite files in saveastable command:
df2.coalesce(1).write.saveAsTable('output2', format='parquet', mode='overwrite', path='C:\\Users\\cloudvikas\\PycharmProjects\\Project1\\SQLPackege\\spark-warehouse\\ouput2')

Question 4)How to Launch Jupyter and execute a simple PySpark Program?

Ans: We are going to discuss two ways to implement this.

Way 1:

Step 1: Install python 3.7.5 and run below command to install jupyter in cmd:

pip install jupyter

—first navigate to —

C:\Users\cloudvikas\AppData\Local\Programs\Python\Python37\Scripts

Then execute this command in cmd.

C:\Spark\spark-2.4.5-bin-hadoop2.6\bin>pip install jupyter
Collecting jupyter
  Downloading https://files.pythonhosted.org/packages/83/df/0f5dd132200728a86190397e1ea87cd76244e42d39ec5e88efd25b2abd7e/jupyter-1.0.0-py2.py3-none-any.whl
Collecting qtconsole (from jupyter)
  Downloading https://files.pythonhosted.org/packages/d6/de/2a0bda85367881e27370a206a561326a99fbb05ab9402f4c4ad59761eec4/qtconsole-4.7.1-py2.py3-none-any.whl (117kB)
     |████████████████████████████████| 122kB 939kB/s
Collecting jupyter-console (from jupyter)
  Downloading https://files.pythonhosted.org/packages/0a/89/742fa5a80b552ffcb6a8922712697c6e6828aee7b91ee4ae2b79f00f8401/jupyter_console-6.1.0-py2.py3-none-any.whl
Collecting notebook (from jupyter)
  Downloading https://files.pythonhosted.org/packages/b1/f1/0a67f09ef53a342403ffa66646ee39273e0ac79ffa5de5dbe2f3e28b5bdf/notebook-6.0.3-py3-none-any.whl (9.7MB)
     |████████████████████████████████| 9.7MB 731kB/s
Collecting ipykernel (from jupyter)
  Downloading https://files.pythonhosted.org/packages/d7/62/d1a5d654b7a21bd3eb99be1b59a608cc18a7a08ed88495457a87c40a0495/ipykernel-5.1.4-py3-none-any.whl (116kB)
     |████████████████████████████████| 122kB 1.3MB/s
Collecting ipywidgets (from jupyter)
  Downloading https://files.pythonhosted.org/packages/56/a0/dbcf5881bb2f51e8db678211907f16ea0a182b232c591a6d6f276985ca95/ipywidgets-7.5.1-py2.py3-none-any.whl (121kB)
     |████████████████████████████████| 122kB 1.1MB/s
Collecting nbconvert (from jupyter)
  Downloading https://files.pythonhosted.org/packages/79/6c/05a569e9f703d18aacb89b7ad6075b404e8a4afde2c26b73ca77bb644b14/nbconvert-5.6.1-py2.py3-none-any.whl (455kB)
     |████████████████████████████████| 460kB 1.3MB/s
Collecting jupyter-core (from qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/63/0d/df2d17cdf389cea83e2efa9a4d32f7d527ba78667e0153a8e676e957b2f7/jupyter_core-4.6.3-py2.py3-none-any.whl (83kB)
     |████████████████████████████████| 92kB 2.0MB/s
Collecting ipython-genutils (from qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/fa/bc/9bd3b5c2b4774d5f33b2d544f1460be9df7df2fe42f352135381c347c69a/ipython_genutils-0.2.0-py2.py3-none-any.whl
Collecting traitlets (from qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/ca/ab/872a23e29cec3cf2594af7e857f18b687ad21039c1f9b922fac5b9b142d5/traitlets-4.3.3-py2.py3-none-any.whl (75kB)
     |████████████████████████████████| 81kB 2.6MB/s
Collecting pygments (from qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/2d/68/106af3ae51daf807e9cdcba6a90e518954eb8b70341cee52995540a53ead/Pygments-2.6.1-py3-none-any.whl (914kB)
     |████████████████████████████████| 921kB 1.6MB/s
Collecting qtpy (from qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/cd/fd/9972948f02e967b691cc0ca1f26124826a3b88cb38f412a8b7935b8c3c72/QtPy-1.9.0-py2.py3-none-any.whl (54kB)
     |████████████████████████████████| 61kB 2.0MB/s
Collecting jupyter-client>=4.1 (from qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/40/75/4c4eb43749e59db3c1c7932b50eaf8c4b8219b1b5644fe379ea796f8dbe5/jupyter_client-6.0.0-py3-none-any.whl (104kB)
     |████████████████████████████████| 112kB 3.3MB/s
Collecting ipython (from jupyter-console->jupyter)
  Downloading https://files.pythonhosted.org/packages/61/6f/69f1eec859ce48a86660529b166b6ea466f0f4ab98e4fc0807b835aa22c6/ipython-7.13.0-py3-none-any.whl (780kB)
     |████████████████████████████████| 788kB 1.7MB/s
Collecting prompt-toolkit!=3.0.0,!=3.0.1,<3.1.0,>=2.0.0 (from jupyter-console->jupyter)
  Downloading https://files.pythonhosted.org/packages/ab/29/d744cee89937b7e52a5c20ca237a6c77298f757965eb3eb0c653df1bfb14/prompt_toolkit-3.0.4-py3-none-any.whl (351kB)
     |████████████████████████████████| 358kB 1.1MB/s
Collecting prometheus-client (from notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/b3/23/41a5a24b502d35a4ad50a5bb7202a5e1d9a0364d0c12f56db3dbf7aca76d/prometheus_client-0.7.1.tar.gz
Collecting Send2Trash (from notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/49/46/c3dc27481d1cc57b9385aff41c474ceb7714f7935b1247194adae45db714/Send2Trash-1.5.0-py3-none-any.whl
Collecting pyzmq>=17 (from notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/26/35/d50bbc114c4da8a344efedc82eeb010b6eb2cc1906ccf89c725259a0d0fe/pyzmq-19.0.0-cp38-cp38-win32.whl (910kB)
     |████████████████████████████████| 911kB 1.1MB/s
Collecting jinja2 (from notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/27/24/4f35961e5c669e96f6559760042a55b9bcfcdb82b9bdb3c8753dbe042e35/Jinja2-2.11.1-py2.py3-none-any.whl (126kB)
     |████████████████████████████████| 133kB 3.2MB/s
Collecting nbformat (from notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/ac/eb/de575b7a64e7ab8d8c95e4c180ccc36deda3f1379186c4ee7adf6c2f1586/nbformat-5.0.4-py3-none-any.whl (169kB)
     |████████████████████████████████| 174kB 3.2MB/s
Collecting terminado>=0.8.1 (from notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/ff/96/1d9a2c23990aea8f8e0b5c3b6627d03196a73771a17a2d9860bbe9823ab6/terminado-0.8.3-py2.py3-none-any.whl
Collecting tornado>=5.0 (from notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/30/38/91349845eea916575ddbe59a08866de3661b1e4564d42a845545066d51c4/tornado-6.0.4-cp38-cp38-win32.whl (416kB)
     |████████████████████████████████| 419kB 3.3MB/s
Collecting widgetsnbextension~=3.5.0 (from ipywidgets->jupyter)
  Downloading https://files.pythonhosted.org/packages/6c/7b/7ac231c20d2d33c445eaacf8a433f4e22c60677eb9776c7c5262d7ddee2d/widgetsnbextension-3.5.1-py2.py3-none-any.whl (2.2MB)
     |████████████████████████████████| 2.2MB 1.1MB/s
Collecting mistune<2,>=0.8.1 (from nbconvert->jupyter)
  Downloading https://files.pythonhosted.org/packages/09/ec/4b43dae793655b7d8a25f76119624350b4d65eb663459eb9603d7f1f0345/mistune-0.8.4-py2.py3-none-any.whl
Collecting defusedxml (from nbconvert->jupyter)
  Downloading https://files.pythonhosted.org/packages/06/74/9b387472866358ebc08732de3da6dc48e44b0aacd2ddaa5cb85ab7e986a2/defusedxml-0.6.0-py2.py3-none-any.whl
Collecting testpath (from nbconvert->jupyter)
  Downloading https://files.pythonhosted.org/packages/1b/9e/1a170feaa54f22aeb5a5d16c9015e82234275a3c8ab630b552493f9cb8a9/testpath-0.4.4-py2.py3-none-any.whl (163kB)
     |████████████████████████████████| 174kB 652kB/s
Collecting entrypoints>=0.2.2 (from nbconvert->jupyter)
  Downloading https://files.pythonhosted.org/packages/ac/c6/44694103f8c221443ee6b0041f69e2740d89a25641e62fb4f2ee568f2f9c/entrypoints-0.3-py2.py3-none-any.whl
Collecting pandocfilters>=1.4.1 (from nbconvert->jupyter)
  Downloading https://files.pythonhosted.org/packages/4c/ea/236e2584af67bb6df960832731a6e5325fd4441de001767da328c33368ce/pandocfilters-1.4.2.tar.gz
Collecting bleach (from nbconvert->jupyter)
  Downloading https://files.pythonhosted.org/packages/6e/87/f88b0e33914420fe11fe8b820c10d045a342c2a015e79ad8309de4bba820/bleach-3.1.1-py2.py3-none-any.whl (150kB)
     |████████████████████████████████| 153kB 819kB/s
Collecting pywin32>=1.0; sys_platform == "win32" (from jupyter-core->qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/9f/cb/d693f7cdaed51d83bfec5c77a9fa895c1a5a18cf242ba53b06b98ee366dd/pywin32-227-cp38-cp38-win32.whl (8.4MB)
     |████████████████████████████████| 8.4MB 504kB/s
Collecting decorator (from traitlets->qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/ed/1b/72a1821152d07cf1d8b6fce298aeb06a7eb90f4d6d41acec9861e7cc6df0/decorator-4.4.2-py2.py3-none-any.whl
Collecting six (from traitlets->qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/65/eb/1f97cb97bfc2390a276969c6fae16075da282f5058082d4cb10c6c5c1dba/six-1.14.0-py2.py3-none-any.whl
Collecting python-dateutil>=2.1 (from jupyter-client>=4.1->qtconsole->jupyter)
  Downloading https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl (227kB)
     |████████████████████████████████| 235kB 409kB/s
Collecting colorama; sys_platform == "win32" (from ipython->jupyter-console->jupyter)
  Downloading https://files.pythonhosted.org/packages/c9/dc/45cdef1b4d119eb96316b3117e6d5708a08029992b2fee2c143c7a0a5cc5/colorama-0.4.3-py2.py3-none-any.whl
Collecting pickleshare (from ipython->jupyter-console->jupyter)
  Downloading https://files.pythonhosted.org/packages/9a/41/220f49aaea88bc6fa6cba8d05ecf24676326156c23b991e80b3f2fc24c77/pickleshare-0.7.5-py2.py3-none-any.whl
Collecting backcall (from ipython->jupyter-console->jupyter)
  Downloading https://files.pythonhosted.org/packages/84/71/c8ca4f5bb1e08401b916c68003acf0a0655df935d74d93bf3f3364b310e0/backcall-0.1.0.tar.gz
Requirement already satisfied: setuptools>=18.5 in c:\users\cloudvikas\appdata\local\programs\python\python38-32\lib\site-packages (from ipython->jupyter-console->jupyter) (41.2.0)
Collecting jedi>=0.10 (from ipython->jupyter-console->jupyter)
  Downloading https://files.pythonhosted.org/packages/01/67/333e2196b70840f411fd819407b4e98aa3150c2bd24c52154a451f912ef2/jedi-0.16.0-py2.py3-none-any.whl (1.1MB)
     |████████████████████████████████| 1.1MB 2.2MB/s
Collecting wcwidth (from prompt-toolkit!=3.0.0,!=3.0.1,<3.1.0,>=2.0.0->jupyter-console->jupyter)
  Downloading https://files.pythonhosted.org/packages/58/b4/4850a0ccc6f567cc0ebe7060d20ffd4258b8210efadc259da62dc6ed9c65/wcwidth-0.1.8-py2.py3-none-any.whl
Collecting MarkupSafe>=0.23 (from jinja2->notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/93/b8/95b1c38f5b00ed2c0d16cf65f2b07a5ae73eeacf66d2010c0e934737d1d9/MarkupSafe-1.1.1-cp38-cp38-win32.whl
Collecting jsonschema!=2.5.0,>=2.4 (from nbformat->notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/c5/8f/51e89ce52a085483359217bc72cdbf6e75ee595d5b1d4b5ade40c7e018b8/jsonschema-3.2.0-py2.py3-none-any.whl (56kB)
     |████████████████████████████████| 61kB 787kB/s
Collecting pywinpty>=0.5; os_name == "nt" (from terminado>=0.8.1->notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/5d/97/8e43c2152a638cdb83d45644eb125c752abe67249f94bb3e3e29b0709685/pywinpty-0.5.7.tar.gz (49kB)
     |████████████████████████████████| 51kB 1.7MB/s
Collecting webencodings (from bleach->nbconvert->jupyter)
  Downloading https://files.pythonhosted.org/packages/f4/24/2a3e3df732393fed8b3ebf2ec078f05546de641fe1b667ee316ec1dcf3b7/webencodings-0.5.1-py2.py3-none-any.whl
Collecting parso>=0.5.2 (from jedi>=0.10->ipython->jupyter-console->jupyter)
  Downloading https://files.pythonhosted.org/packages/da/15/888f80e429a971d3838124adde705d7b07650aa3a59f4db07d61f653b8cd/parso-0.6.2-py2.py3-none-any.whl (97kB)
     |████████████████████████████████| 102kB 1.1MB/s
Collecting attrs>=17.4.0 (from jsonschema!=2.5.0,>=2.4->nbformat->notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/a2/db/4313ab3be961f7a763066401fb77f7748373b6094076ae2bda2806988af6/attrs-19.3.0-py2.py3-none-any.whl
Collecting pyrsistent>=0.14.0 (from jsonschema!=2.5.0,>=2.4->nbformat->notebook->jupyter)
  Downloading https://files.pythonhosted.org/packages/90/aa/cdcf7ef88cc0f831b6f14c8c57318824c9de9913fe8de38e46a98c069a35/pyrsistent-0.15.7.tar.gz (107kB)
     |████████████████████████████████| 112kB 939kB/s
Installing collected packages: pywin32, decorator, ipython-genutils, six, traitlets, jupyter-core, tornado, wcwidth, prompt-toolkit, colorama, pygments, pickleshare, backcall, parso, jedi, ipython, pyzmq, python-dateutil, jupyter-client, ipykernel, qtpy, qtconsole, jupyter-console, prometheus-client, Send2Trash, MarkupSafe, jinja2, attrs, pyrsistent, jsonschema, nbformat, pywinpty, terminado, mistune, defusedxml, testpath, entrypoints, pandocfilters, webencodings, bleach, nbconvert, notebook, widgetsnbextension, ipywidgets, jupyter
  Running setup.py install for backcall ... done
  Running setup.py install for prometheus-client ... done
  Running setup.py install for pyrsistent ... done
  Running setup.py install for pywinpty ... done
  Running setup.py install for pandocfilters ... done
Successfully installed MarkupSafe-1.1.1 Send2Trash-1.5.0 attrs-19.3.0 backcall-0.1.0 bleach-3.1.1 colorama-0.4.3 decorator-4.4.2 defusedxml-0.6.0 entrypoints-0.3 ipykernel-5.1.4 ipython-7.13.0 ipython-genutils-0.2.0 ipywidgets-7.5.1 jedi-0.16.0 jinja2-2.11.1 jsonschema-3.2.0 jupyter-1.0.0 jupyter-client-6.0.0 jupyter-console-6.1.0 jupyter-core-4.6.3 mistune-0.8.4 nbconvert-5.6.1 nbformat-5.0.4 notebook-6.0.3 pandocfilters-1.4.2 parso-0.6.2 pickleshare-0.7.5 prometheus-client-0.7.1 prompt-toolkit-3.0.4 pygments-2.6.1 pyrsistent-0.15.7 python-dateutil-2.8.1 pywin32-227 pywinpty-0.5.7 pyzmq-19.0.0 qtconsole-4.7.1 qtpy-1.9.0 six-1.14.0 terminado-0.8.3 testpath-0.4.4 tornado-6.0.4 traitlets-4.3.3 wcwidth-0.1.8 webencodings-0.5.1 widgetsnbextension-3.5.1
WARNING: You are using pip version 19.2.3, however version 20.0.2 is available.
You should consider upgrading via the 'python -m pip install --upgrade pip' command.

Once it is installed then we can navigate Step 2.

Step 2: Run Below command in jupyter notebook.

Navigate to Python->Scripts folder and open cmd.

Run jupyter notebook command in cmd.

C:\Python27\Scripts>jupyter notebook
[I 10:43:00.847 NotebookApp] Writing notebook server cookie secret to C:\Users\cloudvikas\AppData\Roaming\jupyter\runtime\notebook_cookie_secret
[W 10:43:01.988 NotebookApp] Terminals not available (error was No module named 'winpty.cywinpty')
[I 10:43:01.989 NotebookApp] Serving notebooks from local directory: C:\Python27\Scripts
[I 10:43:01.989 NotebookApp] The Jupyter Notebook is running at:
[I 10:43:01.989 NotebookApp] http://localhost:8888/?token=e20c5af58e28aef74db963a4e3169042d1a54fa71061fa87
[I 10:43:01.989 NotebookApp]  or 
[I 10:43:01.989 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 10:43:02.158 NotebookApp]

    To access the notebook, open this file in a browser:
        file:///C:/Users/cloudvikas/AppData/Roaming/jupyter/runtime/nbserver-12400-open.html
    Or copy and paste one of these URLs:
        http://localhost:8888/?token=e20c5af58e28aef74db963a4e3169042d1a54fa71061fa87
     or 

Step 3:

We can see url link . it opens jupyter notebook in browser.

Launch Jupiter and login with url in browser.

Create New Python3 project and execute pyspark program.

Select python3.

sparksession will be active.

Way 2: Through Anaconda:

Step 1: Install Gnu On Windows

1. Download and install Gnu On Windows (Gow) from https://github.com/bmatzelle/gow/releases/download/v0.8.0/Gow-0.8.0.exe. Select the default options when prompted during the installation of Gow.

Step 2: Install Anaconda and Jupyter Notebook

  1. Downloads and install Anaconda.

Select python 3.7 version.

Once it is downloaded then downloaded file name looks like:

Anaconda3-2020.02-Windows-x86_64

Select the both options when prompted during the installation of Anaconda.

  1. Open “Anaconda Prompt” by finding it in the Windows (Start) Menu.

Step 3: Anaconda with Jupyter notebook

Install conda findspark, to access spark instance from jupyter notebook. Check current installation in Anaconda cloud. In time of writing:

conda install -c conda-forge findspark
(base) C:\Users\cloudvikas>conda install -c conda-forge findspark
Collecting package metadata (current_repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: C:\Users\cloudvikas\Anaconda3

  added / updated specs:
    - findspark


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    certifi-2019.11.28         |           py37_0         148 KB  conda-forge
    conda-4.8.3                |   py37hc8dfbb8_0         3.1 MB  conda-forge
    findspark-1.3.0            |             py_1           6 KB  conda-forge
    python_abi-3.7             |          1_cp37m           4 KB  conda-forge
    ------------------------------------------------------------
                                           Total:         3.2 MB

The following NEW packages will be INSTALLED:

  findspark          conda-forge/noarch::findspark-1.3.0-py_1
  python_abi         conda-forge/win-64::python_abi-3.7-1_cp37m

The following packages will be UPDATED:

  conda                       pkgs/main::conda-4.8.2-py37_0 --> conda-forge::conda-4.8.3-py37hc8dfbb8_0

The following packages will be SUPERSEDED by a higher-priority channel:

  certifi                                         pkgs/main --> conda-forge


Proceed ([y]/n)? y


Downloading and Extracting Packages
python_abi-3.7       | 4 KB      | ################################################################################################################################ | 100%
findspark-1.3.0      | 6 KB      | ################################################################################################################################ | 100%
certifi-2019.11.28   | 148 KB    | ################################################################################################################################ | 100%
conda-4.8.3          | 3.1 MB    | ################################################################################################################################ | 100%
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

(base) C:\Users\cloudvikas>

Step 4: Download and Install Spark

Go to Spark home page, and download the .tgz file from 2.3.2 version from

apache spark download page

Extract the file to your chosen directory (7z can open tgz). In my case, it was C:\spark. There is another compressed directory in the tar, extract it (into here) as well.

Setup the environment variables

SPARK_HOME  = C:\spark\spark-2.3.2-bin-hadoop2.7
HADOOP_HOME = C:\spark\spark-2.3.2-bin-hadoop2.7

Add the following path to PATH environment variable:

C:\spark\spark-2.3.2-bin-hadoop2.7\bin

Step 5: Download and setup winutils.exe

In hadoop binaries repository, https://github.com/steveloughran/winutils choose your hadoop version, then goto bin, and download the winutils.exe file. In my case: https://github.com/steveloughran/winutils/blob/master/hadoop-2.7.1/bin/winutils.exe

Save winutils.exe in to bin directory of your spark installation, SPARK_HOME\bin directory. In my case: C:\spark\spark-2.3.2-bin-hadoop2.7\bin.

Step 6: PySpark with Jupyter notebook

Install conda findspark, to access spark instance from jupyter notebook. Check current installation in Anaconda cloud. In time of writing:

conda install -c conda-forge findspark

Open your python jupyter notebook, and write inside:

import findspark
findspark.init()findspark.find()
import pyspark
findspark.find()

Last line will output SPARK_HOME path. It’s just for test, you can delete it.

Run below commands:

In this way, we can setup jupyter using Anaconda and execute simple pyspark program.

QUESTION 5:How to connect mysql database through jupyter notebook?

Prerequisite:

Install mysql workbench.

https://mysql-workbench.en.uptodown.com/windows/download/269553

after downloading , file name will be :

mysql-workbench-6-3-4-64-bit-multi-win

To install this we have to install below software as well:

https://visualstudio.microsoft.com/downloads/?q=Visual+C%2B%2B+Redistributable+for+Visual+Studio+2019

or you can ignore if you have done already.

once its installed then open it.

************************************************************

Run below code in jupyter:

from pyspark.sql import SparkSession
#creation of sparksession driver process
Sparkdriver =SparkSession.builder.master(‘local’).\
config(‘spark.jars.packages’,’mysql:mysql-connector-java:5.1.44’).\
appName(‘cloudvikas’).getOrCreate()
#SparkDriver
Df1=sparkdriver.read.format(‘jdbc’).\
option(‘url’,’jdbc:mysql://localhost:3306’).\
option(‘driver’,’com.mysql.jdbc.Driver’).\
option(‘user’,’root’).\
option(‘password’,’cloudvikas’)
option(‘dbtable’,’dbsample.cloudvikastable’).\
load()
df1.printSchema()
df1.show(5)

QUESTION:6 How will you read json file in pyspark?

Ans:

Step 1: Create any json file:

[{
  "id": 1,
  "first_name": "Jeanette",
  "last_name": "Penddreth",
  "email": "vikas1",
  "gender": "Female",
  "ip_address": "26.58.193.2"
}, {
  "id": 2,
  "first_name": "Giavani",
  "last_name": "Frediani",
  "email": "vikas2",
  "gender": "Male",
  "ip_address": "229.179.4.212"
}, {
  "id": 3,
  "first_name": "Noell",
  "last_name": "Bea",
  "email": "vikas3",
  "gender": "Female",
  "ip_address": "180.66.162.255"
}, {
  "id": 4,
  "first_name": "Willard",
  "last_name": "Valek",
  "email": "vikas4",
  "gender": "Male",
  "ip_address": "67.76.188.26"
}]

Step 2: Write Spark SQL code:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import os
import sqlite3


os.environ['HADOOP_HOME']='C:\\hadoop'
spark=SparkSession.builder.master('local').config('spark.jars.packages','mysql:mysql-connector-java:5.1.44,com.databricks:spark-xml_2.11:0.4.1').appName('demoapp').getOrCreate()
print(spark)

# A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files
Path="C:\\Users\\cloudvikas\\Downloads\\example_2.json"
df1=spark.read.json(path)

# The inferred schema can be visualized using the printSchema() method

df1.printSchema()

# Creates a temporary view using the DataFrame

df1.createOrReplaceTempView("student")
# SQL statements can be run by using the sql methods provided by spark

data1=spark.sql("select * from student")
data1.show()

Step 3: Execute this script :

We are executing in pycharm:

There is an issue found while executing:

C:\Users\cloudvikas\.conda\envs\Project1\python.exe C:/Users/cloudvikas/PycharmProjects/Project1/SQLPackege/jsonread.py
Ivy Default Cache set to: C:\Users\cloudvikas\.ivy2\cache
The jars for the packages stored in: C:\Users\cloudvikas\.ivy2\jars
:: loading settings :: url = jar:file:/C:/opt/spark/spark-2.4.5-bin-hadoop2.6/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
mysql#mysql-connector-java added as a dependency
com.databricks#spark-xml_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-352043a3-f1c0-486c-828b-1e428f6afe60;1.0
	confs: [default]
	found mysql#mysql-connector-java;5.1.44 in central
	found com.databricks#spark-xml_2.11;0.4.1 in central
:: resolution report :: resolve 363ms :: artifacts dl 5ms
	:: modules in use:
	com.databricks#spark-xml_2.11;0.4.1 from central in [default]
	mysql#mysql-connector-java;5.1.44 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-352043a3-f1c0-486c-828b-1e428f6afe60
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/13ms)
20/03/19 09:39:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/19 09:39:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
<pyspark.sql.session.SparkSession object at 0x000001A3CA613548>
20/03/19 09:39:48 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
root
 |-- _corrupt_record: string (nullable = true)

Traceback (most recent call last):
  File "C:\Users\cloudvikas\.conda\envs\Project1\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\cloudvikas\.conda\envs\Project1\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o47.showString.
: org.apache.spark.sql.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().;
	at org.apache.spark.sql.execution.datasources.json.JsonFileFormat.buildReader(JsonFileFormat.scala:120)
	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:/Users/cloudvikas/PycharmProjects/Project1/SQLPackege/jsonread.py", line 14, in <module>
    data1.show()
  File "C:\Users\cloudvikas\.conda\envs\Project1\lib\site-packages\pyspark\sql\dataframe.py", line 380, in show
    print(self._jdf.showString(n, 20, vertical))
  File "C:\Users\cloudvikas\.conda\envs\Project1\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\cloudvikas\.conda\envs\Project1\lib\site-packages\pyspark\sql\utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the\nreferenced columns only include the internal corrupt record column\n(named _corrupt_record by default). For example:\nspark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()\nand spark.read.schema(schema).json(file).select("_corrupt_record").show().\nInstead, you can cache or save the parsed results and then send the same query.\nFor example, val df = spark.read.schema(schema).json(file).cache() and then\ndf.filter($"_corrupt_record".isNotNull).count().;'

Process finished with exit code 1

Issue is related to json file.

Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column

We must keep data in single line when we are using json file.

Reason:

Spark while processing json data considers each new line as a complete json. Thus it is failing.

You should keep your complete json in a single line in a compact form by removing all white spaces and newlines.

Like

{"a":{"b":1}}

If you want multiple jsons in a single file keep them like this

{"a":{"b":1}}
{"a":{"b":2}}
{"a":{"b":3}} 

Lets update file in such way:

Through notepad++ -> EDIT option->Blank operations.


[{ "id": 1, "first_name": "Jeanette", "last_name": "Penddreth", "email": "vikas1", "gender": "Female", "ip_address": "26.58.193.2" }, { "id": 2, "first_name": "Giavani", "last_name": "Frediani", "email": "vikas2", "gender": "Male", "ip_address": "229.179.4.212" }, { "id": 3, "first_name": "Noell", "last_name": "Bea", "email": "vikas3", "gender": "Female", "ip_address": "180.66.162.255" }, { "id": 4, "first_name": "Willard", "last_name": "Valek", "email": "vikas4", "gender": "Male", "ip_address": "67.76.188.26" }]

Step 3: Execute same script again:

Script is executed successfully and got output:

C:\Users\cloudvikas\.conda\envs\Project1\python.exe C:/Users/cloudvikas/PycharmProjects/Project1/SQLPackege/jsonread.py
Ivy Default Cache set to: C:\Users\cloudvikas\.ivy2\cache
The jars for the packages stored in: C:\Users\cloudvikas\.ivy2\jars
:: loading settings :: url = jar:file:/C:/opt/spark/spark-2.4.5-bin-hadoop2.6/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
mysql#mysql-connector-java added as a dependency
com.databricks#spark-xml_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dbcd4693-5986-4263-82fc-61708f7c5df3;1.0
	confs: [default]
	found mysql#mysql-connector-java;5.1.44 in central
	found com.databricks#spark-xml_2.11;0.4.1 in central
:: resolution report :: resolve 295ms :: artifacts dl 4ms
	:: modules in use:
	com.databricks#spark-xml_2.11;0.4.1 from central in [default]
	mysql#mysql-connector-java;5.1.44 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-dbcd4693-5986-4263-82fc-61708f7c5df3
	confs: [default]
	0 artifacts copied, 2 already retrieved (0kB/11ms)
20/03/19 09:51:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/03/19 09:51:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
<pyspark.sql.session.SparkSession object at 0x000001DCFA163508>
20/03/19 09:51:45 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
root
 |-- email: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- id: long (nullable = true)
 |-- ip_address: string (nullable = true)
 |-- last_name: string (nullable = true)

+------+----------+------+---+--------------+---------+
| email|first_name|gender| id|    ip_address|last_name|
+------+----------+------+---+--------------+---------+
|vikas1|  Jeanette|Female|  1|   26.58.193.2|Penddreth|
|vikas2|   Giavani|  Male|  2| 229.179.4.212| Frediani|
|vikas3|     Noell|Female|  3|180.66.162.255|      Bea|
|vikas4|   Willard|  Male|  4|  67.76.188.26|    Valek|
+------+----------+------+---+--------------+---------+


Process finished with exit code 0

Finally we have completed this solution and came to know:

Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using SparkSession.read.json on a JSON file.

Note that the file that is offered as a json file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object.For a regular multi-line JSON file, set the multiLine parameter to True.

Question:7 How to store output to mysql table?

Run below command:

First we have to create dataframe with some columns(column count depends on schema of output). Then we have to write dataframe data into mysql table.Currently I am assuming two columns.

Df_local2 = df_local.select(‘column1’,’column2’)
df_local2.write.format(‘jdbc’).\
option(‘url’,’jdbc:mysql://localhost:3306’).\
option(‘driver’,’com.mysql.jdbc.driver’).\
option(‘user’,’root’).\
option(‘password’,’vikas’).\
option(‘dbtable’,’dbsample.table).
save()

When will you use Batch Analytics?

Batch Analytics
• Analytics based on the data collected over a period of time is called as Batch Analytics.
• Batch processing jobs can take hours, or perhaps even days.
• Batch processing involves three separate processes.

Examples of batch processing are:

  1. Payroll processing
  2. Billing activities
    • Apache Hadoop is a great tool for batch analytics.

What is Real Time Analytics?

• Analytics based on the immediate data for instant results is called as Real-Time Analytics.
• Real-time Analytics requires a continual input, constant processing, and steady output of data.
• Examples of real-time processing data streaming,

Radar systems
where immediate processing is crucial to make the system work properly.
• Apache Spark, Apache Storm are great tools for real-time analytics.

What is Spark?


• Spark is an open-source framework implemented by Apache Software Foundation.
• Spark is a general-purpose, cluster, an in-memory computing system for real-time processing
• Spark is used for faster data analytics.
• Spark provides an API to program the entire cluster with an implicit data parallelism and fault- tolerance.
• Spark was built on top of Hadoop MapReduce.
• Spark is not intended to replace Hadoop but it is an extension of Hadoop.
• MapReduce and Spark can be used together where MapReduce is used for Batch Processing and Spark is used for Real-Time Processing.

  1. In-Memory Computation
  2. Speed
    • Run programs up to lOOx faster than Hadoop MapReduce in memory, or lOx faster on disk.
  3. Real Time Processing
    • Real time computation and low latency because of in-memory computation.
  4. Polyglot
    • Write applications quickly in Java, Scala, Python, R.
  5. Lazy Evaluation
    • Delays evaluation till needed.
  6. Deployment
    • Spark runs on Hadoop, Mesos, standalone, or in the cloud. It can access diverse data sources includingHDFS, Cassandra, HBase, and S3.

What is SparkContext?
• A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
• The SparkContext allows your Spark driver application to access the cluster through a resource manager. The resource manager can be YARN, or Spark’s cluster manager.
• Only one SparkContext may be active per JVM.
• In Spark shell, SparkContext is already created for the user with the name sc.

What is RDD?


RDD stands for Resilient Distributed Datasets.
• Resilient because RDDs are immutable(can’t be modified once created).
• Distributed because it is distributed across clusters.
• Dataset because it holds data.
• RDD is the main abstraction or logical data unit in Spark programming.
• RDDs are collections of records with the following properties:
• immutable
• Partitioned
• Fault-tolerant
• Lazily evaluated
• Can be persisted
• Created by coarse-grained operations

What is Directed Acyclic Graph?

• DAG is a graph data structure that has an edge that is directional and does not have any loops or cycles.
• DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD.

What is Transformation and Actions?

• Transformations are lazy operations on an RDD that create one or many new RDDs.
• Transformations are functions that take an RDD as the input and produce one or many RDDs as the output.
• Transformations are lazy, i.e. are not executed immediately. Only after calling an action, transformations are executed.

• Actions are RDD operations that produce non-RDD values.
• Actions trigger execution using lineage graph to load the real data into RDD.
• Actions trigger the execution of RDD transformations to return values.
• The values of action are stored to drivers or to the external storage system. It brings the laziness of RDD into motion.

Explain Lineage and Lazy Evaluations.

RDD Lineage is also called an RDD operator graph or RDD dependency graph.
• When we create new RDDs based on the existing RDDs, Spark manages these dependencies using Lineage Graph.
• RDD Lineage is built as a result of applying transformations to the RDD and creates a logical execution plan.

• Lazy evaluation is an evaluation strategy which delays the evaluation of an expression until its value is needed.
• Lazy Evaluation helps to optimize the Disk & Memory Usage in Spark.
• Transformations are lazy in nature so when we call some transformation in RDD, it does not execute immediately.
• The execution will be started only when the action is triggered.

Benefits of Lazy evaluation:
• Increases Manageability
• Saves Computation and increases Speed
• Reduces time and space Complexities

Write wordcount program using Python?

file = sc.textFile(“spark/Lab1/hello.txt”);

file.collect()

words=file.flatMap(lambda line :line.splït(” “));

words.collect()

wordNum=words.map(lambda word : (word,1));

wordNum.colIect()

wordCount= wordNum.reduceByKey(lambda sum,b : sum+b);

wordCount.coIlect()

sorted=wordCount.sortByKey ();

sorted.collect()

descSorted=wordCount.sortByKey(False);

descSorted.collect()

sorted = sc.textFiIe(“spark/Lab1/hello.txt”)

      .flatMaplambda line:line.split{” “))

      .map(lambda word:(word,l))

      .reduceByKey(Iambda sum,b:sum+b)

      .sortByKey();

descSorted = sc.textFile(“spark/Lab1/helIo.txt”)

      .flatMap(lambda line : line.split(” “))

      .map(lambda word : (word,l))

      .reduceByKey(lambda sum,b : sum+b)

      .sortByKey(False)

What is RDD and how it can be created?

RDD stands for Resilient Distributed Datasets.

  • Resilient because RDDs are immutable(can’t be modified once created).
  • Distributed because it is distributed across cluster.
  • Dataset because it holds data.

RDD is main abstraction or logical data unit in Spark programming.

RDDs are collections of records with following properties:

  • Immutable
  • Partitioned
  • Fault tolerant
  • Lazily evaluated
  • Can be persisted
  • Created by coarse grained operations

Creating an RDD

RDD can be created in two ways:

  1. From an external file
  2. By parallelizing collections

1) From an external file

            Val rdd= sc.textfile(“cloud.txt”)

2) By parallelizing collections

                        Val lines= sc.parallelize(List(10,20,30,40,10))

3) Through different RDD

Explain Action and Transformations:-

There are two types of operations that you can perform on an RDD

  1. Transformations
  2. Actions

1) Transformation

Transformation applies some function on a RDD and creates a new RDD. It does not modify the RDD when you apply the function on the RDD. There are two kinds of transformations:

  • Narrow transformations:

o When each partition of the parent RDD is used by at most one partition of the child RDD then this transformation is called as Narrow transfromation. All the elements that are required to compute the records in single partition live in the single partition of parent RDD.

o Narrow transfromation doesn’t required to distribute the data across the partitions Ex:

flatMap, map, filter, etc

  • Wide transformation

2) Action

  • The action applies some function on an RDD and produces non-RDD values,
  • Actions trigger execution using lineage graph.
  • An Action is used to either save a result to some location or to display it.
Explain RDD Lineage Graph
  • When you apply a transformation, Spark creates a lineage.
  • A lineage keeps track of all the transformations has to be applied on that RDD, including from where it has to read the data.

Val rdd1= sc.parallelize(1 to10)

Val rdd2= rdd1.filter(n=>n%2==0)

Rdd2.count()

  • sc.parallelize() and rddl.filter() do not get executed immediately. It will only get executed once we call an Action on the RDD called rdd2.count().
  • The lineage graph helps Spark to recompute any intermediate RDD in case of failures. This way spark achieves fault tolerance.
Explain Pair RDDs:-
  • RDDs, which contain the key/value pairs, are called as pair RDDs.
  • These RDDs expose operations that allow us to act on each key in parallel or regroup data across the network, eg. reduceByKeyQ, join(), etc.
  • There are a number of ways in which the pair RDD can be created. Spark provides some APIs for loading the data which return the pair RDDs.
Explain Partitions


• Partition is a basic unit of parallelism in a RDD.
• In Hadoop, we divide data set into multiple blocks and store them in different data nodes ,in Spark, we divide the RDD into multiple partitions and store them in worker nodes (datanodes) which are computed in parallel across all the nodes.
• In Hadoop, we need to replicate the data for fault tolerance, but in Spark, replication is not required because this is performed by RDDs.
• Spark automatically decides the number of partitons required and divides the RDD into partitions.
Val rdd1= sc.texFile(“cloud.txt”,3)

        Val rdd3= sc.parallelize(1 to 115,4)

• we can also specify the number of partitions required when we create the RDD.

• When we execute some action then the task will be launched per partition. It means that more number of partitions, the more parallelism.
• Partitions never span over multiple machines, i.e., all tuples in the same partition are guaranteed to be on the same machine.
• Each machine in the cluster can contain one or more partitions.
• The total number of partitions are configurable. By default, it equals the total number of cores on all executor nodes.
we can access the number of partitions created as follows: rdd.partitions.size
rdd.getNuinPartitions
• we can access the partitioner information as follows:
• rdd.partitioner
• we can access the data from the partitions as follows:
• rdd.glom.collect

EX1:-
val rdd3 = sc.parallelize(l to 20,3)
rdd3.partitions.size
rdd3.getNumPardtions
rdd3.partitioner
rdd3.glom.collect
rdd3.saveAsTextFile(user/output2)

Types of Partitions

  1. HashPartitioner
  2. RangePartitioner

HashPartitioner

  • It uses the hashCode() method to determine the partition in spark.
  • So based on this hashcode() concept HashPartitioner will divide the keys that have the same hash code ().

partition = key.hashCode() % numPartitions

RangePartitioner

  • It uses a range to distribute the data to the respective partition based on the range in which a key falls.

•           When there are sortable records, then range partition will divide the records almost in equal ranges.

•           RangePartitioner will sort the records based on the key first and then it will divide the records into a number of partitions based on the given value.

3) Custom Partitioners

            •           We can define our own partitioner class also.

            •           When we want to implement a custom partitioner, we need to follow the following steps:

                        i.          We need to extend the org.apache.spark.Partitioner class

                        ii.         We need to override the three methods which are:

            •           numPartitions : Int : returns the number of partitions

            •           getPartitions(key : Any) : Int : returns the partition 10 (0 to numPartitions-1) for the given key

            •           equalsf) : To ensure that whether correct partitioner object is used or not.

FLATMAP AND MAP

flatMap()

              •            In flatMap() function, for each input element it produces one or more elements in the output RDD.

              •            Map and flatMap are similar in the way that they take a element from input RDD and apply a function on that element.

              •            The difference between map and flatMap is map returns only one element, while flatMap can return a list of elements.

flatMapValues()

              •            ft works for Paired RDD.

              •            The flatMapValues() is similar to flatMap but it performs the specified operation values.

                             val rddl = sc.parallelize(Array((l,2), (3,4), (5,6)])

filter()

              •            filter () function returns a new RDD that contains only the elements that meet a predicate/condition.

groupBy()

              •            The data are grouped according to the result of the expression specified.

groupByKey()

              •            It works for Paired RDI).

              •            The data is grouped according to the key of paired RDD.

sortBY()

              •            The data is sorted according to the result of the expression specified.

sortByKey()

              •            It works for Paired RDD.

              •            The data is sorted according to the key of the paired RDD.

reduceBykey()

              •            It works for Paired RDD.

              •            It is used to combine the values with the same key, before the data is shuffled.

mapPartitipns()

              •            The map () iterates over every partition in RDD and returns new RDD.

mapPartitionsWithlndexQ

              •            It is like mapPartition except it also provides the index of the partition.

distinct()

              •            It returns a new RDD that contains the unique elements of the source RDD.

              •            It is used to remove duplicate data.

union()

              •            union function gives the elements of both the RDD in new RDD.

              •            The two RDDs should be of the same type.

              •            It is same as set operation called UNION.

intersection()

              •            intersection function gives only the common element of both the RDD in the new RDD.

              •            It is same as set operation called INTERSECT.

subtract()

•            subtract returns an RDD with the elements from self that are not in other.

•            It is same as set operation called MINUS.

SubtractByKey()

              •            It works for Paired RDD.

              •            subtractByKeyO returns an RDD with the elements from self that are not in other.

              •            It matches the same element depending on key.

JOINS

              •            It combines the fields from two RDDs using common values.

              •            It works for Paired RDD.

coalesce()

•            coalesce reduces the number of partitions.

•            It can trigger shuffling depending on the shuffle flag.

•            By default shuffle flag is disabled i.e false.

•            You can not increase the number of partitions with coalesce.

repartition()

•            repartition can be used to increase or decrease the number of partitions.         

•            Repartition does a full data shuffle and equally distributes the data among the partitions

partitionby()

•            partitionBy is used to create the number of partitions depending on the specified partitioner.

Action

o Action applies some function on a RDD and produce non-RDD values,

o Actions trigger execution using lineage graph.

o Actions trigger execution using lineage graph.

o An Action is used to either save result to some location or to display it.

getNumPartitions

•            It is used to get the number of partitions created for RDD.

collect()

•            It returns the entire content of RDD.

collectAsMap()

•            It converts Paired RDD into Map.

take()

•            take returns specified number of elements from RDD.

first()

•            take returns first element from RDD.

top()

•            It returnd specified number of top elements from our RDD.

•            top use default ordering of data.

reduce()

•            reduce() takes the two elements as input from the RDD and then produces the output of the same type as that of the input elements.

max

•            max returns the maximum element available in the RDD.

min()

•            min{) returns the minimum element available in the RDD.

sum()

•            sum() return the sum of the element available in the RDD.

 mean()

•            mean() return the mean of the element available in the RDD.

What is the need for SparkSession in Spark?

From Spark 2.0,Spark Session is the new entry point for Spark applications.it includes all the APIs available in different contexts – Spark Context, SQL Context, Streaming Context, Hive Context.

What is Catalyst optimizer?

Spark SQL deals with both SQL queries and DataFrame API.

-> In catalyst optimizer, Catalyst optimization allows some advanced programming language features that allow you to build an extensible query optimizer.

Catalyst Optimizer supports both rule-based and cost-based optimization.

->In rule-based optimization: it uses set of rule to determine how to execute the query.

While the cost-based optimization finds the most suitable way to carry out the SQL statements. So when you execute a query then, in cost-based optimization, multiple plans are generated using rules and then their cost is computed.

What are the fundamentals of Catalyst Optimizer?

Catalyst optimizer uses standard pattern matching.Catalyst contains the tree and the set of rules to manipulate the tree.There are specific libraries to process relational queries.Analysis, query optimization, physical planning, and code generation are the steps which compile parts of queries into Java bytecode.

Trees

The main data type is a tree and it contains node object. These objects are immutable in nature.

A node can have one or more children. And New nodes are always subclasses of TreeNode class.

The objects can be manipulated using functional transformation.

Run application locally on 8 cores

./bin/spark-submit \
–class org.apache.spark.examples.SparkPi \
–master local[8] \
/path/to/examples.jar \
100

Run on a Spark standalone cluster in client deploy mode

./bin/spark-submit \
–class org.apache.spark.examples.SparkPi \
–master spark://207.184.161.138:7077 \
–executor-memory 20G \
–total-executor-cores 100 \
/path/to/examples.jar \
1000

Run on a Spark standalone cluster in cluster deploy mode with supervise

./bin/spark-submit \
–class org.apache.spark.examples.SparkPi \
–master spark://207.184.161.138:7077 \
–deploy-mode cluster \
–supervise \
–executor-memory 20G \
–total-executor-cores 100 \
/path/to/examples.jar \
1000

Run on a YARN cluster

export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
–class org.apache.spark.examples.SparkPi \
–master yarn \
–deploy-mode cluster \ # can be client for client mode
–executor-memory 20G \
–num-executors 50 \
/path/to/examples.jar \
1000

Run a Python application on a Spark standalone cluster

./bin/spark-submit \
–master spark://207.184.161.138:7077 \
examples/src/main/python/pi.py \
1000

Run on a Mesos cluster in cluster deploy mode with supervise

./bin/spark-submit \
–class org.apache.spark.examples.SparkPi \
–master mesos://207.184.161.138:7077 \
–deploy-mode cluster \
–supervise \
–executor-memory 20G \
–total-executor-cores 100 \
http://path/to/examples.jar \
1000

Run on a Kubernetes cluster in cluster deploy mode

./bin/spark-submit \
–class org.apache.spark.examples.SparkPi \
–master k8s://xx.yy.zz.ww:443 \
–deploy-mode cluster \
–executor-memory 20G \
–num-executors 50 \
http://path/to/examples.jar \
1000

Do I need Hadoop to run Spark?

Answer: No, but if we run on a cluster, we need shared file system (NFS mounted on each node). If we have this type of filesystem, we can just deploy Spark in standalone mode. 

What is the Default level of parallelism in Spark?

Answer) Default level of parallelism is the number of partitions

Is it possible to have multiple SparkContext in single JVM? 

Answer)Yes, spark.driver.allow.MultipleContexts is true (default: false ). If it is true then multiple SparkContexts can run on single JVM.

What is the advantage of broadcasting values across Spark Cluster? 

Answer) Spark transfers the value to Spark executors once, and whenever it is requested then tasks can share it without incurring repetitive network transmissions.

How do you disable Info Message when running Spark Application?

Answer)Navigate to $SPARK_HOME/conf dir and modify the log4j.properties file – change values INFO to ERROR 

In your project ,you  have access to a cluster (12 nodes where each node has 2 processors Intel(R) Xeon(R) CPU E5-2650 2.00GHz, where each processor has 8 cores). Now how will you tune application and to observe its performance?

For Tuning, we have to follow below points:

1)Monitor Application: your cluster is under-utilized or not how many resources are used by your application. Monitoring can be done using various tools eg. Ganglia From Ganglia you can find CPU, Memory, and Network Usage.

2) Serialization: what kind of serialization is needed how much Driver Memory and Executor Memory needed by your application. We can tune this parameter based on your requirements
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 5g
spark.executor.memory 3g
spark.executor.extraJavaOptions -XX:MaxPermSize=2G -XX:+UseG1GC
spark.driver.extraJavaOptions -XX:MaxPermSize=6G -XX:+UseG1GC

What is Lazy evaluated RDD mean?

Answer)Lazy evaluated means the data inside RDD is not available or transformed until an action is executed that triggers the execution. 

How would you control the number of partitions of a RDD?

Answer)using repartition or coalesce operations. 

Data is spread in all the nodes of cluster, how spark tries to process this data?

Answer)By default, Spark tries to read data into an RDD from the nodes that are close to it. To optimize transformation operations spark creates partitions to hold the data chunks 

What is coalesce transformation?

Answer)The coalesce transformation is used to change the number of partitions. It can trigger RDD shuffling depending on the second shuffle boolean input parameter (defaults to false ). 

What is Shuffling?

Answer)Shuffling is a process of repartitioning data across partitions and may cause moving it across JVMs or even network when it is redistributed among executors.

What is the difference between groupByKey and reduceByKey ?

Answer)We should avoid groupByKey and use reduceByKey or combineByKey instead.
groupByKey shuffles all the data, which is slow.
reduceByKey shuffles only the results of sub-aggregations in each partition of the data.

What is checkpointing?

Answer)Checkpointing is a process of truncating RDD lineage graph and saving it to HDFS.RDD checkpointing saves the actual intermediate RDD data to a reliable distributed file system. 

What is stage, with regards to Spark Job execution?

Answer)A stage is a set of parallel tasks, one per partition of an RDD, that compute partial results of a function executed as part of a Spark job. 

What is Speculative Execution of a tasks?

Answer) Speculative execution of tasks is a health-check procedure that checks for tasks execution time. If any slow task found then such slow tasks will be re-launched in another worker with new copy in parallel. 

Which all cluster manager can be used with Spark?

Answer)Apache Mesos, Hadoop YARN, Spark standalone 

Hadoop uses replication to achieve fault tolerance. How is this achieved in Apache Spark?

Data storage model in Apache Spark is based on RDDs. RDDs help achieve fault tolerance through lineage.

What are the various levels of persistence in Apache Spark?

Apache Spark automatically persists the intermediary data from various shuffle operations, however it is often suggested that users call persist () method on the RDD in case they plan to reuse it. Spark has various persistence levels to store the RDDs on disk or in memory or as a combination of both with different replication levels.

The various storage/persistence levels in Spark are –

  • MEMORY_ONLY
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK
  • MEMORY_AND_DISK_SER, DISK_ONLY
  • OFF_HEAP
What is the difference between persist() and cache()

persist () allows the user to specify the storage level whereas cache () uses the default storage level.

What are the various data sources available in SparkSQL?
  • Parquet file
  • JSON Datasets
  • Hive tables
What is the advantage of a Parquet file?

Parquet file is a columnar format file that helps –

  • Limit I/O operations
  • Consumes less space
  • Fetches only required columns.
What is Catalyst framework?

Catalyst framework is a new optimization framework present in Spark SQL. It allows Spark to automatically transform SQL queries by adding new optimizations to build a faster processing system.

When running Spark applications, is it necessary to install Spark on all the nodes of YARN cluster?

Spark need not be installed when running a job under YARN or Mesos because Spark can execute on top of YARN or Mesos clusters without affecting any change to the cluster.

How can you trigger automatic clean-ups in Spark to handle accumulated metadata?

You can trigger the clean-ups by setting the parameter ‘spark.cleaner.ttl’ or by dividing the long running jobs into different batches and writing the intermediary results to the disk.

What is lineage graph?

The representation of dependencies in between RDDs is known as the lineage graph. Whenever a part of persistent RDD is lost, the data that is lost can be recovered using the lineage graph information.

Why is there a need for broadcast variables with Apache Spark?

Broadcast variables are read only variables, (in-memory cache on every machine). Usage of broadcast variables eliminates the necessity to ship copies of a variable for every task, so data can be processed faster.

It helps in storing a lookup table inside the memory which enhances the retrieval efficiency when compared to an RDD lookup ().

How can you minimize data transfers when working with Spark?
  1. Using Broadcast Variable– Broadcast variable enhances the efficiency of joins between RDDs.
  2. Using Accumulators – Accumulators update the values of variables in parallel while executing.
  3. The most common way is to avoid operations ByKey, repartition or any other operations which trigger shuffles.
Why is Spark RDD immutable?

– Immutable data is always safe to share across multiple processes as well as multiple threads.
– Since RDD is immutable we can recreate the RDD any time. (From lineage graph).
– If the computation is time-consuming, in that we can cache the RDD which result in performance improvement

By Default, how many partitions are created in RDD in Apache Spark?

numbers of cores in Cluster = no. of partitions

What is the difference between DAG and Lineage?

Lineage graph
When a new RDD has been created from an existing RDD, that new RDD contains a pointer to the parent RDD. Similarly, all the dependencies between the RDDs will be logged in a graph.This graph is called the lineage graph.

Directed Acyclic Graph(DAG)
DAG is a combination of Vertices as well as Edges. In DAG vertices represent the RDDs and the edges represent the Operation to be applied on RDD.

What is the difference between Caching and Persistence in Apache Spark?

Cache and Persist both are optimization techniques for Spark computations.

In Cache, we have only MEMORY_ONLY storage level. Using Cache technique we can save intermediate results in memory only when needed.

Persist having different storage levels which can be MEMORY,

MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER,DISK_ONLY,MEMORY_ONLY_2,MEMORY_AND_DISK_2

Explain about Spark SQL.

The two main components when using Spark SQL are DataFrame and SQLContext.

DataFrame

A Dataframe is a distributed collection of data organized into named columns.

DataFrames can be created from different data sources such as:

  • Existing RDDs
  • Structured data files
  • JSON datasets
  • Hive tables
  • External databases

 

SQLContext

  • Spark SQL provides SQLContext to encapsulate all relational functionality in Spark.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

How do I skip a header from CSV files in Spark?

Answer)Spark 2.x : spark.read.format(“csv”).option(“header”,”true”).load(“fileP‌​ath”) 

How to read multiple text files into a single RDD?

Answer)sc.textFile(“/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file”) 

List the advantage of Parquet file in Apache Spark. 

Answer)Parquet is a columnar format supported by many data processing systems. The benefits of having columnar storage are
1)Columnar storage limits IO operations.
2)Columnar storage can fetch specific columns that you need to access.
3)Columnar storage consumes less space.
4)Columnar storage gives better-summarized data and follows type-specific encoding.

How to read JSON file in Spark?

To read data from text file , we have API in sparkcontext in spark. But certain industry use specific file formats like parquet , json files. For those kind of files, there are no direct API in Sparkcontext to read.

Sqlcontext has certain APIs to read industry-standard files(json,parquet,orc).Through the read and load command, we can read JSON files.

Val ordersdf = sqlContext.read.json(“/orders”)

Ordersdf.show

To see its schema, we can use below command:

OrdersDF.printschema

If you want to see only 2 columns from dataframe then we can use below command:

OrdersDF.select(“col1”,”col2”).show

We can use load command as well.

sqlContext.load(“/user/vikas”,”json”).show

Write some of the Date functions in Spark.

Current_date

Current_timestamp

Date_add

Date_format

Date_sub

Datediff

Day

Dayofmonth

To_date

Write transformation logic to convert date (2017:12:12 00:00:00) to 20171212.

Step1: Lets create RDD first by reading textfile

Val orders = sc.textFile(“/public/retail_db/orders”)

Its first element can be seen as below

Orders.first

Consider it has 1st column is order id,2nd is order date,3rd is order customer id

Step 2:filter Date first.

Str.split(“,”)(1).substring(0,10)

— output : 2013-07-25

Step 3: Replace “-” by blank space.

Str.split(“,”)(1).substring(0,10).replace(“-”,””)

–output – 20130725

Step 4:To convert into Int

Str.split(“,”)(1).substring(0,10).replace(“-”,””).toInt

–output : Int- 20130725

Step 5: Print 10 outputs

orderDates.take(10).foreach(println)