6 min read

Deploy Spark on Kubernetes

Deploy Spark on Kubernetes

Introduction

Yarn has been the default orchestration platform for tools from Hadoop ecosystem. This has started changing in recent times. Especially with Spark, which integrates very well with storage platforms like S3 and isn't tightly coupled with other Hadoop ecosystem tools. Kubernetes is rapidly becoming the default orchestration platform for Spark with an object storage platform used for storage. In this post we take a deep dive on creating and deploying Spark containers on a Kubernetes cluster. As Spark needs data to work, we'll configure this cluster to use S3 API for storage operations.

Build Spark Container

This first step to deploy an application on Kubernetes is to create a container. While several project offer official container images, Apace Spark doesn't offer a container image as of writing this post. So, we'll just go ahead and create our own Spark container. Let's start with the Dockerfile.

FROM java:openjdk-8-jdk

ENV hadoop_ver 2.8.2
ENV spark_ver 2.4.4

RUN mkdir -p /opt && \
    cd /opt && \
    curl http://archive.apache.org/dist/hadoop/common/hadoop-${hadoop_ver}/hadoop-${hadoop_ver}.tar.gz | \
        tar -zx && \
    ln -s hadoop-${hadoop_ver} hadoop && \
    echo Hadoop ${hadoop_ver} installed in /opt

RUN mkdir -p /opt && \
    cd /opt && \
    curl http://archive.apache.org/dist/spark/spark-${spark_ver}/spark-${spark_ver}-bin-without-hadoop.tgz | \
        tar -zx && \
    ln -s spark-${spark_ver}-bin-without-hadoop spark && \
    echo Spark ${spark_ver} installed in /opt

ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin
ENV HADOOP_HOME=/opt/hadoop
ENV PATH=$PATH:$HADOOP_HOME/bin
ENV LD_LIBRARY_PATH=$HADOOP_HOME/lib/native

RUN curl http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.8.2/hadoop-aws-2.8.2.jar -o /opt/spark/jars/hadoop-aws-2.8.2.jar
RUN curl http://central.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.3/httpclient-4.5.3.jar -o /opt/spark/jars/httpclient-4.5.3.jar
RUN curl http://central.maven.org/maven2/joda-time/joda-time/2.9.9/joda-time-2.9.9.jar -o /opt/spark/jars/joda-time-2.9.9.jar
RUN curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-core/1.11.712/aws-java-sdk-core-1.11.712.jar -o /opt/spark/jars/aws-java-sdk-core-1.11.712.jar
RUN curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.712/aws-java-sdk-1.11.712.jar -o /opt/spark/jars/aws-java-sdk-1.11.712.jar
RUN curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-kms/1.11.712/aws-java-sdk-kms-1.11.712.jar -o /opt/spark/jars/aws-java-sdk-kms-1.11.712.jar
RUN curl http://central.maven.org/maven2/com/amazonaws/aws-java-sdk-s3/1.11.712/aws-java-sdk-s3-1.11.712.jar -o /opt/spark/jars/aws-java-sdk-s3-1.11.712.jar

ADD start-common.sh start-worker start-master /
ADD core-site.xml /opt/spark/conf/core-site.xml
ADD spark-defaults.conf /opt/spark/conf/spark-defaults.conf
ENV PATH $PATH:/opt/spark/bin

In this Dockerfile, we download Apache Spark and Hadoop from official release locations. Additionally we fetch relevant jar files from Maven central. Once all the relevant files are downloaded and un-tarred at certain locations, we add all the important configuration files to this image. This gives an opportunity to you to easily add certain configurations that may be important for your environment.

Instead of going through this step, we could have pointed to a pre-built image, and be done with it. But this step ensures our readers see what's under the hood of their Spark containers, while power users can actually make important modifications to suit their needs.

This Dockerfile and the other relevant configuration files used in this example are available in this Github repository. To use this repo, clone it locally

git clone git@github.com:devshlabs/spark-kubernetes.git

Now you can make any changes if required in your environment. Then, build the image, and push it to container registry that you'd like to use. I have used Dockerhub in this example.

