Apache Beam Testing

So you decided to join the wagon of apache beam. Now that you have written your pipeline, you would like to test it.

Apache beam has a full framework for testing your code. You can test each function, Composites, and even a full pipeline. For the full options see https://beam.apache.org/documentation/pipelines/test-your-pipeline/.

Actually at first it seemed like it is very easy to test data pipelines since they are functional by design. So we have a group of functions where each one has an import and export, so it should be easy to test them.

The Problem

The problem is the evil of functional programing - Side Effects). My pipeline is an ETL that gets information for salesforce (and external api) and transforms the data. There are multiple steps (api calls) to get the data, from description of the data to polling count, and then actual jobs and downloads.

Since Apache Beam transformations are stateless, the implementation is in each transformation. From within each transformation we have access to the runtime Options parameters of the pipeline. So within each transformation we create the connection object and connect to the external API.

For the code this is all very nice, but for testing we need to mock the connection object. Since this object is created in the transformation object we don’t have a way to mock the object.

The standard way would be to use a dependency injection, but we don’t have an IOC framework for this. For those in scala there is the cake pattern , but it is very complicated and does not apply to java.

Options - IOC

Since we have the options class that is part of the pipeline framework, and we have access to it in all transformations, we will use it as our injector framework.

Options do not have to be only primitive types, they can be complex type that even include factory methods for creating them.

So, on our options interface we will add the following object:

public interface Options extends DataflowPipelineOptions {
   ...
   void setExternalServices(ExternalServices value);

   @Default.InstanceFactory(ExternalServices.ExternalServicesFactory.class)
   @JsonIgnore
   ExternalServices getExternalServices();
}

The object itself, will hold the reference to the external API services, and this class also has access to the options to get all parameters to initialize the connection.

public class ExternalServices {
   private Options options;
   private AppCredential appCredential;
   private BigQueryService bigQueryService;
   private SalesforceCredentials salesforceCredentials;
   private SalesforceUtils salesforceUtils;

   public ExternalServices(final Options options) {
       this.options = options;
   }

   public AppCredential getAppCredential() {
       if (appCredential == null) {
           appCredential = new AppCredential(options.getP12KFileName(), options.getServiceAccountId());
       }
       return appCredential;
   }

   @VisibleForTesting
   public void setAppCredential(AppCredential appCredential) {
       this.appCredential = appCredential;
   }

...

   /**
    * A {@link ExternalServicesFactory} able to create a {@link ExternalServices} using
    * any transport flags specified on the {@link PipelineOptions}.
    */
   public static class ExternalServicesFactory implements DefaultValueFactory<ExternalServices> {
       /**
        * Returns an instance of {@link GcpProjectUtil} based on the
        * {@link PipelineOptions}.
        */
       @Override
       public ExternalServices create(PipelineOptions options) {
           LoadSalesforceData.Options pipelineOptions = options.as(LoadSalesforceData.Options.class);
           return new ExternalServices(pipelineOptions);
       }
   }
}

External settings of the API services can be done via testing only, by adding the VisibleForTesting annotation.

Now code in my application will look like:

final SalesforceCredentials salesforceCredentials = options.getExternalServices().getSalesforceCredentials();

So code in my testing will look like:

BigQueryService bigQueryService = Mockito.mock(BigQueryService.class);
Mockito.when(bigQueryService.getDatasetId()).thenReturn("salesforce");
opts.getExternalServices().setBigQueryService(bigQueryService);

As you can see we have managed to mock out the classes that send external API requests, and we can now fully test our pipeline.