Scala actors design patterns - part 3

 Actors process messages one at a time. But what if processing a message is a read-only process? Why not be able to process them simultaneously?

 

I’ll first show a direct implementation of the pattern and then a trait that can be reused.

 

First, assume we have 2 messages  for that stand for a read operation and for a write operation (the parameter i is used for print-debug to keep track of what message is processed):

 

case class Read(i: Int)
case class Write(i: Int)

 

We would like to process Read messages simultaneously. The actor itself cannot do that, but it can run another actor which will. So the strategy is to forward Read handling to a temporary actor and maintain a count for all readers, so that we don’t process a Write message while any Read message is being handled. When the handling of Read is finished by the temporary actor to the “parent” one. This message is handled before others (and not pushed to the end of the queue). Another aspect is that we want to prevent starvation, so when encountering a Write message we then stop processing new Read messages to allow all read operations to finish so the Write message can be processed.

 

Here is the code:

 

class RwActor extends Actor {
    private case class ReadFinished()
   
    var readers = 0
    var writer: Option[Write] = None
   
    def executeWrite(w: Write) = println("no readers, executing writer " + w.i)
   
    def executeRead(parent: Actor, r: Read) = actor {
       println("Executing reader " + r.i)
       Thread.sleep(2000)
       println("Finished reader " + r.i)
       parent ! ReadFinished()
    }
   
    def readerFinished = {
       readers -= 1
       println("Readers: " + readers)
       if (readers == 0 && writer.isDefined) {
               executeWrite(writer.get)
               writer = None
       }
    }
   
    def act =
       loop {
         reactWithin(0) {
           case ReadFinished() => readerFinished
           case TIMEOUT =>
               react {
                 case w@Write(i) if writer.isEmpty =>
                   println("Got writer " + i)
                   if (readers == 0) executeWrite(w)
                   else writer = Some(w)
                 case r: Read if writer.isEmpty =>
                   readers += 1
                   executeRead(Actor.self, r)
                 case ReadFinished() => readerFinished
               }
           }
       }
} 

Here how it looks when used:

 

scala> val rw = new RwActor

rw: RwActor = RwActor@cc74e7

 

scala> rw.start

res12: scala.actors.Actor = RwActor@cc74e7

 

scala> rw ! Read(0)

Executing reader 0

scala> rw ! Read(1)

Executing reader 1

scala> rw ! Write(0)

Got writer 0

scala> rw ! Read(2)

scala>

 Finished reader 0

Readers: 1

Finished reader 1

Readers: 0

no readers, executing writer 0

Executing reader 2

Finished reader 2

 

 

You can see that the message Read(1) is processed before Read(0) finished and Read(2) is not processed until Write is processed.

 

To create a more generic actor, we can create a trait to tag Read-Only messages:

 

trait ReadOnly

 

Then a RwActor trait can redefine #react to provide a wrapping for the partial function that is passed:

 

trait RwActor extends Actor {
    private case class ReadFinished()
 
    var readers = 0
    var writer: Option[Any] = None
   
    def readerFinished(f: PartialFunction[Any, Unit]) = {
       readers -= 1
       println("Readers: " + readers)
       if (readers == 0 && writer.isDefined) {
           println("Executing writer " + writer.get); f(writer.get)
           writer = None
       }
    }
 
    override def react(f: PartialFunction[Any, Unit]): Nothing = {
       reactWithin(0) {
         case ReadFinished() => readerFinished(f)
 
         case TIMEOUT =>
             super.react {
                 case ReadFinished() => readerFinished(f)
 
                 case m if f.isDefinedAt(m) && writer.isEmpty =>
                     m match {
                         case m: ReadOnly =>
                             readers += 1
                             val original = Actor.self
                             actor {println("Executing reader " + m); f(m); original ! ReadFinished()}
                         case m =>
                             if (readers == 0) {println("Executing writer " + m); f(m)}
                             else writer = Some(m)
                     }
                            
             }
       }
    } 
}

 

 

To use:

 

case class Add(i: Int)
case class Get(n: Int) extends ReadOnly
 
val rw = new RwActor {
   def act = loop {
       react {
          case Add(i) => println("Adding " + i)
          case Get(i) => println("Getting " + i); Thread.sleep(2000)
       }
   }
}
 


Thank you for your interest!

We will contact you as soon as possible.

Send us a message

Oops, something went wrong
Please try again or contact us by email at info@tikalk.com