Asynchronous functions, actors and CPS

Using  Scala we can easily create lock free multi-threaded code, while still maintaining a familiar programming model.

 

There are two issues of writing multi-threaded code: synchronizing shared access to resources and running code asynchronously. In Java this is usually done with locks and implementing interfaces that wrap the asynchronous code. This leads to code that is cumbersome to write, error prone and inefficient in how it uses threads (since a thread may block for an unspecified time waiting on a lock).

 

In Scala we have actors (of course there are actor libraries for Java also, they are just less popular).  However, using actors means giving away on type safety (we cannot know the exact set of messages an actor handled) and is also cumbersome to use as a replacement for method calls.

 

Instead of just calling a method, one has to create a message and the actor needs to receive it and call the actual method. There are situations where actors provide the right abstraction, by being able to determine the order in which messages are handled, but this is not the focus of this post.

 

In this post I’d like to present an alternative way of programming, that will utilize actors behind the scenes, giving front stage to objects and method calls.

 

First, let’s model an asynchronous method. Such a method handles the computation in another thread. Since we don’t want to block, we create a followup function that is called once the computation is done:


def mult(i: Int, j: Int)(followup: Int => Unit)

 

We use it like:

 

mult(3, 4) {r => println(r)}

 

In this case, implementing mult is trivial:

 

def mult(i: Int, j: Int)(followup: Int => Unit) =
  followup(i * j)


(in general, followup should probably modeled as an Either of value or error, but I’d like to keep things simple for this post)

 

The body of mult runs in the thread of the caller. This is not good in the general case where a method modifies the state of an object, or where other methods modify the state and we want to avoid collisions.

 

A way of synching all methods is through a use of an actor. The trivial way is to create an internal actor inside the object and send it messages that do the computation:

 

class Obj {
  case class Mult(i: int, j: Int, followup: Int => Unit)
  def mult(i: Int, j: Int)(followup: Int => Unit) =
    impl ! Mult(i, j, followup)
 
  val impl = actor {
    loop {
      react {
       case Mult(i, j, followup) => followup(i * j)
      }
    }
  }
}
 

 

Of course  this is very cumbersome. The messages are used internally and quite trivially. Instead, we can use the actor to evaluate functions that each method creates:

 

trait AsyncSupport {
  val syncActor = actor {
    loop {
      react {
       case x: Function0[Unit]  => x()
      }
    }
  }
 
  def sync(body: => Unit) = syncActor ! {() => body}
}

 

(This is of course not a full implementation, need to add handling of the actor’s exit and maybe explicit start)

 

Then implementing mult is:

 

class Obj extends AsyncSupport {
  def mult(i: Int, j: Int)(followup: Int => Unit) =
    sync{followup(i * j)}
} 

 

So far, so good. But when we start creating more and more such function, code starts to become unreadable. For example, consider the expression (a + b) * (c + d). With asynchronous methods this would be:

 

add(a, b){r1 => add(c, d){r2 => mult(r1, r2)(followup)}}

 

One solution is to use for comprehension to hide the use of followup functions. Then code will look like:

 

for (r1 <- add(a,b);
       r2 <- add(c,d)
      )  followup(mult(r1, r2))

 

This has several drawbacks: first, it isn’t much more readable than the original code, second, it forces us to redesign our methods to conform to the rules of for comprehension, and thus make them less usable outside of its context.

 

Instead, we can use CPS

 

This post is not intended to introduce CPS, just to show its application here. I recommend searching for related articles (one linked below).

 

As a brief overview, CPS allows to take code that looks like this:

 

expr1 {a => expr2(a)}

 

And turn it “inside out” into something like

 

reset {
  val a = shift {expr1}
  expr2(a)
}

 

‘shift’ and ‘reset’ can be given other more descriptive names.

 

For example, it allows taking this code:

 

List("x","y","z").flatMap{left =>
   List(4,5,6).flatMap{right =>
     List((s,i))
  }
}

 

And transforming it to:

 

