zoukankan      html  css  js  c++  java
  • Netty NioEventLoop自定义任务处理

    NioEventLoop添加自定义任务处理,可分为普通任务和定时任务;

    NioEventLoop继承关系图

     

    NioEventLoop顶层的接口为JUC包下的java.util.concurrent.Executor;

    Executor是一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务为一个实现了Runnable接口的类,一般来说,Runnable任务开辟在新线程中的使用方法为:

    new Thread(new Runnable() {
        @Override
        public void run() {
    
        }
    }).start();
    

    而在Executor中,不用显示地创建线程,只需在匿名内部类实现run方法逻辑或传入实现了Runnable接口的类;

    executor.execute(new Runnable() {
        @Override
        public void run() {
            
        }
    });
    

      

    添加自定义的普通任务提交到channel对应的NioEventLoop的taskQueue中;

    io.netty.util.concurrent.SingleThreadEventExecutor#taskQueue

    io.netty.util.concurrent.SingleThreadEventExecutor#execute

     

    io.netty.channel.nio.NioEventLoop#run

    io.netty.channel.nio.NioEventLoop#run 实现 io.netty.util.concurrent.SingleThreadEventExecutor#run;

    NioEventLoop需要同时处理I/O事件和非I/O任务,为了保证两者的任务能够得到足够的CPU时间执行,Netty提供了I/O比例供用户定制;如果I/O操作多于定时任务和普通任务,则可以将I/O比例调大,反之则调小,默认为50%;

    io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long) 执行非I/O任务;

    添加自定义定时任务提交到channel对应的NioEventLoop的scheduleTaskQueue中;

    看下面的调用栈

    io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(java.lang.Runnable, long, java.util.concurrent.TimeUnit)

    ->io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)

     

    ->io.netty.util.concurrent.AbstractScheduledEventExecutor#scheduledTaskQueue

    io.netty.util.concurrent.AbstractScheduledEventExecutor#scheduledTaskQueue

    最终可以发现scheduledTaskQueue在这初始化;

    io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long) 该方法用于执行非I/O任务

    io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue

    io.netty.util.concurrent.AbstractScheduledEventExecutor#pollScheduledTask(long)

    fetchFromScheduledTaskQueue方法:首先从定时任务消息队列中弹出消息进行处理,如果消息队列为空,则退出循环;根据当前的时间戳进行判断,如果该定时任务已经超时或正处于超时(scheduledTask.deadlineNanos() <= naonoTIme)条件成立,则将该定时任务添加到taskQueue中,同时从定时任务的队列中删除;定时任务如果没有超时,循环直接退出,不处理;

    io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long) 继续执行taskQueue中原有的任务和从定时任务队列中复制已经超时或正处于超时的定时任务也就是定时任务的延时时间已到,开始执行定时任务里的逻辑);因此当有一个普通任务,一个定时任务,不论哪个写前面,定时任务都会在普通任务后面执行;

    测试如下:

    pom.xml

        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.30.Final</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.25</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.25</version>
            </dependency>
        </dependencies>
    

      

    log4j.properties

    log4j.rootLogger=DEBUG,consoleOut
    
    # Console output...
    log4j.appender.consoleOut=org.apache.log4j.ConsoleAppender
    log4j.appender.consoleOut.layout=org.apache.log4j.PatternLayout
    log4j.appender.consoleOut.layout.ConversionPattern=%d %5p %t (%c{1}:%M:%L) - %m%n 
    

      

    重写ServerHandler的channelRead0方法如下:

    /**
     * 读取客户端发生的数据
     * @param ctx 上下文对象,含有channel,管道pipeline
     * @param msg 客户端发送的数据
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
        byte[] buffer = new byte[msg.readableBytes()];
        msg.readBytes(buffer);
     
        //将buffer转成字符串
        String message = new String(buffer, Charset.forName("utf-8"));
        logger.info("服务器接收到数据:{} ", message);
     
        ctx.channel().eventLoop().schedule(() -> {
            logger.info("发送定时消息测试...开始");
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
     
            logger.info("发送定时消息测试...结束");
        }, 0, TimeUnit.SECONDS);
     
        ctx.channel().eventLoop().execute(() -> {
            try {
                logger.info("发送异步消息测试...开始");
                Thread.sleep(1000 * 1);
                logger.info("发送异步消息测试...结束");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
     
    }
    

      

    结果如下:

  • 相关阅读:
    最近迷上用dvd字幕学习英语
    原始套接字
    c语言socket编程
    inet_aton和inet_network和inet_addr三者比较
    用man来查找c函数库
    ubuntu的系统日志配置文件的位置
    复制文件
    vim复制粘贴解密(转)
    vim的自动补齐功能
    两个数据结构ip和tcphdr
  • 原文地址:https://www.cnblogs.com/coder-zyc/p/14354886.html
Copyright © 2011-2022 走看看