zoukankan      html  css  js  c++  java
  • AKKA 集群中的发布与订阅Distributed Publish Subscribe in Cluster

    Distributed Publish Subscribe in Cluster

    基本定义

    在单机环境下订阅与发布是很常用的,然而在集群环境是比较麻烦和不好实现的;

    AKKA已经提供了相应的实现,集群环境各节点之间的actor相互订阅发布感兴的主题的消息,

    关键依赖媒介actor: akka.cluster.pubsub.DistributedPubSubMediator


    订阅:

    DistributedPubSubMediator.Subscribe方法将actor注册到本地中介者。
    成功的订阅和取消订阅由DistributedPubSubMediator.SubscribeAck和DistributedPubSubMediator.UnsubscribeAck应答确认。这个确认消息意味着订阅已经注册了,但是它仍然需要花费一些时间复制到其它的节点上。节点之间发现与注册会有一定延迟,可能造成消息不会立即送达!

    发布:

    你通过向本地的中介者发送DistributedPubSubMediator.Publish消息来发布消息。
    当actor终止时,它们会自动从注册表移除,或者你可以明确的使用DistributedPubSubMediator.Unsubscribe移除。

    实现示例

    package pubsub
    
    import akka.actor.AbstractActor
    import akka.actor.ActorRef
    import akka.actor.ActorSystem
    import org.slf4j.LoggerFactory
    import scala.PartialFunction
    import scala.runtime.BoxedUnit
    import akka.cluster.pubsub.DistributedPubSubMediator
    import akka.actor.Nobody.tell
    import akka.actor.Props
    import java.time.Clock.system
    import akka.cluster.pubsub.DistributedPubSub
    import akka.actor.Nobody.tell
    import com.typesafe.config.ConfigFactory
    
    
    /**
     * Created by: tankx
     * Date: 2019/7/16
     * Description: 发布订阅模式
     */
    /**
     * 定义发布者
     */
    class Pub() : AbstractActor() {
    
        private var log = LoggerFactory.getLogger(Pub::class.java)
    
        var mediator: ActorRef = DistributedPubSub.get(context.system).mediator()
    
        override fun createReceive(): Receive {
            return receiveBuilder().matchAny(this::receive).build()
        }
    
        private fun receive(msg: Any) {
    
            log.info("派发事件:$msg")
            if (msg is String) {
                mediator.tell(DistributedPubSubMediator.Publish(topA, msg), self)
            }
    
    
        }
    
    
    }
    
    /**
     * 定义订阅者
     */
    class Sub() : AbstractActor() {
    
        private var log = LoggerFactory.getLogger(Sub::class.java)
    
        override fun preStart() {
            //注册订阅
            var mediator = DistributedPubSub.get(getContext().system()).mediator()
            mediator.tell(DistributedPubSubMediator.Subscribe(topA, self), self)
    
            println("注册订阅")
            //ActorRef.noSender()不会接收订阅信息DistributedPubSubMediator.SubscribeAck
            //mediator.tell(DistributedPubSubMediator.Subscribe(topA, self), ActorRef.noSender())
    
            //移除订阅
            //DistributedPubSub.get(getContext().system()).mediator().tell(DistributedPubSubMediator.Unsubscribe(topA, self), ActorRef.noSender())
        }
    
    
        override fun createReceive(): Receive {
            return receiveBuilder().matchAny(this::receive).build()
        }
    
        private fun receive(msg: Any) {
    
            when (msg) {
                is String -> log.info("收到事件: $msg")
                is DistributedPubSubMediator.SubscribeAck -> log.info("订阅事件:$msg")
                else -> log.info("无对应类型")
            }
    
    
        }
    
    
    }
    
    //定义主题
    var topA: String = "topa"
    
    fun getSystem(port: Int): ActorSystem {
    
        val config = ConfigFactory.parseString(
            "akka.remote.netty.tcp.port=$port"
        ).withFallback(
            ConfigFactory.load("application_pub.conf")
        )
    
        var actorSystem = ActorSystem.create("custerPubSystem", config);
    
    
        return actorSystem
    }
    
    
    fun main() {
    
        var system = getSystem(3660);
    
        var subActor = system.actorOf(Props.create(Sub::class.java))
        Thread.sleep(1000)//让sub 完全起来
    
    //    var pubActor = system.actorOf(Props.create(Pub::class.java))
    //    pubActor.tell("hello", ActorRef.noSender())
    //
    //    pubActor.tell("world", ActorRef.noSender())
    //
    //    Thread.sleep(3000)
    
    }

    上面订阅启动后,再启动一个节点派发事件

    package pubsub
    
    import akka.actor.ActorRef
    import akka.actor.Props
    import akka.cluster.pubsub.DistributedPubSub
    import akka.cluster.pubsub.DistributedPubSubMediator
    
    /**
     * Created by: tankx
     * Date: 2019/7/18
     * Description:
     */
    fun main() {
    
        var system = getSystem(3661);
    
        Thread.sleep(3000)
    
        var mediator: ActorRef = DistributedPubSub.get(system).mediator()
      
        for (i in 1..1000) {
            mediator.tell(DistributedPubSubMediator.Publish(topA, "消息XXXXXX"), ActorRef.noSender())
    
            Thread.sleep(2000)
        }
    
    }

    配置文件

    akka {
      actor {
        provider = "akka.cluster.ClusterActorRefProvider"
      }
    
      cluster {
        seed-nodes = [
          "akka.tcp://custerPubSystem@127.0.0.1:3660"
        ]
      }
    
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
      }
    
    }

    依赖JAR

        compile("com.typesafe.akka:akka-actor_2.13:$akkaVersion")
        compile("com.typesafe.akka:akka-remote_2.13:$akkaVersion")
        compile("com.typesafe.akka:akka-cluster-tools_2.13:$akkaVersion")

    结果:

    2019-07-18 20:19:55.941 [custerPubSystem-akka.actor.default-dispatcher-4] INFO  pubsub.Sub 77- 收到事件: 消息XXXXXX
    2019-07-18 20:19:55.942 [custerPubSystem-akka.actor.default-dispatcher-4] INFO  pubsub.Sub 77- 收到事件: 消息XXXXXX

    结论:

    AKKA 集群中的发布与订阅在节点之间的Actor之间广播消息,监听自己关心的主题消息做相应逻辑,是非常方便与很多场景适用的

  • 相关阅读:
    js前端分享功能
    git常用命令
    webstorm中.vue报错
    页面重绘重排
    浏览器渲染引擎总结
    javascript中的this总结
    cookie、session、sessionid 与jsessionid
    promise和Angular中的 $q, defer
    C++11之nullptr
    C++ 输入ctrl+z 不能再使用cin的问题
  • 原文地址:https://www.cnblogs.com/tankaixiong/p/11209895.html
Copyright © 2011-2022 走看看