Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Table of Contents

Description

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.

Versionmodulename
3.2.0spark/3.2.0
Dependencies

The following dependencies are loaded 

...

Some users might want to consider also loading Apache Hive, R and one of the many Python modules, conda/spark that provides both R and Python is recommended and loaded by default in the Jupyter Notebook Open OnDemand Application.

Running Spark on Sol and Hawk

Running spark jobs on Sol and Hawk is not the same as that on a traditional Hadoop/Spark cluster. To run spark jobs you need to

  1. allocate compute resources using the traditional SLURM submit scripts,
  2. start a standalone spark cluster,
  3. submit jobs to the standalone spark cluster using the spark-submit, Jupyter Notebooks or RStudio.

Creating a Spark cluster

The first step in running spark jobs is to create a standalone spark cluster by requesting resources from SLURM. Similar to any traditional HPC workload, request the number of nodes, total cores or cores per node, maximum wall time and partition as needed. In the following example script, we are requesting 2 nodes on the health partition with 2 total tasks with 36 cpus per tasks for a total of 72 cpus in the spark cluster. Once you have resources, you need to create a cluster by assigning one cpu as the master and the remaining cpus as workers. At the end of your job, you need to tear down the spark cluster by stopping the workers and the master.

...

Code Block
languagebash
titleSample Submit script
collapsetrue
#!/bin/bash

#SBATCH -p health
#SBATCH -N 2
#SBATCH --ntasks=2
#SBATCH --cpus-per-task=36
#SBATCH -t 120
#SBATCH -o spark-%j.out
#SBATCH -e spark-%j.err



module load spark
cd ${SLURM_SUBMIT_DIR}

# Load the python or R module that you want to use
module load anaconda3
module load conda/spark

# Set up environment variables
source /share/Apps/spark/lurc/bin/spark.config

# Start the history server, master and worker process
/share/Apps/spark/lurc/bin/spark-start.sh

# Enter commands to run your spark jobs

# After your job is complete, stop all spark processes
/share/Apps/spark/lurc/bin/spark-stop.sh


Submitting jobs

There are multiple ways to submit Spark jobs once you have a standalone cluster.

...

Expand
titleStarting a Spark cluster interactively


Code Block
languagebash
themeMidnight
[2021-10-28 10:32.44] ~/spark
[alp514.sol](1037): interact -p health -N 2 -n 2 -c 36
manpath: warning: $MANPATH set, ignoring /etc/man_db.conf
System Architecture: avx512
Processor Family: skylake

Due to MODULEPATH changes, the following have been reloaded:
  1) intel/20.0.3     2) mvapich2/2.3.4     3) python/3.8.6

[2021-10-28 10:32.50] ~/spark
[alp514.sol-e617](1000): module load spark anaconda3 conda/spark

Lmod is automatically replacing "python/3.8.6" with "anaconda3/2020.07".

(spark)[2021-10-28 10:33.01] ~/spark
[alp514.sol-e617](1001): source /share/Apps/spark/lurc/bin/spark.config

====================================================================
Log Files: /share/ceph/scratch/alp514/6567337/logs
History Log Files: /share/ceph/scratch/alp514/6567337/history
Worker Log Files: /share/ceph/scratch/alp514/6567337/work
Local Files: /home/alp514/.spark/6567337
====================================================================

(spark)[2021-10-28 10:33.11] ~/spark
[alp514.sol-e617](1002): /share/Apps/spark/lurc/bin/spark-start.sh

====================================================================
SPARK Master        : spark://192.168.4.37:7077
SPARK Master WebUI  : http://sol-e617.cc.lehigh.edu:8080
SPARK History WebUI : http://sol-e617.cc.lehigh.edu:18080
SPARK Worker WebUIs : http://sol-e617.cc.lehigh.edu:8081
SPARK Worker WebUIs : http://sol-e618.cc.lehigh.edu:8081
====================================================================



Spark interactive shell 

Load the Spark Module

Code Block
languagebash
module load spark

...

Code Block
languagebash
SPARK_MASTER=$(grep "Starting Spark master" ${SPARK_LOG_DIR/master.err} | cut -d " " -f 9)


Connect to the master using the Spark interactive shell in 

Scala
Code Block
languagebash
spark-shell --master ${SPARK_MASTER}

...

Expand
titleExample


