Apache Spark is one of the leading frameworks for analyzing data in real-time with Spark Structured Streaming, this talk explains how to step by step run Spark structured streaming jobs on Kubernetes and access the Spark UI to see the jobs metrics and progress.

This talk was given by AirAsia, Engineering Manager Shardul Srivastava as part of DoK Day at KubeCon NA 2021, watch it below. You can access the other talks here.

Shardul Srivastava 0:00

Before I go into the details, let’s talk about the basics of Spark. Spark is one of the most popular Big Data frameworks and it has become a norm in the industry that when it comes to data processing everybody just goes to Spark and when it comes to real time streaming as well, Spark has been making a lot of noise. Spark used to have a lot of frameworks such as data streaming. They actually introduced a new framework, which was actually more towards the real time data streaming which the earlier streaming platform was not. So, they came up with something called Structured Streaming and that actually supports and it rivals the Apache Flink Framework. So, it is very good and it allows you to process data at any scale and in real time. So, if you have click events coming in, you can process all the data for a customer and you can probably think about it as data can be analyzed at the same time as well as sent to the user. For example, if you want to send recommendations of any sort, then you can do it right away from the Spark structured streaming. Since Spark is very popular and it has a very solid foundation, it allows you to process data at any scale. It can process terabytes of data and it supports distributed processing, so you can run it on multiple nodes and then all the data can be processed in minutes, even the terabytes of that. Apart from this, Spark also has support for Machine library (Machine Learning). Spark has this MLlib library. It doesn’t support all of the machine learning frameworks like you have in TensorFlow or Pytorch, but it has a lot of features of machine learning and it is incorporated as part of Spark MLlib library. It is all part of the core Spark framework. 

So when it comes to Spark, you can run your jobs in Spark. It supports a lot of cluster managers, so it allows you to run in a lot of environments. Some of these environments are where it allows you to run it as a Standalone. If you download any Spark application so you can download the zip file from the Spark repository and then you can just run it without any sort of environment or cluster manager, you can just run it standalone. You can run it on Apache Mesos as well, it has the first class support for Apache Mesos. One of the most used cluster management tools that was being used with Spark was Hadoop YARN. It was the most preferred cluster manager but that’s when Kubernetes came in. Kubernetes support was there in Spark from 2.4.0 onwards, but it used to be an experimental. So it was not suggested to use Spark on Kubernetes in production environments, but from release 3.1.1 onwards it is in GA, so you can use it in production environments as well and that’s amazing!

So how do you submit a Spark application? Whenever you write a Spark application, be it a part of structured streaming Spark streaming or basic Spark code, which actually pulls the data from, for example S3 or GCS, then run some analysis to do some aggregation. You can do all of that but you have to use something called a spark-submit and Spark application for example is the entire Spark distribution. It comes with a lot of utility scripts. One of those scripts is actually called Spark Submit. And what it does is, whenever you download the Spark distribution, it will be inside of the bin folder, and then you can use /bin/spark-submit and you can define the main class of your application then the master. So here the master could be the master of Apache Mesos or yarn or Kubernetes cluster. But Spark supports two modes: client and cluster mode. We are going to talk about Kubernetes cluster mode going forward. And then a ton of configuration. I’m going to talk about how we can get rid of those configurations and do it in a better way in Kubernetes. So again, Spark has first class support for Kubernetes, but it all goes through Spark Submit. So everything that you want to run on Spark you have to submit using this CLI tool that is /bin/spark-submit, and then give all the configuration options.

How do you run Spark on Kubernetes? Whenever you run your Spark on Kubernetes, Spark Submit contacts the API server on Kubernetes. So in the example I showed you that there is a part where you have to define the master URL, in that whenever you are running Spark on Kubernetes, you have to give the Kubernetes API URL so that the Spark can communicate to the Kubernetes API server, and that’s where it will start creating the Spark driver node and then in the from Spark driver, it will start creating the executor nodes, and the driver node is gonna manage your job on these executed ports. And they will just run, so it will be managed from one central place. The client communicates a Spark Submit, in this case. Whenever you do a Spark Submit, and you give all those configuration options, you basically say that “hey Spark, I want to run my application on Kubernetes and here’s the address to the Kubernetes API server”. So what it’s gonna do is, it’s going to use your credentials, communicate to the base or work provision some ports, driver port, and then provisions and execute reports. And then the code would be transferred to the executor ports. And that’s where they’re going to run your actual code in a distributed mode. How do you do this? When you want to submit your application using Spark, submit on Kubernetes. Again, you have to give the address to the Kubernetes API server. So you have to say k8s://, then the API server hostname and the port. So usually, if you’re inside of the cluster, you don’t do not have to do that. Because it will always be the same. But let’s say you are running it from somewhere else. And for example, if you’re running it on EKS, for example, then EKS will have a different cluster address and the API server URL would be different, and the port even would be different. So you have to provide it, then you have to give the deployment mode. Deployment mode should always be clustered in case of Kubernetes, even though the client mode is supported, but there are a lot of restrictions on that. And you can run into a lot of limitations on that. So it’s better to always run in cluster mode, and you have to give the name of your Spark application. So when you give the name, the driver port would come with something like Spark pi driver, and the same name convention would be used for your executor port, then you have to give the class name, that is where you have to give the main class of your application. So in this case, for example, this is just an example for Spark pi application, which Spark repository gives you as an example. So if you have a Spark main class, then give the main class file name here.

