zoukankan      html  css  js  c++  java
  • java线程异步转同步。

    最近有个项目在压测,TPS有点低。做了一些日志异步批量落地和redis数据预热后,TPS稍微提高了点,但还是没达标。

    研究了下项目的系统设计和技术栈。

    用的分布式服务架构,其中服务端使用Netty。而客户端为了同步获取响应结果,采用了socket短连接模式。

    为了使系统的性能有所提升,决定客户端也改用Netty框架并采用长连接的方式。

    Netty是Jboss开源的一款非常优秀的异步通信框架。目前很多主流的开源项目有使用Netty开发的,Dubbo/RocketMQ/Apache Synapse等。

    但Netty本身似乎没有提供同步等待响应的接口或方法。

    使用过Dubbo的都知道我们平常使用的dubbo发布/订阅端就是同步获取响应报文,虽然它本身是基于Netty开发。

    为了知道dubbo怎么实现netty响应结果异步转同步,看了点dubbo的源码。

    发现dubbo是使用future+lock+condition实现的。这里就不作展开了,有兴趣可以看一下Dubbo的DefaultFuture这个类。

    这里记录一下网上看到的一些异步转同步的方法。

    首先定义一个业务操作类,纯粹处理业务。

    /**
     * 业务操作类
     */
    public class TaskService {
    
        public String getNumber() {
            return UUID.randomUUID().toString();
        }
    }
    

      

    然后写一个线程执行接口,

    public interface Executor {
    
        /**
         * 异步执行
         */
        default void supplyAsync() {}
    
        /**
         * 异步执行并回调
         * @param callback
         */
        default void supplyAsync(Callback callback) {}
    
        /**
         * 同步执行
         */
        default void supplySync(){}
    
        /**
         * 回调接口
         */
        @FunctionalInterface
        public interface Callback{
            void call(Object o);
        }
    }
    

      

    接下来,写异步操作。

    /**
     * 异步任务
     */
    public class AsyncExecutor implements Executor {
    
        private TaskService service = new TaskService();
    
        public void supplyAsync() {
            this.supplyAsync(null);
        }
    
        public void supplyAsync(Callback callback) {
            // 另起线程异步执行。
            new Thread(() -> {
                System.out.println("running async task...");
                try {
                    // 子线程睡眠,主线程不休息。
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
                }
                String num = service.getNumber();
                // 如果有回调,则调用回调函数。
                if (callback != null){
                    callback.call(num);
                }
            }).start();
        }
    }
    

      

    测试代码:

    public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
            /**
             *  async
             */
            executor = new AsyncExecutor();
            executor.supplyAsync((o) -> {
                System.out.println(String.format("get number[%s] by async.", o));
            });
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    输出:

    Connected to the target VM, address: '127.0.0.1:53015', transport: 'socket'
    
    ===============   start main thread   ===============
    
    
    ===============   main thread  over   ===============
    
    running async task...
    get number[de720331-2ca3-4b33-8dee-4a7d26ede037] by async.
    Disconnected from the target VM, address: '127.0.0.1:53015', transport: 'socket'
    

      

    我们可以看到,子线程在睡眠了2秒的时候,主线程并没有等待子线程执行完,而是继续往下执行。

    接下来,我们要将异步响应改成同步的。

    /**
     * 同步执行抽象类
     */
    public abstract class AbstractSyncExecutor implements Executor {
        // 异步执行器
        public AsyncExecutor executor = new AsyncExecutor();
    
        /**
         * 异步转同步
         */
        public void supplySync() {
            executor.supplyAsync(this::process);
            await();
        }
    
        /**
         *  线程等待
         */
        protected void await() {}
    
        /**
         * 回调
         * @param o
         */
        protected abstract void process(Object o);
    }
    

      

    第一种:Synchronized + wait + notify

    /**
     * 采用synchronized配合wait和notify。
     */
    public class SynchronizedExecutor extends AbstractSyncExecutor {
    
        @Override
        protected void process(Object o) {
            System.out.println(String.format("get number[%s] by synchronized.", o));
            synchronized (this) {
                notify();
            }
        }
    
        @Override
        public void await() {
            synchronized (this) {
                try {
                    // 主线程调用wait阻塞等待,直到回调方法调用notify或者notifyAll唤醒。
                    wait();
                } catch (InterruptedException e) {
    
                }
            }
        }
    }
    

      

    测试代码

    public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
    
            /**
             *  synchronized
             */
            executor = new SynchronizedExecutor();
            executor.supplySync();
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53235', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[05c5db70-ec3b-4411-b080-21e5b4cddf79] by synchronized.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53235', transport: 'socket'
    

      

    可以看到内容已经顺序输出了。

    第二种:reentrantLock + condition

    /**
     * 使用lock + condition
     */
    public class ReentrantLockExecutor extends AbstractSyncExecutor {
    
        private Lock lock = new ReentrantLock();
        private Condition condition;
    
        public ReentrantLockExecutor() {
            this.condition = lock.newCondition();
        }
    
        @Override
        protected void process(Object o) {
            System.out.println(String.format("get number[%s] by lockAndCondition.", o));
            lock.lock();
            try {
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        protected void await() {
            lock.lock();
            try {
                // 阻塞等待直到回调函数唤醒
                condition.await();
            } catch (Exception e) {
    
            } finally {
                lock.unlock();
            }
        }
    }
    

      

    测试代码

        public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
    
            /**
             *  reentrantLock
             */
            executor = new ReentrantLockExecutor();
            executor.supplySync();
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53254', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[6ad17c2c-a3ce-4fc6-a03a-fdb7d84acf7e] by lockAndCondition.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53254', transport: 'socket'
    

      

    第三种:countDownLatch

    /**
     * CountDownLatch
     */
    public class CountDownLatchExecutor extends AbstractSyncExecutor {
    
        // 假设每笔调用都创建一个CountDownLatchExecutor,那么从发起到响应只算一次操作,这里设置为1就可以了。
        private CountDownLatch latch = new CountDownLatch(1);
    
        @Override
        public void process(Object o) {
            System.out.println(String.format("get number[%s] by countDownLatch.", o));
            // latch count - 1  变成0, 主线程继续执行
            latch.countDown();
        }
    
        @Override
        protected void await() {
            try {
                // 阻塞直到latch count=0
                latch.await();
            } catch (InterruptedException e) {
                latch.countDown();
            }
        }
    }
    

      

    测试代码

        public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
    
            /**
             * countDownLatch
             */
            executor = new CountDownLatchExecutor();
            executor.supplySync();
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53325', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[aa51089e-6f74-4ae7-b463-6a04ca73adcf] by countDownLatch.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53325', transport: 'socket'
    

      

    第四种:CyclicBarrier

    /**
     * CyclicBarrier
     */
    public class CyclicBarrierExecutor extends AbstractSyncExecutor {
        
        // 假设每笔调用都创建一个CountDownLatchExecutor
        CyclicBarrier barrier = new CyclicBarrier(2);
    
        @Override
        protected void process(Object o) {
            try {
                System.out.println(String.format("get number[%s] by cyclicBarrier.", o));
                // await线程数量=2,当前线程被唤醒
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        protected void await() {
            try {
                // await线程数为1,等待直至所有线程都到达
                barrier.await();
            } catch (Exception e) {
    
            }
        }
    }
    

      

    与CountDownLatch相反。CyclicBarrier是做加操作。当await线程达到初始parties数时,当前线程就被唤醒。我们需要在主线程await一次,回调线程await一次,然后主线程唤醒。即:CyclicBarrier的栅栏数parties设置为2。

    测试代码

        public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
    
            /**
             * CyclicBarrier
             */
            executor = new CyclicBarrierExecutor();
            executor.supplySync();
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53527', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[2409d6c4-7bb2-4838-a6ef-3ce42e4a18c7] by cyclicBarrier.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53527', transport: 'socket'
    

      

    第五种:Future  + countDownLatch。

    public class SyncFuture<T> implements Future<T> {
        private CountDownLatch latch = new CountDownLatch(1);
        private T resp;
    
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }
    
        @Override
        public boolean isCancelled() {
            return false;
        }
    
        @Override
        public boolean isDone() {
            if (this.resp != null) {
                return true;
            }
            return false;
        }
    
        @Override
        public T get() throws InterruptedException, ExecutionException {
            latch.await();
            return this.resp;
        }
    
        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            if (latch.await(timeout, unit)) {
                return this.resp;
            }
            return null;
        }
    
        public void set(T resp) {
            this.resp = resp;
            latch.countDown();
        }
    }
    

      

    public class FutureExecutor extends AbstractSyncExecutor {
        private SyncFuture future;
    
        public FutureExecutor(SyncFuture future) {
            this.future = future;
        }
    
        @Override
        public void process(Object o) {
            future.set(o);
        }
    }
    

      

    测试代码

        public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
    
            /**
             * future + countDownLatch
             */
            SyncFuture<String> future = new SyncFuture<>();
            new FutureExecutor(future).supplySync();
            // Object resp = futureExecutor.get();
            // Object resp = futureExecutor.get(1, TimeUnit.SECONDS);
            Object resp = future.get(3, TimeUnit.SECONDS);
            System.out.println(String.format("get number[%s] by futureAndCountDownLatch.", resp));
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53590', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[942e6c15-1eb2-4572-9a1f-dcb7d262387b] by futureAndCountDownLatch.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53590', transport: 'socket'
    

      

    前面四种方式都大同小异。主线程阻塞等待,子线程的回调函数里面唤醒主线程。

    只有最后一种不太一样。而Dubbo就是使用最后一种方式,只是把其中的countDownLatch换成了condition+lock。

  • 相关阅读:
    poj 1789 每个字符串不同的字母数代表两个结点间的权值 (MST)
    poj 1251 poj 1258 hdu 1863 poj 1287 poj 2421 hdu 1233 最小生成树模板题
    poj 1631 最多能有多少条不交叉的线 最大非降子序列 (LIS)
    hdu 5256 最少修改多少个数 能使原数列严格递增 (LIS)
    hdu 1025 上面n个点与下面n个点对应连线 求最多能连有多少条不相交的线 (LIS)
    Gym 100512F Funny Game (博弈+数论)
    UVa 12714 Two Points Revisited (水题,计算几何)
    UVa 12717 Fiasco (BFS模拟)
    UVa 12718 Dromicpalin Substrings (暴力)
    UVa 12716 && UVaLive 6657 GCD XOR (数论)
  • 原文地址:https://www.cnblogs.com/braska/p/12982488.html
Copyright © 2011-2022 走看看