akka-typed - EventSourcedBehavior in action

As mentioned earlier, the more important change in akka typed is the addition of EventSourcedBehavior. That is to say, an actor dedicated to EventSource mode is added, and eventually CQRS can be perfectly implemented together with other kinds of actors. The new actor, which I still call persistent actor, is a kind of actor that can maintain and maintain the running state. That is, the internal state of the actor can be stored in the database, and then a set of function functions can be used to provide the transformation of the state, namely, persistence. Of course, as an actor with EventSourcedBehavior, there must be universal actor properties, methods, message processing protocols and supervision. In this discussion, we will illustrate how EventSourcedBehavior maintains internal state and how it should be used as an actor through cases and source code.

Let's use the shopping cart example in the previous discussion, and add some message response mechanisms, mainly to report the shopping cart status:

object ItemInfo {
  case class Item(name: String, price: Double)
}

object MyCart {
 import ItemInfo._

  sealed trait Command 
  sealed trait Event extends CborSerializable
  sealed trait Response 

  //commands
  case class AddItem(item: Item) extends Command
  case object PayCart extends Command
  case class CountItems(replyTo: ActorRef[Response]) extends Command

  //event
  case class ItemAdded(item: Item) extends Event
  case object CartPaid extends Event

  //state
  case class CartLoad(load: List[Item] = Nil)

  //response
  case class PickedItems(items: List[Item]) extends Response
  case object CartEmpty extends Response

  val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
    cmd match {
      case AddItem(item) =>
        Effect.persist(ItemAdded(item))
      case PayCart =>
        Effect.persist(CartPaid)
      case CountItems(replyTo) =>
        Effect.none.thenRun { cart =>
          cart.load match {
            case Nil =>
              replyTo ! CartEmpty
            case listOfItems =>
              replyTo ! PickedItems(listOfItems)
          }
        }
    }
  }

  val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
    evt match {
      case ItemAdded(item) =>
         state.copy(load = item :: state.load)
      case CartPaid =>
        state.copy(load = Nil)
    }
  }

  def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
    persistenceId = PersistenceId("10","1013"),
    emptyState = CartLoad(),
    commandHandler = commandHandler,
    eventHandler = eventHandler
  )

}

object Shopper {

  import ItemInfo._

  sealed trait Command extends CborSerializable

  case class GetItem(item: Item) extends Command
  case object Settle extends Command
  case object GetCount extends Command

  case class WrappedResponse(res: MyCart.Response) extends Command

  def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
    val shoppingCart = ctx.spawn(MyCart(), "shopping-cart")
    val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse)
    Behaviors.receiveMessage { msg =>
      msg match {
        case GetItem(item) =>
          shoppingCart ! MyCart.AddItem(item)
        case Settle =>
          shoppingCart ! MyCart.PayCart
        case GetCount =>
          shoppingCart ! MyCart.CountItems(cartRef)
        case WrappedResponse(res) => res match {
          case MyCart.PickedItems(items) =>
            ctx.log.info("**************Current Items in Cart: {}*************", items)
          case MyCart.CartEmpty =>
            ctx.log.info("**************shopping cart is empty!***************")
        }
      }
      Behaviors.same
    }
  }

}


object ShoppingCart extends App {
  import ItemInfo._
  val shopper = ActorSystem(Shopper(),"shopper")
  shopper ! Shopper.GetItem(Item("banana",11.20))
  shopper ! Shopper.GetItem(Item("watermelon",4.70))
  shopper ! Shopper.GetCount
  shopper ! Shopper.Settle
  shopper ! Shopper.GetCount
  scala.io.StdIn.readLine()

  shopper.terminate()

}

In fact, there is also a reply mechanism embedded in EventSourcedBehavior. After a Command is processed, it must reply to the instruction side. Otherwise, the program cannot be compiled. As follows:

private def withdraw(acc: OpenedAccount, cmd: Withdraw): ReplyEffect[Event, Account] = {
  if (acc.canWithdraw(cmd.amount))
    Effect.persist(Withdrawn(cmd.amount)).thenReply(cmd.replyTo)(_ => Confirmed)
  else
    Effect.reply(cmd.replyTo)(Rejected(s"Insufficient balance ${acc.balance} to be able to withdraw ${cmd.amount}"))
}

But this response mechanism is a side Effect. That is, the concatenation is implemented immediately after the Effect is generated. This action is before the eventHandler. Unable to revert to the latest status at this time.

