zoukankan      html  css  js  c++  java
  • Netty框架问题记录1--多线程下批量发送消息导致消息被覆盖

    业务背景

    项目是基于Netty实现的实时课堂项目,课堂中老师需要对试卷进行讲解,则老师向服务器发送一个打开试卷信息的请求,服务器获取试卷信息,将试卷信息发送给所有的客户端(学生和老师)。

    发送给学生的时候需要在试卷信息中加上本人得分的信息。

     实现方式大致如下:

    1 Paper paper = getPaper(paperId); // 根据试卷ID获取试卷详细信息
    2 for(Client client : allClients){
    3    paper.setMyScore(getMyScore(client.getUserId())); //根据userId获取本人得分
    4   client.send(paper); //向客户端发送数据
    5 }

    结果:学生A收到的得分是学生B的得分,也就是发送给clientA的paper数据被发送给clientB的paper数据给覆盖了,因为paper对象是同一个

    原因分析:

    虽然发送给所有客户端的信息都是paper对象,但是是在for循环里面执行的send方法,也就是说理论上应该是clientA的send方法执行完了之后才会执行clientB的send方法,也就是说理论上应该是学生A收到的paper信息之后学生B才会收到paper信息。

    所以得出的结论猜想就是send方法不是同步执行的,而是异步的。追踪代码进行分析

    第四行的代码client.send(paper) 实际就是调用了Channel的writeAndFlush方法

    追踪到AbstractChannel的实现如下: 

    1 @Override
    2     public ChannelFuture writeAndFlush(Object msg) {
    3         return pipeline.writeAndFlush(msg);
    4     }

     执行了ChannelPipeline的writeAndFlush方法,跟踪实现类DefaultChannelPipeline的实现如下:

    1 @Override
    2     public final ChannelFuture writeAndFlush(Object msg) {
    3         return tail.writeAndFlush(msg);
    4     }

    执行的是ChannelHandlerContext的writeAndFlush方法,跟踪实现类AbstractChannelHandlerContext实现如下:

    1 @Override
    2     public ChannelFuture writeAndFlush(Object msg) {
    3         return writeAndFlush(msg, newPromise());
    4     }

    执行了内部的writeAndFlush方法,继续跟踪如下:

     1 @Override
     2     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
     3         if (msg == null) {
     4             throw new NullPointerException("msg");
     5         }
     6 
     7         if (isNotValidPromise(promise, true)) {
     8             ReferenceCountUtil.release(msg);
     9             // cancelled
    10             return promise;
    11         }
    12 
    13         write(msg, true, promise);
    14 
    15         return promise;
    16     }

    write方法如下:

     1 private void write(Object msg, boolean flush, ChannelPromise promise) {
     2         AbstractChannelHandlerContext next = findContextOutbound();
     3         final Object m = pipeline.touch(msg, next);
     4         EventExecutor executor = next.executor();
     5         if (executor.inEventLoop()) { //判断当前线程是否是EventLoop线程
     6             if (flush) {
     7                 next.invokeWriteAndFlush(m, promise);
     8             } else {
     9                 next.invokeWrite(m, promise);
    10             }
    11         } else {
    12             AbstractWriteTask task;
    13             if (flush) {
    14                 task = WriteAndFlushTask.newInstance(next, m, promise);
    15             }  else {
    16                 task = WriteTask.newInstance(next, m, promise);
    17             }
    18             safeExecute(executor, task, promise, m);
    19         }
    20     }

    跟踪到这里终于有所发现了,方法逻辑大致如下:

    1.获取channelPipeline中的head节点

    2.获取当前channel的eventLoop对象

    3.判断当前channel的eventLoop对象中的线程是否是当前线程

    4.如果是EventLoop线程,则直接执行writeAndFlush方法,也就是执行写入并且刷新到channelSocket中去

    5.如果不是EventLoop线程,则会创建一个AbstractWriteTask,然后将这个task添加到这个channel的eventLoop中去 

    分析到这里就可以总结问题的所在了,如果执行channel的writeAndFlush的线程不是work线程池中的线程,那么就会先将这个发送消息封装成一个task,然后添加到这个channel所属的eventLoop中的阻塞队列中去,

    然后通过EventLoop的循环来从队列中获取任务来执行。一旦task添加到队列中完成,write方法就会返回。那么当下一个客户端再执行write方法时,由于msg内容是同一个对象,就会将前一个msg的内容给覆盖了。

    从而就会出现发送给多个客户端的内容不同,但是接收到的内容是相同的内容。而本例中,执行channel的write方法的线程确实不是eventLoop线程,因为我们采用了线程池来处理业务,当channel发送数据给服务器之后,

    服务器解析channel中发送来的请求,然后执行业务处理,而执行业务的操作是采用线程池的方式实现的,所以最终通过channel发送数据给客户端的时候实际的线程是线程池中的线程,而并不是channel所属的EventLoop中的线程。

    总结:

    Netty中的work线程池中的EventLoop并不是一个纯粹的IO线程,除了有selector轮询IO操作之外,还会处理系统的Task和定时任务。

    系统的task是通过EventLoop的execute(Runnable task)方法实现,EventLoop内部有一个LinkedBlockingQueue阻塞队列保存task,task一般都是由于用户线程发起的IO操作。

    每个客户端有一个channel,每一个channel会绑定一个EventLoop,所以每个channel的所以IO操作默认都是由这个EventLoop中的线程来执行。然后用户可以在自定义的线程中执行channel的方法。

    当用户线程执行channel的IO操作时,并不会立即执行,而是将IO操作封装成一个Task,然后添加到这个channel对应的EventLoop的队列中,然后由这个EventLoop中的线程来执行。所以channel的所有IO操作最终还是

    由同一个EventLoop中的线程来执行的,只是发起channel的IO操作的线程可以不是任何线程。

    采用将IO操作封装成Task的原因主要是防止并发操作导致的锁竞争,因为如果不用task的方式,那么用户线程和IO线程就可以同时操作网络资源,就存储并发问题,所以采用task的方式实现了局部的无锁化。

    所以线程池固然好用,netty固然强大,但是如果没有深入理解,稍有不慎就可能会出现意想不到的BUG。

  • 相关阅读:
    Bootstrap 网格系统(Grid System)实例2
    Bootstrap 网格系统(Grid System)实例1
    Bootstrap 网格系统(Grid System)
    Bootstrap CSS概览
    Bootstrap响应式布局(1)
    46、谈谈你对面向对象的理解?
    算法--练习题1
    堆排序----赠品2
    计数排序----赠品1
    45、如何使用python删除一个文件?
  • 原文地址:https://www.cnblogs.com/jackion5/p/11265198.html
Copyright © 2011-2022 走看看