zoukankan      html  css  js  c++  java
  • 跟着seata学Netty之怎么写同步发送逻辑

    看了seata的TCC部分源码,看到TC调用每个分支事务的提交逻辑,看到了如何利用netty进行同步调用的写法,感觉挺好

    AbstractNettyRemoting # sendSync

    protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
            if (timeoutMillis <= 0) {
                throw new FrameworkException("timeout should more than 0ms");
            }
            if (channel == null) {
                LOGGER.warn("sendSync nothing, caused by null channel.");
                return null;
            }
    
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            messageFuture.setTimeout(timeoutMillis);
            futures.put(rpcMessage.getId(), messageFuture);
    
            channelWritableCheck(channel, rpcMessage.getBody());
    
            channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
                if (!future.isSuccess()) {
                    MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
                    if (messageFuture1 != null) {
                        messageFuture1.setResultMessage(future.cause());
                    }
                    destroyChannel(future.channel());
                }
            });
    
            try {
                return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {

      MessageFuture 是seata 实现的future类,是对CompletableFuture的封装

    public class MessageFuture {
        private RpcMessage requestMessage;
        private long timeout;
        private long start = System.currentTimeMillis();
        private transient CompletableFuture<Object> origin = new CompletableFuture<>();
        
        public Object get(long timeout, TimeUnit unit) throws TimeoutException,
            InterruptedException {
            Object result = null;
            try {
                result = origin.get(timeout, unit);
            } catch (ExecutionException e) {
                throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
            } catch (TimeoutException e) {
                throw new TimeoutException("cost " + (System.currentTimeMillis() - start) + " ms");
            }
    
            if (result instanceof RuntimeException) {
                throw (RuntimeException)result;
            } else if (result instanceof Throwable) {
                throw new RuntimeException((Throwable)result);
            }
    
            return result;
        }
        
        public void setResultMessage(Object obj) {
            origin.complete(obj);
        }

     重点就是这个setResultMessage 啥时候被调用的了

    AbstractNettyRemotingServer # ServerHandler

    class ServerHandler extends ChannelDuplexHandler {
    
            /**
             * Channel read.
             *
             * @param ctx the ctx
             * @param msg the msg
             * @throws Exception the exception
             */
            @Override
            public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
                if (!(msg instanceof RpcMessage)) {
                    return;
                }
                processMessage(ctx, (RpcMessage) msg);
            }
    protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
            }
            Object body = rpcMessage.getBody();
            if (body instanceof MessageTypeAware) {
                MessageTypeAware messageTypeAware = (MessageTypeAware) body;
                final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
                if (pair != null) {
                    if (pair.getSecond() != null) {
                        try {
                            pair.getSecond().execute(() -> {
                                try {
                                    pair.getFirst().process(ctx, rpcMessage);

      ServerOnResponseProcessor # process

    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
            MessageFuture messageFuture = futures.remove(rpcMessage.getId());
            if (messageFuture != null) {
                messageFuture.setResultMessage(rpcMessage.getBody());

      原理就是在服务端发送前,构造Future,然后将Future缓存在map,key是发送报文的id号。同时Netty的服务端注册handler时,实现channelRead中,把从客户端的报文中的id好再取出来,这样就能取出来future,根据报文的结果调用 

      CompletableFuture.complete 这样在同步发送的逻辑里

    try {
                return messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
            } catch (Exception exx) {

      就可以从阻塞中返回了

       

  • 相关阅读:
    [leetcode]算法题目
    JQuery功能查询页
    [C语言]一个很实用的服务端和客户端进行TCP通信的实例
    Siege——多线程编程最佳实例
    CodeIgniter框架中关于URL(index.php)的那些事
    web压测工具http_load原理分析
    【JAVA】文件各行打乱
    【JAVA】HashMap的原理及多线程下死循环的原因
    【JAVA】高并发优化细节点
    【Linux】日志分析工具grep sed sort
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15200807.html
Copyright © 2011-2022 走看看