When it comes to side effect, such as Effect.persist (). Thenrun (procedure sideeffect): when the event is successfully persisted, you can safely perform some other operations. For example, you can deduct inventory from the account as soon as the event affecting the inventory number is persist.

In the ShoppingCart example above, we did not find the State transition code such as Behaviors.same . It can only be that EventSourcedBehavior belongs to a higher-level Behavior. The State transition has been embedded in the eventHandler. Remember the style of this function (State, event) = > State. This State is the State.

Events persist in journal, if there is an exception in the journal during persist operation, EventSourcedBehavior has its own security supervision policy, as follows:

  def apply(): Behavior[Command] = EventSourcedBehavior[Command,Event,CartLoad](
    persistenceId = PersistenceId("10","1013"),
    emptyState = CartLoad(),
    commandHandler = commandHandler,
    eventHandler = eventHandler
  ).onPersistFailure(
    SupervisorStrategy
    .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
    .withMaxRestarts(3)
    .withResetBackoffAfter(10.seconds))

It's worth noting that this strategy only applies to PersistFailure (), which is used externally Behaviors.supervisor() package embedding cannot implement the effect of processing PersistFailure. However, the whole actor still needs a Backoff strategy, because in the event sourcedbehavior internal commandhandler, the event handler may also involve some database operations. Some kind of Backoff restart policy is required after the operation fails. Then we can add monitoring strategies for actors as follows:

  def apply(): Behavior[Command] =
    Behaviors.supervise(
      Behaviors.setup { ctx =>
        EventSourcedBehavior[Command, Event, CartLoad](
          persistenceId = PersistenceId("10", "1013"),
          emptyState = CartLoad(),
          commandHandler = commandHandler,
          eventHandler = eventHandler
        ).onPersistFailure(
          SupervisorStrategy
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .withMaxRestarts(3)
            .withResetBackoffAfter(10.seconds))
      }
    ).onFailure(
      SupervisorStrategy
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
        .withMaxRestarts(3)
        .withResetBackoffAfter(10.seconds)
    )

Now this MyCart can be said to be a safe, strong and resilient actor.

Since it is a persistent actor, persistent management should also be the core function. EventSourcedBehavior provides the function of monitoring the persistence process by receiving signals, such as:

 def apply(): Behavior[Command] =
    Behaviors.supervise(
      Behaviors.setup[Command] { ctx =>
        EventSourcedBehavior[Command, Event, CartLoad](
          persistenceId = PersistenceId("10", "1013"),
          emptyState = CartLoad(),
          commandHandler = commandHandler,
          eventHandler = eventHandler
        ).onPersistFailure(
          SupervisorStrategy
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .withMaxRestarts(3)
            .withResetBackoffAfter(10.seconds)
        ).receiveSignal {
          case (state, RecoveryCompleted) =>
            ctx.log.info("**************Recovery Completed with state: {}***************",state)
          case (state, SnapshotCompleted(meta))  =>
            ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
          case (state,RecoveryFailed(err)) =>
            ctx.log.error("recovery failed with: {}",err.getMessage)
          case (state,SnapshotFailed(meta,err)) =>
            ctx.log.error("snapshoting failed with: {}",err.getMessage)
        }
      }
    ).onFailure(
      SupervisorStrategy
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
        .withMaxRestarts(3)
        .withResetBackoffAfter(10.seconds)
    )

EventSourcedBehavior.receiveSignal It's a partial function:

  def receiveSignal(signalHandler: PartialFunction[(State, Signal), Unit]): EventSourcedBehavior[Command, Event, State]

The following is a list of EventSourcedBehavior Signal:

sealed trait EventSourcedSignal extends Signal

@DoNotInherit sealed abstract class RecoveryCompleted extends EventSourcedSignal
case object RecoveryCompleted extends RecoveryCompleted {
  def instance: RecoveryCompleted = this
}

final case class RecoveryFailed(failure: Throwable) extends EventSourcedSignal {
  def getFailure(): Throwable = failure
}

final case class SnapshotCompleted(metadata: SnapshotMetadata) extends EventSourcedSignal {
  def getSnapshotMetadata(): SnapshotMetadata = metadata
}

final case class SnapshotFailed(metadata: SnapshotMetadata, failure: Throwable) extends EventSourcedSignal {

  def getFailure(): Throwable = failure
  def getSnapshotMetadata(): SnapshotMetadata = metadata
}

object SnapshotMetadata {