cd spark-kubernetes/spark-container
docker build . -t mydockerrepo/spark:2.4.4
docker push mydockerrepo/spark:2.4.4

Remember to replace mydockerrepo with the actual repository name.

Deploy Spark on Kubernetes

Now that the Spark container is built and available to be pulled, lets deploy this image as both Spark Master and Worker. First step is to create the Spark Master. We'll use Kubernetes ReplicationController resource to create the Spark Master. In this example, I have used a single replica of the Spark Master. For a production system with HA requirements, you may want to increase the number of Spark Master replicas to be at least 3.

kind: ReplicationController
apiVersion: v1
metadata:
  name: spark-master-controller
spec:
  replicas: 1
  selector:
    component: spark-master
  template:
    metadata:
      labels:
        component: spark-master
    spec:
      hostname: spark-master-hostname
      subdomain: spark-master-headless
      containers:
        - name: spark-master
          image: mydockerrepo/spark:2.4.4
          imagePullPolicy: Always
          command: ["/start-master"]
          ports:
            - containerPort: 7077
            - containerPort: 8080
          resources:
            requests:
              cpu: 100m

For the Spark master nodes to be discoverable by the Spark worker nodes, we'll also need to create a headless service.

Once you have checked-out the Github repository, and are inside the spark-kubernetesdirectory, you can start the Kubernetes master and the relevant services:

kubectl create -f spark-master-controller.yaml
kubectl create -f spark-master-service.yaml

Now make sure all the services and master nodes are up and running. Once done, we can deploy the Spark worker nodes. The default replicas for Spark workers is set to 2. You can change it accordingly.

kubectl create -f spark-worker-controller.yaml

Finally confirm if everything is running as expected using the below command.

kubectl get all

You should see something like this:

NAME                               READY     STATUS    RESTARTS   AGE
po/spark-master-controller-5rgz2   1/1       Running   0          9m
po/spark-worker-controller-0pts6   1/1       Running   0          9m
po/spark-worker-controller-cq6ng   1/1       Running   0          9m

NAME                         DESIRED   CURRENT   READY     AGE
rc/spark-master-controller   1         1         1         9m
rc/spark-worker-controller   2         2         2         9m

NAME               CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
svc/spark-master   10.108.94.160   <none>        7077/TCP,8080/TCP   9m

Submitting jobs to this Spark cluster

Let's now submit a job to see if it works. As a pre-requite, you'll need a AWS S3 valid credentials and a bucket with sample data. I have used Kaggle to download the sample file. The sample file is available here: https://www.kaggle.com/datasnaek/youtube-new#USvideos.csv. Download the file and put it in a S3 Bucket. Let's call this bucket s3-data-bucket. So, the file is available at s3-data-bucket/data.csv.

Once the data is available, exec into one of the Spark master pods. For example, if the pod is called spark-master-controller-5rgz2, use the command

kubectl exec -it spark-master-controller-v2hjb /bin/bash

Once you're logged in, spark the spark shell like so:

export SPARK_DIST_CLASSPATH=$(hadoop classpath)
spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.132.147:4040
Spark context available as 'sc' (master = spark://spark-master:7077, app id = app-20170405152342-0000).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_221)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

Now, let's tell the Spark master the details of our S3 target. Type the below configurations in the scale prompt we saw above:

sc.hadoopConfiguration.set("fs.s3a.endpoint", "https://s3.amazonaws.com")
sc.hadoopConfiguration.set("fs.s3a.access.key", "s3-access-key")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "s3-secret-key")

Now, post the Spark job. Simply paste the contents in the scala prompt. Remember to change the S3 related fields.

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.IntParam
import org.apache.spark.sql.SQLContext
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils

val conf = new SparkConf().setAppName("YouTube")
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._
import sqlContext._

val youtubeDF = spark.read.format("csv").option("sep", ",").option("inferSchema", "true").option("header", "true").load("s3a://s3-data-bucket/data.csv")

youtubeDF.registerTempTable("popular")

val fltCountsql = sqlContext.sql("select s.title,s.views from popular s")
fltCountsql.show()

Finally, you can update the Spark deployment using kubectl patch command. For example, you can add more worker nodes when there is higher load and then remove these worker nodes as the load goes down.