Reactive Programming and locking

Programming paradigms have been changing throughout the years. Lately we have seen a move from object oriented to functional programing via the reactive extensions. This change brings with it new challenges. One challenge that I would like to share with you is how to lock a process.

There are times where we will externalize a rest api that does some processing, but we would like to make sure that this process is not executed multiple times simultaneously. To do this we use locks that are part of java. For example:

public List<BatchInfo> calcData() {
     syncLock.lock();
     try {
        List<BatchInfo> dataList;
        for (somthing){
          data.add(data);
        }
        return dataList;
     } finally {
        syncLock.unlock();
     }
}

As you can see the syncLock (ReentrantLock) will make sure that this code is not run more than once at the same time. Now let's write this code with the reactive paradigma:

public Flux<BatchInfo> calcData() {
     syncLock.lock();
     try {
        List<BatchInfo> dataList;
        for (somthing){
          data.add(data);
        }
        return Flux.just(dataList);
     } finally {
        syncLock.unlock();
     }
}

Code looks the same so we think that it will work. But simple debugging will show that it does not do what we thought. Since reactive programing is similar to an sql plan. The code above is the plan to run but does not actually run the code. Only after the caller to this method calls on the Flux object subscribe will the code actually run. So what the lock is doing is just locking the creation of the plan but not the actual run of the plan. To properly use the lock you need to:

public Flux<BatchInfo> calcData() {
  return Flux.create(fluxSink -> {
     syncLock.lock();
     try {
        try {
           for (somthing){
                fluxSink.next(data);
            }              
           fluxSink.complete();
        } catch (Exception e) {
           fluxSink.error(e);
        }
     } finally {
        syncLock.unlock();
     }
  };
}

In this code you can see the producer of the flux source. The lock needs to be in the code that produces the data, and not the code that builds the data pipeline.

So as you can see you can lock processes also in reactive programming so that they do not run twice in the same time, but the way to do it is different the standard object oriented programming.