Managing State in Akka Actors

/media/akka1.png

Preface

Reactive programming is not a new concept but it’s gaining more and more popularity in the last couple of years, especially with frameworks and libraries such as Vertx, RxJava, Akka ,Reactor etc. Even the WebFlux of Spring Boot is becoming the new standard on building RESTfull API’s with reactive streams in mind. One should also look at the reactive manifesto in order to understand its goals and promise.

In this post I would like to present a common problem when working with such tools, and I will demonstrate it on the Akka framework (I assume you’ve heard about it a bit since this is not an introduction tutorial of Akka).

One of the key features of Akka actors its their ability to manage state in a thread-safe manner. Since each actor reacts on its own mailbox and there is guarantee that message pumping from that mailbox is guarantee to be on a single thread (A.K.A the Reactor pattern , similar to Node.js for example) the actor needs not to handle concurrency, context- switching and lock mechanism. So each actor is in fact an isolated state (even though state can be shared between actors)

/media/actorNetwork.png

Tutorial Steps

But how can we handle state ? let’s start with a simple problem such as a word-count example. The task will be broken to these parts :

  1. Build a Generator/ProducerActor (in the future this can be infinite stream source such as a stream from a real-time events, twitter stream ,etc)

  2. Build an “Aggregator” to hold the word-count -> This is our state in the system and print the Statistics (“Stats” in this article) to the screen every 5 seconds . in the future we can expose this as a Rest endpoint.

  3. Refactor our code to use an akka feature around states and behaviour.

The Producer

You can start with the Lightbend predefined template and refactor the code. Assuming you already have an SBT project with the actor dependencies , let’s add our first Actor and its Data Source object.

note: The source code for this article can be found here

First we need a “dummy” data source

  object DataSource {
  var rnd:Random = Random

  val lines:List[String] =
    List("the cow jumped over the moon",
    "an apple a day keeps the doctor away",
    "four score and seven years ago",
    "snow white and the seven dwarfs",
    "i am at two with nature")

  def getMessage = {
    lines(rnd.nextInt(lines.length))
  }
}

Then we will add an actor that sends this data to some other actor which is called the StatsActor. In a more complex pipeline the Generator will know nothing about its destination and the structure of the pipeline and just pass the data further down to whichever actorRef it is told.

We need some case classes for this demo, I’ve put them in a model.scala file:

  case object StartProducer
case class Message(line:String)
case object GetStats
case class  StatsResult(stats:Map[String,Int])

….and the Generator Actor with its companion object

object GeneratorActor{
  def props(statsActor:ActorRef):Props = Props(new GeneratorActor(statsActor))
}

class GeneratorActor(statsActor:ActorRef) extends Actor{
  import DataSource._
  case object Tick
 
  override def receive: Receive = {
    case StartProducer =>
      println("Producer started")
      context.system.scheduler.schedule(
        0 milliseconds,
        750 milliseconds,
        self,
        Tick)

    case Tick =>
      statsActor !  Message (getMessage)

  }
}

I am using a Scheduler here to send the data every 750 milliseconds since this is just a simulation.

In the Tick Event we simply build a new Message which holds a single message and send it to the Stats Actor. so far nothing new here.

Building the StatsActor (a.k.a ‘state’)

Our first attempt to build the Stats actor will require us to hold the state in a form of a Map[String,Int]. As scala functional programming prefers the flavour of Immutable objects we will use an immutable map. But , we keep on changing this map, right ? inserting more and more words into it will require us to modify the map, and we still want to stay under Scala’s principles , since those principles helps us maintain our code in a thread-safe manner without synchronisation mechanisms.

package com.ts.stats

import akka.actor.{Actor, ActorRef}

class StatsActor extends Actor {

  private def wordsToMapCount(line: String): Map[String, Int] = {
    line.split("\\W+")
        .foldLeft(Map.empty[String, Int])
((map: Map[String, Int], next: String) => map + (next -> (map.getOrElse(next, 0) + 1)))
  }

  var statsMap:Map[String,Int] = Map.empty.withDefaultValue(0)

  override def receive: Receive = {
    
   case Message(line: String) =>
      val currentMap = wordsToMapCount(line)
      
       statsMap ++= currentMap.map { 
        case (k, v) => k -> (v + statsMap(k)) 
      }

    case GetStats =>
      sender ! StatsResult(statsMap)
  }
}

As you can see we manage our state with a variable. Of course we could have used some persistence layer (Redis/CouchBase/MongoDb/Aerospike or any other Key/value store), and that would allow us to scale in the future and prevent single-point-of-failure But for this discussion lets keep it simple since adding a distributed persistence layer is not possible in all cases.

Wait a minute. Do I see a “var” in scala code ?

/media/omg.jpg

So we do have a “var” in our code, even though our map is immutable map hence each change to it produces a new map.

We will need to add a bit more code to support some printing on the screen, so instead of building a rest endpoint, we will add another timer to print the results on the screen. In the Generator actor add this code under the previous scheduler :

context.system.scheduler.schedule(
  0 milliseconds,
  5 seconds,
  statsActor,
  GetStats)

And then another event handler for the StatsResult message :

case StatsResult(stats) =>
  println(stats)

Once we ran this little app we can view the word-count output :

Encapsulating State by using State “Container”

Storing the state in a single var for this solution is not so terrible. but we can do better. There is a feature in scala that will allow us to maintain a “behaviour-change” rather than a variable. I am referring to “Become”.

Consider the following modification to the Stats Actor :

object StatsActor {

  case class StatsMaps(wordCounts:Map[String,Int] = Map.empty.withDefaultValue(0))

  def props():Props = Props[StatsActor]

  def wordsToMapCount(line: String): Map[String, Int] = {
    line.split("\\W+")
      .foldLeft(Map.empty[String, Int])((map: Map[String, Int], next: String) => map + (next -> (map.getOrElse(next, 0) + 1)))
  }
}

class StatsActor extends Actor {

  import StatsActor._

  override def receive: Receive = updated(StatsMaps())

  private def updated(stats:StatsMaps):Receive = {

    case Message(line: String) =>
      val currentMap = wordsToMapCount(line)
      val wcMap = stats.wordCounts ++ currentMap.map { case (k,v) => k -> (v + stats.wordCounts(k)) }
      context.become(updated(stats.copy(wordCounts = wcMap)))

    case GetStats =>
      sender ! StatsResult(stats.wordCounts)

    case _ => throw new IllegalStateException("Unkown event was sent to StatsAggregatorActor")
  }

}

So we got rid of the “var” while maintaining a state as a behaviour change. You can think of the “become” feature as a context or finite state machine mechanism. once it is activated with a new data/state the actor “becomes” that state.

This technique is saved for cases where you have multiple behaviours and each one might have multiple states. It does come with a drawback since managing a “var” is much easy and I admit it might get complicated when you try to debug and maintain the code.

note: The code for phase 2 can be found here

Anyhow it is a nice tool worth knowing.

Backend Senior Developer

Backend Group
Thank you for your interest!

We will contact you as soon as possible.

Send us a message

Oops, something went wrong
Please try again or contact us by email at info@tikalk.com