对于Event Bus,也就是事件总线,普通场景下个人建议不要使用。Event Bus会使本来就复杂的消息通信更加复杂, 如果不用,开发过程中你明确知道跟某个actor通信的都有哪些actor,也就是说他们之间的通信协议是明确的。仅仅做到这一点,就会使actor系统很复杂了,再用个Event Bus把事件发送出去,会导致消息更加分散,某种意义上也是一种耦合。比如你把消息A发布出去,但却不知道谁在订阅它,如果某个版本升级你不消息忘了发布这个消息,那其他actor还能正常工作吗?这明显是给自己找麻烦。
废话不多说,来看看它的实现。当然EventBus实现比较复杂,简单起见,我们只分析Event Stream。
// this provides basic logging (to stdout) until .start() is called below val eventStream = new EventStream(this, DebugEventStream) eventStream.startStdoutLogger(settings)
/** * An Akka EventStream is a pub-sub stream of events both system and user generated, * where subscribers are ActorRefs and the channels are Classes and Events are any java.lang.Object. * EventStreams employ SubchannelClassification, which means that if you listen to a Class, * you'll receive any message that is of that type or a subtype. * * The debug flag in the constructor toggles if operations on this EventStream should also be published * as Debug-Events */ class EventStream(sys: ActorSystem, private val debug: Boolean) extends LoggingBus with SubchannelClassification
Akka EventStream是一个发布-订阅事件流,包括系统和用户产生的数据。订阅某个特定类型的消息,不一定会收到对应的消息,前提是你自己或系统调用EventStream的发布接口把消息发布了出去。
/** * Classification which respects relationships between channels: subscribing * to one channel automatically and idempotently subscribes to all sub-channels. */ trait SubchannelClassification { this: EventBus ⇒
class DeadLetterListener extends Actor { def receive = { case d: DeadLetter ⇒ println(d) } } val listener = system.actorOf(Props[DeadLetterListener]) system.eventStream.subscribe(listener, classOf[DeadLetter])
/** * ''Must'' be called after actor system is "ready". * Starts system actor that takes care of unsubscribing subscribers that have terminated. */ def startUnsubscriber(): Unit = // sys may be null for backwards compatibility reasons if (sys ne null) EventStreamUnsubscriber.start(sys, this)
/** * INTERNAL API * * Provides factory for [[akka.event.EventStreamUnsubscriber]] actors with **unique names**. * This is needed if someone spins up more [[EventStream]]s using the same [[akka.actor.ActorSystem]], * each stream gets it's own unsubscriber. */ private[akka] object EventStreamUnsubscriber { private val unsubscribersCount = new AtomicInteger(0) final case class Register(actor: ActorRef) final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef) private def props(eventStream: EventStream, debug: Boolean) = Props(classOf[EventStreamUnsubscriber], eventStream, debug) def start(system: ActorSystem, stream: EventStream) = { val debug = system.settings.config.getBoolean("akka.actor.debug.event-stream") system.asInstanceOf[ExtendedActorSystem] .systemActorOf(props(stream, debug), "eventStreamUnsubscriber-" + unsubscribersCount.incrementAndGet()) } }
/** * INTERNAL API * * Watches all actors which subscribe on the given eventStream, and unsubscribes them from it when they are Terminated. * * Assumptions note: * We do not guarantee happens-before in the EventStream when 2 threads subscribe(a) / unsubscribe(a) on the same actor, * thus the messages sent to this actor may appear to be reordered - this is fine, because the worst-case is starting to * needlessly watch the actor which will not cause trouble for the stream. This is a trade-off between slowing down * subscribe calls * because of the need of linearizing the history message sequence and the possibility of sometimes * watching a few actors too much - we opt for the 2nd choice here. */ protected[akka] class EventStreamUnsubscriber(eventStream: EventStream, debug: Boolean = false) extends Actor
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") if (debug) publish(Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel)) registerWithUnsubscriber(subscriber) super.subscribe(subscriber, channel) }
@tailrec private def registerWithUnsubscriber(subscriber: ActorRef): Unit = { // sys may be null for backwards compatibility reasons if (sys ne null) initiallySubscribedOrUnsubscriber.get match { case value @ Left(subscribers) ⇒ if (!initiallySubscribedOrUnsubscriber.compareAndSet(value, Left(subscribers + subscriber))) registerWithUnsubscriber(subscriber) case Right(unsubscriber) ⇒ unsubscriber ! EventStreamUnsubscriber.Register(subscriber) } }
/** Either the list of subscribed actors, or a ref to an [[akka.event.EventStreamUnsubscriber]] */ private val initiallySubscribedOrUnsubscriber = new AtomicReference[Either[Set[ActorRef], ActorRef]](Left(Set.empty))
def subscribe(subscriber: Subscriber, to: Classifier): Boolean = subscriptions.synchronized { val diff = subscriptions.addValue(to, subscriber) addToCache(diff) diff.nonEmpty }
// must be lazy to avoid initialization order problem with subclassification private lazy val subscriptions = new SubclassifiedIndex[Classifier, Subscriber]()
@volatile private var cache = Map.empty[Classifier, Set[Subscriber]]
def publish(event: Event): Unit = { val c = classify(event) val recv = if (cache contains c) cache(c) // c will never be removed from cache else subscriptions.synchronized { if (cache contains c) cache(c) else { addToCache(subscriptions.addKey(c)) cache(c) } } recv foreach (publish(event, _)) }
protected def classify(event: AnyRef): Class[_] = event.getClass
protected def publish(event: AnyRef, subscriber: ActorRef) = { if (sys == null && subscriber.isTerminated) unsubscribe(subscriber) else subscriber ! event }
publish呢?就是调用subscriber的! 方法,把消息发送出去。