zoukankan      html  css  js  c++  java
  • Lagom学习 六 Akka Stream

    lagom中的stream 流数据处理是基于akka stream的,异步的处理流数据的。如下看代码:

    流式service好处是:   

             A: 并行:  hellos.mapAsync(8, name -> helloService.hello(name).invoke())),  八个线程并行处理;

            B: 异步: 返回completedFuture, 使用基于Web Socket的方式。

            C: 全双工: 

    package com.example.hello.stream.impl;
    
    import akka.NotUsed;
    import akka.stream.javadsl.Source;
    import com.lightbend.lagom.javadsl.api.ServiceCall;
    import com.example.hello.hello.api.HelloService;
    import com.example.hello.stream.api.StreamService;
    
    import javax.inject.Inject;
    
    import static java.util.concurrent.CompletableFuture.completedFuture;
    
    /**
     * Implementation of the HelloString.
     */
    public class StreamServiceImpl implements StreamService {
    
      private final HelloService helloService;
      private final StreamRepository repository;
    
      @Inject
      public StreamServiceImpl(HelloService helloService, StreamRepository repository) {
        this.helloService = helloService;
        this.repository = repository;
      }
    
      @Override
      public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> directStream() {
        return hellos -> completedFuture(
          hellos.mapAsync(8, name ->  helloService.hello(name).invoke()));
      }
    
      @Override
      public ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> autonomousStream() {
        return hellos -> completedFuture(
            hellos.mapAsync(8, name -> repository.getMessage(name).thenApply( message ->
                String.format("%s, %s!", message.orElse("Hello"), name)
            ))
        );
      }
    }

    调用streamed service 接口的方式:

      Source<String, ?> response = await(streamService.directStream().invoke(
                    Source.from(Arrays.asList("a", "b", "c"))
                            .concat(Source.maybe())));
            List<String> messages = await(response.take(3).runWith(Sink.seq(), mat));
            assertEquals(Arrays.asList("Hello, a!", "Hello, b!", "Hello, c!"), messages);

    private <T> T await(CompletionStage<T> future) throws Exception {   //等待10秒 拿结果
    return future.toCompletableFuture().get(10, TimeUnit.SECONDS);
    }
  • 相关阅读:
    删除MSSQL危险存储过程的代码
    给年轻工程师的十大忠告[转贴]
    HTML中利用堆栈方式对Table进行行排序
    年轻人宣言:青春符号
    刘亦菲小龙女绝美剧照
    精巧完整的日历程序
    XSLT快速参考
    酒吧里经典的英文歌曲专集(4CD)
    检测系统颜色与使用字体
    SQL Server实用操作小技巧集合
  • 原文地址:https://www.cnblogs.com/liufei1983/p/8486082.html
Copyright © 2011-2022 走看看