SparkJob testing with Live Data

There is this one spark job which gave us quite a headache. In the beginning, it seemed quite simple. Extract the data, aggregate it in some way or another, filter it, enrich the data, and in the end, save it to a database. The real “challenge” here was to loop over the source data a couple of times and create different aggregations to be saved in the resulting database, in the most efficient manner. Simple enough. The whole pipeline was written in Scala on Apache Spark with not too many lines of code.

There were even some tests. Unit Tests. Scala Test, of course, ensuring the different modules and functions performed as expected. Some of the tests were logical test and for others, we extracted some real data, saved in a file, and read into the tests.

And then we started deployment and all hell broke loose.

Testing data-driven applications/pipelines

For a service application, the input is much more defined, the environment is much more controlled. A request which does not conform to a specific standard is more likely to be discarded than anything else. There is mostly some form of interaction between the client and the server providing a form of feedback for problematic input.

None of the above is true for data-driven applications, specifically data pipelines. Of course, the internal logic can be tested with Unit tests, but events cannot be simply discarded when not conforming to a specific standard. The pipeline must not fail, the data must not be lost.

For this reason, it is not enough to just write some tests with specific input, a snapshot of the data. It is imperative to test the pipeline using real live data at any given time.

So there I was, trying to figure out how to tell my team leader how my pipeline is correct, it is the data that is wrong.

Data Pipeline Integration Tests

So my team leader Eran Shemesh had this great idea. Run a pipeline, which tests your pipeline. It is quite easy when you think of it, although getting all the smaller pieces to work together is quite tedious. So here it goes:

1. Break apart your pipeline into testable pieces such as ETL

2. Write another pipeline that uses the main pipeline as an API

3. Your test should test the outcome of the main pipeline functionality

Using this setup, I can run my testing pipeline in the same environment as the main pipeline, even extracting the same data and run my tests on real live data.

Depending on how I’ve written my code, I can control which data is being loaded and which data is being run through the transformation functions. I could even control the amount of data that is being run in this testing pipeline. But most importantly, I can run this against real live data and gain higher confidence in mission-critical pipelines.

And since this is a testing pipeline, I will want to run this automatically using my CI system

OK, Nice, But what can we test

Well, the interesting thing is, we can write tests that were previously unavailable to us in a local testing environment. Remember, our tests run in a spark environment. Following is a list of possible tests that can be run:

  • I can run many different pipelines just to see if the data matches.
  • I can do some statistical analysis of the previous data and check if the new data is within those bounds.
  • I can run my tests on previous data and compare the output to the current real output.

What we cannot test

On the other hand, it is important to note what we cannot test.

You have no control over the data which is being processed. There could be “missing” data, e.g. data points not present while running your testing pipeline. Or what about a batch job running for 2 hours. Are 2 hours of live data significant enough to for us to determine the tests passed?

As with all tests, they are complementary, not instead of. It will not replace your regular Unit Tests or any other tests running sample data through your pipeline.

Implementation

Our technology stack is a pretty standard Apache Spark stack:

  1. Apache Spark
  2. Scala
  3. Maven

Jenkins is being used from CI/CD automation

and our deployment model based on Docker, K8s, Helm, and Sparkoperator.

Project Setup

We created a multi-module maven project consisting of the following projects:

  1. main pipeline
  2. testing pipeline

Given a project on currency aggregation, the setup looks like the following

currency-aggregation
 — currency-aggregation-pipeline
 — currency-aggregation-test-pipeline

Each one of these projects has a Dockerfile associated with it and in the case of the main pipeline, there is even a helm chart.

Main Pipeline

This is essentially the business logic. The important aspect of this project in the current context is that the functionality can be easily tested and each part of the “ETL” can be used and tested separately.

Testing Pipeline

This is where all the magic happens. The important aspect of this project in the current context is that the “testing pipeline” project has the “main pipeline” as a dependency. This is essentially what allows us to write tests and call the various functions we want to test.

Testing Framework/Library

As our basic Testing Framework, we used ScalaTest. But we did not want to simply run the tests. We wanted the tests to run after initiating a Spark Session. The idea is to pass the SparkSession to each test to be run.

For this purpose, we wrote an abstract class from which all our tests need to inherit.

