zoukankan      html  css  js  c++  java
  • akka-typed(6)

    先谈谈akka-typed的router actor。route 分pool router, group router两类。我们先看看pool-router的使用示范:

          val pool = Routers.pool(poolSize = 4)(
            // make sure the workers are restarted if they fail
            Behaviors.supervise(WorkerRoutee()).onFailure[Exception](SupervisorStrategy.restart))
          val router = ctx.spawn(pool, "worker-pool")
    
          (0 to 10).foreach { n =>
            router ! WorkerRoutee.DoLog(s"msg $n")
          }

    上面例子里的pool是个pool-router,意思是一个有4个routees的routee池。每个routee都是通过WorkerRoutee()构建的,意味着routee池中只有一个种类的actor。pool-router是通过工厂方法直接在本地(JVM)构建(spawn)所有的routee。也就是说所有routee都是router的子actor。

    再看看group-router的使用例子:

    val serviceKey = ServiceKey[Worker.Command]("log-worker")
    
          // this would likely happen elsewhere - if we create it locally we
          // can just as well use a pool
          val workerRoutee = ctx.spawn(WorkerRoutee(), "worker-route")
          ctx.system.receptionist ! Receptionist.Register(serviceKey, workerRoutee)
    
          val group = Routers.group(serviceKey)
          val router = ctx.spawn(group, "worker-group")
    
          // the group router will stash messages until it sees the first listing of registered
          // services from the receptionist, so it is safe to send messages right away
          (0 to 10).foreach { n =>
            router ! WorkerRoutee.DoLog(s"msg $n")
          }

    group-router与pool-router有较多分别:

    1、routee是在router之外构建的,router是用一个key通过Receptionist获取同key的actor清单作为routee group的

    2、Receptionist是集群全局的。任何节点上的actor都可以发送注册消息在Receptionist上登记

    3、没有size限制,任何actor一旦在Receptionist上登记即变成routee,接受router管理

    应该说如果想把运算任务分配在集群里的各节点上并行运算实现load-balance效果,group-router是最合适的选择。不过对不同的运算任务需要多少routee则需要用户自行决定,不像以前akka-classic里通过cluster-metrics根据节点负载情况自动增减routee实例那么方便。

    Receptionist: 既然说到,那么就再深入一点介绍Receptionist的应用:上面提到,Receptionist是集群全局的。就是说任何节点上的actor都可以在Receptonist上注册形成一个生存在集群中不同节点的actor清单。如果Receptionist把这个清单提供给一个用户,那么这个用户就可以把运算任务配置到各节点上,实现某种意义上的分布式运算模式。Receptionist的使用方式是:通过向本节点的Receptionist发送消息去登记ActorRef,然后通过Receptionist发布的登记变化消息即可获取最新的ActorRef清单:

      val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")
    
      ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)
    
      ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)

    Receptionist的登记和清单获取是以ServiceKey作为关联的。那么获取的清单内应该全部是一种类型的actor,只不过它们的地址可能是跨节点的,但它们只能进行同一种运算。从另一个角度说,一项任务是分布在不同节点的actor并行进行运算的。

    在上篇讨论里提过:如果发布-订阅机制是在两个actor之间进行的,那么这两个actor也需要在规定的信息交流协议框架下作业:我们必须注意消息类型,提供必要的消息类型转换机制。下面是一个Receptionist登记示范:

    object Worker {
    
      val WorkerServiceKey = ServiceKey[Worker.TransformText]("Worker")
    
      sealed trait Command
      final case class TransformText(text: String, replyTo: ActorRef[TextTransformed]) extends Command with CborSerializable
      final case class TextTransformed(text: String) extends CborSerializable
    
      def apply(): Behavior[Command] =
        Behaviors.setup { ctx =>
          // each worker registers themselves with the receptionist
          ctx.log.info("Registering myself with receptionist")
          ctx.system.receptionist ! Receptionist.Register(WorkerServiceKey, ctx.self)
    
          Behaviors.receiveMessage {
            case TransformText(text, replyTo) =>
              replyTo ! TextTransformed(text.toUpperCase)
              Behaviors.same
          }
        }
    }

    Receptionist登记比较直接:登记者不需要Receptionist返回消息,所以随便用ctx.self作为消息的sender。注意TransformText的replyTo: ActorRef[TextTransformed],代表sender是个可以处理TextTransformed消息类型的actor。实际上,在sender方是通过ctx.ask提供了TextTransformed的类型转换。

    Receptionist.Subscribe需要Receptionist返回一个actor清单,所以是个request/response模式。那么发送给Receptionist消息中的replyTo必须是发送者能处理的类型,如下:

      def apply(): Behavior[Event] = Behaviors.setup { ctx =>
        Behaviors.withTimers { timers =>
          // subscribe to available workers
          val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
            case Worker.WorkerServiceKey.Listing(workers) =>
              WorkersUpdated(workers)
          }
          ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)
    
    ...
        }
      

    ctx.messageAdapter进行了一个从Receptionist.Listing返回类型到WorkersUpdated类型的转换机制登记:从Receptionist回复的List类型会被转换成WorkersUpdated类型,如下:

    ...
       Behaviors.receiveMessage {
          case WorkersUpdated(newWorkers) =>
            ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
    
    ...

    另外,上面提过的TextTransformed转换如下:

              ctx.ask[Worker.TransformText,Worker.TextTransformed](selectedWorker, Worker.TransformText(text, _)) {
                case Success(transformedText) => TransformCompleted(transformedText.text, text)
                case Failure(ex) => JobFailed("Processing timed out", text)
              }

    ctx.ask将TextTransformed转换成TransformCompleted。完整的Behavior定义如下:

    object Frontend {
    
      sealed trait Event
      private case object Tick extends Event
      private final case class WorkersUpdated(newWorkers: Set[ActorRef[Worker.TransformText]]) extends Event
      private final case class TransformCompleted(originalText: String, transformedText: String) extends Event
      private final case class JobFailed(why: String, text: String) extends Event
    
    
      def apply(): Behavior[Event] = Behaviors.setup { ctx =>
        Behaviors.withTimers { timers =>
          // subscribe to available workers
          val subscriptionAdapter = ctx.messageAdapter[Receptionist.Listing] {
            case Worker.WorkerServiceKey.Listing(workers) =>
              WorkersUpdated(workers)
          }
          ctx.system.receptionist ! Receptionist.Subscribe(Worker.WorkerServiceKey, subscriptionAdapter)
    
          timers.startTimerWithFixedDelay(Tick, Tick, 2.seconds)
    
          running(ctx, IndexedSeq.empty, jobCounter = 0)
        }
      }
    
      private def running(ctx: ActorContext[Event], workers: IndexedSeq[ActorRef[Worker.TransformText]], jobCounter: Int): Behavior[Event] =
        Behaviors.receiveMessage {
          case WorkersUpdated(newWorkers) =>
            ctx.log.info("List of services registered with the receptionist changed: {}", newWorkers)
            running(ctx, newWorkers.toIndexedSeq, jobCounter)
          case Tick =>
            if (workers.isEmpty) {
              ctx.log.warn("Got tick request but no workers available, not sending any work")
              Behaviors.same
            } else {
              // how much time can pass before we consider a request failed
              implicit val timeout: Timeout = 5.seconds
              val selectedWorker = workers(jobCounter % workers.size)
              ctx.log.info("Sending work for processing to {}", selectedWorker)
              val text = s"hello-$jobCounter"
              ctx.ask[Worker.TransformText,Worker.TextTransformed](selectedWorker, Worker.TransformText(text, _)) {
                case Success(transformedText) => TransformCompleted(transformedText.text, text)
                case Failure(ex) => JobFailed("Processing timed out", text)
              }
              running(ctx, workers, jobCounter + 1)
            }
          case TransformCompleted(originalText, transformedText) =>
            ctx.log.info("Got completed transform of {}: {}", originalText, transformedText)
            Behaviors.same
    
          case JobFailed(why, text) =>
            ctx.log.warn("Transformation of text {} failed. Because: {}", text, why)
            Behaviors.same
    
        }

    现在我们可以示范用group-router来实现某种跨节点的分布式运算。因为group-router是通过Receptionist来实现对routees管理的,而Receptionist是集群全局的,意味着如果我们在各节点上构建routee,然后向Receptionist登记,就会形成一个跨节点的routee ActorRef清单。如果把任务分配到这个清单上的routee上去运算,应该能实现集群节点负载均衡的效果。下面我们就示范这个loadbalancer。流程很简单:在一个接入点 (serviceActor)中构建workersRouter,然后3个workerRoutee并向Receptionist登记,把接到的任务分解成子任务逐个发送给workersRouter。每个workerRoutee完成任务后将结果发送给一个聚合器Aggregator,Aggregator在核对完成接收所有workerRoutee返回的结果后再把汇总结果返回serverActor。先看看这个serverActor:

    object Service {
      val routerServiceKey = ServiceKey[WorkerRoutee.Command]("workers-router")
    
      sealed trait Command extends CborSerializable
    
      case class ProcessText(text: String) extends Command {
        require(text.nonEmpty)
      }
    
      case class WrappedResult(res: Aggregator.Response) extends Command
    
      def serviceBehavior(workersRouter: ActorRef[WorkerRoutee.Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
        val aggregator = ctx.spawn(Aggregator(), "aggregator")
        val aggregatorRef: ActorRef[Aggregator.Response] = ctx.messageAdapter(WrappedResult)
        Behaviors.receiveMessage {
          case ProcessText(text) =>
            ctx.log.info("******************** received ProcessText command: {} ****************",text)
            val words = text.split(' ').toIndexedSeq
            aggregator ! Aggregator.CountText(words.size, aggregatorRef)
            words.foreach { word =>
              workersRouter ! WorkerRoutee.Count(word, aggregator)
            }
            Behaviors.same
          case WrappedResult(msg) =>
            msg match {
              case Aggregator.Result(res) =>
                ctx.log.info("************** mean length of words = {} **********", res)
            }
            Behaviors.same
        }
      }
    
      def singletonService(ctx: ActorContext[Command], workersRouter: ActorRef[WorkerRoutee.Command]) = {
        val singletonSettings = ClusterSingletonSettings(ctx.system)
          .withRole("front")
        SingletonActor(
          Behaviors.supervise(
            serviceBehavior(workersRouter)
          ).onFailure(
            SupervisorStrategy
              .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
              .withMaxRestarts(3)
              .withResetBackoffAfter(10.seconds)
          )
          , "singletonActor"
        ).withSettings(singletonSettings)
      }
    
      def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
        val cluster = Cluster(ctx.system)
        val workersRouter = ctx.spawn(
          Routers.group(routerServiceKey)
            .withRoundRobinRouting(),
          "workersRouter"
        )
        (0 until 3).foreach { n =>
          val routee = ctx.spawn(WorkerRoutee(cluster.selfMember.address.toString), s"work-routee-$n")
          ctx.system.receptionist ! Receptionist.register(routerServiceKey, routee)
        }
        val singletonActor = ClusterSingleton(ctx.system).init(singletonService(ctx, workersRouter))
        Behaviors.receiveMessage {
          case job@ProcessText(text) =>
            singletonActor ! job
            Behaviors.same
        }
      }
    
    }

    整体goup-router和routee的构建是在apply()里,并把接到的任务转发给singletonActor。singletonActor是以serviceBehavior为核心的一个actor。在servceBehavior里把收到的任务分解并分别发送给workersRouter。值得注意的是:serviceBehavior期望接收从Aggregator的回应,它们之间存在request/response模式信息交流,所以需要Aggregator.Response到WrappedResult的类型转换机制。还有:子任务是通过workersRoute发送给个workerRoutee的,我们需要各workerRoutee把运算结果返给给Aggregator,所以发送给workersRouter的消息包含了Aggregator的ActorRef,如:workersRouter ! WorkerRoutee.Count(cnt,aggregatorRef)。

    Aggregator是个persistentActor, 如下:

     

    object Aggregator {
      sealed trait Command
      sealed trait Event extends  CborSerializable
      sealed trait Response
    
      case class CountText(cnt: Int, replyTo: ActorRef[Response]) extends Command
      case class MarkLength(word: String, len: Int) extends Command
      case class TextCounted(cnt: Int) extends Event
      case class LengthMarked(word: String, len: Int) extends Event
      case class Result(meanWordLength: Double) extends Response
    
      case class State(expectedNum: Int = 0, lens: List[Int] = Nil)
    
      var replyTo: ActorRef[Response] = _
    
      def commandHandler: (State,Command) => Effect[Event,State] = (st,cmd) => {
        cmd match {
          case CountText(cnt,ref) =>
            replyTo = ref
            Effect.persist(TextCounted(cnt))
          case MarkLength(word,len) =>
            Effect.persist(LengthMarked(word,len))
        }
      }
      def eventHandler: (State, Event) => State = (st,ev) => {
        ev match {
          case TextCounted(cnt) =>
            st.copy(expectedNum = cnt, lens = Nil)
          case LengthMarked(word,len) =>
            val state = st.copy(lens = len :: st.lens)
            if (state.lens.size >= state.expectedNum) {
              val meanWordLength = state.lens.sum.toDouble / state.lens.size
              replyTo ! Result(meanWordLength)
              State()
            } else state
        }
      }
      val takeSnapShot: (State,Event,Long) => Boolean = (st,ev,seq) => {
          if (st.lens.isEmpty) {
              if (ev.isInstanceOf[LengthMarked])
                true
              else
                false
          } else
             false
      }
      def apply(): Behavior[Command] = Behaviors.supervise(
        Behaviors.setup[Command] { ctx =>
          EventSourcedBehavior(
            persistenceId = PersistenceId("33","2333"),
            emptyState = State(),
            commandHandler = commandHandler,
            eventHandler = eventHandler
          ).onPersistFailure(
            SupervisorStrategy
              .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
              .withMaxRestarts(3)
              .withResetBackoffAfter(10.seconds)
          ).receiveSignal {
            case (state, RecoveryCompleted) =>
              ctx.log.info("**************Recovery Completed with state: {}***************",state)
            case (state, SnapshotCompleted(meta))  =>
              ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
            case (state,RecoveryFailed(err)) =>
              ctx.log.error("*************recovery failed with: {}***************",err.getMessage)
            case (state,SnapshotFailed(meta,err)) =>
              ctx.log.error("***************snapshoting failed with: {}*************",err.getMessage)
          }.snapshotWhen(takeSnapShot)
        }
      ).onFailure(
        SupervisorStrategy
          .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
          .withMaxRestarts(3)
          .withResetBackoffAfter(10.seconds)
      )
    }

    注意这个takeSnapShot函数:这个函数是在EventSourcedBehavior.snapshotWhen(takeSnapShot)调用的。传入参数是(State,Event,seqenceNr),我们需要对State,Event的当前值进行分析后返回true代表做一次snapshot。

    看看一部分显示就知道任务已经分配到几个节点上的routee:

    20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.WorkerRoutee$ - ************** processing [this] on akka://ClusterSystem@127.0.0.1:51182 ***********
    20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [text] on akka://ClusterSystem@127.0.0.1:51182 ***********
    20:06:59.072 [ClusterSystem-akka.actor.default-dispatcher-36] INFO com.learn.akka.WorkerRoutee$ - ************** processing [be] on akka://ClusterSystem@127.0.0.1:51182 ***********
    20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.learn.akka.WorkerRoutee$ - ************** processing [will] on akka://ClusterSystem@127.0.0.1:51173 ***********
    20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-26] INFO com.learn.akka.WorkerRoutee$ - ************** processing [is] on akka://ClusterSystem@127.0.0.1:25251 ***********
    20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-13] INFO com.learn.akka.WorkerRoutee$ - ************** processing [the] on akka://ClusterSystem@127.0.0.1:51173 ***********
    20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [that] on akka://ClusterSystem@127.0.0.1:25251 ***********
    20:06:59.236 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.WorkerRoutee$ - ************** processing [analyzed] on akka://ClusterSystem@127.0.0.1:25251 ***********

    这个例子的源代码如下:

    package com.learn.akka
    
    import akka.actor.typed._
    import akka.persistence.typed._
    import akka.persistence.typed.scaladsl._
    import scala.concurrent.duration._
    import akka.actor.typed.receptionist._
    import akka.actor.typed.scaladsl.Behaviors
    import akka.actor.typed.scaladsl._
    import akka.cluster.typed.Cluster
    import akka.cluster.typed.ClusterSingleton
    import akka.cluster.typed.ClusterSingletonSettings
    import akka.cluster.typed.SingletonActor
    import com.typesafe.config.ConfigFactory
    
    object WorkerRoutee {
      sealed trait Command extends CborSerializable
      case class Count(word: String, replyTo: ActorRef[Aggregator.Command]) extends Command
    
      def apply(nodeAddress: String): Behavior[Command] = Behaviors.setup {ctx =>
        Behaviors.receiveMessage[Command] {
          case Count(word,replyTo) =>
            ctx.log.info("************** processing [{}] on {} ***********",word,nodeAddress)
            replyTo ! Aggregator.MarkLength(word,word.length)
            Behaviors.same
        }
      }
    }
    object Aggregator {
      sealed trait Command
      sealed trait Event extends  CborSerializable
      sealed trait Response
    
      case class CountText(cnt: Int, replyTo: ActorRef[Response]) extends Command
      case class MarkLength(word: String, len: Int) extends Command
      case class TextCounted(cnt: Int) extends Event
      case class LengthMarked(word: String, len: Int) extends Event
      case class Result(meanWordLength: Double) extends Response
    
      case class State(expectedNum: Int = 0, lens: List[Int] = Nil)
    
      var replyTo: ActorRef[Response] = _
    
      def commandHandler: (State,Command) => Effect[Event,State] = (st,cmd) => {
        cmd match {
          case CountText(cnt,ref) =>
            replyTo = ref
            Effect.persist(TextCounted(cnt))
          case MarkLength(word,len) =>
            Effect.persist(LengthMarked(word,len))
        }
      }
      def eventHandler: (State, Event) => State = (st,ev) => {
        ev match {
          case TextCounted(cnt) =>
            st.copy(expectedNum = cnt, lens = Nil)
          case LengthMarked(word,len) =>
            val state = st.copy(lens = len :: st.lens)
            if (state.lens.size >= state.expectedNum) {
              val meanWordLength = state.lens.sum.toDouble / state.lens.size
              replyTo ! Result(meanWordLength)
              State()
            } else state
        }
      }
      val takeSnapShot: (State,Event,Long) => Boolean = (st,ev,seq) => {
          if (st.lens.isEmpty) {
              if (ev.isInstanceOf[LengthMarked])
                true
              else
                false
          } else
             false
      }
      def apply(): Behavior[Command] = Behaviors.supervise(
        Behaviors.setup[Command] { ctx =>
          EventSourcedBehavior(
            persistenceId = PersistenceId("33","2333"),
            emptyState = State(),
            commandHandler = commandHandler,
            eventHandler = eventHandler
          ).onPersistFailure(
            SupervisorStrategy
              .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
              .withMaxRestarts(3)
              .withResetBackoffAfter(10.seconds)
          ).receiveSignal {
            case (state, RecoveryCompleted) =>
              ctx.log.info("**************Recovery Completed with state: {}***************",state)
            case (state, SnapshotCompleted(meta))  =>
              ctx.log.info("**************Snapshot Completed with state: {},id({},{})***************",state,meta.persistenceId, meta.sequenceNr)
            case (state,RecoveryFailed(err)) =>
              ctx.log.error("*************recovery failed with: {}***************",err.getMessage)
            case (state,SnapshotFailed(meta,err)) =>
              ctx.log.error("***************snapshoting failed with: {}*************",err.getMessage)
          }.snapshotWhen(takeSnapShot)
        }
      ).onFailure(
        SupervisorStrategy
          .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
          .withMaxRestarts(3)
          .withResetBackoffAfter(10.seconds)
      )
    }
    object Service {
      val routerServiceKey = ServiceKey[WorkerRoutee.Command]("workers-router")
    
      sealed trait Command extends CborSerializable
    
      case class ProcessText(text: String) extends Command {
        require(text.nonEmpty)
      }
    
      case class WrappedResult(res: Aggregator.Response) extends Command
    
      def serviceBehavior(workersRouter: ActorRef[WorkerRoutee.Command]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
        val aggregator = ctx.spawn(Aggregator(), "aggregator")
        val aggregatorRef: ActorRef[Aggregator.Response] = ctx.messageAdapter(WrappedResult)
        Behaviors.receiveMessage {
          case ProcessText(text) =>
            ctx.log.info("******************** received ProcessText command: {} ****************",text)
            val words = text.split(' ').toIndexedSeq
            aggregator ! Aggregator.CountText(words.size, aggregatorRef)
            words.foreach { word =>
              workersRouter ! WorkerRoutee.Count(word, aggregator)
            }
            Behaviors.same
          case WrappedResult(msg) =>
            msg match {
              case Aggregator.Result(res) =>
                ctx.log.info("************** mean length of words = {} **********", res)
            }
            Behaviors.same
        }
      }
    
      def singletonService(ctx: ActorContext[Command], workersRouter: ActorRef[WorkerRoutee.Command]) = {
        val singletonSettings = ClusterSingletonSettings(ctx.system)
          .withRole("front")
        SingletonActor(
          Behaviors.supervise(
            serviceBehavior(workersRouter)
          ).onFailure(
            SupervisorStrategy
              .restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1)
              .withMaxRestarts(3)
              .withResetBackoffAfter(10.seconds)
          )
          , "singletonActor"
        ).withSettings(singletonSettings)
      }
    
      def apply(): Behavior[Command] = Behaviors.setup[Command] { ctx =>
        val cluster = Cluster(ctx.system)
        val workersRouter = ctx.spawn(
          Routers.group(routerServiceKey)
            .withRoundRobinRouting(),
          "workersRouter"
        )
        (0 until 3).foreach { n =>
          val routee = ctx.spawn(WorkerRoutee(cluster.selfMember.address.toString), s"work-routee-$n")
          ctx.system.receptionist ! Receptionist.register(routerServiceKey, routee)
        }
        val singletonActor = ClusterSingleton(ctx.system).init(singletonService(ctx, workersRouter))
        Behaviors.receiveMessage {
          case job@ProcessText(text) =>
            singletonActor ! job
            Behaviors.same
        }
      }
    
    }
    
    object LoadBalance {
      def main(args: Array[String]): Unit = {
        if (args.isEmpty) {
          startup("compute", 25251)
          startup("compute", 25252)
          startup("compute", 25253)
          startup("front", 25254)
        } else {
          require(args.size == 2, "Usage: role port")
          startup(args(0), args(1).toInt)
        }
      }
    
      def startup(role: String, port: Int): Unit = {
        // Override the configuration of the port when specified as program argument
        val config = ConfigFactory
          .parseString(s"""
          akka.remote.artery.canonical.port=$port
          akka.cluster.roles = [$role]
          """)
          .withFallback(ConfigFactory.load("cluster-persistence"))
    
        val frontEnd = ActorSystem[Service.Command](Service(), "ClusterSystem", config)
        if (role == "front") {
          println("*************** sending ProcessText command  ************")
          frontEnd ! Service.ProcessText("this is the text that will be analyzed")
        }
    
      }
    
    }

    cluster-persistence.conf

    akka.actor.allow-java-serialization = on
    akka {
      loglevel = INFO
      actor {
        provider = cluster
        serialization-bindings {
          "com.learn.akka.CborSerializable" = jackson-cbor
        }
      }
     remote {
        artery {
          canonical.hostname = "127.0.0.1"
          canonical.port = 0
        }
      }
      cluster {
        seed-nodes = [
          "akka://ClusterSystem@127.0.0.1:25251",
          "akka://ClusterSystem@127.0.0.1:25252"]
      }
      # use Cassandra to store both snapshots and the events of the persistent actors
      persistence {
        journal.plugin = "akka.persistence.cassandra.journal"
        snapshot-store.plugin = "akka.persistence.cassandra.snapshot"
      }
    }
    akka.persistence.cassandra {
      # don't use autocreate in production
      journal.keyspace = "poc"
      journal.keyspace-autocreate = on
      journal.tables-autocreate = on
      snapshot.keyspace = "poc_snapshot"
      snapshot.keyspace-autocreate = on
      snapshot.tables-autocreate = on
    }
    
    datastax-java-driver {
      basic.contact-points = ["192.168.11.189:9042"]
      basic.load-balancing-policy.local-datacenter = "datacenter1"
    }

    build.sbt

    name := "learn-akka-typed"
    
    version := "0.1"
    
    scalaVersion := "2.13.1"
    scalacOptions in Compile ++= Seq("-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")
    javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation")
    
    val AkkaVersion = "2.6.5"
    val AkkaPersistenceCassandraVersion = "1.0.0"
    
    
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
      "com.typesafe.akka" %% "akka-persistence-typed" % AkkaVersion,
      "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
      "com.typesafe.akka" %% "akka-serialization-jackson" % AkkaVersion,
      "com.typesafe.akka" %% "akka-persistence-cassandra" % AkkaPersistenceCassandraVersion,
      "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
      "ch.qos.logback"     % "logback-classic"             % "1.2.3"
    )
  • 相关阅读:
    windows8安装docker(tool box)
    windows8 使用docker创建第一个nodejs运行环境
    nodejs使用fetch获取WebAPI
    Nodejs获取Azure Active Directory AccessToken
    使用Bot Service创建Bot Framework
    安装Team Services Agent Win7
    Nodejs微信与Bot framework通过Direct Line连接
    修改cmd默认启动路径
    less配置
    javascript滚动到大于一定距离显示隐藏
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/13091012.html
Copyright © 2011-2022 走看看