Reactive Programming - real world example

Reactive Programming - real world example

Reactive Programming is the new hype and it is here to stay. The new version of Spring is fully based on reactive programing.

I would like to bring a real world example that will show the advantages of reactive programing even in a case where the source is a blocking api (most cases show the advantages of non blocking source like mongodb).

The source in our example is salesforce. Salesforce is a SAS CRM solution that has a rest api to update or retrieve information from the database. The rest api allows us to retrieve up to 2000 records per request (depending on the column count). So if we want to get 100,000 records we will need 50 requests. Each request is a round trip on the internet. We are using the force rest api which is a blocking rest client.

So let’s write some code using the java 8 stream feature to give us a stream of records from salesforce:

public <T> Stream<T> find(Query query, Class<T> type, String tableName) {
     List<T> records = new LinkedList<>();
		QueryResult<T> res = forceApi.query(query, type);
		while (!res.isDone()) {
			res = forceApi.queryMore(res.getNextRecordsUrl(), type);
			records.addAll(res.getRecords());
		}
		return records.stream();
}

This code will give us a stream of records, and will open to us all the features of streaming (map, filter, reduce…).

The consumer code would look similar to:

int value = salesforceDAO.find("select * from Account", "Account")
     .filter(account -> account.getAmount()>50)
     .mapToInt(account -> account.getAmount())
     .sum();

System.out.print(value);

What is the drawback? The stream will not be built until all 100,000 records are downloaded. This means that we need to wait until all 50 rest requests are done, and all records are in memory. So we are blocked, wasting both time and memory.

How would this code look like if we were using reactive style:

public <T> Flux<T> find(Query query, Class<T> type, String tableName) {
  return Flux.create(fluxSink -> {
     try {
        QueryResult<T> res = forceApi.query(query.getQuery(), type);
        for (T record : res.getRecords()) {
           fluxSink.next(record);
        }
        while (!res.isDone() && !fluxSink.isCancelled()) {
           res = forceApi.queryMore(res.getNextRecordsUrl(), type);
           for (T record : res.getRecords()) {
              fluxSink.next(record);
           }
        }
        fluxSink.complete();
     } catch (Throwable t) {
        fluxSink.error(t);
     }
  });
}

What would the consumer code look like:

salesforceDAO.find("select * from Account", "Account")
     .filter(account -> account.getAmount()>50)
     .map(account -> account.getAmount())
     .reduce((a,b) -> a+b)
     .subscribe(value ->{
        System.out.print(value);
     });

As you can see we do not accumulate the results to a stream, but pass them back to the calling code using the onNext method. This code will allow the calling code to start processing the data as it comes in, and you do not have to wait for all the data to download. This will save both time and memory since we do not need to have all records in memory at the same time.

Also just like the stream api, the reactive stream also support the full variety of methods like map, reduce and a lot more.

For more information see https://projectreactor.io/, or http://reactivex.io/

For complete code see: https://github.com/chaimt/TurelUtils/tree/master/src/main/java/com/force/dao

Backend/Data Architect

Backend Group
Thank you for your interest!

We will contact you as soon as possible.

Send us a message

Oops, something went wrong
Please try again or contact us by email at info@tikalk.com