Then you can give a lot of configuration options. Since Kubernetes is vast, you can run deployments, you can run pods and pods have a lot of configuration, so all of those configurations will go here in the –-conf. And then in the end, you have to have a jar where in all the code that you have mentioned as a class, it’s there.

So, as I mentioned that whenever you are trying to submit an application on Kubernetes cluster, the pod will come up and the pod has to have all those details. For example, your image pull policy, what is the image and if you’re pulling the image from a private repository so you have to give the pull secrets as well. So for all of these configurations, Spark has support for most of it. And some of them are not supported that I’m going to talk about later. For example, you can give the container image, then the executor container image, everything. You can basically customize a lot of things. But again, these are our tons of options. So if you’re doing it, it could be problematic when you’re doing it via the spark-submit. And most of the configuration options are available in Spark configuration. So there is a particular document where you can find all of these and the details about it on what it means, for example, this one, so the driver actually creates those pods. So when it creates those pods, it needs the cluster role or, you know, roll wearing, it has the permissions to create the pod. So that’s where, and the rule will be attached to a service account, and the service account, in turn will be attached to your driver. So you have to give that service account name as well. So if you’re not using any other tool, then you have to do all of that manually. So it’s a lot of work. I mean, whenever you are doing it with spark-submit, it’s quite a lot of work. Okay, so I already mentioned a lot of things that are problematic with spark-submit. When it comes to Kubernetes, it is hard to do things in a configuration manner. So now, this is one of the problems that let’s say you want to define, like 10 things for your pod, so you have to add all of those into your command, and then you have to append those arguments into pod, and let’s say you’re running like 100 jobs. So for 100 jobs, you will have to write those command line arguments 100 times, and that’s a lot of work. The whole point of Kubernetes is that it has a lot of tools like customize and then it has Helm as well. So it actually allows you to customize a lot of things and that’s where it is very easy to use Kubernetes. And if you’re using spark-submit, then that all goes into trade. So you can’t do that. So we have a solution for this. And what’s the solution? It is Spark on K8s operator. So some folks at Google created this operator, and they have open sourced it and the good thing is that they are actively maintaining it. So a Spark on K8s operator is based basically on Kubernetes operator. So Kubernetes operators are using something called mutating admission webhook, which allows you to intercept requests, and then update the pod aspect. For example, let’s say I have written an operator wherein, what it does is just a very particular example that if in AWS, we have to add this x-ray as a sidecar container everywhere. And that’s where this content that you have to add is everywhere. 

And this is just a simple line of code, but we have to add it in each and every deployment and port. So how do you do it automatically? I can create a Kubernetes operator, which actually will use a mutating webhook, which will update the pod spec, and it will add all those details. Add the sidecar container of X-Ray into all the pods. So that’s one of the use cases for mutating admission webhook. So in this case Spark on Kubernetes works on the same principle. It updates your pod spec based on the certain details. And it provides a CRD called a Spark application and should do with a Spark application. I’m going to be primarily talking about Spark application in particular in this talk. So I’m sure if you guys are a fan of Kubernetes, then you know kubectl is just heaven. It makes your work so easy. And that’s where the Spark on Kubernetes operator also comes with a command line tool, which actually helps you to maintain and manage the Spark applications. 

How does it work really? So we have we create a yaml-manifest of custom resource that is in this case, Spark application and then we submit it to the cluster and when we submit it to the cluster So, what happens is the mutating webhook kicks in it because the operator is running and what it does is creates up a particular stateful set and the port and those ports or ports are running on your cluster as a driver port and router ports and it maintains them. So they are not independent, they are being maintained and created and deleted by the corporate itself. So that’s what the entire work workflow offers for Kubernetes operators. 

So how do you install Spark on the Kubernetes operator? You can install it using Helm so you have a very good installation wherein you can just add the repo. You know that Helm has deprecated the central Helm repo so you can just use it, so everybody has to use their own repo now. So you can add their own Spark operator repo and then you can install it using Helm upgrade. helm install or helm upgrade for subsequent install issues. Any updates and Helm install is for the first time. So that’s where you have to define the release name, then you have to give the Helm chart that is power cooperative /Spark operator. Now in this case I’m installing a Spark in the namespace Spark. So that’s where it seems if it’s not there, but I’m giving the option of –create -namespace, so it’s going to create it automatically if it’s not there. By default, this operator doesn’t have the webhook enabled. So what happens is that when I say that the webhook is not enabled, then it’s not going to update your pod specs with a lot of other things that you have created. But so it will just manage the lifecycle of your driver and execute reports. But you have to explicitly set it to webhook dot enabled and that’s where it’s gonna create a certificate and add it so that you can use it for updating reports. Okay, one important thing here is that you have to tell the Spark operator, where do you intend to run those Spark jobs because when you install this caters operator, it sort of creates a rule and rule binding which allows you to manage the resources in that namespace. So that’s why it needs you to specify the Spark job namespace. Since in this case, I’m using a Spark namespace to install the operator as well as to run the jobs as well, so in this case it is redundant that you don’t have to specify it, but if you let’s say are installing the Spark operator in Spark operator namespace and running the jobs in Spark namespace, then you have to specify the Spark domain space here

