zoukankan      html  css  js  c++  java
  • RocketMQ的异步调用

    这个异步调用方法中传入一个final 回调对象。

        public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
                                    final InvokeCallback invokeCallback)
                throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
            System.out.println("NettyRemotingAbstract.invokeAsyncImpl()****************");
            final int opaque = request.getOpaque();
            //超时信号量锁
            boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
            if (acquired) {
                final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
    
                //这个ResonseFuture的封装的思路。
                final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
                this.responseTable.put(opaque, responseFuture);
                try {
                    //----》执行io请求,添加channelfuture监听器
                    channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture f) throws Exception {
                            //这里只是发送成功!!!!!!!!!!!!!!!!!!!!!!!!!
                            if (f.isSuccess()) {
                                responseFuture.setSendRequestOK(true);
                                return;
                            } else {
                                responseFuture.setSendRequestOK(false);
                            }
                            //这里只是发送成功,返回设为nul。putResponse -> NULL
                            responseFuture.putResponse(null);
                            responseTable.remove(opaque);
                            try {
                                //发送成功后还没有返回消息时的,调用回调方法。参见:MQClientAPIImpl.sendMessageAsync(..)
                                System.out.println("**************NettyRemotingAbstract.invokeAsyncImpl().callback()");
                                responseFuture.executeInvokeCallback();
                            } catch (Throwable e) {
                                plog.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                            } finally {
                                responseFuture.release();
                            }
    
                            plog.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                        }
                    });
                } catch (Exception e) {
                    responseFuture.release();
                    plog.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                    throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
                }
            } else {
                String info =
                        String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", //
                                timeoutMillis, //
                                this.semaphoreAsync.getQueueLength(), //
                                this.semaphoreAsync.availablePermits()//
                        );
                plog.warn(info);
                throw new RemotingTooMuchRequestException(info);
            }
        }

    我们往上面看看这个回调对象的回调方法:

     

  • 相关阅读:
    殷浩详解DDD:如何避免写流水账代码?
    如何从 0 到 1 开发 PyFlink API 作业
    探秘RocketMQ源码——Series1:Producer视角看事务消息
    教父郭盛华透露:PHP编程语言中多个代码执行缺陷
    互联网用户仍然容易受到黑客社会工程学攻击
    揭秘郭盛华在世界的排名,才华与颜值并存的男神
    什么是逆向工程?黑客是如何构建可利用的漏洞?
    人工智能时代,计算机网络主要面临哪些安全威胁?
    【2020-10-01】国庆堵车不堵心
    【2020-09-30】走起来慢,但实际很快
  • 原文地址:https://www.cnblogs.com/guazi/p/6675760.html
Copyright © 2011-2022 走看看