lagom中的stream 流数据处理是基于akka stream的,异步的处理流数据的。如下看代码:
流式service好处是:
A: 并行: hellos.mapAsync(8, name -> helloService.hello(name).invoke())), 八个线程并行处理;
B: 异步: 返回completedFuture, 使用基于Web Socket的方式。
C: 全双工:
package com.example.hello.stream.impl; import akka.NotUsed; import akka.stream.javadsl.Source; import com.lightbend.lagom.javadsl.api.ServiceCall; import com.example.hello.hello.api.HelloService; import com.example.hello.stream.api.StreamService; import javax.inject.Inject; import static java.util.concurrent.CompletableFuture.completedFuture; /** * Implementation of the HelloString. */ public class StreamServiceImpl implements StreamService { private final HelloService helloService; private final StreamRepository repository; @Inject public StreamServiceImpl(HelloService helloService, StreamRepository repository) { this.helloService = helloService; this.repository = repository; } @Override public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> directStream() { return hellos -> completedFuture( hellos.mapAsync(8, name -> helloService.hello(name).invoke())); } @Override public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> autonomousStream() { return hellos -> completedFuture( hellos.mapAsync(8, name -> repository.getMessage(name).thenApply( message -> String.format("%s, %s!", message.orElse("Hello"), name) )) ); } }
调用streamed service 接口的方式:
Source<String, ?> response = await(streamService.directStream().invoke( Source.from(Arrays.asList("a", "b", "c")) .concat(Source.maybe()))); List<String> messages = await(response.take(3).runWith(Sink.seq(), mat)); assertEquals(Arrays.asList("Hello, a!", "Hello, b!", "Hello, c!"), messages);
private <T> T await(CompletionStage<T> future) throws Exception { //等待10秒 拿结果
return future.toCompletableFuture().get(10, TimeUnit.SECONDS);
}