Code Block
languagebash
themeMidnight
(spark)[2021-10-28 10:33.49] ~/spark
[alp514.sol-e617](1004): spark-shell --master spark://192.168.4.37:7077
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/share/Apps/lusoft/opt/spack/linux-centos8-x86_64/gcc-8.3.1/spark/3.2.0-xpdy6ov/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-10-28 10:34:09,542 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://sol-e617.cc.lehigh.edu:4040
Spark context available as 'sc' (master = spark://192.168.4.37:7077, app id = app-20211028103410-0000).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/

Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.2)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val NUM_SAMPLES=1000
NUM_SAMPLES: Int = 1000

scala> val count = sc.parallelize(1 to NUM_SAMPLES).filter { _ =>
     |      val x = math.random
     |      val y = math.random
     |      x*x + y*y < 1
     |  }.count()
count: Long = 794

scala> println(s"Pi is roughly ${4.0 * count / NUM_SAMPLES}")
Pi is roughly 3.176

scala>



Python (pyspark)
Code Block
languagebash
pyspark --master ${SPARK_MASTER}

...

Expand
titleExample


Code Block
languagebash
themeMidnight
(spark)[2021-10-28 10:49.56] ~/spark
[alp514.sol-e617](1027): pyspark --master spark://192.168.4.37:7077
Python 3.8.6 | packaged by conda-forge | (default, Jan 25 2021, 23:21:18)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/share/Apps/lusoft/opt/spack/linux-centos8-x86_64/gcc-8.3.1/spark/3.2.0-xpdy6ov/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-10-28 10:50:06,387 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0
      /_/