Okay, then next thing is you can also so what happens is whenever you’re doing this default installation the service account that comes in that you have to use everywhere in your Spark application whenever you’re writing the Spark application in manifest so the service account will have the name something like your release- Spark for in this case, it would come as spark-operator/spark-operator so that looks weird. So you can just override it by specifying this value serviceAccounts.spark.name=spark. I always tried to set the log level to higher value ,because when you’re doing it, you know what is going on into the operator and how it’s getting those details and what is it doing? Basically, it’s very important for you to debug any jobs, whether it is updating them or not, whether it’s updating the port spec or not, what it is doing, whether it’s even getting those interceptors request or not, so all those details are kept in the log. So you can tune the log level based on your comfort level of how you want to debug or not. Then again you have to define the version of this operator and then just wait for it to be installed. This is how you do a Spark on K8s operator installation. Let’s see how to write the Spark application. The CRD. Basically all you have to do is, if we’re creating a pod, so you have to create an application you say API version is equal to B1 and then you say kind is equal to pod similarly here, since we are creating a CRD of Spark application, so here you have to specify the kind of Spark application and the API version. So the API version could be sparkoperator.k8s.io/v1beta1 or v1beta2, so they have multiple versions. So each and every version has different functionalities. So you can use any of it and then you have to specify the namespace. So in this case, when I installed  Spark, I gave the namespace as Spark. So then I have to specify the namespace Spark here as well. If I do not do that, your Spark operator will never be able to intercept those requests and create your driver and executor pods. Then, in the spread, you have to define what type of application it is? Is it Scala or Python? So in this case, it’s a Scala based application. So you can define the cluster mode as cluster, then you have to define a image, so in this case, this gcr.io/spark-operator/version, spark:v3.1.1, it already has this spot examples for jars, that’s what I’m running, and it has the, this jar has the code wherein there is a main class spark-pi and then once you do that, you actually have to define the Spark version as well. And with that Spark version your application has to be compatible with that. So you have to set it then you have to define the driver and executor details like how many ports do you want to allocate to the driver pods? How many ports do you want to allocate to the executor pods and how many instances and you can define the memory as well. And then the labels for example, executor pods will be a lot lesser based on your values you’re setting in the instances. So you can set a label as well in the executor pods, and that would be helpful when you’re debugging, you know, see all the logs from all the executor pods at once. Okay, so once you create this spark-pi application, you know, you create this manifest, all you have to do is to do a kubectl apply -f and this YAML file, that’s it, and then your application has been submitted to the cluster. Now, once you have submitted this application, you can actually describe the status of this application using this command kubectl -n Spark describe and then basically, you have to define the namespace that you want to describe Spark applications and then you can get the status so it will give you all the details of whether application has started or not and any error. So it is actually very useful in the case of Spark application that all the details would be right there. So if you want to debug your Spark application, whether it’s running or not, what is the status or if any executed pods failed and all it would be right there in the Spark applications described. So when you describe it, you will get all the details right there.

Then again, so whenever you submit an application, it will have its own UI, so you can access that UI as well. So, every time you submit an application, it will create a UI service specifically for that application. With the naming convention of your name, (ex: applicationname-ui-svc) so your application name actually plays a crucial role here. So your executor pods and your driver pods will also come with the same name, and then you can port forward to that service and you will be able to access the Spark UI. Okay, now here’s the tricky part. Now you don’t have to do all these things, you know, do run kubectl port forward and all in Spark. As I mentioned about Spark on Kubernetes operator, it has this very nice command line interface, it’s called a sparkctl and it allows you to do everything that you do using kubectl, but in a lesser number of arguments. For example, if you want to see the okay before you could even use it actually. So you cannot download it using brew or anything. So you have to actually build it on your local so this is how you build it you have to just clone the repo. Just go into the folder on Spark SQL which is right there into the Spark on Kubernetes operator repo, and then do a go build and then it will create the binary in your bin file. Now if you want to list the sources it is as simple as sparkctl list -n Spark so it will list all the Spark applications available in this cluster. You don’t have to specifically say kubectl -n Spark list, get and Spark application entire big words, you don’t have to do that. Okay, apart from that, you can actually check the status with the sparkctl as well, you can do a sparkctl status and the same thing, the name of your application and the namespace where you have submitted that Spark application. The good thing about this is it actually provides you a very good way of getting all those locks in one manner. You don’t have to do a -l, just like to get all the executor pods and the label to get the executor pod locks. You can just use sparkctl to get all those logs by just sparkctl log spark-pi -n Spark that will give you all the logs from the executor pods. That’s very, very helpful when you are sending some Spark structured streaming jobs. Okay, so apart from this, you can access the UI as well. 

So you can just say sparkctl forward spark-pi -n Spark and then the name/namespace -n Spark.

Thank you, everyone!