  /**
   * @param persistenceId id of persistent actor from which the snapshot was taken.
   * @param sequenceNr sequence number at which the snapshot was taken.
   * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
   *                  in milliseconds from the epoch of 1970-01-01T00:00:00Z.
   */
  def apply(persistenceId: String, sequenceNr: Long, timestamp: Long): SnapshotMetadata =
    new SnapshotMetadata(persistenceId, sequenceNr, timestamp)
}

/**
 * Snapshot metadata.
 *
 * @param persistenceId id of persistent actor from which the snapshot was taken.
 * @param sequenceNr sequence number at which the snapshot was taken.
 * @param timestamp time at which the snapshot was saved, defaults to 0 when unknown.
 *                  in milliseconds from the epoch of 1970-01-01T00:00:00Z.
 */
final class SnapshotMetadata(val persistenceId: String, val sequenceNr: Long, val timestamp: Long) {
  override def toString: String =
    s"SnapshotMetadata($persistenceId,$sequenceNr,$timestamp)"
}

final case class DeleteSnapshotsCompleted(target: DeletionTarget) extends EventSourcedSignal {
  def getTarget(): DeletionTarget = target
}

final case class DeleteSnapshotsFailed(target: DeletionTarget, failure: Throwable) extends EventSourcedSignal {
  def getFailure(): Throwable = failure
  def getTarget(): DeletionTarget = target
}

final case class DeleteEventsCompleted(toSequenceNr: Long) extends EventSourcedSignal {
  def getToSequenceNr(): Long = toSequenceNr
}

final case class DeleteEventsFailed(toSequenceNr: Long, failure: Throwable) extends EventSourcedSignal {
  def getFailure(): Throwable = failure
  def getToSequenceNr(): Long = toSequenceNr
}

Of course, one of the reasons that EventSourcedBehavior has the ability of self-healing is that it has a mechanism to replay persistent events. It would be unrealistic to repeat all the historical events in each startup. You must use snapshot to condense historical events:

  def apply(): Behavior[Command] =
    Behaviors.supervise(
      Behaviors.setup[Command] { ctx =>
        EventSourcedBehavior[Command, Event, CartLoad](
          persistenceId = PersistenceId("10", "1013"),
          emptyState = CartLoad(),
          commandHandler = commandHandler,
          eventHandler = eventHandler
        ).onPersistFailure(
          SupervisorStrategy
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .withMaxRestarts(3)
            .withResetBackoffAfter(10.seconds)
        ).receiveSignal {
          case (state, RecoveryCompleted) =>
            ctx.log.info("**************Recovery Completed with state: {}***************",state)
          case (state, SnapshotCompleted(meta))  =>
            ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
          case (state,RecoveryFailed(err)) =>
            ctx.log.error("recovery failed with: {}",err.getMessage)
          case (state,SnapshotFailed(meta,err)) =>
            ctx.log.error("snapshoting failed with: {}",err.getMessage)
        }.snapshotWhen {
          case (state,CartPaid,seqnum) =>
            ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state)
            true
          case (state,event,seqnum) => false
        }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
      }
    ).onFailure(
      SupervisorStrategy
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
        .withMaxRestarts(3)
        .withResetBackoffAfter(10.seconds)
    )

Here is the source code of this demonstration:

build.sbt

name := "learn-akka-typed"

version := "0.1"

scalaVersion := "2.13.1"
scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")

val AkkaVersion = "2.6.5"
val AkkaPersistenceCassandraVersion = "1.0.0"


libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
  "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
  "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
  "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
  "ch.qos.logback"     % "logback-classic"             % "1.2.3"
)

application.conf

akka.actor.allow-java-serialization = on
akka {
  loglevel = DEBUG
  actor {
    serialization-bindings {
      "com.learn.akka.CborSerializable" = jackson-cbor
    }
  }
  # use Cassandra to store both snapshots and the events of the persistent actors
  persistence {
    journal.plugin = "akka.persistence.cassandra.journal"
    snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
  }

}
akka.persistence.cassandra {
  # don't use autocreate in production
  journal.keyspace = "poc"
  journal.keyspace-autocreate = on
  journal.tables-autocreate = on
  snapshot.keyspace = "poc_snapshot"
  snapshot.keyspace-autocreate = on
  snapshot.tables-autocreate = on
}

datastax-java-driver {
  basic.contact-points = ["192.168.11.189:9042"]
  basic.load-balancing-policy.local-datacenter = "datacenter1"
}

ShoppingCart.scala

package com.learn.akka

