zoukankan      html  css  js  c++  java
  • netty EventLoop线程与当前线程的问题

    模拟客户端向服务端发送消息:

    客户端部分代码如下,当连接激活触发消息发送,采用线程池的形式,分多个线程向服务端发送同一消息

      @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel type: " + ctx.channel().getClass().getSimpleName());
            NioSocketChannel socketChannel = (NioSocketChannel) ctx.channel();
            ByteBuf byteBuf = Unpooled.copiedBuffer("test thread:", CharsetUtil.UTF_8).retain();
            int x = 1;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    socketChannel.writeAndFlush(byteBuf.duplicate());
                    System.out.println(Thread.currentThread().getName() + " " + byteBuf);
                }
            };
            Executor executor = Executors.newCachedThreadPool();
            for(int i = 0; i <= 10; i++){
                executor.execute(runnable);
            }
    
        }

    服务端代码如下,接收到消息并打印

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf in = (ByteBuf)msg;
            System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
            //ctx.write(in); // send the msg to client and no flush
        }

    服务端与客户端均未进行连接关闭操作(始终保持连接)。

    服务端打印结果如下(只接受到部分客户端消息),常规思想,理应收到客户端发来的所有消息:10条 “test thread:”, 结果只收到两条

    Server received: test thread:test thread:

    原因:writeAndFlush()方法最终会调用AbstractChannelHandlerContext类的如下代码

    private void write(Object msg, boolean flush, ChannelPromise promise) {
            AbstractChannelHandlerContext next = this.findContextOutbound();
            Object m = this.pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                Object task;
                if (flush) {
                    task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);
                } else {
                    task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);
                }
    
                safeExecute(executor, (Runnable)task, promise, m);
            }
    
        }

    方法逻辑大致如下:

    1.获取channelPipeline中的head节点

    2.获取当前channel的eventLoop对象

    3.判断当前channel的eventLoop对象中的线程是否是当前线程

    4.如果是EventLoop线程,则直接执行writeAndFlush方法,也就是执行写入并且刷新到channelSocket中去

    5.如果不是EventLoop线程,则会创建一个AbstractWriteTask,然后将这个task添加到这个channel的eventLoop中去 

    原因:

    分析到这里就可以总结问题的所在了,如果执行channel的writeAndFlush的线程不是work线程池中的线程,那么就会先将这个发送消息封装成一个task,然后添加到这个channel所属的eventLoop中的阻塞队列中去,

    然后通过EventLoop的循环来从队列中获取任务来执行。一旦task添加到队列中完成,write方法就会返回。那么当下一个客户端再执行write方法时,由于msg内容是同一个对象,就会将前一个msg的内容给覆盖了。

    从而就会出现发送给多个客户端的内容不同,但是接收到的内容是相同的内容。而本例中,执行channel的write方法的线程确实不是eventLoop线程,因为我们采用了线程池来处理业务,当channel发送数据给服务器之后,

    服务器解析channel中发送来的请求,然后执行业务处理,而执行业务的操作是采用线程池的方式实现的,所以最终通过channel发送数据给客户端的时候实际的线程是线程池中的线程,而并不是channel所属的EventLoop中的线程。

    总结:

    Netty中的work线程池中的EventLoop并不是一个纯粹的IO线程,除了有selector轮询IO操作之外,还会处理系统的Task和定时任务。

    系统的task是通过EventLoop的execute(Runnable task)方法实现,EventLoop内部有一个LinkedBlockingQueue阻塞队列保存task,task一般都是由于用户线程发起的IO操作。

    每个客户端有一个channel,每一个channel会绑定一个EventLoop,所以每个channel的所以IO操作默认都是由这个EventLoop中的线程来执行。然后用户可以在自定义的线程中执行channel的方法。

    当用户线程执行channel的IO操作时,并不会立即执行,而是将IO操作封装成一个Task,然后添加到这个channel对应的EventLoop的队列中,然后由这个EventLoop中的线程来执行。所以channel的所有IO操作最终还是

    由同一个EventLoop中的线程来执行的,只是发起channel的IO操作的线程可以不是任何线程。

    采用将IO操作封装成Task的原因主要是防止并发操作导致的锁竞争,因为如果不用task的方式,那么用户线程和IO线程就可以同时操作网络资源,就存储并发问题,所以采用task的方式实现了局部的无锁化。

    所以线程池固然好用,netty固然强大,但是如果没有深入理解,稍有不慎就可能会出现意想不到的BUG。

  • 相关阅读:
    Linux入门之常用命令(12) mount
    Linux入门之运维(1) 系统监控 vmstat top
    【转】常用Maven插件
    【转】Mapreduce部署与第三方依赖包管理
    ssh (免密码登录、开启服务)
    增大hadoop client内存
    ubuntu12.04添加程序启动器到Dash Home
    jquery fadeOut 异步
    jquery 全选 全不选 反选
    js 上传文件预览
  • 原文地址:https://www.cnblogs.com/yelao/p/11674169.html
Copyright © 2011-2022 走看看