Deploy Spark on Kubernetes

Introduction
Yarn has been the default orchestration platform for tools from Hadoop ecosystem. This has started changing in recent times. Especially with Spark, which integrates very well with storage platforms like S3 and isn't tightly coupled with other Hadoop ecosystem tools. Kubernetes is rapidly becoming the default orchestration platform for Spark with an object storage platform used for storage. In this post we take a deep dive on creating and deploying Spark containers on a Kubernetes cluster. As Spark needs data to work, we'll configure this cluster to use S3 API for storage operations.
Build Spark Container
This first step to deploy an application on Kubernetes is to create a container. While several project offer official container images, Apace Spark doesn't offer a container image as of writing this post. So, we'll just go ahead and create our own Spark container. Let's start with the Dockerfile.
FROM java:openjdk-8-jdk
ENV hadoop_ver 2.8.2
ENV spark_ver 2.4.4
RUN mkdir -p /opt && \
cd /opt && \
curl http://archive.apache.org/dist/hadoop/common/hadoop-${hadoop_ver}/hadoop-${hadoop_ver}.tar.gz | \
tar -zx && \
ln -s hadoop-${hadoop_ver} hadoop && \
echo Hadoop ${hadoop_ver} installed in /opt
RUN mkdir -p /opt && \
cd /opt && \
curl http://archive.apache.org/dist/spark/spark-${spark_ver}/spark-${spark_ver}-bin-without-hadoop.tgz | \
tar -zx && \
ln -s spark-${spark_ver}-bin-without-hadoop spark && \
echo Spark ${spark_ver} installed in /opt
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin
ENV HADOOP_HOME=/opt/hadoop
ENV PATH=$PATH:$HADOOP_HOME/bin
ENV LD_LIBRARY_PATH=$HADOOP_HOME/lib/native
RUN curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.2/hadoop-aws-2.8.2.jar -o /opt/spark/jars/hadoop-aws-2.8.2.jar
RUN curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.3/httpclient-4.5.3.jar -o /opt/spark/jars/httpclient-4.5.3.jar
RUN curl http://central.maven.org/maven2/joda-time/joda-time/2.9.9/joda-time-2.9.9.jar -o /opt/spark/jars/joda-time-2.9.9.jar
RUN curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.712/aws-java-sdk-core-1.11.712.jar -o /opt/spark/jars/aws-java-sdk-core-1.11.712.jar
RUN curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.712/aws-java-sdk-1.11.712.jar -o /opt/spark/jars/aws-java-sdk-1.11.712.jar
RUN curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.712/aws-java-sdk-kms-1.11.712.jar -o /opt/spark/jars/aws-java-sdk-kms-1.11.712.jar
RUN curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.712/aws-java-sdk-s3-1.11.712.jar -o /opt/spark/jars/aws-java-sdk-s3-1.11.712.jar
ADD start-common.sh start-worker start-master /
ADD core-site.xml /opt/spark/conf/core-site.xml
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ENV PATH $PATH:/opt/spark/bin
In this Dockerfile, we download Apache Spark and Hadoop from official release locations. Additionally we fetch relevant jar files from Maven central. Once all the relevant files are downloaded and un-tarred at certain locations, we add all the important configuration files to this image. This gives an opportunity to you to easily add certain configurations that may be important for your environment.
Instead of going through this step, we could have pointed to a pre-built image, and be done with it. But this step ensures our readers see what's under the hood of their Spark containers, while power users can actually make important modifications to suit their needs.
This Dockerfile and the other relevant configuration files used in this example are available in this Github repository. To use this repo, clone it locally
git clone git@github.com:devshlabs/spark-kubernetes.git
Now you can make any changes if required in your environment. Then, build the image, and push it to container registry that you'd like to use. I have used Dockerhub in this example.
cd spark-kubernetes/spark-container
docker build . -t mydockerrepo/spark:2.4.4
docker push mydockerrepo/spark:2.4.4
Remember to replace mydockerrepo
with the actual repository name.
Deploy Spark on Kubernetes
Now that the Spark container is built and available to be pulled, lets deploy this image as both Spark Master and Worker. First step is to create the Spark Master. We'll use Kubernetes ReplicationController
resource to create the Spark Master. In this example, I have used a single replica of the Spark Master. For a production system with HA requirements, you may want to increase the number of Spark Master replicas to be at least 3.
kind: ReplicationController
apiVersion: v1
metadata:
name: spark-master-controller
spec:
replicas: 1
selector:
component: spark-master
template:
metadata:
labels:
component: spark-master
spec:
hostname: spark-master-hostname
subdomain: spark-master-headless
containers:
- name: spark-master
image: mydockerrepo/spark:2.4.4
imagePullPolicy: Always
command: ["/start-master"]
ports:
- containerPort: 7077
- containerPort: 8080
resources:
requests:
cpu: 100m
For the Spark master nodes to be discoverable by the Spark worker nodes, we'll also need to create a headless service.
Once you have checked-out the Github repository, and are inside the spark-kubernetes
directory, you can start the Kubernetes master and the relevant services:
kubectl create -f spark-master-controller.yaml
kubectl create -f spark-master-service.yaml
Now make sure all the services and master nodes are up and running. Once done, we can deploy the Spark worker nodes. The default replicas for Spark workers is set to 2. You can change it accordingly.
kubectl create -f spark-worker-controller.yaml
Finally confirm if everything is running as expected using the below command.
kubectl get all
You should see something like this:
NAME READY STATUS RESTARTS AGE
po/spark-master-controller-5rgz2 1/1 Running 0 9m
po/spark-worker-controller-0pts6 1/1 Running 0 9m
po/spark-worker-controller-cq6ng 1/1 Running 0 9m
NAME DESIRED CURRENT READY AGE
rc/spark-master-controller 1 1 1 9m
rc/spark-worker-controller 2 2 2 9m
NAME CLUSTER-IP EXTERNAL-IP PORT(S) AGE
svc/spark-master 10.108.94.160 <none> 7077/TCP,8080/TCP 9m
Submitting jobs to this Spark cluster
Let's now submit a job to see if it works. As a pre-requite, you'll need a AWS S3 valid credentials and a bucket with sample data. I have used Kaggle to download the sample file. The sample file is available here: https://www.kaggle.com/datasnaek/youtube-new#USvideos.csv. Download the file and put it in a S3 Bucket. Let's call this bucket s3-data-bucket
. So, the file is available at s3-data-bucket/data.csv
.
Once the data is available, exec into one of the Spark master pods. For example, if the pod is called spark-master-controller-5rgz2
, use the command
kubectl exec -it spark-master-controller-v2hjb /bin/bash
Once you're logged in, spark the spark shell like so:
export SPARK_DIST_CLASSPATH=$(hadoop classpath)
spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.132.147:4040
Spark context available as 'sc' (master = spark://spark-master:7077, app id = app-20170405152342-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
Now, let's tell the Spark master the details of our S3 target. Type the below configurations in the scale
prompt we saw above:
sc.hadoopConfiguration.set("fs.s3a.endpoint", "https://s3.amazonaws.com")
sc.hadoopConfiguration.set("fs.s3a.access.key", "s3-access-key")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "s3-secret-key")
Now, post the Spark job. Simply paste the contents in the scala
prompt. Remember to change the S3 related fields.
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
val conf = new SparkConf().setAppName("YouTube")
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
import sqlContext._
val youtubeDF = spark.read.format("csv").option("sep", ",").option("inferSchema", "true").option("header", "true").load("s3a://s3-data-bucket/data.csv")
youtubeDF.registerTempTable("popular")
val fltCountsql = sqlContext.sql("select s.title,s.views from popular s")
fltCountsql.show()
Finally, you can update the Spark deployment using kubectl patch
command. For example, you can add more worker nodes when there is higher load and then remove these worker nodes as the load goes down.