zoukankan      html  css  js  c++  java
  • Java异步非阻塞编程的几种方式

    简介: Java异步非阻塞编程的几种方式

     

    一、 从一个同步的Http调用说起

    一个很简单的业务逻辑,其他后端服务提供了一个接口,我们需要通过接口调用,获取到响应的数据。

    逆地理接口:通过经纬度获取这个经纬度所在的省市区县以及响应的code:

    curl-i"http://xxx?latitude=31.08966221524924&channel=amap7a&near=false&longitude=105.13990312814713"
    {"adcode":"510722"}

    服务端执行,最简单的同步调用方式:

     

    服务端响应之前,IO会阻塞在:
    java.net.SocketInputStream#socketRead0 的native方法上:

     

    通过jstack日志,可以发现,此时这个Thread会一直在runable的状态:

    "main"#1 prio=5 os_prio=31 tid=0x00007fed0c810000 nid=0x1003 runnable [0x000070000ce14000]   java.lang.Thread.State: RUNNABLE
            at java.net.SocketInputStream.socketRead0(Native Method)
            at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
            at java.net.SocketInputStream.read(SocketInputStream.java:171)
            at java.net.SocketInputStream.read(SocketInputStream.java:141)
            at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)
            at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
            at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153)
            at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282)
            at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138)
            at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56)
            at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259)
            at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163)
            at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165)
            at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273)
            at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125)
            at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272)
            at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
            at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
            at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
            at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
            at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
            at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
            at com.amap.aos.async.AsyncIO.blockingIO(AsyncIO.java:207)
                    .......

    线程模型示例:

     

    同步最大的问题是在IO等待的过程中,线程资源没有得到充分的利用,对于大量IO场景的业务吞吐量会有一定限制。

    二 、JDK NIO & Future

    在JDK 1.5 中,JUC提供了Future抽象:

     

     

    当然并不是所有的Future都是这样实现的,如
    io.netty.util.concurrent.AbstractFuture 就是通过线程轮询去。

    这样做的好处是,主线程可以不用等待IO响应,可以去做点其他的,比如说再发送一个IO请求,可以等到一起返回:

    "main"#1 prio=5 os_prio=31 tid=0x00007fd7a500b000 nid=0xe03 waiting on condition [0x000070000a95d000]   java.lang.Thread.State: WAITING (parking)
            at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076ee2d768> (a java.util.concurrent.CountDownLatch$Sync)
            at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
            at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
            at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
            at org.asynchttpclient.netty.NettyResponseFuture.get(NettyResponseFuture.java:162)
            at com.amap.aos.async.AsyncIO.futureBlockingGet(AsyncIO.java:201)
            .....
    "AsyncHttpClient-2-1"#11 prio=5 os_prio=31 tid=0x00007fd7a7247800 nid=0x340b runnable [0x000070000ba94000]   java.lang.Thread.State: RUNNABLE
            at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
            at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
            at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:117)
            at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
    - locked <0x000000076eb00ef0> (a io.netty.channel.nio.SelectedSelectionKeySet)
    - locked <0x000000076eb00f10> (a java.util.Collections$UnmodifiableSet)
    - locked <0x000000076eb00ea0> (a sun.nio.ch.KQueueSelectorImpl)
            at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
            at io.netty.channel.nio.NioEventLoop.select(NioEventLoop.java:693)
            at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
            at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
            at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
            at java.lang.Thread.run(Thread.java:748)

     

    主线程在等待结果返回过程中依然需要等待,没有根本解决此问题。

    三 、使用Callback回调方式

    第二节中,依然需要主线程等待,获取结果,那么可不可以在主线程完成发送请求后,再也不用关心这个逻辑,去执行其他的逻辑?那就可以使用Callback机制。

     

    如此一来,主线程再也不需要关心发起IO后的业务逻辑,发送完请求后,就可以彻底去干其他事情,或者回到线程池中再供调度。如果是HttpServer,那么需要结合Servlet 3.1的异步Servlet。

     

     

    使用Callback方式,从线程模型中看,发现线程资源已经得到了比较充分的利用,整个过程中已经没有线程阻塞。

    四、 Callback hell

    回调地狱,当Callback的线程还需要执行下一个IO调用的时候,这个时候进入回调地狱模式。

    典型的应用场景如,通过经纬度获取行政区域adcode(逆地理接口),然后再根据获得的adcode,获取当地的天气信息(天气接口)。

    在同步的编程模型中,几乎不会涉及到此类问题。

     

    Callback方式的核心缺陷

    五、 JDK 1.8 CompletableFuture

    那么有没有办法解决Callback Hell的问题?当然有,JDK 1.8中提供了CompletableFuture,先看看它是怎么解决这个问题的。

    将逆地理的Callback逻辑,封装成一个独立的CompletableFuture,当异步线程回调时,调用 future.complete(T) ,将结果封装。

     

    将天气执行的Call逻辑,也封装成为一个独立的CompletableFuture ,完成之后,逻辑同上。

     

    compose衔接,whenComplete输出:

     

    每一个IO操作,均可以封装为独立的CompletableFuture,从而避免回调地狱。

    CompletableFuture,只有两个属性:

    • result:Future的执行结果 (Either the result or boxed AltResult)。
    • stack:操作栈,用于定义这个Future接下来操作的行为 (Top of Treiber stack of dependent actions)。

    weatherFuture这个方法是如何被调用的呢?

    通过堆栈可以发现,是在
    reverseCodeFuture.complete(result) 的时候,并且也将获得的adcode作为参数执行接下来的逻辑。

     

    这样一来,就完美解决回调地狱问题,在主的逻辑中,看起来像是在同步的进行编码。

    六、 Vert.x Future

    Info-Service中,大量使用的 Vert.x Future 也是类似的解决的方案,不过设计上使用Handler的概念。

     

    核心执行的逻辑差不多:

     

    这当然不是Vertx的全部,当然这是题外话了。

    七 、Reactive Streams

    异步编程对吞吐量以及资源有好处,但是有没有统一的抽象去解决此类问题内,答案是 Reactive Streams。

    核心抽象:Publisher Subscriber Processor Subscription ,整个包里面,只有这四个接口,没有实现类。

     


    在JDK 9里面,已经被作为一种规范封装到 java.util.concurrent.Flow :

     

     

    一个简单的例子:

     

    八、 Reactor & Spring 5 & Spring WebFlux

    Flux & Mono

     

     

    作者:开发者小助手_LS

    原文链接

    本文为阿里云原创内容,未经允许不得转载

  • 相关阅读:
    把影响集中到一个点
    How to avoid Over-fitting using Regularization?
    适定性问题
    Numerical Differentiation 数值微分
    What Every Computer Scientist Should Know About Floating-Point Arithmetic
    Generally a good method to avoid this is to randomly shuffle the data prior to each epoch of training.
    What is the difference between iterations and epochs in Convolution neural networks?
    Every norm is a convex function
    Moore-Penrose Matrix Inverse 摩尔-彭若斯广义逆 埃尔米特矩阵 Hermitian matrix
    perl 类里的函数调用其他类的函数
  • 原文地址:https://www.cnblogs.com/yunqishequ/p/14445656.html
Copyright © 2011-2022 走看看