Using Python version 3.8.6 (default, Jan 25 2021 23:21:18)
Spark context Web UI available at http://sol-e617.cc.lehigh.edu:4040
Spark context available as 'sc' (master = spark://192.168.4.37:7077, app id = app-20211028105007-0000).
SparkSession available as 'spark'.
>>> from operator import add
>>> from random import random
>>> partitions =100
>>> n = 100000 * partitions
>>> def f(_):
...     x = random() * 2 - 1
...     y = random() * 2 - 1
...     return 1 if x ** 2 + y ** 2 <= 1 else 0
...
>>> count = sc.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
>>> print("Pi is roughly %f" % (4.0 * count / n))
Pi is roughly 3.140600
>>>


R (SparkR)
Code Block
languagebash
sparkR --master ${SPARK_MASTER}
or 
R

...

Expand
titleExample


Code Block
languagebash
themeMidnight
(spark)[2021-10-28 11:00.03] ~/spark
[alp514.sol-e617](1048): R

R version 4.1.1 (2021-08-10) -- "Kick Things"
Copyright (C) 2021 The R Foundation for Statistical Computing
Platform: x86_64-conda-linux-gnu (64-bit)

R is free software and comes with ABSOLUTELY NO WARRANTY.
You are welcome to redistribute it under certain conditions.
Type 'license()' or 'licence()' for distribution details.

  Natural language support but running in an English locale

R is a collaborative project with many contributors.
Type 'contributors()' for more information and
'citation()' on how to cite R or R packages in publications.

Type 'demo()' for some demos, 'help()' for on-line help, or
'help.start()' for an HTML browser interface to help.
Type 'q()' to quit R.

[Previously saved workspace restored]

Spark package found in SPARK_HOME: /share/Apps/lusoft/opt/spack/linux-centos8-x86_64/gcc-8.3.1/spark/3.2.0-xpdy6ov
Launching java with spark-submit command /share/Apps/lusoft/opt/spack/linux-centos8-x86_64/gcc-8.3.1/spark/3.2.0-xpdy6ov/bin/spark-submit   sparkr-shell /share/ceph/scratch/alp514/6567337/RtmpqAAAhc/backend_port1956d55242d5d2
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/share/Apps/lusoft/opt/spack/linux-centos8-x86_64/gcc-8.3.1/spark/3.2.0-xpdy6ov/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-10-28 11:00:08,491 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2021-10-28 11:00:09,480 WARN r.SQLUtils: SparkR: enableHiveSupport is requested for SparkSession but Spark is not built with Hive or spark.sql.catalogImplementation is not set to 'hive', falling back to without Hive support.

 Welcome to
    ____              __
   / __/__  ___ _____/ /__
  _\ \/ _ \/ _ `/ __/  '_/
 /___/ .__/\_,_/_/ /_/\_\   version  3.2.0
    /_/


 SparkSession available as 'spark'.
> library(SparkR)
> localDF <- data.frame(name=c("John", "Smith", "Sarah"), age=c(19, 23, 18))
> df <- createDataFrame(localDF)
> printSchema(df)
root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
> path <- file.path(Sys.getenv("SPARK_HOME"), "examples/src/main/resources/people.json")
> peopleDF <- read.json(path)
> printSchema(peopleDF)
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
> createOrReplaceTempView(peopleDF, "people")
> teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
> teenagersLocalDF <- collect(teenagers)
> print(teenagersLocalDF)
    name
1 Justin
>


spark-submit interactively

You can submit spark jobs -  java, python, scala, r or sql using the spark-submit command. 

...

Code Block
languagebash
SPARK_SUBMIT_OPTS="--master spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT  \
   --total-executor-cores $((SLURM_NTASKS * SLURM_CPUS_PER_TASK)) \
   --conf \"spark.eventLog.enabled=true\" \
   --conf \"spark.eventLog.dir=file://$TMPDIR/spark-events\" \
   "

spark-submit ${SPARK_SUBMIT_OPTS} /share/Apps/examples/spark/examples/src/main/r/dataframe.R > dataframe.out
Jupyter Notebooks from compute node interactively

Instead of using pyspark or sparkr, you can use Jupyter Notebooks to run Python and R scripts on the spark-cluster. For this, you need to use a Python or R environment that contains Jupyter Notebook or Lab. The example script provided uses the spark conda environment that has Python 3.8.6 with pyspark, R 4.1.1 with sparklyr and Jupyter installed. If you use a different environment, you will need to make appropriate modifications. 

...

Code Block
languagebash
title/share/Apps/examples/spark/start_jupyter.sh
collapsetrue
#!/bin/bash

# This script assumes that the variables set by /share/Apps/spark/lurcbin/spark.config are available.

export XDG_RUNTIME_DIR=""
export PYTHONHASHSEED=0
ipnport=$(shuf -i8000-9999 -n1)
ipnip=$(hostname -s)
SPARK_MASTER=$(grep "Starting Spark master" ${SPARK_LOG_DIR/master.err} | cut -d " " -f 9)
export PYSPARK_DRIVER_PYTHON='jupyter'
# change notebook to lab if you prefer Jupyter Lab server
export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port=$ipnport --ip=$ipnip"
#export PYSPARK_SUBMIT_ARGS=" \
#  --master spark://${SPARK_MASTER_HOST}:${SPARK_MASTER_PORT} \
#  --driver-memory 2G \
#  --executor-memory ${SPARK_MEM} \
#  --total-executor-cores $(( ${SLURM_NTASKS:-1} * ${SLURM_CPUS_PER_TASK:-1} ))  \
#  --conf \"spark.eventLog.enabled=true\" \
#  --conf \"spark.eventLog.dir=file://$TMPDIR/spark-events\"  \
#  pyspark-shell \
#"

## print tunneling instructions to standard output
echo -e "
    Copy/Paste this in your local terminal to ssh tunnel with remote
    -----------------------------------------------------------------
    ssh -N -L $ipnport:$ipnip:$ipnport ${USER}@sol.cc.lehigh.edu
    -----------------------------------------------------------------

    Then open a browser on your local machine to the following address
    ------------------------------------------------------------------
    localhost:$ipnport
    ------------------------------------------------------------------
    and use the token that appears below to login.

    OR replace "$ipnip" in the address below with "localhost" and copy
    to your local browser.
    "

# Start Jupyter Notebook using the pyspark command
# pyspark # If you use this command, uncomment the PYSPARK_SUBMIT_ARGS lines above  
pyspark --master ${SPARK_MASTER} \
  --driver-memory 2G \
  --executor-memory ${SPARK_MEM} \     
  --total-executor-cores $(( ${SLURM_NTASKS:-1} * ${SLURM_CPUS_PER_TASK:-1} )) \
  --conf "spark.eventLog.enabled=true" \
  --conf "spark.eventLog.dir=file://$TMPDIR/spark-events"


Submit jobs from a SLURM script

This is an extension of the slurm example script with the spark-submit commands and start_jupyter script above. 

Code Block
languagebash
title/share/Apps/example/spark/spark.slurm
collapsetrue
#!/bin/bash

#SBATCH -p health
#SBATCH -N 2
#SBATCH --ntasks=4
#SBATCH --cpus-per-task=18
#SBATCH -t 120
#SBATCH -o spark-%j.out
#SBATCH -e spark-%j.err



module load spark
cd ${SLURM_SUBMIT_DIR}

module load anaconda3
module load conda/spark

# Start SPARK cluster
source /share/Apps/spark/lurc/bin/spark.config

/share/Apps/spark/lurc/bin/spark-start.sh

which python
which pyspark
which R
which sparkR

python --version
Rscript -e "R.version.string"

SPARK_SUBMIT_OPTS="--master spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT  \
   --driver-memory 2G \
   --executor-memory ${SPARK_MEM} \
   --total-executor-cores $((SLURM_NTASKS * SLURM_CPUS_PER_TASK)) \
   --conf \"spark.eventLog.enabled=true\" \
   --conf \"spark.eventLog.dir=file://$TMPDIR/spark-events\" \
   "

echo $SPARK_SUBMIT_OPTS

spark-submit ${SPARK_SUBMIT_OPTS} pi.py $(( ${SPARK_WORKER_CORES:-1} * 100 )) 2> pi.err
spark-submit ${SPARK_SUBMIT_OPTS} pi_with_pythonstartup.py 2> pistart.err
spark-submit ${SPARK_SUBMIT_OPTS} pi_pyspark.py 2> pispark.err
spark-submit ${SPARK_SUBMIT_OPTS} text_spark.py 2> text.err
spark-submit ${SPARK_SUBMIT_OPTS} dfsql_spark.py 2> dfsql.err
spark-submit ${SPARK_SUBMIT_OPTS} wordcount.py Complete_Shakespeare.txt 2> wordcount.err

spark-submit ${SPARK_SUBMIT_OPTS} dataframe.R 2> dataframe.err
spark-submit ${SPARK_SUBMIT_OPTS} data-manipulation.R nycflights.csv 2> data-manipulation.err
spark-submit ${SPARK_SUBMIT_OPTS} RSparkSQLExample.R 2> RSparkSQLExample.err

spark-submit --master spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT pi.py $(( ${SPARK_WORKER_CORES:-1} * 10000 )) 2> pi.err

# Run pyspark through jupyter notebooks
#sh start_jupyter.sh

# Stop the SPARK Cluster
/share/Apps/spark/lurc/bin/spark-stop.sh

module unload conda/spark
module unload anaconda3
  
Open OnDemand Interactive Apps
  • Use the dropdown menu under Interactive Apps and select Spark + Jupyter or Spark + RStudio
  • Spark + Jupyter: uses the spark environment provided by the anaconda3/2020.07 module.
    • Select a different conda environment OR
    • Enter the commands to launch a conda environment of your choice in the text box
    • If you prefer Jupyter Lab, check the radio button for "Use JupyterLab instead of Jupyter Notebook?"
  • Spark + Rstudio: As of Nov 2021, only R 4.1.2 is available for use.
  • Enter the resources you wish to use for your Spark Job
    • By default, one task will be launched per node.
    • Enter the number of cpus you want per task
    • For e.g. to run a 2 node Spark job on the health partition with 36 workers on each node,
      • Enter 36 in the "Number of cpus per task" box, and
      • 2 in the "Number of nodes" box.
  • Click the "Launch" button to submit your job and wait for resources to become available.
  • When resources are available, a standalone spark cluster will be created for you. Setting up a spark cluster will take a while - go grab a coffee.
  • If you want to monitor the Spark cluster only
    • Click the link next to Session ID
    • Open the output.log file (this will be created only when your job starts and it may take upto a few minutes) to see the information on the Spark Master, Master WebUI and History WebUI. 
  • Once the cluster is setup, it could take a few minutes
  • Spark + Jupyter: Click on the "Connect to Jupyter" button to start Jupyter Lab or Notebook.
  • Spark + Rstudio: Click on the "Connect to RStudio Server" button to start RStudio.
    • Due to the way RStudio handles user created environmental variables, only the following variables are available from the RStudio session using the Sys.getenv command
      • SPARK_HOME, HIVE_HOME, JAVA_HOME, HADOOP_HOME, SPARK_MASTER_HOST, SPARK_MASTER_PORT, SPARK_WORKER_CORES,  SPARK_WORKER_MEMORY,  SPARK_EXECUTOR_MEMORY, SPARK_HISTORY_OPTS

...