import akka.actor.typed._
import akka.persistence.typed._
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.scaladsl._
import scala.concurrent.duration._

object ItemInfo {
  case class Item(name: String, price: Double)
}

object MyCart {
 import ItemInfo._

  sealed trait Command
  sealed trait Event extends CborSerializable
  sealed trait Response

  //commands
  case class AddItem(item: Item) extends Command
  case object PayCart extends Command
  case class CountItems(replyTo: ActorRef[Response]) extends Command

  //event
  case class ItemAdded(item: Item) extends Event
  case object CartPaid extends Event

  //state
  case class CartLoad(load: List[Item] = Nil)

  //response
  case class PickedItems(items: List[Item]) extends Response
  case object CartEmpty extends Response

  val commandHandler: (CartLoad, Command) => Effect[Event,CartLoad] = { (state, cmd) =>
    cmd match {
      case AddItem(item) =>
        Effect.persist(ItemAdded(item))
      case PayCart =>
        Effect.persist(CartPaid)
      case CountItems(replyTo) =>
        Effect.none.thenRun { cart =>
          cart.load match {
            case Nil =>
              replyTo ! CartEmpty
            case listOfItems =>
              replyTo ! PickedItems(listOfItems)
          }
        }
    }
  }

  val eventHandler: (CartLoad,Event) => CartLoad = { (state,evt) =>
    evt match {
      case ItemAdded(item) =>
         state.copy(load = item :: state.load)
      case CartPaid =>
        state.copy(load = Nil)
    }
  }

  def apply(): Behavior[Command] =
    Behaviors.supervise(
      Behaviors.setup[Command] { ctx =>
        EventSourcedBehavior[Command, Event, CartLoad](
          persistenceId = PersistenceId("10", "1013"),
          emptyState = CartLoad(),
          commandHandler = commandHandler,
          eventHandler = eventHandler
        ).onPersistFailure(
          SupervisorStrategy
            .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
            .withMaxRestarts(3)
            .withResetBackoffAfter(10.seconds)
        ).receiveSignal {
          case (state, RecoveryCompleted) =>
            ctx.log.info("**************Recovery Completed with state: {}***************",state)
          case (state, SnapshotCompleted(meta))  =>
            ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
          case (state,RecoveryFailed(err)) =>
            ctx.log.error("recovery failed with: {}",err.getMessage)
          case (state,SnapshotFailed(meta,err)) =>
            ctx.log.error("snapshoting failed with: {}",err.getMessage)
        }.snapshotWhen {
          case (state,CartPaid,seqnum) =>
            ctx.log.info("*****************snapshot taken at: {} with state: {}",seqnum,state)
            true
          case (state,event,seqnum) => false
        }.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
      }
    ).onFailure(
      SupervisorStrategy
        .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
        .withMaxRestarts(3)
        .withResetBackoffAfter(10.seconds)
    )
}

object Shopper {

  import ItemInfo._

  sealed trait Command extends CborSerializable

  case class GetItem(item: Item) extends Command
  case object Settle extends Command
  case object GetCount extends Command

  case class WrappedResponse(res: MyCart.Response) extends Command

  def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
    val shoppingCart = ctx.spawn(MyCart(), "shopping-cart")
    val cartRef: ActorRef[MyCart.Response] = ctx.messageAdapter(WrappedResponse)
    Behaviors.receiveMessage { msg =>
      msg match {
        case GetItem(item) =>
          shoppingCart ! MyCart.AddItem(item)
        case Settle =>
          shoppingCart ! MyCart.PayCart
        case GetCount =>
          shoppingCart ! MyCart.CountItems(cartRef)
        case WrappedResponse(res) => res match {
          case MyCart.PickedItems(items) =>
            ctx.log.info("**************Current Items in Cart: {}*************", items)
          case MyCart.CartEmpty =>
            ctx.log.info("**************shopping cart is empty!***************")
        }
      }
      Behaviors.same
    }
  }

}


object ShoppingCart extends App {
  import ItemInfo._
  val shopper = ActorSystem(Shopper(),"shopper")
  shopper ! Shopper.GetItem(Item("banana",11.20))
  shopper ! Shopper.GetItem(Item("watermelon",4.70))
  shopper ! Shopper.GetCount
  shopper ! Shopper.Settle
  shopper ! Shopper.GetCount
  scala.io.StdIn.readLine()

  shopper.terminate()

}

Tags: Scala snapshot Database Java

Posted on Fri, 05 Jun 2020 22:01:29 -0700 by candy2126