zoukankan      html  css  js  c++  java
  • AKKA集群中的分布式发布订阅

    集群中的分布式发布订阅

    如何向一个不知道在哪个节点上运行的actor发送消息呢?

    如何向集群中的所有actor发送感兴趣的主题的消息?

    这种模式提供了一个中介actor,akka.cluster.pubsub.DistributedPubSubMediator,它管理actor引用的注册,复制所有集群节点或者特定角色节点的对等actor的条目。

    DistributedPubSubMediator actor应该在所有的节点上或者特定角色的节点上启动。中介可以由DistributedPubSub扩展启动或者作为普通的actor启动。

    注册最终是一致的,即变化不会被其它的节点立即看到,但是通常它们会在几秒之后被完全复制到所有的节点上。只在自己的注册部分执行变化,这些变化被版本化了。变化的增量用gossip协议以可伸缩的方式传播到其它的节点。

    WeaklyUp状态的集群成员,如果这个特性使能了,将会参与到分布式发布订阅,即WeaklyUp状态节点的订阅者会收到发布的消息,如果发布者和订阅者在同一个网络分区。

    你可以通过任意节点上的中介者向任何其它节点注册的actor发送消息。

    有两种不同的消息投递模式,分别在发布和发送章节进行解释。

    发布Publish

    这是真正的发布订阅模式。这种模式的一种典型用法就是即时消息应用中的聊天室。

    Actor注册到被命名的主题。这将使能每一个节点上的多个订阅者。消息将会投递到这个主题的所有订阅者。

    为了提高效率,消息只会通过这个wire向每一个节点发送一次(有匹配的主题)然后就被投递到本地主题的所有订阅者。

    你用DistributedPubSubMediator.Subscribe方法将actor注册到本地中介者。成功的订阅和取消订阅由DistributedPubSubMediator.SubscribeAck和 DistributedPubSubMediator.UnsubscribeAck应答确认。这个确认消息意味着订阅已经注册了,但是它仍然需要花费一些时间复制到其它的节点上。

    你通过向本地的中介者发送DistributedPubSubMediator.Publish消息来发布消息。

    当actor终止时,它们会自动从注册表移除,或者你可以明确的使用DistributedPubSubMediator.Unsubscribe移除。

    订阅actor的一个例子:

    [java] view plain copy
     
    1. import akka.actor.ActorRef;  
    2.   
    3. import akka.actor.UntypedActor;  
    4.   
    5. import akka.cluster.pubsub.DistributedPubSub;  
    6.   
    7. import akka.cluster.pubsub.DistributedPubSubMediator;  
    8.   
    9. import akka.event.Logging;  
    10.   
    11. import akka.event.LoggingAdapter;  
    12.   
    13.    
    14.   
    15. publicclass Subscriber extends UntypedActor {  
    16.   
    17.    
    18.   
    19.     LoggingAdapter log = Logging.getLogger(getContext().system(), this);  
    20.   
    21.    
    22.   
    23.     public Subscriber() {  
    24.   
    25.         ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();  
    26.   
    27.         // subscribe to the topic named "content"  
    28.   
    29.         mediator.tell(new DistributedPubSubMediator.Subscribe("content", getSelf()), getSelf());  
    30.   
    31.     }  
    32.   
    33.    
    34.   
    35.     @Override  
    36.   
    37.     publicvoid onReceive(Object msg) {  
    38.   
    39.         if (msginstanceof String)  
    40.   
    41.             log.info("Got: {}", msg);  
    42.   
    43.         elseif (msginstanceof DistributedPubSubMediator.SubscribeAck)  
    44.   
    45.             log.info("subscribing");  
    46.   
    47.         else  
    48.   
    49.             unhandled(msg);  
    50.   
    51.     }  
    52.   
    53. }  

    订阅者可以再集群的多个节点上启动,所有的订阅者都会收到发布到"content"主题的消息:

    system.actorOf(Props.create(Subscriber.class), "subscriber1");

    //another node

    system.actorOf(Props.create(Subscriber.class), "subscriber2");

    system.actorOf(Props.create(Subscriber.class), "subscriber3");

    一个发布消息到"content"主题的简单actor:

    [java] view plain copy
     
    1. import akka.actor.ActorRef;  
    2.   
    3. import akka.actor.UntypedActor;  
    4.   
    5. import akka.cluster.pubsub.DistributedPubSub;  
    6.   
    7. import akka.cluster.pubsub.DistributedPubSubMediator;  
    8.   
    9.    
    10.   
    11. publicclass Publisher extends UntypedActor {  
    12.   
    13.    
    14.   
    15.     // activate the extension  
    16.   
    17.     ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();  
    18.   
    19.    
    20.   
    21.     @Override  
    22.   
    23.     publicvoid onReceive(Object msg) {  
    24.   
    25.         if (msginstanceof String) {  
    26.   
    27.             String in = (String) msg;  
    28.   
    29.             String out = in.toUpperCase();  
    30.   
    31.             mediator.tell(new DistributedPubSubMediator.Publish("content", out), getSelf());  
    32.   
    33.         } else {  
    34.   
    35.             unhandled(msg);  
    36.   
    37.         }  
    38.   
    39.     }  
    40.   
    41. }  

    它可以从集群中的任何地方发布消息到这个主题:

    //somewhere else

    ActorRef publisher = system.actorOf(Props.create(Publisher.class), "publisher");

    // after a while the subscriptions are replicated

    publisher.tell("hello", null);

    主题分组

    具有groupid的Actor也订阅被命名的主题。如果用group Id订阅,那么每一个发布到该主题(sendOneMessageToEachGroup标记为true)的消息都通过RoutingLogic(默认是随机的)投递到每一个订阅组的一个actor。

    如果所有的订阅actor都具有相同的group id,那么它工作起来就像Send(发送),每一个消息只投递到一个订阅者。

    如果所有的订阅actor具有不同的group名字,那么它工作起来就跟正常的Publish(发布)一样了,每一个消息都广播到所有的订阅者。

    注意:如果使用group id,那么它将是主题标识符的一部分。用sendOneMessageToEachGroup=false发送的消息将不会投递到以group id订阅的订阅者。用sendOneMessageToEachGroup=true发布的消息不会投递到没有用group id订阅的订阅者。

    发送Send

    这是点到点模式,每一个消息都投递到一个目的地,但是你仍然不知道目的地位于哪儿。这种模式的典型用法是即时消息应用中的私有聊天。它可以被用于发布任务到注册的worker,就像感知集群的router那样,routee会动态注册自己。

    消息会被投递到一个匹配路径的接收者,如果注册表中存在这样的接收者。如果多个条目匹配这个路径,因为它注册到了多个节点上,那么消息会通过提供的RoutingLogic (默认是随机的)投递到目的地。消息的发送者可以指定偏好本地近亲,即消息发送到一个相同的本地actor系统中的中介者actor,如果他存在的话,否则就会路由到其它匹配的条目。

    你会用DistributedPubSubMediator.Put 注册actor到本地的中介者。Put中的ActorRef必须与中介者属于相同的本地actor系统。没有地址信息的路径就是你发送消息的关键字。在每一个节点上,只有一个给定路径的actor,因为一个本地actor系统中的路径是唯一的。

    你用DistributedPubSubMediator.Send消息发送消息到具有目的actor路径(没有地址信息)的本地中介者。

    当actor终止时,它们自动从注册表中移除,或者你可以明确地使用DistributedPubSubMediator.Remove移除。

    目的actor的示例:

    [java] view plain copy
     
    1. import akka.actor.ActorRef;  
    2.   
    3. import akka.actor.UntypedActor;  
    4.   
    5. import akka.cluster.pubsub.DistributedPubSub;  
    6.   
    7. import akka.cluster.pubsub.DistributedPubSubMediator;  
    8.   
    9. import akka.event.Logging;  
    10.   
    11. import akka.event.LoggingAdapter;  
    12.   
    13.    
    14.   
    15. publicclass Destination extends UntypedActor {  
    16.   
    17.    
    18.   
    19.     LoggingAdapter log = Logging.getLogger(getContext().system(), this);  
    20.   
    21.    
    22.   
    23.     public Destination() {  
    24.   
    25.         ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();  
    26.   
    27.         // register to the path  
    28.   
    29.         mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());  
    30.   
    31.     }  
    32.   
    33.    
    34.   
    35.     @Override  
    36.   
    37.     publicvoid onReceive(Object msg) {  
    38.   
    39.         if (msginstanceof String)  
    40.   
    41.             log.info("Got: {}", msg);  
    42.   
    43.         elseif (msginstanceof DistributedPubSubMediator.SubscribeAck)  
    44.   
    45.             log.info("subscribing");  
    46.   
    47.         else  
    48.   
    49.             unhandled(msg);  
    50.   
    51.     }  
    52.   
    53. }  

    Subscriber actors can be started on several nodes in the cluster, and all will receive messages published to the "content" topic.

    system.actorOf(Props.create(Destination.class), "destination");

    //another node

    system.actorOf(Props.create(Destination.class), "destination");

    向"content"主题发送消息的简单actor:

    [java] view plain copy
     
    1. import akka.actor.ActorRef;  
    2.   
    3. import akka.actor.UntypedActor;  
    4.   
    5. import akka.cluster.pubsub.DistributedPubSub;  
    6.   
    7. import akka.cluster.pubsub.DistributedPubSubMediator;  
    8.   
    9.    
    10.   
    11. publicclass Sender extends UntypedActor {  
    12.   
    13.    
    14.   
    15.     // activate the extension  
    16.   
    17.     ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();  
    18.   
    19.    
    20.   
    21.     @Override  
    22.   
    23.     publicvoid onReceive(Object msg) {  
    24.   
    25.         if (msginstanceof String) {  
    26.   
    27.             String in = (String) msg;  
    28.   
    29.             String out = in.toUpperCase();  
    30.   
    31.             booleanlocalAffinity = true;  
    32.   
    33.             mediator.tell(new DistributedPubSubMediator.Send("/user/destination",out,  
    34.   
    35.                     localAffinity), getSelf());  
    36.   
    37.         } else {  
    38.   
    39.             unhandled(msg);  
    40.   
    41.         }  
    42.   
    43.     }  
    44.   
    45. }  

    可以从集群的任何地方向主题发布消息:

    //somewhere else

    ActorRef sender = system.actorOf(Props.create(Publisher.class), "sender");

    // after a while the destinations are replicated

    sender.tell("hello", null);

    向用Put注册的actor广播消息也是可能的。向本地的中介者发送 DistributedPubSubMediator.SendToAll消息,包装消息会被投递到匹配路径的所有接收者。具有相同path的actor,没有地址信息,可以在不同的节点上注册。每一个节点只能有一个这样的actor,因为一个本地actor系统的路径是唯一的。

    这种模式的典型用法是广播消息给相同路径的接收者,例如不同节点上的3个actor执行相同的动作来实现冗余。你可以指定一个属性(allButSelf)来决定是否消息应该发送到自己节点上的匹配路径。

    Distributed发布订阅扩展

    在上面的例子中,中介者是通过akka.cluster.pubsub.DistributedPubSub扩展启动和访问的。在大多数场景下,这都是很方便的,也是极好的,但是最好要知道中介者actor也可以作为普通的actor启动,你可以具有多个不同中介者actor,这能够将大量的actor/主题分到不同的中介者。例如,你可能想要使用不同的集群角色的中介者。

    DistributedPubSub扩展可以用如下的属性进行配置:

    # Settings for the DistributedPubSub extension

    akka.cluster.pub-sub {

      # Actor name of the mediator actor, /system/distributedPubSubMediator

      name = distributedPubSubMediator

      # Start the mediator on members tagged with this role.

      # All members are used if undefined or empty.

      role = ""

      # The routing logic to use for 'Send'

      # Possible values: random, round-robin, broadcast

      routing-logic = random

      # How often the DistributedPubSubMediator should send out gossip information

      gossip-interval = 1s

      # Removed entries are pruned after this duration

      removed-time-to-live = 120s

      # Maximum number of elements to transfer in one message when synchronizing the registries.

      # Next chunk will be transferred in next round of gossip.

      max-delta-elements = 3000

      # The id of the dispatcher to use for DistributedPubSubMediator actors.

      # If not specified default dispatcher is used.

      # If specified you need to define the settings of the actual dispatcher.

      use-dispatcher = ""

    }

    推荐当actor系统启动时加载这个扩展,这需要定义akka.extensions配置属性。否则它在第一次使用时激活,然后过一会儿才能使用。

    akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"]

    投递保证

    正如Akka的消息投递可靠性所言,在分布式发布订阅模式中的消息投递保证是最多一次投递at-most-once delivery。换句话说,消息可能会丢失。

    如果你正在寻找at-least-once投递保证,我们推荐Kafka Akka Streams integration

    依赖

    要使用分布式发布订阅,你得工程必须添加如下的依赖:

    sbt:

    "com.typesafe.akka" %% "akka-cluster-tools" % "2.4.16"

    maven:

    [html] view plain copy
     
    1. <dependency>  
    2.   <groupId>com.typesafe.akka</groupId>  
    3.   <artifactId>akka-cluster-tools_2.11</artifactId>  
    4.   <version>2.4.16</version>  
    5. </dependency>  
  • 相关阅读:
    【LeetCode】589.N叉树的前序遍历(递归+迭代,java实现,详细分析)
    百度网盘偷偷更新,终于实现免费不限速了!
    如何调整DOS窗口的宽高
    输入adb shell 时 提示error: more than one device and emulator
    logcat不显示信息
    安卓打开File Explorer里面不显示内容
    android查看源码的时候看不了
    This version of the rendering library is more recent than your version of ADT plug-in. Please update
    eclipse或者AS链接手机真机之后,logcat里面日志信息乱跳
    Android ADB使用之详细篇
  • 原文地址:https://www.cnblogs.com/vana/p/9071897.html
Copyright © 2011-2022 走看看