zoukankan      html  css  js  c++  java
  • akka-typed(7)

      在使用akka-typed的过程中发现有很多地方都简化了不少,变得更方便了,包括:Supervision,只要用Behaviors.supervise()把Behavior包住,很容易就可以实现这个actor的SupervisorStrategy.restartWithBackoff策略了。然后集群化的group router使用起来也很方便,再就是集群分片cluster-sharding了。下面我们就通过一个例子来介绍cluster-sharding的具体使用方法。

    首先,分片的意思是指在集群中多个节点上部署某种actor,即entity,的构建机制。entity的构建是动态的,ClusterSharding系统根据各节点的负载情况决定到底在哪个节点构建entity,然后返回ShardRegion:一个该类entity具体的构建工具及消息中介。也就是说我们可以把同样的一种运算通过entityId指定给任何一个entity,但具体这个entity生存在集群哪个节点上人工是无法确定的,完全靠ClusterSharding引导。先设计一个简单功能的actor,测试它作为一个entity的工作细节:

    object Counter {
      sealed trait Command extends CborSerializable
      case object Increment extends Command
      final case class GetValue(replyTo: ActorRef[Response]) extends Command
      case object StopCounter extends Command
      private case object Idle extends Command
    
      sealed trait Response extends CborSerializable
      case class SubTtl(entityId: String, ttl: Int) extends Response
    
    
      val TypeKey = EntityTypeKey[Command]("Counter")
    
      def apply(nodeAddress: String, entityContext: EntityContext[Command]): Behavior[Command] = {
        Behaviors.setup { ctx =>
          def updated(value: Int): Behavior[Command] = {
            Behaviors.receiveMessage[Command] {
              case Increment =>
                ctx.log.info("******************{} counting at {},{}",ctx.self.path,nodeAddress,entityContext.entityId)
                updated(value + 1)
              case GetValue(replyTo) =>
                ctx.log.info("******************{} get value at {},{}",ctx.self.path,nodeAddress,entityContext.entityId)
                replyTo ! SubTtl(entityContext.entityId,value)
                Behaviors.same
              case Idle =>
                entityContext.shard ! ClusterSharding.Passivate(ctx.self)
                Behaviors.same
              case StopCounter =>
                Behaviors.stopped(() => ctx.log.info("************{} stopping ... passivated for idling.", entityContext.entityId))
            }
          }
          ctx.setReceiveTimeout(30.seconds, Idle)
          updated(0)
        }
      }
    }

    cluster-sharding的机制是这样的:在每个(或指定的)节点上构建部署一个某种EntityType的ShardRegion。这样系统可以在任何部署了ShardRegion的节点上构建这种entity。然后ClusterSharding系统会根据entityId来引导消息至正确的接收对象。我们再看看ShardRegion的部署是如何实现的吧:

    object EntityManager {
      sealed trait Command
      case class AddOne(counterId: String) extends Command
      case class GetSum(counterId: String ) extends Command
      case class WrappedTotal(res: Counter.Response) extends Command
    
    
      def apply(): Behavior[Command] = Behaviors.setup { ctx =>
        val cluster = Cluster(ctx.system)
        val sharding = ClusterSharding(ctx.system)
        val entityType = Entity(Counter.TypeKey) { entityContext =>
          Counter(cluster.selfMember.address.toString,entityContext)
        }.withStopMessage(Counter.StopCounter)
        sharding.init(entityType)
    
        val counterRef: ActorRef[Counter.Response] = ctx.messageAdapter(ref => WrappedTotal(ref))
    
         Behaviors.receiveMessage[Command] {
          case AddOne(cid) =>
            val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid)
            entityRef ! Counter.Increment
            Behaviors.same
          case GetSum(cid) =>
             val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid)
             entityRef ! Counter.GetValue(counterRef)
             Behaviors.same
          case WrappedTotal(ttl) => ttl match {
            case Counter.SubTtl(eid,subttl) =>
              ctx.log.info("***********************{} total: {} ",eid,subttl)
          }
          Behaviors.same
        }
      }
    
    }

    太简单了, sharding.ini(entityType)一个函数完成了一个节点分片部署。系统通过sharding.init(entityType)来实现ShardRegion构建。这个entityType代表某种特殊actor模版,看看它的构建函数:

    object Entity {
    
      /**
       * Defines how the entity should be created. Used in [[ClusterSharding#init]]. More optional
       * settings can be defined using the `with` methods of the returned [[Entity]].
       *
       * @param typeKey A key that uniquely identifies the type of entity in this cluster
       * @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
       * @tparam M The type of message the entity accepts
       */
      def apply[M](typeKey: EntityTypeKey[M])(
          createBehavior: EntityContext[M] => Behavior[M]): Entity[M, ShardingEnvelope[M]] =
        new Entity(createBehavior, typeKey, None, Props.empty, None, None, None, None, None)
    }

    这个函数需要一个EntityTyeKey和一个构建Behavior的函数createBehavior,产生一个Entity类型。Entity类型定义如下:

    final class Entity[M, E] private[akka] (
        val createBehavior: EntityContext[M] => Behavior[M],
        val typeKey: EntityTypeKey[M],
        val stopMessage: Option[M],
        val entityProps: Props,
        val settings: Option[ClusterShardingSettings],
        val messageExtractor: Option[ShardingMessageExtractor[E, M]],
        val allocationStrategy: Option[ShardAllocationStrategy],
        val role: Option[String],
        val dataCenter: Option[DataCenter]) {
    
      /**
       * [[akka.actor.typed.Props]] of the entity actors, such as dispatcher settings.
       */
      def withEntityProps(newEntityProps: Props): Entity[M, E] =
        copy(entityProps = newEntityProps)
    
      /**
       * Additional settings, typically loaded from configuration.
       */
      def withSettings(newSettings: ClusterShardingSettings): Entity[M, E] =
        copy(settings = Option(newSettings))
    
      /**
       * Message sent to an entity to tell it to stop, e.g. when rebalanced or passivated.
       * If this is not defined it will be stopped automatically.
       * It can be useful to define a custom stop message if the entity needs to perform
       * some asynchronous cleanup or interactions before stopping.
       */
      def withStopMessage(newStopMessage: M): Entity[M, E] =
        copy(stopMessage = Option(newStopMessage))
    
      /**
       *
       * If a `messageExtractor` is not specified the messages are sent to the entities by wrapping
       * them in [[ShardingEnvelope]] with the entityId of the recipient actor. That envelope
       * is used by the [[HashCodeMessageExtractor]] for extracting entityId and shardId. The number of
       * shards is then defined by `numberOfShards` in `ClusterShardingSettings`, which by default
       * is configured with `akka.cluster.sharding.number-of-shards`.
       */
      def withMessageExtractor[Envelope](newExtractor: ShardingMessageExtractor[Envelope, M]): Entity[M, Envelope] =
        new Entity(
          createBehavior,
          typeKey,
          stopMessage,
          entityProps,
          settings,
          Option(newExtractor),
          allocationStrategy,
          role,
          dataCenter)
    
      /**
       * Allocation strategy which decides on which nodes to allocate new shards,
       * [[ClusterSharding#defaultShardAllocationStrategy]] is used if this is not specified.
       */
      def withAllocationStrategy(newAllocationStrategy: ShardAllocationStrategy): Entity[M, E] =
        copy(allocationStrategy = Option(newAllocationStrategy))
    
      /**
       *  Run the Entity actors on nodes with the given role.
       */
      def withRole(newRole: String): Entity[M, E] = copy(role = Some(newRole))
    
      /**
       * The data center of the cluster nodes where the cluster sharding is running.
       * If the dataCenter is not specified then the same data center as current node. If the given
       * dataCenter does not match the data center of the current node the `ShardRegion` will be started
       * in proxy mode.
       */
      def withDataCenter(newDataCenter: DataCenter): Entity[M, E] = copy(dataCenter = Some(newDataCenter))
    
      private def copy(
          createBehavior: EntityContext[M] => Behavior[M] = createBehavior,
          typeKey: EntityTypeKey[M] = typeKey,
          stopMessage: Option[M] = stopMessage,
          entityProps: Props = entityProps,
          settings: Option[ClusterShardingSettings] = settings,
          allocationStrategy: Option[ShardAllocationStrategy] = allocationStrategy,
          role: Option[String] = role,
          dataCenter: Option[DataCenter] = dataCenter): Entity[M, E] = {
        new Entity(
          createBehavior,
          typeKey,
          stopMessage,
          entityProps,
          settings,
          messageExtractor,
          allocationStrategy,
          role,
          dataCenter)
      }
    }

    这里面有许多方法用来控制Entity的构建和作业。

    然后我们把这个EntityManager当作RootBehavior部署到多个节点上去:

    object ClusterShardingApp  {
      def main(args: Array[String]): Unit = {
        if (args.isEmpty) {
          startup("shard", 25251)
          startup("shard", 25252)
          startup("shard", 25253)
          startup("front", 25254)
        } else {
          require(args.size == 2, "Usage: role port")
          startup(args(0), args(1).toInt)
        }
      }
    
      def startup(role: String, port: Int): Unit = {
        // Override the configuration of the port when specified as program argument
        val config = ConfigFactory
          .parseString(s"""
          akka.remote.artery.canonical.port=$port
          akka.cluster.roles = [$role]
          """)
          .withFallback(ConfigFactory.load("cluster"))
    
        val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config)
    ...
    }

    一共设定了3个role=shard节点和1个front节点。

    在front节点上对entityId分别为9013,9014,9015,9016几个entity发送消息:

     def startup(role: String, port: Int): Unit = {
        // Override the configuration of the port when specified as program argument
        val config = ConfigFactory
          .parseString(s"""
          akka.remote.artery.canonical.port=$port
          akka.cluster.roles = [$role]
          """)
          .withFallback(ConfigFactory.load("cluster"))
    
        val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config)
        if (role == "front") {
          entityManager ! EntityManager.AddOne("9013")
          entityManager ! EntityManager.AddOne("9014")
          entityManager ! EntityManager.AddOne("9013")
          entityManager ! EntityManager.AddOne("9015")
          entityManager ! EntityManager.AddOne("9013")
          entityManager ! EntityManager.AddOne("9014")
          entityManager ! EntityManager.AddOne("9014")
          entityManager ! EntityManager.AddOne("9013")
          entityManager ! EntityManager.AddOne("9015")
          entityManager ! EntityManager.AddOne("9015")
          entityManager ! EntityManager.AddOne("9016")
          entityManager ! EntityManager.GetSum("9014")
          entityManager ! EntityManager.GetSum("9015")
          entityManager ! EntityManager.GetSum("9013")
          entityManager ! EntityManager.GetSum("9016")
        }

    以下是部分运算结果显示:

    15:12:10.073 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://ClusterSystem@127.0.0.1:25253,9014
    15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://ClusterSystem@127.0.0.1:25253,9014
    15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 counting at akka://ClusterSystem@127.0.0.1:25253,9014
    15:12:10.106 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://ClusterSystem@127.0.0.1:25251,9013
    15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://ClusterSystem@127.0.0.1:25251,9013
    15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://ClusterSystem@127.0.0.1:25251,9013
    15:12:10.107 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 counting at akka://ClusterSystem@127.0.0.1:25251,9013
    15:12:10.109 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://ClusterSystem@127.0.0.1:25254,9015
    15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://ClusterSystem@127.0.0.1:25254,9015
    15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 counting at akka://ClusterSystem@127.0.0.1:25254,9015
    15:12:10.110 [ClusterSystem-akka.actor.default-dispatcher-19] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/787/9015 get value at akka://ClusterSystem@127.0.0.1:25254,9015
    15:12:10.112 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9015 total: 3
    15:12:10.149 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/786/9014 get value at akka://ClusterSystem@127.0.0.1:25253,9014
    15:12:10.149 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/785/9013 get value at akka://ClusterSystem@127.0.0.1:25251,9013
    15:12:10.169 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9014 total: 3
    15:12:10.169 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9013 total: 4
    15:12:10.171 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/788/9016 counting at akka://ClusterSystem@127.0.0.1:25251,9016
    15:12:10.171 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ******************akka://ClusterSystem/system/sharding/Counter/788/9016 get value at akka://ClusterSystem@127.0.0.1:25251,9016
    15:12:10.172 [ClusterSystem-akka.actor.default-dispatcher-18] INFO com.learn.akka.EntityManager$ - ***********************9016 total: 1
    
    15:19:32.176 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9013 stopping ... passivated for idling.
    15:19:52.529 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9014 stopping ... passivated for idling.
    15:19:52.658 [ClusterSystem-akka.actor.default-dispatcher-3] INFO com.learn.akka.Counter$ - ************9016 stopping ... passivated for idling.
    15:19:52.662 [ClusterSystem-akka.actor.default-dispatcher-14] INFO com.learn.akka.Counter$ - ************9015 stopping ... passivated for idling.

    下面是本次示范的完整源代码:

    ClusterSharding.scala

    package com.learn.akka
    import scala.concurrent.duration._
    import akka.actor.typed._
    import akka.actor.typed.scaladsl._
    import akka.cluster.sharding.typed.scaladsl.EntityContext
    import akka.cluster.sharding.typed.scaladsl.Entity
    import akka.persistence.typed.PersistenceId
    //#sharding-extension
    import akka.cluster.sharding.typed.ShardingEnvelope
    import akka.cluster.sharding.typed.scaladsl.ClusterSharding
    import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
    import akka.cluster.sharding.typed.scaladsl.EntityRef
    import com.typesafe.config.ConfigFactory
    import akka.cluster.typed.Cluster
    //#counter
    object Counter {
      sealed trait Command extends CborSerializable
      case object Increment extends Command
      final case class GetValue(replyTo: ActorRef[Response]) extends Command
      case object StopCounter extends Command
      private case object Idle extends Command
    
      sealed trait Response extends CborSerializable
      case class SubTtl(entityId: String, ttl: Int) extends Response
    
    
      val TypeKey = EntityTypeKey[Command]("Counter")
    
      def apply(nodeAddress: String, entityContext: EntityContext[Command]): Behavior[Command] = {
        Behaviors.setup { ctx =>
          def updated(value: Int): Behavior[Command] = {
            Behaviors.receiveMessage[Command] {
              case Increment =>
                ctx.log.info("******************{} counting at {},{}",ctx.self.path,nodeAddress,entityContext.entityId)
                updated(value + 1)
              case GetValue(replyTo) =>
                ctx.log.info("******************{} get value at {},{}",ctx.self.path,nodeAddress,entityContext.entityId)
                replyTo ! SubTtl(entityContext.entityId,value)
                Behaviors.same
              case Idle =>
                entityContext.shard ! ClusterSharding.Passivate(ctx.self)
                Behaviors.same
              case StopCounter =>
                Behaviors.stopped(() => ctx.log.info("************{} stopping ... passivated for idling.", entityContext.entityId))
            }
          }
          ctx.setReceiveTimeout(30.seconds, Idle)
          updated(0)
        }
      }
    }
    object EntityManager {
      sealed trait Command
      case class AddOne(counterId: String) extends Command
      case class GetSum(counterId: String ) extends Command
      case class WrappedTotal(res: Counter.Response) extends Command
    
    
      def apply(): Behavior[Command] = Behaviors.setup { ctx =>
        val cluster = Cluster(ctx.system)
        val sharding = ClusterSharding(ctx.system)
        val entityType = Entity(Counter.TypeKey) { entityContext =>
          Counter(cluster.selfMember.address.toString,entityContext)
        }.withStopMessage(Counter.StopCounter)
        sharding.init(entityType)
    
        val counterRef: ActorRef[Counter.Response] = ctx.messageAdapter(ref => WrappedTotal(ref))
    
         Behaviors.receiveMessage[Command] {
          case AddOne(cid) =>
            val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid)
            entityRef ! Counter.Increment
            Behaviors.same
          case GetSum(cid) =>
             val entityRef: EntityRef[Counter.Command] = sharding.entityRefFor(Counter.TypeKey, cid)
             entityRef ! Counter.GetValue(counterRef)
             Behaviors.same
          case WrappedTotal(ttl) => ttl match {
            case Counter.SubTtl(eid,subttl) =>
              ctx.log.info("***********************{} total: {} ",eid,subttl)
          }
          Behaviors.same
        }
      }
    
    }
    
    object ClusterShardingApp  {
      def main(args: Array[String]): Unit = {
        if (args.isEmpty) {
          startup("shard", 25251)
          startup("shard", 25252)
          startup("shard", 25253)
          startup("front", 25254)
        } else {
          require(args.size == 2, "Usage: role port")
          startup(args(0), args(1).toInt)
        }
      }
    
      def startup(role: String, port: Int): Unit = {
        // Override the configuration of the port when specified as program argument
        val config = ConfigFactory
          .parseString(s"""
          akka.remote.artery.canonical.port=$port
          akka.cluster.roles = [$role]
          """)
          .withFallback(ConfigFactory.load("cluster"))
    
        val entityManager = ActorSystem[EntityManager.Command](EntityManager(), "ClusterSystem", config)
        if (role == "front") {
          entityManager ! EntityManager.AddOne("9013")
          entityManager ! EntityManager.AddOne("9014")
          entityManager ! EntityManager.AddOne("9013")
          entityManager ! EntityManager.AddOne("9015")
          entityManager ! EntityManager.AddOne("9013")
          entityManager ! EntityManager.AddOne("9014")
          entityManager ! EntityManager.AddOne("9014")
          entityManager ! EntityManager.AddOne("9013")
          entityManager ! EntityManager.AddOne("9015")
          entityManager ! EntityManager.AddOne("9015")
          entityManager ! EntityManager.AddOne("9016")
          entityManager ! EntityManager.GetSum("9014")
          entityManager ! EntityManager.GetSum("9015")
          entityManager ! EntityManager.GetSum("9013")
          entityManager ! EntityManager.GetSum("9016")
        }
    
      }
    
    }

    cluster.conf

    akka {
      actor {
        provider = cluster
    
        serialization-bindings {
          "com.learn.akka.CborSerializable" = jackson-cbor
        }
      }
      remote {
        artery {
          canonical.hostname = "127.0.0.1"
          canonical.port = 0
        }
      }
      cluster {
        seed-nodes = [
          "akka://ClusterSystem@127.0.0.1:25251",
          "akka://ClusterSystem@127.0.0.1:25252"]
      }
    }
  • 相关阅读:
    composer阿里云短信服务不支持传参为数值--为2017年短信接口,2018阿里云有更新http://www.cnblogs.com/q1104460935/p/8916096.html
    随机生成字符串,数字,手机号,邮箱
    C#: .net序列化及反序列化 [XmlElement(“节点名称”)] [XmlAttribute(“节点属性”)] (上篇)
    自动升级功能
    C# WinForm 设置按纽为透明,使用背景色
    sql server 2000 单主键高效分页存储过程 (支持多字段排序)
    分页存储过程
    C# WinForm 解决子窗体放大后,子窗体图标放大的问题
    Windows 7/8 64位系统 不能注册32位dll 文件的解决方案
    添加ico图标
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/13100185.html
Copyright © 2011-2022 走看看