zoukankan      html  css  js  c++  java
  • Netty解析之schedule方法的实现原理和心跳实现代码

    一 定时任务队列

    如果我们想使用netty快速的开发心跳程序,简化下场景假设我们只需要客户端定期往服务端发送心跳消息,那么代码可以这么写

    public class Pinger extends ChannelInboundHandlerAdapter {
    
        private Random random = new Random();
        private int baseRandom = 5;
    
        private Channel channel;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            this.channel = ctx.channel();
    
            ping(ctx.channel());
        }
    
        private void ping(Channel channel) {
            int second = Math.max(1, random.nextInt(baseRandom));
            System.out.println("next heart beat will send after " + second + "s.");
            ScheduledFuture<?> future = channel.eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    if (channel.isActive()) {
                        System.out.println("sending heart beat to the server...");
                        channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
                    } else {
                        System.err.println("The connection had broken, cancel the task that will send a heart beat.");
                        channel.closeFuture();
                        throw new RuntimeException();
                    }
                }
            }, second, TimeUnit.SECONDS);
    
            future.addListener(new GenericFutureListener() {
                @Override
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess()) {
                        ping(channel);
                    }
                }
            });
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 当Channel已经断开的情况下, 仍然发送数据, 会抛异常, 该方法会被调用.
            cause.printStackTrace();
            ctx.close();
        }
    }

    上面的逻辑正是利用了  AbstractScheduledEventExecutor.schedule(Runnable command, long delay, TimeUnit unit) 

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            ObjectUtil.checkNotNull(command, "command");
            ObjectUtil.checkNotNull(unit, "unit");
            if (delay < 0) {
                delay = 0;
            }
            return schedule(new ScheduledFutureTask<Void>(
                    this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
        }
     <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
            if (inEventLoop()) {
                scheduledTaskQueue().add(task);//只是加入调度队列而已
            } else {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        scheduledTaskQueue().add(task);
                    }
                });
            }
    
            return task;
        }

      正好复习下netty的EventLoop的结构,这个延时队列定义在  AbstractScheduledEventExecutor  

    public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
    
        Queue<ScheduledFutureTask<?>> scheduledTaskQueue;

      而Netty中还有一个普通队列 

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    
        ...
    
        private final Queue<Runnable> taskQueue;

      而  public final class NioEventLoop extends SingleThreadEventLoop 又是继承了SingleThreadEventLoop,SingleThreadEventLoop又继承了 SingleThreadEventExecutor。

      所以每个NioEventLoop都有两个任务队列,一个是普通的任务队列,是一个延时任务队列

    二 队列的执行时机

        EpollEventLoop.run() 

    protected void run() {
            for (;;) {
                try {
                    int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            strategy = epollWait(WAKEN_UP_UPDATER.getAndSet(this, 0) == 1);
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp == 1) {
                                Native.eventFdWrite(eventFd.intValue(), 1L);
                            }
                        default:
                            // fallthrough
                    }
    
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            if (strategy > 0) {
                                processReady(events, strategy);
                            }
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }

    1 调用selector的select找到就绪事件

    2 处理这些就绪事件

    3 执行定时队列和普通队列里的任务

    runAllTasks()就不细说了,简单地说就是定时任务是一个java中的priorityQueue,根据到期的时间来判断该不该出队。该方法首先执行出队,把已经到期的任务从定时队列放到普通队列里,也就是从scheduledTaskQueue放到 taskQueue 然后统一执行,普通队列的所有任务。呵呵,看着像不像redis的调度模型啊,果然填下代码都是抄啊。

    三 EpollEventLoop.run() 在何时被调用

      这里还是以服务端也就是ServerBootStrap为例

       AbstractBootstrap.doBind(final SocketAddress localAddress) 

    private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();//initAndRegister是同步调用不在这里启动
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);
                return promise;
            } else {

      重点在于doBind0

    private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }

    首先这段代码的调用是在主线程下调用的,而channel.eventLoop()拿到的线程池中用来执行的线程肯定不是主线程

    所以   SingleThreadEventExecutor.execute 

    public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {//这里的判断肯定不是true
                addTask(task);
            } else {
                startThread();
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    private void startThread() {
            if (state == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();
                }
            }
        }
    private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();//这里就是run方法被调用的地方
                        success = true;

     四 谈谈Netty的future模式

      schedule方法会把代码中传进来的Runnable包成 ScheduledFutureTask 

    public void run() {
            assert executor().inEventLoop();
            try {
                if (periodNanos == 0) {
                    if (setUncancellableInternal()) {
                        V result = task.call();
                        setSuccessInternal(result);
                    }
                } else {

      线程的逻辑执行完了之后,会调用 setSuccessInternal。接着调用  PromiseTask.setSuccessInternal 

    protected final Promise<V> setSuccessInternal(V result) {
            super.setSuccess(result);
            return this;
        }

      DefaultPromise

     public Promise<V> setSuccess(V result) {
            if (setSuccess0(result)) {
                notifyListeners();
                return this;
            }
            throw new IllegalStateException("complete already: " + this);
        }

      notifyListeners()就会回调上面的listener也就是我们上面的逻辑

    future.addListener(new GenericFutureListener() {
                @Override
                public void operationComplete(Future future) throws Exception {
                    if (future.isSuccess()) {
                        ping(channel);
                    }
                }
            });

      如果发送一次心跳成功了,通过listener收到成功的通知就再一次执行ping方法

  • 相关阅读:
    光纤网卡与HBA卡区别
    Windows远程桌面相关
    port bridge enable命令导致的环路
    堡垒机jumpserver测试记录--使用
    堡垒机jumpserver测试记录--安装
    Centos6.5升级openssh、OpenSSL和wget
    linux抓包工具tcpdump使用总结
    iOS -视频缩略图的制作
    Mac 上视图的坐标系统原点位于左下角
    Mac
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14286332.html
Copyright © 2011-2022 走看看