zoukankan      html  css  js  c++  java
  • Akka(12): 分布式运算:Cluster-Singleton-让运算在集群节点中自动转移

      在很多应用场景中都会出现在系统中需要某类Actor的唯一实例(only instance)。这个实例在集群环境中可能在任何一个节点上,但保证它是唯一的。Akka的Cluster-Singleton提供对这种Singleton Actor模式的支持,能做到当这个实例所在节点出现问题需要脱离集群时自动在另一个节点上构建一个同样的Actor,并重新转交控制。当然,由于涉及了一个新构建的Actor,内部状态会在这个过程中丢失。Single-Actor的主要应用包括某种对外部只能支持一个接入的程序接口,或者一种带有由多个其它Actor运算结果产生的内部状态的累积型Actor(aggregator)。当然,如果使用一种带有内部状态的Singleton-Actor,可以考虑使用PersistenceActor来实现内部状态的自动恢复。如此Cluster-Singleton变成了一种非常实用的模式,可以在许多场合下应用。

      Cluster-Singleton模式也恰恰因为它的唯一性特点存在着一些隐忧,需要特别关注。唯一性容易造成的隐忧包括:容易造成超负荷、无法保证稳定在线、无法保证消息投递。这些需要用户在编程时增加特别处理。

    好了,我们设计个例子来了解Cluster-Singleton,先看看Singleton-Actor的功能:

    class SingletonActor extends PersistentActor with ActorLogging {
      import SingletonActor._
      val cluster = Cluster(context.system)
    
      var freeHoles = 0
      var freeTrees = 0
      var ttlMatches = 0
    
      override def persistenceId = self.path.parent.name + "-" + self.path.name
    
      def updateState(evt: Event): Unit = evt match {
        case AddHole =>
          if (freeTrees > 0) {
            ttlMatches += 1
            freeTrees -= 1
          } else freeHoles += 1
        case AddTree =>
          if (freeHoles > 0) {
            ttlMatches += 1
            freeHoles -= 1
          } else freeTrees += 1
    
      }
    
      override def receiveRecover: Receive = {
        case evt: Event => updateState(evt)
        case SnapshotOffer(_,ss: State) =>
          freeHoles = ss.nHoles
          freeTrees = ss.nTrees
          ttlMatches = ss.nMatches
      }
    
      override def receiveCommand: Receive = {
        case Dig =>
          persist(AddHole){evt =>
            updateState(evt)
          }
          sender() ! AckDig   //notify sender message received
          log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches")
    
        case Plant =>
          persist(AddTree) {evt =>
            updateState(evt)
          }
          sender() ! AckPlant   //notify sender message received
          log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches")
    
        case Disconnect =>  //this node exits cluster. expect switch to another node
          log.info(s"${cluster.selfAddress} is leaving cluster ...")
          cluster.leave(cluster.selfAddress)
        case CleanUp =>
          //clean up ...
          self ! PoisonPill
      }
    
    }

    这个SingletonActor就是一种特殊的Actor,它继承了PersistentActor,所以需要实现PersistentActor的抽象函数。SingletonActor维护了几个内部状态,分别是各类运算的当前累积结果freeHoles,freeTrees,ttlMatches。SingletonActor模拟的是一个种树场景:当收到Dig指令后产生登记树坑AddHole事件,在这个事件中更新当前状态值;当收到Plant指令后产生AddTree事件并更新状态。因为Cluster-Singleton模式无法保证消息安全投递所以应该加个回复机制AckDig,AckPlant让消息发送者可用根据情况补发消息。我们是用Cluster.selfAddress来确认当前集群节点的转换。

    我们需要在所有承载SingletonActor的集群节点上构建部署ClusterSingletonManager,如下:

      def create(port: Int) = {
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
            .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]"))
            .withFallback(ConfigFactory.load())
        val singletonSystem = ActorSystem("SingletonClusterSystem",config)
    
        startupSharedJournal(singletonSystem, (port == 2551), path =
            ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store"))
    
        val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props(
          singletonProps = Props[SingletonActor],
          terminationMessage = CleanUp,
          settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton"))
        ), name = "singletonManager")
    
      }

    可以看的出来,ClusterSingletonManager也是一种Actor,通过ClusterSingletonManager.props配置其所管理的SingletonActor。我们的目的主要是去求证当前集群节点出现故障需要退出集群时,这个SingletonActor是否能够自动转移到其它在线的节点上。ClusterSingletonManager的工作原理是首先在所有选定的集群节点上构建和部署,然后在最先部署的节点上启动SingletonActor,当这个节点不可使用时(unreachable)自动在次先部署的节点上重新构建部署SingletonActor。

    同样作为一种Actor,ClusterSingletonProxy是通过与ClusterSingletonManager消息沟通来调用SingletonActor的。ClusterSingletonProxy动态跟踪在线的SingletonActor,为用户提供它的ActorRef。我们可以通过下面的代码来具体调用SingletonActor:

    object SingletonUser {
      def create = {
        val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]")
          .withFallback(ConfigFactory.load())
        val suSystem = ActorSystem("SingletonClusterSystem",config)
    
        val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props(
          singletonManagerPath = "/user/singletonManager",
          settings = ClusterSingletonProxySettings(suSystem).withRole(None)
        ), name= "singletonUser")
    
    
        import suSystem.dispatcher
        //send Dig messages every 2 seconds to SingletonActor through prox
        suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig)
    
        //send Plant messages every 3 seconds to SingletonActor through prox
        suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant)
    
        //send kill message to hosting node every 30 seconds
        suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect)
      }
    
    }

    我们分不同的时间段通过ClusterSingletonProxy向SingletonActor发送Dig和Plant消息。然后每隔30秒向SingletonActor发送一个Disconnect消息通知它所在节点开始脱离集群。然后我们用下面的代码来试着运行:

    package clustersingleton.demo
    
    import clustersingleton.sa.SingletonActor
    import clustersingleton.frontend.SingletonUser
    
    object ClusterSingletonDemo extends App {
    
      SingletonActor.create(2551)    //seed-node
      
      SingletonActor.create(0)   //ClusterSingletonManager node
      SingletonActor.create(0)
      SingletonActor.create(0)
      SingletonActor.create(0)
    
      SingletonUser.create     //ClusterSingletonProxy node
    
    }

    运算结果如下:

    [INFO] [07/09/2017 20:17:28.210] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [07/09/2017 20:17:28.334] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:2551]
    [INFO] [07/09/2017 20:17:28.489] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [07/09/2017 20:17:28.493] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55839]
    [INFO] [07/09/2017 20:17:28.514] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [07/09/2017 20:17:28.528] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55840]
    [INFO] [07/09/2017 20:17:28.566] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [07/09/2017 20:17:28.571] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55841]
    [INFO] [07/09/2017 20:17:28.595] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [07/09/2017 20:17:28.600] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55842]
    [INFO] [07/09/2017 20:17:28.620] [main] [akka.remote.Remoting] Starting remoting
    [INFO] [07/09/2017 20:17:28.624] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://SingletonClusterSystem@127.0.0.1:55843]
    [INFO] [07/09/2017 20:17:28.794] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton]
    [INFO] [07/09/2017 20:17:28.817] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=0,ttlMatches=0
    [INFO] [07/09/2017 20:17:29.679] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=1,freeTrees=0,ttlMatches=0
    ...
    [INFO] [07/09/2017 20:17:38.676] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:2551 is leaving cluster ...
    [INFO] [07/09/2017 20:17:39.664] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=4
    [INFO] [07/09/2017 20:17:40.654] [SingletonClusterSystem-akka.actor.default-dispatcher-21] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=2,ttlMatches=4
    [INFO] [07/09/2017 20:17:41.664] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:2551:freeHoles=0,freeTrees=1,ttlMatches=5
    [INFO] [07/09/2017 20:17:42.518] [SingletonClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton]
    [INFO] [07/09/2017 20:17:43.653] [SingletonClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=2,ttlMatches=5
    [INFO] [07/09/2017 20:17:43.672] [SingletonClusterSystem-akka.actor.default-dispatcher-15] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=1,ttlMatches=6
    [INFO] [07/09/2017 20:17:45.665] [SingletonClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=2,ttlMatches=6
    [INFO] [07/09/2017 20:17:46.654] [SingletonClusterSystem-akka.actor.default-dispatcher-19] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=3,ttlMatches=6
    ...
    [INFO] [07/09/2017 20:17:53.673] [SingletonClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] akka.tcp://SingletonClusterSystem@127.0.0.1:55839 is leaving cluster ...
    [INFO] [07/09/2017 20:17:55.654] [SingletonClusterSystem-akka.actor.default-dispatcher-13] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=4,ttlMatches=9
    [INFO] [07/09/2017 20:17:55.664] [SingletonClusterSystem-akka.actor.default-dispatcher-24] [akka.tcp://SingletonClusterSystem@127.0.0.1:55839/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55839:freeHoles=0,freeTrees=3,ttlMatches=10
    [INFO] [07/09/2017 20:17:56.646] [SingletonClusterSystem-akka.actor.default-dispatcher-5] [akka.tcp://SingletonClusterSystem@127.0.0.1:55843/user/singletonUser] Singleton identified at [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton]
    [INFO] [07/09/2017 20:17:57.662] [SingletonClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55840:freeHoles=0,freeTrees=4,ttlMatches=10
    [INFO] [07/09/2017 20:17:58.652] [SingletonClusterSystem-akka.actor.default-dispatcher-23] [akka.tcp://SingletonClusterSystem@127.0.0.1:55840/user/singletonManager/singleton] State on akka.tcp://SingletonClusterSystem@127.0.0.1:55840:freeHoles=0,freeTrees=5,ttlMatches=10

    从结果显示里我们可以观察到随着节点脱离集群,SingletonActor自动转换到其它的集群节点上继续运行。

    值得再三注意的是:以此等简单的编码就可以实现那么复杂的集群式分布运算程序,说明Akka是一种具有广阔前景的实用编程工具!

    下面是本次示范的完整源代码:

    build.sbt

    name := "cluster-singleton"
    
    version := "1.0"
    
    scalaVersion := "2.11.9"
    
    resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
    
    val akkaversion = "2.4.8"
    
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-actor" % akkaversion,
      "com.typesafe.akka" %% "akka-remote" % akkaversion,
      "com.typesafe.akka" %% "akka-cluster" % akkaversion,
      "com.typesafe.akka" %% "akka-cluster-tools" % akkaversion,
      "com.typesafe.akka" %% "akka-cluster-sharding" % akkaversion,
      "com.typesafe.akka" %% "akka-persistence" % "2.4.8",
      "com.typesafe.akka" %% "akka-contrib" % akkaversion,
      "org.iq80.leveldb" % "leveldb" % "0.7",
      "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8")

    application.conf

    akka.actor.warn-about-java-serializer-usage = off
    akka.log-dead-letters-during-shutdown = off
    akka.log-dead-letters = off
    
    
    akka {
      loglevel = INFO
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
    
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://SingletonClusterSystem@127.0.0.1:2551"]
        log-info = off
      }
    
      persistence {
        journal.plugin = "akka.persistence.journal.leveldb-shared"
        journal.leveldb-shared.store {
          # DO NOT USE 'native = off' IN PRODUCTION !!!
          native = off
          dir = "target/shared-journal"
        }
        snapshot-store.plugin = "akka.persistence.snapshot-store.local"
        snapshot-store.local.dir = "target/snapshots"
      }
    }

    SingletonActor.scala

    package clustersingleton.sa
    
    import akka.actor._
    import akka.cluster._
    import akka.persistence._
    import com.typesafe.config.ConfigFactory
    import akka.cluster.singleton._
    import scala.concurrent.duration._
    import akka.persistence.journal.leveldb._
    import akka.util.Timeout
    import akka.pattern._
    
    
    object SingletonActor {
      sealed trait Command
      case object Dig extends Command
      case object Plant extends Command
      case object AckDig extends Command    //acknowledge
      case object AckPlant extends Command   //acknowledge
    
      case object Disconnect extends Command   //force node to leave cluster
      case object CleanUp extends Command      //clean up when actor ends
    
      sealed trait Event
      case object AddHole extends Event
      case object AddTree extends Event
    
      case class State(nHoles: Int, nTrees: Int, nMatches: Int)
    
      def create(port: Int) = {
        val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
            .withFallback(ConfigFactory.parseString("akka.cluster.roles=[singleton]"))
            .withFallback(ConfigFactory.load())
        val singletonSystem = ActorSystem("SingletonClusterSystem",config)
    
        startupSharedJournal(singletonSystem, (port == 2551), path =
            ActorPath.fromString("akka.tcp://SingletonClusterSystem@127.0.0.1:2551/user/store"))
    
        val singletonManager = singletonSystem.actorOf(ClusterSingletonManager.props(
          singletonProps = Props[SingletonActor],
          terminationMessage = CleanUp,
          settings = ClusterSingletonManagerSettings(singletonSystem).withRole(Some("singleton"))
        ), name = "singletonManager")
    
      }
    
      def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {
        // Start the shared journal one one node (don't crash this SPOF)
        // This will not be needed with a distributed journal
        if (startStore)
          system.actorOf(Props[SharedLeveldbStore], "store")
        // register the shared journal
        import system.dispatcher
        implicit val timeout = Timeout(15.seconds)
        val f = (system.actorSelection(path) ? Identify(None))
        f.onSuccess {
          case ActorIdentity(_, Some(ref)) =>
            SharedLeveldbJournal.setStore(ref, system)
          case _ =>
            system.log.error("Shared journal not started at {}", path)
            system.terminate()
        }
        f.onFailure {
          case _ =>
            system.log.error("Lookup of shared journal at {} timed out", path)
            system.terminate()
        }
      }
    
    
    }
    
    class SingletonActor extends PersistentActor with ActorLogging {
      import SingletonActor._
      val cluster = Cluster(context.system)
    
      var freeHoles = 0
      var freeTrees = 0
      var ttlMatches = 0
    
      override def persistenceId = self.path.parent.name + "-" + self.path.name
    
      def updateState(evt: Event): Unit = evt match {
        case AddHole =>
          if (freeTrees > 0) {
            ttlMatches += 1
            freeTrees -= 1
          } else freeHoles += 1
        case AddTree =>
          if (freeHoles > 0) {
            ttlMatches += 1
            freeHoles -= 1
          } else freeTrees += 1
    
      }
    
      override def receiveRecover: Receive = {
        case evt: Event => updateState(evt)
        case SnapshotOffer(_,ss: State) =>
          freeHoles = ss.nHoles
          freeTrees = ss.nTrees
          ttlMatches = ss.nMatches
      }
    
      override def receiveCommand: Receive = {
        case Dig =>
          persist(AddHole){evt =>
            updateState(evt)
          }
          sender() ! AckDig   //notify sender message received
          log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches")
    
        case Plant =>
          persist(AddTree) {evt =>
            updateState(evt)
          }
          sender() ! AckPlant   //notify sender message received
          log.info(s"State on ${cluster.selfAddress}:freeHoles=$freeHoles,freeTrees=$freeTrees,ttlMatches=$ttlMatches")
    
        case Disconnect =>  //this node exits cluster. expect switch to another node
          log.info(s"${cluster.selfAddress} is leaving cluster ...")
          cluster.leave(cluster.selfAddress)
        case CleanUp =>
          //clean up ...
          self ! PoisonPill
      }
    
    }

    SingletonUser.scala

    package clustersingleton.frontend
    import akka.actor._
    import clustersingleton.sa.SingletonActor
    import com.typesafe.config.ConfigFactory
    import akka.cluster.singleton._
    import scala.concurrent.duration._
    
    object SingletonUser {
      def create = {
        val config = ConfigFactory.parseString("akka.cluster.roles=[frontend]")
          .withFallback(ConfigFactory.load())
        val suSystem = ActorSystem("SingletonClusterSystem",config)
    
        val singletonProxy = suSystem.actorOf(ClusterSingletonProxy.props(
          singletonManagerPath = "/user/singletonManager",
          settings = ClusterSingletonProxySettings(suSystem).withRole(None)
        ), name= "singletonUser")
    
    
        import suSystem.dispatcher
        //send Dig messages every 2 seconds to SingletonActor through prox
        suSystem.scheduler.schedule(0.seconds,3.second,singletonProxy,SingletonActor.Dig)
    
        //send Plant messages every 3 seconds to SingletonActor through prox
        suSystem.scheduler.schedule(1.seconds,2.second,singletonProxy,SingletonActor.Plant)
    
        //send kill message to hosting node every 30 seconds
        suSystem.scheduler.schedule(10.seconds,15.seconds,singletonProxy,SingletonActor.Disconnect)
      }
    
    }

     ClusterSingletonDemo.scala

    package clustersingleton.demo
    
    import clustersingleton.sa.SingletonActor
    import clustersingleton.frontend.SingletonUser
    
    object ClusterSingletonDemo extends App {
    
      SingletonActor.create(2551)    //seed-node
    
      SingletonActor.create(0)   //ClusterSingletonManager node
      SingletonActor.create(0)
      SingletonActor.create(0)
      SingletonActor.create(0)
    
      SingletonUser.create     //ClusterSingletonProxy node
    
    }

     

     

     

     

     

  • 相关阅读:
    poj3723Conscription
    hiho1304 24点
    hdu2089不要62
    hdu3555Bomb
    关于分割平面问题
    poj2976Dropping tests(01分数规划)
    linux命令行
    java内存不足
    如何设置jsp默认的编码为utf-8
    visul svn+花生壳
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/7144879.html
Copyright © 2011-2022 走看看