zoukankan      html  css  js  c++  java
  • talent-aio源码阅读小记(二)

    我们上一篇提到了talent-aio的四类Task:DecodeRunnableHandlerRunnableSendRunnableCloseRunnable,并且分析了这些task的基类AbstractQueueRunnable。在这篇文章中,我们就来分析一下这几个task是如何相互协作来处理输入、输出数据以及客户端连接的。

    talent-aio 基础类介绍


    在开始介绍前,我们需要先介绍几个talent-aio的基础类和接口,在处理输入输出的整个流程中,都需要用到它们。两个基础的类为:

    ChannelContext
    GroupContext

    其中ChannelContext类包含:

    1. AsynchronousSocketChannel对象、与该连接有关的
    2. DecodeRunnableHandlerRunnableSendRunnableCloseRunnable
    3. GroupContext
    4. ReadCompletionHandler
      ChannelContext类代表一个socket连接,封装了:连接对应的AsynchronousSocketChannel对象;解码、处理接收到的数据和发送数据、关闭连接的几个Runnable;该连接所属的GroupContext引用;该连接的ReadCompletionHandler等。

    在创建ChannelContext的过程中,首先记录accept返回的asynchronousSocketChannel:

         public void setAsynchronousSocketChannel(AsynchronousSocketChannel
                             asynchronousSocketChannel)
        {
            this.asynchronousSocketChannel = asynchronousSocketChannel;
    
            if (asynchronousSocketChannel != null)
            {
                try
                {
                    clientNode = getClientNode(asynchronousSocketChannel);
                } catch (IOException e)
                {
                    log.error(e.toString(), e);
                }
            } else
            {
                clientNode = null;
            }
        }

    而后设置GroupContext,并创建几个Runnable,最终将自己加入GroupContext管理的连接集合connections和对端集合clientNodes中:

        public void setGroupContext(GroupContext<Ext, P, R> groupContext)
        {
            this.groupContext = groupContext;
    
            if (groupContext != null)
            {
                decodeRunnable = new DecodeRunnable<>(this, 
    groupContext.getDecodeExecutor());
                closeRunnable = new CloseRunnable<>(this, null, 
    null, groupContext.getCloseExecutor());
    
                handlerRunnableNormPrior = new HandlerRunnable<>(this, 
    groupContext.getHandlerExecutorNormPrior());
    
                sendRunnableNormPrior = new SendRunnable<>(this, 
    groupContext.getSendExecutorNormPrior());
    
                groupContext.getConnections().add(this);
                groupContext.getClientNodes().put(this);
            }
        }

    而GroupContext管理着当前实例的所有连接connections、连接对应的用户users以及用户间的分组关系groups:

        protected ClientNodes<Ext, P, R> clientNodes = new ClientNodes<>();
        protected Connections<Ext, P, R> connections = new Connections<>();
        protected Groups<Ext, P, R> groups = new Groups<>();
        protected Users<Ext, P, R> users = new Users<>();

    除此之外,GroupContext还管理着一些线程池:

    /**
     * 解码线程池
    */
    private SynThreadPoolExecutor<SynRunnableIntf> decodeExecutor;
    
    /**
     * 关闭连接的线程池
     */
    private SynThreadPoolExecutor<SynRunnableIntf> closeExecutor;
    
    /**
     * 业务处理线程池
     */
    private SynThreadPoolExecutor<SynRunnableIntf> handlerExecutorNormPrior;
    
    /**
     * 消息发送线程池
     */
    private SynThreadPoolExecutor<SynRunnableIntf> sendExecutorNormPrior;

    在处理输入输出的过程中我们会用到上面这两个类。

    talent-aio的主要的业务逻辑都在AioHandler接口的实现类中,AioHandler接口为:

    public interface AioHandler<Ext, P extends Packet, R>
    {
    
        /**
         * 处理消息包
         *
         * @param packet the packet
         * @return the r
         * @author: tanyaowu
         * @创建时间: 2016年11月15日 上午11:38:52
         */
        R handler(P packet,
     ChannelContext<Ext, P, R> channelContext) throws Exception;
    
        /**
         * 编码
         *
         * @param packet the packet
         * @return the byte buffer
         * @author: tanyaowu
         * @创建时间: 2016年11月15日 上午11:38:52
         */
        ByteBuffer encode(P packet, 
    ChannelContext<Ext, P, R> channelContext);
    
        /**
         * 根据ByteBuffer解码成业务需要的Packet对象.
         *
         * @param buffer the buffer
         * @return the t
         * @throws AioDecodeException the aio decode exception
         */
        P decode(ByteBuffer buffer, ChannelContext<Ext, P, R> channelContext)
     throws AioDecodeException;
    
    }

    三个方法分别用来数据解包、数据封包以及处理数据包。

    处理流程


    读取数据

    首先,是处理读取数据:

        public void completed(Integer result, ByteBuffer byteBuffer)
        {
            GroupContext<Ext, P, R> groupContext 
      = channelContext.getGroupContext();
            if (result > 0)
            {
                byteBuffer.limit(byteBuffer.position());
                byteBuffer.position(0);
                DecodeRunnable<Ext, P, R> decodeRunnable
     = channelContext.getDecodeRunnable();
                decodeRunnable.addMsg(byteBuffer);
    
                groupContext.getDecodeExecutor().execute(decodeRunnable);
    
            } else if (result == 0)
            {
                log.error("读到的数据长度为0");
            } else if (result < 0)
            {
                Aio.close(channelContext, null, "读数据时返回" + result);
            }
    
            if (AioUtils.checkBeforeIO(channelContext))
            {
                AsynchronousSocketChannel asynchronousSocketChannel 
    = channelContext.getAsynchronousSocketChannel();
                ByteBuffer newByteBuffer = 
          ByteBuffer.allocate(
    channelContext.getGroupContext().getReadBufferSize()
    );
                asynchronousSocketChannel.read(newByteBuffer,
     newByteBuffer, this);
            }
    
        }

    在读取数据后,将byteBuffer提交到decodeRunnable的数据队列中,而后继续调用read方法读取对端发送来的数据。

    数据组包

    在decodeRunnable中,调用AioHandler的decode方法来获取数据包,然后提交给HandlerRunnable处理。在处理数据时主要考虑了半包和粘包的情况:

    @Override
    public void runTask()
    {
        ConcurrentLinkedQueue<ByteBuffer> queue
     = getMsgQueue();
    
        ByteBuffer byteBuffer = null;
        label_1: while ((size = queue.size()) > 0)
        {
            byteBuffer = queue.poll();
            if (byteBuffer != null)
            {
                if (lastByteBuffer != null)
                {
                    byteBuffer.position(0);
                    byteBuffer = 
                ByteBufferUtils.composite(lastByteBuffer, byteBuffer);
                    lastByteBuffer = null;
                }
            } else {
                break label_1;
            }
    
            try
            {
                byteBuffer.position(0);
                label_2: while (true)
                {
                    int initPosition = byteBuffer.position();
                    P packet = 
            channelContext.
            getGroupContext().
            getAioHandler().
            decode(byteBuffer, channelContext);
    
                    if (packet == null)// 数据不够,组不了包
                    {
                        if (log.isDebugEnabled())
                        {
                            log.debug("{},数据不够,组不了包", 
                                 channelContext.toString());
                        }
                        byteBuffer.position(initPosition);
                        lastByteBuffer = byteBuffer;
                        continue label_1;
                    } else //组包成功
                    {        
                        channelContext.getStat().
                setLatestTimeOfReceivedPacket(
                  SystemTimer.currentTimeMillis()
                );    
                        int afterDecodePosition = byteBuffer.position();
                        int len = afterDecodePosition - initPosition;
                        AioListener<Ext, P, R> aioListener
           = channelContext.getGroupContext().getAioListener();
                        if (aioListener != null)
                        {
                              aioListener.
    onAfterDecoded(channelContext, packet, len);
                        }
                        submit(packet, len);
                        channelContext.getGroupContext().
                                      getGroupStat().
                                      getReceivedPacket().
                                      incrementAndGet();
    
                                      channelContext.getGroupContext().
                                      getGroupStat().
                                      getReceivedBytes().
                                      addAndGet(len);
    
                      if (byteBuffer.hasRemaining())//组包后,还剩有数据
                      {
                            if (log.isDebugEnabled())
                            {
                                log.debug("{}组包后,还剩有数据:{}", 
              channelContext, byteBuffer.limit() - byteBuffer.position());
                            }
                                continue label_2;
                            } else//组包后,数据刚好用完
                            {
                                lastByteBuffer = null;
                                log.debug("{},组包后,数据刚好用完", 
                                                    channelContext);
                                continue label_1;
                            }
                        }
                    }
                } catch (AioDecodeException e)
                {
                    log.error(channelContext.toString(), e);
                    Aio.close(channelContext, e, "解码异常:" 
                                    + e.getMessage());
                    break label_1;
                } finally
                {
    
                }
            }
        }

    在这个循环中,首先从队列中获取数据,获取到数据后,看看是否存在粘包多出来的数据lastByteBuffer, 如果存在则将两部分数据合并。而后调用AioHandler的decode方法处理数据,如果由于半包导致解包失败,则继续从队列中获取数据,组合起来尝试解包;如果解包成功,则将多余的数据放在lastByteBuffer,并且更新各种统计信息。最后,将组好的数据包通过submit方法传递给HandlerRunnable处理。

    数据处理

    HandlerRunnable的逻辑相对比较简单,从数据队列中获取组好的包,并调用doPacket处理数据包:

        @Override
        public void runTask()
        {
            ConcurrentLinkedQueue<P> queue = getMsgQueue();
            P packet = null;
            while ((packet = queue.poll()) != null)
            {
                doPacket(packet);
            }
        }

    在doPacket中,调用AioHandler的hanlder解除处理数据包:

    groupContext.getAioHandler().handler(packet, channelContext);

    发送数据

    发送数据使用Aio类的静态方法send将packet添加到SendRunnable的数据队列中,而后将sendRunnable提交到线程池中运行:

    public static <Ext, P extends Packet, R> void send(
    ChannelContext<Ext, P, R> channelContext,
     P packet
    )
    {
        if (channelContext == null)
        {
            log.error("channelContext == null");
            return;
        }
        SendRunnable<Ext, P, R> sendRunnable
       = AioUtils.selectSendRunnable(channelContext, packet);
        sendRunnable.addMsg(packet);
        SynThreadPoolExecutor<SynRunnableIntf> synThreadPoolExecutor
       = AioUtils.selectSendExecutor(channelContext, packet);
        synThreadPoolExecutor.execute(sendRunnable);
    }

    在线程池中,会尝试一次发送所有等待发送的packet,不过对单次发送的packet设置了一个上限,而后对每个packet编码,汇总到一个ByteBuffer中:

    for ( int i = 0; i < queueSize; i++ )
    {
        if ( (packet = queue.poll() ) != null )
        {
            ByteBuffer byteBuffer = aioHandler.encode( packet, channelContext );
            allBytebufferCapacity += byteBuffer.limit();
            packetCount++;
            byteBuffers[i] = byteBuffer;
    
            if ( aioListener != null )
            {
                try
                {
                    aioListener.onBeforeSent( channelContext, packet );
                } catch ( Exception e )
                {
                    log.error( e.toString(), e );
                }
            }
        } else{
            break;
        }
    }
    ByteBuffer allByteBuffer = ByteBuffer.allocate( allBytebufferCapacity );
    for ( ByteBuffer byteBuffer : byteBuffers )
    {
        if ( byteBuffer != null )
        {
            byteBuffer.flip();
            allByteBuffer.put( byteBuffer );
        }
    }

    最终在sendRunnable的sendByteBuffer中完成数据发送,注意发送前需要获取一个信号量,保证同一时间对一个连接只有一个线程在调用发送动作,并在writeCompleteHandler中记录当前连接发送数据,并更新当前连接活动时间,为keepalive做准备:

    public void sendByteBuffer( ByteBuffer byteBuffer, Integer packetCount ) {
        if ( byteBuffer == null )
        {
            log.error( "byteBuffer is null" );
            return;
        }
    
        if ( !AioUtils.checkBeforeIO( channelContext ) )
        {
            return;
        }
    
        byteBuffer.flip();
        AsynchronousSocketChannel  asynchronousSocketChannel    
                                                  = channelContext.getAsynchronousSocketChannel();
        WriteCompletionHandler<Ext, P, R>  writeCompletionHandler        
                                                 = channelContext.getWriteCompletionHandler();
        try
        {
            writeCompletionHandler.getWriteSemaphore().acquire();
        } catch ( InterruptedException e )
        {
            log.error( e.toString(), e );
        }
        asynchronousSocketChannel.write( byteBuffer, packetCount, writeCompletionHandler );
    }

    注意发送时如果队列中有待发送数据将直接在当前进程中继续运行runTask,而不是重新提交Runnable到线程池中,以尽快发送数据:

    if (queue.size() > 0) {
        runTask();
    }

    关闭连接

    在关闭连接时,首先将解码,处理,发送runnable全部停止,然后清空其数据队列,最后检查该连接是否已经 调用过关闭,如果没有,就将CloseRunnable提交到线程池中执行:

    public static < Ext, P extends Packet, R > void close( ChannelContext<Ext, P, R> channelContext, Throwable t, String remark, boolean isRemove )
    {
        channelContext.getDecodeRunnable().clearMsgQueue();
        channelContext.getHandlerRunnableNormPrior().clearMsgQueue();
        channelContext.getSendRunnableNormPrior().clearMsgQueue();
    
        channelContext.getDecodeRunnable().setCanceled( true );
        channelContext.getHandlerRunnableNormPrior().setCanceled( true );
        channelContext.getSendRunnableNormPrior().setCanceled( true );
    
        CloseRunnable<Ext, P, R> closeRunnable = channelContext.getCloseRunnable();
        if ( closeRunnable.isWaitingExecute() )
        {
            log.error( "{},已经在等待关闭
    本次关闭备注:{}
    第一次的备注:{}
    本次关闭异常:{}
    第一次时异常:{}", channelContext, remark, closeRunnable.getRemark(), t, closeRunnable.getT() );
            return;
        }
        synchronized (closeRunnable) {
            if ( closeRunnable.isWaitingExecute() ) /* double check */
            {
                return;
            }
            closeRunnable.setRemove( isRemove );
            closeRunnable.setRemark( remark );
            closeRunnable.setT( t );
            closeRunnable.getExecutor().execute( closeRunnable );
            closeRunnable.setWaitingExecute( true );
        }
    }

    在CloseRunnable的runTask中,主要执行关闭连接,清理连接数据的逻辑:

    //关闭连接
    try
    {
        AsynchronousSocketChannel asynchronousSocketChannel = channelContext.getAsynchronousSocketChannel();
        if ( asynchronousSocketChannel != null )
        {
            asynchronousSocketChannel.close();
        }
    } catch ( Throwable e )
    {
        log.error( e.toString() );
    }
    
    //清理连接数据
    //删除集合中的维护信息 start
    /*删除集合中的维护信息 start */
    try
    {
        groupContext.getConnections().remove( channelContext );
    } catch ( Throwable e )
    {
        log.error( e.toString(), e );
    }
    
    try
    {
        groupContext.getClientNodes().remove( channelContext );
    } catch ( Throwable e )
    {
        log.error( e.toString(), e );
    }
    
    try
    {
        groupContext.getUsers().unbind( channelContext );
    } catch ( Throwable e )
    {
        log.error( e.toString(), e );
    }
    
    try
    {
        groupContext.getGroups().unbind( channelContext );
    } catch ( Throwable e )
    {
        log.error( e.toString(), e );
    }
    
    channelContext.setClosed( true );
    channelContext.getGroupContext().getGroupStat().getClosed().incrementAndGet();
    /*删除集合中的维护信息 end */

    除此以外,如果连接是因为出错被关闭的,还会根据ReconnConf的配置进行断线重连操作,这个我们将在后文中讲解。

    本篇总结


    本篇着重讲解了talent-aio如何处理半包、粘包的情况,对输入进行解包,并处理输入数据包,并返回数据的。
    在本系列的下一篇中,会讲解talent-aio的断线重连、keepalive等功能是如何实现的,并且接受其是如何定制线程以及executor的。最后,还将给出一个简单的使用talent-io的IM server的示例。预知后事如何,请听下回分解~

    转:http://www.jianshu.com/p/6e3c4d99e72e

  • 相关阅读:
    线程中死锁的demo
    发布.net core程序碰到的问题
    .net core Identity学习(三) 第三方认证接入
    .net Identity学习(二)OAuth
    .net core Identity学习(一)注册登录
    Git常用操作
    log4net使用
    c#中的Quartz
    jquery中的deferred
    .net core应用部署在IIS上
  • 原文地址:https://www.cnblogs.com/hd-zg/p/6960985.html
Copyright © 2011-2022 走看看