zoukankan      html  css  js  c++  java
  • java线程异步转同步。

    最近有个项目在压测,TPS有点低。做了一些日志异步批量落地和redis数据预热后,TPS稍微提高了点,但还是没达标。

    研究了下项目的系统设计和技术栈。

    用的分布式服务架构,其中服务端使用Netty。而客户端为了同步获取响应结果,采用了socket短连接模式。

    为了使系统的性能有所提升,决定客户端也改用Netty框架并采用长连接的方式。

    Netty是Jboss开源的一款非常优秀的异步通信框架。目前很多主流的开源项目有使用Netty开发的,Dubbo/RocketMQ/Apache Synapse等。

    但Netty本身似乎没有提供同步等待响应的接口或方法。

    使用过Dubbo的都知道我们平常使用的dubbo发布/订阅端就是同步获取响应报文,虽然它本身是基于Netty开发。

    为了知道dubbo怎么实现netty响应结果异步转同步,看了点dubbo的源码。

    发现dubbo是使用future+lock+condition实现的。这里就不作展开了,有兴趣可以看一下Dubbo的DefaultFuture这个类。

    这里记录一下网上看到的一些异步转同步的方法。

    首先定义一个业务操作类,纯粹处理业务。

    /**
     * 业务操作类
     */
    public class TaskService {
    
        public String getNumber() {
            return UUID.randomUUID().toString();
        }
    }
    

      

    然后写一个线程执行接口,

    public interface Executor {
    
        /**
         * 异步执行
         */
        default void supplyAsync() {}
    
        /**
         * 异步执行并回调
         * @param callback
         */
        default void supplyAsync(Callback callback) {}
    
        /**
         * 同步执行
         */
        default void supplySync(){}
    
        /**
         * 回调接口
         */
        @FunctionalInterface
        public interface Callback{
            void call(Object o);
        }
    }
    

      

    接下来,写异步操作。

    /**
     * 异步任务
     */
    public class AsyncExecutor implements Executor {
    
        private TaskService service = new TaskService();
    
        public void supplyAsync() {
            this.supplyAsync(null);
        }
    
        public void supplyAsync(Callback callback) {
            // 另起线程异步执行。
            new Thread(() -> {
                System.out.println("running async task...");
                try {
                    // 子线程睡眠,主线程不休息。
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
    
                }
                String num = service.getNumber();
                // 如果有回调,则调用回调函数。
                if (callback != null){
                    callback.call(num);
                }
            }).start();
        }
    }
    

      

    测试代码:

    public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
            /**
             *  async
             */
            executor = new AsyncExecutor();
            executor.supplyAsync((o) -> {
                System.out.println(String.format("get number[%s] by async.", o));
            });
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    输出:

    Connected to the target VM, address: '127.0.0.1:53015', transport: 'socket'
    
    ===============   start main thread   ===============
    
    
    ===============   main thread  over   ===============
    
    running async task...
    get number[de720331-2ca3-4b33-8dee-4a7d26ede037] by async.
    Disconnected from the target VM, address: '127.0.0.1:53015', transport: 'socket'
    

      

    我们可以看到,子线程在睡眠了2秒的时候,主线程并没有等待子线程执行完,而是继续往下执行。

    接下来,我们要将异步响应改成同步的。

    /**
     * 同步执行抽象类
     */
    public abstract class AbstractSyncExecutor implements Executor {
        // 异步执行器
        public AsyncExecutor executor = new AsyncExecutor();
    
        /**
         * 异步转同步
         */
        public void supplySync() {
            executor.supplyAsync(this::process);
            await();
        }
    
        /**
         *  线程等待
         */
        protected void await() {}
    
        /**
         * 回调
         * @param o
         */
        protected abstract void process(Object o);
    }
    

      

    第一种:Synchronized + wait + notify

    /**
     * 采用synchronized配合wait和notify。
     */
    public class SynchronizedExecutor extends AbstractSyncExecutor {
    
        @Override
        protected void process(Object o) {
            System.out.println(String.format("get number[%s] by synchronized.", o));
            synchronized (this) {
                notify();
            }
        }
    
        @Override
        public void await() {
            synchronized (this) {
                try {
                    // 主线程调用wait阻塞等待,直到回调方法调用notify或者notifyAll唤醒。
                    wait();
                } catch (InterruptedException e) {
    
                }
            }
        }
    }
    

      

    测试代码

    public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
    
            /**
             *  synchronized
             */
            executor = new SynchronizedExecutor();
            executor.supplySync();
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53235', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[05c5db70-ec3b-4411-b080-21e5b4cddf79] by synchronized.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53235', transport: 'socket'
    

      

    可以看到内容已经顺序输出了。

    第二种:reentrantLock + condition

    /**
     * 使用lock + condition
     */
    public class ReentrantLockExecutor extends AbstractSyncExecutor {
    
        private Lock lock = new ReentrantLock();
        private Condition condition;
    
        public ReentrantLockExecutor() {
            this.condition = lock.newCondition();
        }
    
        @Override
        protected void process(Object o) {
            System.out.println(String.format("get number[%s] by lockAndCondition.", o));
            lock.lock();
            try {
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    
        @Override
        protected void await() {
            lock.lock();
            try {
                // 阻塞等待直到回调函数唤醒
                condition.await();
            } catch (Exception e) {
    
            } finally {
                lock.unlock();
            }
        }
    }
    

      

    测试代码

        public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
    
            /**
             *  reentrantLock
             */
            executor = new ReentrantLockExecutor();
            executor.supplySync();
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53254', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[6ad17c2c-a3ce-4fc6-a03a-fdb7d84acf7e] by lockAndCondition.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53254', transport: 'socket'
    

      

    第三种:countDownLatch

    /**
     * CountDownLatch
     */
    public class CountDownLatchExecutor extends AbstractSyncExecutor {
    
        // 假设每笔调用都创建一个CountDownLatchExecutor,那么从发起到响应只算一次操作,这里设置为1就可以了。
        private CountDownLatch latch = new CountDownLatch(1);
    
        @Override
        public void process(Object o) {
            System.out.println(String.format("get number[%s] by countDownLatch.", o));
            // latch count - 1  变成0, 主线程继续执行
            latch.countDown();
        }
    
        @Override
        protected void await() {
            try {
                // 阻塞直到latch count=0
                latch.await();
            } catch (InterruptedException e) {
                latch.countDown();
            }
        }
    }
    

      

    测试代码

        public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
    
            /**
             * countDownLatch
             */
            executor = new CountDownLatchExecutor();
            executor.supplySync();
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53325', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[aa51089e-6f74-4ae7-b463-6a04ca73adcf] by countDownLatch.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53325', transport: 'socket'
    

      

    第四种:CyclicBarrier

    /**
     * CyclicBarrier
     */
    public class CyclicBarrierExecutor extends AbstractSyncExecutor {
        
        // 假设每笔调用都创建一个CountDownLatchExecutor
        CyclicBarrier barrier = new CyclicBarrier(2);
    
        @Override
        protected void process(Object o) {
            try {
                System.out.println(String.format("get number[%s] by cyclicBarrier.", o));
                // await线程数量=2,当前线程被唤醒
                barrier.await();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        protected void await() {
            try {
                // await线程数为1,等待直至所有线程都到达
                barrier.await();
            } catch (Exception e) {
    
            }
        }
    }
    

      

    与CountDownLatch相反。CyclicBarrier是做加操作。当await线程达到初始parties数时,当前线程就被唤醒。我们需要在主线程await一次,回调线程await一次,然后主线程唤醒。即:CyclicBarrier的栅栏数parties设置为2。

    测试代码

        public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
            Executor executor;
    
            /**
             * CyclicBarrier
             */
            executor = new CyclicBarrierExecutor();
            executor.supplySync();
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53527', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[2409d6c4-7bb2-4838-a6ef-3ce42e4a18c7] by cyclicBarrier.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53527', transport: 'socket'
    

      

    第五种:Future  + countDownLatch。

    public class SyncFuture<T> implements Future<T> {
        private CountDownLatch latch = new CountDownLatch(1);
        private T resp;
    
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }
    
        @Override
        public boolean isCancelled() {
            return false;
        }
    
        @Override
        public boolean isDone() {
            if (this.resp != null) {
                return true;
            }
            return false;
        }
    
        @Override
        public T get() throws InterruptedException, ExecutionException {
            latch.await();
            return this.resp;
        }
    
        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            if (latch.await(timeout, unit)) {
                return this.resp;
            }
            return null;
        }
    
        public void set(T resp) {
            this.resp = resp;
            latch.countDown();
        }
    }
    

      

    public class FutureExecutor extends AbstractSyncExecutor {
        private SyncFuture future;
    
        public FutureExecutor(SyncFuture future) {
            this.future = future;
        }
    
        @Override
        public void process(Object o) {
            future.set(o);
        }
    }
    

      

    测试代码

        public static void main(String[] args) throws Exception {
            System.err.println("
    ===============   start main thread   ===============
    ");
    
            /**
             * future + countDownLatch
             */
            SyncFuture<String> future = new SyncFuture<>();
            new FutureExecutor(future).supplySync();
            // Object resp = futureExecutor.get();
            // Object resp = futureExecutor.get(1, TimeUnit.SECONDS);
            Object resp = future.get(3, TimeUnit.SECONDS);
            System.out.println(String.format("get number[%s] by futureAndCountDownLatch.", resp));
    
            System.err.println("
    ===============   main thread  over   ===============
    ");
        }
    

      

    执行结果

    Connected to the target VM, address: '127.0.0.1:53590', transport: 'socket'
    
    ===============   start main thread   ===============
    
    running async task...
    get number[942e6c15-1eb2-4572-9a1f-dcb7d262387b] by futureAndCountDownLatch.
    
    ===============   main thread  over   ===============
    
    Disconnected from the target VM, address: '127.0.0.1:53590', transport: 'socket'
    

      

    前面四种方式都大同小异。主线程阻塞等待,子线程的回调函数里面唤醒主线程。

    只有最后一种不太一样。而Dubbo就是使用最后一种方式,只是把其中的countDownLatch换成了condition+lock。

  • 相关阅读:
    运算符重载
    poj2329dfs
    poj2349最小生成树prim
    poj1258最小生成树prim
    read 一个防止找不到就写一下的输入模板
    CentOS7下安装ngnix
    CentOS7下安装mysql
    CentOS7下安装rabbitmq
    在window 2008r2开发服务器上安装MSMQ消息队列
    spark快速大数据分析学习笔记(1)
  • 原文地址:https://www.cnblogs.com/braska/p/12982488.html
Copyright © 2011-2022 走看看