Scheduling Dataflow pipelines

Google has a product called Dataflow. Dataflow is a engine for processing big data. Google took the dataflow model and externalized it as Apache Beam Model (see https://beam.apache.org). The idea behind Apache Beam, is a generic model that deals with both streaming and batch including windowing with the same code base. In addition the Apache Beam support multiple runners that can run on different implementations such as spark, flink and others.

This blog will describe how to deploy a scheduled pipeline to the google service.

I will start out saying that I am very disappointed with google, as you will see how difficult it is to schedule a run of your pipeline. I had assumed that this would be a built in feature since it is as basic a feature that you can have.

Manual deploy

So how do you manually deploy or run your pipeline?

Since the basic concept behind the apache beam project is the option to run it on multiple engines - one of the engines is a local runner. So if you don’t specify a runner the default one is the local runner. The local runner can run from your local ide and with it you can debug your project (DirectPipelineRunner). Another runner is the DataflowRunner that will run the pipeline on the google dataflow platform. For a comparison of runners see capability-matrix.

After you have written your project with the apache beam sdk pipeline, the output of the project can be a group of jar’s or an uber jar. Deployment can also be done either as all jars in a directory or as an uber jar (to google cloud storage).

To deploy your project to google all you need to do is to run the main class and specify the runner as DataflowRunner. For this you can run it from the command line with java, or via maven:

mvn compile exec:java -Dexec.mainClass=com.company.MainClass --parameters

This will deploy all the jar’s to google cloud storage, and the dataflow platform will read them from there (you must supply as part of the run parameters the location in the cloud, for example: --stagingLocation=gs://bq-data3/SFtoBQApp/).

When making changes and rerunning it is very convenient to load all the jars separately. The dataflow uploader code compares the jars in the target to what is in the storage. Since usually the change is only in your jar, the upload will be faster as it will upload only the single jar. Once you move to production, you will want to deploy an uber jar. To deploy the uber jar use the flag filesToStage, for example: --filesToStage=./target/migration-dataflow-bundled-1.0.0-SNAPSHOT.jar

You need to remember that when running on multiple machines, dataflow needs to copy the code to the added nodes, and this is more efficient with an uber jar. For other parameters see specifying-exec-params.

Scheduled deploy

We would like to now run our pipeline every X minutes.

The first issue that we need to deal with, is to make sure that the if the pipeline is currently running that we do not run it twice at the same time.

Note

Apache Beam is different in it's orientation than spark, in the deployment of the pipeline. In spark you deploy your job to the cluster and it runs in the cluster (one machine in the cluster is the driver). While in apache beam the graph of the execution is calculated before the deployment, so all driver code actually runs on the client machine before the deployment. See Dataflow Service Description.

Luckily google supplies an api that you can use to query the jobs currently running in the cloud, and then with that information decide if to run or quit. For example code to check job status see DataFlowUtils.java.

Obviously we could use a tool like jenkins or a simple linux cron to deploy and run our application. But since the upload takes time, we would prefer to use a google tool that sits in the same network location.

Google suggest two solutions. Both are built on the same idea: Create an application server that runs the dataflow pipeline, then create a cron job to send a rest request to the application server. There are other novel solutions like using pub/sub, but we will not discuss them here.

AppEngine Solution

I decided to go with the appengine solution, but found that the documentation was very lacking, and the implementation was not simple at all.

To begin with appengine has two types of apps. You can create a webapp based on java 7 (basic env) or if you want java 8 and above you need to go with docker (flex env).

The problem is that the flex env is only supported in the us region, and once you create the appengine you cannot change the region. So you need to delete your full google project and recreate it just to create the app.

The app itself

The app will be a wrapper around the jar from the dataflow application. So the appengine will just call the main class of the dataflow just as we did above from the command line.

The disadvantage of this of course, is that any change in the dataflow pipeline code, you need to recompile and redeploy the app engine.

App.yaml

In the app.yaml you need to update it to have the service name and some basic configuration:

Configuration of runtime parameters

runtime: custom
env: flex
service: schedual-dataflow
handlers:
- url: /.*
 script: this field is required, but ignored
 secure: always  # Require HTTPS
manual_scaling:
 instances: 1

Notice the main changes are the runtime custom and the env flex. Since this application is running within a docker container we can use java 8.

Dockerfile

Next we need to fix the docker file to include the jar

FROM gcr.io/google_appengine/openjdk8
VOLUME /tmp
ADD migration-dataflow-appengine-1.0.0-SNAPSHOT-jar-with-dependencies.jar app.jar
CMD [ "java","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]

Note:

For the docker to find the jar and attach it you must add the following to the pom.xml:

<app.stage.artifact>   
${project.build.directory}/migration-dataflow-appengine-1.0.0-SNAPSHOT-jar-with-dependencies.jar
</app.stage.artifact>

Main class

Since all the application needs to do is listen on port 8080 and run the dataflow main class, I decided to go with a lightweight framework of sparkjava. Since I did not want to hard code any parameters that are passed to the main class, I have created a generic class that will pass any parameters sent on the rest request to the main class of the dataflow.

In addition all app engine applications should implement the health check endpoint /ah_health, for more information see how-instances-are-managed.

The main class will look like:

public static void main(String[] args) {
   port(8080);
   get("/dataflow/execute", (req, res) -> {
       try {
           LOG.info(LogUtils.prefixLog("start pipeline"));
           String pipline = req.queryParams(PIPELINE);
           if (pipline==null)
               throw new RuntimeException("pipeline parameter is missing");

           Class<?> aClass = Class.forName(pipline);
           Method method = aClass.getMethod("main", String[].class);
           List<String> appArgs = getArgs(req,Arrays.asList(PIPELINE));
           String[] appParams = new String[appArgs.size()];
           appParams = appArgs.toArray(appParams);
           LOG.info(LogUtils.prefixLog("Running dataflow with {}"), appArgs.toString());
           method.invoke(null,(Object)appParams);
           return "Running with params:" + appArgs.toString();
       } catch (Exception e) {
           LOG.error(LogUtils.prefixLog(e.getMessage()));
           return "error: " + e.getMessage();
       }
   });

   get("/", (req, res) -> {
       LOG.info(LogUtils.prefixLog("request main"));
       return "Scheduler Running";
   });

   get("/_ah/health", (req, res) -> "OK");
}

Cron

Last but not least, we create the cron.yaml (can be put in any part of the project):

cron:
- description: Run SF Export/Import pipeline
  url: /dataflow/execute?pipeline=com.app.LoadSalesforceData&project=BQ-migration&datasetId=salesforce&numWorkers=10
  schedule: every 30 mins
  target: schedual-dataflow

Deployment

You need for the first time to deploy an empty project that is not a docker project. For this we can deploy the demo app: appengine-try-java. The reason is that the appengine needs a default app without a version, but the docker app always has a version (I read wish google would fix this).

Make sure you have installed to google toolkit for cli commands.

To deploy the docker container, you need to run the following command:

mvn appengine:deploy

Once the engine has been deployed, to view the logs from the server run:

gcloud app logs tail -s schedual-dataflow

In the GUI you should see:

image_0.png

This just deploys the appengine application. The cron is actually a separate process in the google cloud, so to deploy the cron request you need to run:

gcloud app deploy cron.yaml

In the GUI you should see:

image_1.png

Summary

Once you have setup the app-engine the scheduling works very nice. But as you can see above it is far from simple to do all the setup.

For the full project have a look at schedule-dataflow-appengine