今天我们来讲解akka-streams,这应该算akka框架下实现的一个很高级的工具。之前在学习akka streams的时候,我是觉得云里雾里的,感觉非常复杂,而且又难学,不过随着对akka源码的深入,才逐渐明白它到底是怎么一回事。下面介绍主要摘自akka官网,但会融入我的理解,以及部分源码,以减少大家学习的难度。
首先近几年流式计算很火,有各种各样的框架,比如spark、storm、flink等,当然前提是我们得有这样的需求。随着数据量越来越大,我们很难一次性处理全部的数据,只能采用流水线或周期性的取一部分数据进行加工。简单来说就是“分而治之”。
Actors是基于消息通信的异步机制,也可以用来处理流式数据。akka使actor变得稳定、可恢复,但我们还需要仔细的考虑数据过载的问题。比如某个actor处理消息过慢,导致后续消息积压在mailbox中。Actor的消息也可能丢失,必要时就需要重传。当以固定的模式处理流式数据的元素时,actor就显得力不从心了,或者我们需要花很大的代价来确保正确性、准确性。
所以,akka团队提供了一套Akka Streams API,主要目的是提供一套直观的、安全的方法来规范流式处理过程,这样我们就可以用有限的资源来高效的执行流式计算,当然再也不会有内存溢出的错误了。当然前提是有一套背压的机制,背压是“Reactive Streams”的核心概念,Akka是“Reactive Streams”的创建成员。这也就意味着我们可以在Akka Streams中无缝的与其他Reactive Streams实现进行交互。
Akka Streams与Reactive Streams完全解耦,前者关注在数据流转换的格式化,后者是用来定义通用的机制来跨异步边界移动数据而且不会丢失数据、缓存数据、耗尽资源。简单来说,Akka Streams是面向开发者的,它内部使用Reactive Streams接口来传递数据。其实,简单来说就是Akka Streams定义了一套开发者友好的API,并在内部把这些API转换成了Reactive Streams接口,并在内部用actor实现了Reactive Streams接口。
那Reactive Streams接口都有什么呢?
- Publisher
- Subscriber
- Subscription
- Processor
Reactive Streams由四个组件构成,分别为消息发布者、订阅者、订阅(或者称为令牌)、处理器。
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Publisher貌似很简单,就只有subscribe接口,是订阅者调用的,用来订阅发布者的消息。发布者在订阅者调用request之后把消息push给订阅者。
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscriber也很简单,就是四个接口,分别为异步触发。当然了,是由Publisher触发调用的。onSubscribe告诉订阅者订阅成功,并返回了一个Subscription,通过Subscription订阅者可以告诉发布者发送指定数量的消息;onNext是发布者有消息时,调用订阅者这个接口来达到发布消息的目的的;onError通知订阅者,发布者出现了错误;onComplete通知订阅者消息发送完毕。当然这些接口都是异步的。
public interface Subscription { public void request(long n); public void cancel(); }
Subscription只有两个接口,请求n个消息,取消此次订阅。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Processor代表一个处理阶段,同时继承了Subscriber,Publisher。
其实Reactive Streams只是通过上面的四个组件和相关的函数,对反应式流进行了一个框架性的约定,并没有具体的实现。简单来说,它只提供通用的、合适的解决方案,大家都按照这个规约来实现就好了。Akka Streams就是这样的一个实现,只不过又对其进行了封装,使其更加易用。
我们来看看Akka Streams的核心概念。Akka Streams是一个库,用有限的缓冲空间来处理、转换一系列数据。翻译成日常术语就是,它能够表达成对一系列数据处理的连,每个加工节点都是独立的(而且尽量并行的),同时只缓存有限数量的元素。当然了,有限的缓存这一点与Actor模型有很大不同,因为Akka Streams并不会去主动丢弃数据。
Akka Streams中的Stream就是一个active的移动、转换数据的进程。Element是流中南的一个处理单元。所有的转换操作把Elements从上游移动到下游。背压是一种流量控制手段,一种数据消费者通知生产者当前可用性的方法,它可以减慢上游生产者产生数据的速度。在Akka中,背压是非阻塞和异步的。Akka Streams中所有操作都是非阻塞的。Akka Streams的计算逻辑是用Graph来描述的,它定义了元素被处理的路径,但不一定是一个DAG。Operator是编译Graph的通用名称,常见的有map、filter。
Akka Streams有几个核心的概念,需要我们理解和掌握。
Source。这是一个只会产生数据的操作,它在下游可以接收的时候发送数据。
Sink。这是一个只有输入的操作。对数据的请求和接受有可能会减慢上游数据的产生速度。
Flow。这是只有一个输入和一个输出的操作,它连接上下游,传输数据。
RunnableGraph。这是一个同时具有Source和Sink的流,也就意味着它可以运行。简单来说就是,它可以被编译成actor拓扑了,数据可以经过actor进行流转并被处理。
val source = Source(1 to 10) val sink = Sink.fold[Int, Int](0)(_ + _) // connect the Source to the Sink, obtaining a RunnableGraph val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right) // materialize the flow and get the value of the FoldSink val sum: Future[Int] = runnable.run()
我们简要分析一下这几个核心概念的源码。
/** * A `Source` is a set of stream processing steps that has one open output. It can comprise * any number of internal sources and transformations that are wired together, or it can be * an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into * a Reactive Streams `Publisher` (at least conceptually). */ final class Source[+Out, +Mat]( override val traversalBuilder: LinearTraversalBuilder, override val shape: SourceShape[Out]) extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat]
官网注释的最后一句话非常重要,它说Materialization把一个Source转换成了Reactive Streams规范中的Publisher,至少是概念上的。
/** * A `Sink` is a set of stream processing steps that has one open input. * Can be used as a `Subscriber` */ final class Sink[-In, +Mat]( override val traversalBuilder: LinearTraversalBuilder, override val shape: SinkShape[In]) extends Graph[SinkShape[In], Mat]
Sink可以被用作一个Subscriber。
/** * A `Flow` is a set of stream processing steps that has one open input and one open output. */ final class Flow[-In, +Out, +Mat]( override val traversalBuilder: LinearTraversalBuilder, override val shape: FlowShape[In, Out]) extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat]
/** * Flow with attached input and output, can be executed. */ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {
官网对Flow和RunnableGraph的注释很简单,这其实非常不利于我们深层次的研究AkkaStreams的实现原理。但我们可以不负责任的猜一下。AkkaStreams的API首先被翻译成RecativeStreams相关的组件及其接口的调用,然后通过ActorSystem和actors实现这些核心组件,比如Publisher、Subscriber。当然了,考虑到这个编译过程的复杂性,这部分的源码估计要后面很久才能深入分析了。