reset {
  val left = List("x","y","z")
  val right = List(4,5,6)
 
  List((left.reflect[Any], right.reflect[Any]))
}

 

(I didn’t include the definitions of ‘reflect’, you can find it here: http://dcsobral.blogspot.com/2009/07/delimited-continuations-explained-in.html)

 

This is close to the sugar magic being done by for comprehension:

 

for (left <- List("x","y","z");
       right <- List(4,5,6))
    yield(List((left, right)))

 

So, in our case, the aim is to be able to rewrite

 

add(a, b){r1 => add(c, d){r2 => mult(r1, r2)(followup)}}

 

As something like:

 

val r1 = add(a,b)
val r2 = add(c,d)
mult(r1, r2)

 

One way is to completely transform ‘add’ and ‘mult’ into CPS methods.

 

def add(i: Int, j: Int) = shift {k: (Int => Unit) => sync{k(i + j)}}
def mult(i: Int, j: Int) = shift {k: (Int => Unit) => sync{k(i * j)}}

 

And use it:

 

scala> def followup(r: Int) = println("Result: " + r)

followup: (r: Int)Unit

scala> reset {

     |   val r1 = add(1,2)

     |   val r2 = add(3,4)

     |   val r = mult(r1, r2)

     |   followup(r)

     | }

Result: 21

 

A disadvantage with this approach is that now our methods are only usable in the context of CPS (‘reset’). It means making it harder to do simple compositions or allow the “straight forward” use for those that want it.

 

Let’s look again at the original definitions:

 

def mult(i: Int, j: Int)(followup: Int => Unit) =
    sync{followup(i * j)}

 

It is obvious that ‘followup’ is our ‘k’ from the CPS code.

 

So how do we go from a simple method with followup argument to a CPS method? With partial application:

 

scala> mult(3,4) _

res3: ((Int) => Unit) => Unit = <function1>

 

The result of the partial application is a function that given a function (Int => Unit) will perform some computation.

 

To use it, we define some syntactic sugar:

 

  type Followup[Result] = Result => Unit
 
  class CpsSupport[Result](f: Followup[Result] => Unit) {
    def ! = shift {k: Followup[Result] => f(k) }
  }
 
  implicit def cps[Result](f: Followup[Result] => Unit) = new CpsSupport[Result](f)

 

 

Then, our methods are defined in the standard way, without CPS, and the usage is:

 

scala> reset {

     |   val r1 = add(1, 2) _!

     |

     |   val r2 = add(3, 4) _!

     |

     |   val r = mult(r1, r2) _!

     |

     |   followup(r)

     | }

 

scala> Result: 21

 

An alternative way is to put the body of ‘!’ directly in ‘cps’:

 

def cps[Result](f: Followup[Result] => Unit) = shift {k: Followup[Result] => f(k) }


 

 

 and use it as:

scala> reset {

     |   val r1 = cps(add(1, 2))

     |

     |   val r2 = cps(add(3, 4))

     |

     |   val r = cps(mult(r1, r2))

     |

     |   followup(r)

     | }

 

scala> Result: 21

 

 

(Note: The newlines between the calls to methods are required, otherwise Scala thinks that each line is a parameter to the previous one and complains that ‘!’ takes no parameters)

 

The methods ‘add’ and ‘mult’ need to explicitly use ‘sync’ this is both to guard their body (‘i * j’)  and the call to followup, which maybe any code, including code that modifies the object and therefore needs to be synced. Instead, the method ‘!’ can call sync. This can be done explicitly, or using an implicit adapter that receives ‘k’ and adapts it. I leave this as an exercise for the reader. 

 

This adapter can further be used if the followup method is of arity higher than 1. In that case, ‘k’ will be a function receiving a tuple and the adapter’s job will be to convert it to FunctionN

 

 

 

 

 

 

 

 

 

 

 

 

 

Thank you for your interest!

We will contact you as soon as possible.

Send us a message

Oops, something went wrong
Please try again or contact us by email at info@tikalk.com