import org.apache.spark.sql.SparkSession
import org.scalatest.Suite
abstract class SparkTest(sparkSession: SparkSession, args: Array[String]) extends Suite

The purpose of this abstract class is twofold:

  1. Conform to the standard and make sure the test can receive the SparkSession
  2. Recognize these specific tests

The second point is quite important as we wrote a small helper class that scans and finds all of these tests and invokes them, passing the spark context object as a parameter. The main function of this `TestRunner` helper class can be seen here:

def runTests(testClasses: Iterable[Class[_ <: T]], args: Array[String]): Seq[TestResult] = {
val constructorArgs = Array[AnyRef](sparkSession, args)
testClasses.map(c => {
c.getConstructors()(0).newInstance(constructorArgs:_*).asInstanceOf[Suite]
}).flatMap(suite => {
suite.testNames.map(name => {
(name, suite)
})
}).map{case (name, test) => {
println(s"Running test $name")
val reporter = new PassFailReporter()
test.run(Some(name), Args(reporter))
reporter.waitForCompletion
}}.toSeq
}

And of course, we had the main class, something which would kick this whole process in motion. The SparkTestRunner class:

def main(args: Array[String]): Unit = {
val sparkSession: SparkSession = SparkSession.builder()
.config(new SparkConf().setAll(ConfigFactory.load().configAsMap("spark")))
.getOrCreate()

val allTestsPassed = new TestRunner[SparkTest](sparkSession)
.runTests("com.orgname", args)
.map{testResult => {
println(testResult.statusString())
testResult.result
}}.foldLeft(true)(_ && _)

if(!allTestsPassed){
println(s"Test Failures")
sys.exit(-1)
}else{
println("All Tests Succeeded")
}
}

Handling Asynchronous Tests

We are running the newly instantiated tests using the run function in the Suite trait

org.scalatest.Suite.run(testName: Option[String], args: Args): Status = {…}

What is important to know is that while this function returns a Status object, it is the status of whether the test was able to run, not the outcome of the tests. This works asynchronously.

I order to deal with the asynchronicity of this function we needed to implement a reporter that inherits from the org.scalatest.Reporter trait and make it synchronous. Luckily for us, we had a great example which can be found as a private class at org.scalatest.tools.Runner.PassFailReporter. Although that class is private, I will leave it to your imagination on how to solve this puzzle.

CI/CD Setup

And here is where this whole beauty comes together. While running the CI, the pipelines are being built, the images are being created and when all those stages are passed, the helm chart is being activated switching from the main pipeline image to the testing pipeline docker image.

stage('Run Test Pipeline') {
def applicationState = sh returnStdout: true, script: """
helm template testing target/helm --set image.versionOverride=${buildVersion}-TEST --set application.mainClass=com.pipeline.test.infra.SparkTestRunner --namespace 'test-space' > test.yaml
kubectl apply -f test.yaml -n 'test-space'
sleep 10;
podname=\$(kubectl get sparkapplication [app_name] -n 'test-space' -o 'jsonpath={..status.driverInfo.podName}');
kubectl logs -f \$podname -n 'test-space'
kubectl get sparkapplication testing-users-aggregation -n 'test-space' -o 'jsonpath={..status.applicationState}'
"""

if(applicationState.contains("FAILED")){
error("$applicationState")
}
}

Conclusion

Leveraging the existing infrastructure of tools, libraries, and techniques, we were able to test our pipelines with real live data. By just adding another pipeline or project, we now can easily implement changes and compare outcomes to actual data production data and outcomes. We can run “heavy” loads on the tests and take the pipelines to a new level by running the tests in a production-like environment, not having to rely on excerpts of data from some time in the past,

Of course, there are drawbacks, such as longer testing cycles and “bulkier” projects. Also, the project itself and the CI/CD process just got a bit “messier”.

But on the other hand, we have just found and a new level of confidence in our testing.


SparkJob testing with Live Data was originally published in Everything Full Stack on Medium, where people are continuing the conversation by highlighting and responding to this story.

Backend Architect & Tech Lead

Backend Group

Thank you for your interest!

We will contact you as soon as possible.

Want to Know More?

Oops, something went wrong
Please try again or contact us by email at info@tikalk.com
Thank you for your interest!

We will contact you as soon as possible.

Let's talk

Oops, something went wrong
Please try again or contact us by email at info@tikalk.com