zoukankan      html  css  js  c++  java
  • Java高并发网络编程(四)Netty

    在网络应用开发的过程中,直接使用JDK提供的NIO的API,比较繁琐,而且想要进行性能提升,还需要结合多线程技术。

    由于网络编程本身的复杂性,以及JDK API开发的使用难度较高,所以在开源社区中,涌现出来了很多对JDK NIO进行封装、增强的网络编程框架,比如Netty、Mina等。

    一、Netty简介

    https://netty.io/ 官网

    Netty是一个高性能、高可扩展性的异步事件驱动的网络应用程序框架,它极大简化了TCP和UDP客户端和服务器开发等网络编程。

    Netty重要的四个内容:

    • Reactor线程模型:一种高性能的多线程程序设计思路
    • Netty中自己定义的Channel概念:增强版的通道概念
    • ChannelPipeline职责链设计模式:事件处理机制
    • 内存管理:增强的ByteBuf缓冲区

    整体结构图

    二、Netty线程模型

     为了让NIO处理更好的利用多线程特性,Netty实现了Reactor线程模型。

    Reactor模型中有四个核心概念:

    • Resources资源(请求/任务)
    • Synchronous Event Demultiplexer同步事件复用器
    • Dispatcher分配器
    • Request Handler请求处理器

     Netty启动时会构建多个Reactor

    EventLoopGroup初始化过程

     

     

    两组EventLoopGroup(Main&Sub)处理不同通道不同的事件

    public final class EchoServer {
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
         // Configure the server.
    // 创建EventLoopGroup accept线程组 NioEventLoop
    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    // 创建EventLoopGroup I/O线程组 EventLoopGroup workerGroup
    = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try {
    // 服务端启动引导工具类 ServerBootstrap b
    = new ServerBootstrap();
    // 配置服务端处理的reactor线程组以及服务端的其他配置 b.group(bossGroup, workerGroup2) .channel(NioServerSocketChannel.
    class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler((ChannelInitializer)(ch)-> { ChannelPipeline p = ch.pipeline(); p.addLast(new EchoServerHandler); }); // 通过bind启动服务器 ChannelFuture f = b.bind(PORT).sync(); // 阻塞主线程,直到网络服务被关闭 f.channel().closeFuture().sync(); } finally { // 关闭线程组 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

     

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);

    追踪NioEventLoopGroup源码,会发现是创造很多NioEventLoop

    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
        ...
        public NioEventLoopGroup(int nThreads) {
                this(nThreads, (Executor) null);
         }
        ...

    追踪到父类

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    
        private static final int DEFAULT_EVENT_LOOP_THREADS;
    
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
        /**
         * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
         */
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); // 默认是cpu核数*2
        }
    ...

    追踪父类

    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    // 代码省略
        
        // 多线程的事件执行器
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {// Tony: 如果执行器为空,则创建一个

    // EventLoop都是通过executor创建线程并执行它的 executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); // 线程创建器,源码见下面 } // EventLoop是EventExecutor接口的具体实现 children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try {
    // 有多个实现方法,见下面 返回NioEventLoop children[i]
    = newChild(executor, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { // Let the caller handle the interruption. Thread.currentThread().interrupt(); break; } } } } } chooser = chooserFactory.newChooser(children); final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length); Collections.addAll(childrenSet, children); readonlyChildren = Collections.unmodifiableSet(childrenSet); }

    ThreadPerTaskExecutor创建线程

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }

    NioEventLoopGroup

    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
      // 省略代码
    
        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    }

    返回NioEventLoop,也传入了executor,用来帮助创建线程执行任务

    看NioEventLoop的具体实现

    public final class NioEventLoop extends SingleThreadEventLoop {
        // 代码省略
        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            final SelectorTuple selectorTuple = openSelector();
            selector = selectorTuple.selector;
            unwrappedSelector = selectorTuple.unwrappedSelector;
            selectStrategy = strategy;
        }
    // 代码省略

    selector是NIO的selector

     NioEventLoop将通道注册到EventLoop的selector上,进行事件轮询

    不断追踪NioEventLoop

    最顶层是

    public interface Executor {
    
        /**
         * Executes the given command at some time in the future.  The command
         * may execute in a new thread, in a pooled thread, or in the calling
         * thread, at the discretion of the {@code Executor} implementation.
         *
         * @param command the runnable task
         * @throws RejectedExecutionException if this task cannot be
         * accepted for execution
         * @throws NullPointerException if command is null
         */
        void execute(Runnable command);
    }

    execute由SingleThreadEventExecutor实现

    提交任务

     public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        // 省略代码  
        @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
            //  判断execute方法的调用者是不是EventLoop同一个线程
            boolean inEventLoop = inEventLoop();
            addTask(task);// 增加到任务队列
            if (!inEventLoop) {// 不是同一个线程,则调用启动方法
                startThread();
                if (isShutdown()) {
                    boolean reject = false;
                    try {
                        if (removeTask(task)) {
                            reject = true;
                        }
                    } catch (UnsupportedOperationException e) {
                        // The task queue does not support removal so the best thing we can do is to just move on and
                        // hope we will be able to pick-up the task before its completely terminated.
                        // In worst case we will log on termination.
                    }
                    if (reject) {
                        reject();
                    }
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }

    startThread

        private void startThread() {
            if (state == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    try {
                        doStartThread();// Tony: 未启动,则触发启动
                    } catch (Throwable cause) {
                        STATE_UPDATER.set(this, ST_NOT_STARTED);
                        PlatformDependent.throwException(cause);
                    }
                }
            }
        }
    
        private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {// 这里的executor是初始化EventLoop的时候传进来的
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {// 创建线程开始执行run方法,所以,每个EventLoop都是执行run
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            if (logger.isErrorEnabled()) {
                                logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                        SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                        "be called before run() implementation terminates.");
                            }
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    if (logger.isWarnEnabled()) {
                                        logger.warn("An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                                    }
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }

    @Override
        protected void run() {// 有任务提交后,被触发执行
            for (;;) {// 执行两件事selector,select的事件 和 taskQueue里面的内容
                try {
                    try {
                        switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
    
                        case SelectStrategy.BUSY_WAIT:
                            // fall-through to SELECT since the busy-wait is not supported with NIO
    
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
    
                            // 'wakenUp.compareAndSet(false, true)' is always evaluated
                            // before calling 'selector.wakeup()' to reduce the wake-up
                            // overhead. (Selector.wakeup() is an expensive operation.)
                            //
                            // However, there is a race condition in this approach.
                            // The race condition is triggered when 'wakenUp' is set to
                            // true too early.
                            //
                            // 'wakenUp' is set to true too early if:
                            // 1) Selector is waken up between 'wakenUp.set(false)' and
                            //    'selector.select(...)'. (BAD)
                            // 2) Selector is waken up between 'selector.select(...)' and
                            //    'if (wakenUp.get()) { ... }'. (OK)
                            //
                            // In the first case, 'wakenUp' is set to true and the
                            // following 'selector.select(...)' will wake up immediately.
                            // Until 'wakenUp' is set to false again in the next round,
                            // 'wakenUp.compareAndSet(false, true)' will fail, and therefore
                            // any attempt to wake up the Selector will fail, too, causing
                            // the following 'selector.select(...)' call to block
                            // unnecessarily.
                            //
                            // To fix this problem, we wake up the selector again if wakenUp
                            // is true immediately after selector.select(...).
                            // It is inefficient in that it wakes up the selector for both
                            // the first case (BAD - wake-up required) and the second case
                            // (OK - no wake-up required).
    
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                        }
                    } catch (IOException e) {
                        // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                        // the selector and retry. https://github.com/netty/netty/issues/8566
                        rebuildSelector0();
                        handleLoopException(e);
                        continue;
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {// 处理事件
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    
        private static void handleLoopException(Throwable t) {
            logger.warn("Unexpected exception in the selector loop.", t);
    
            // Prevent possible consecutive immediate failures that lead to
            // excessive CPU consumption.
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // Ignore.
            }
        }
    
        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {// 处理事件
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
            // check if the set is empty and if so just return to not create garbage by
            // creating a new Iterator every time even if there is nothing to process.
            // See https://github.com/netty/netty/issues/597
            if (selectedKeys.isEmpty()) {
                return;
            }
            // 获取selector所有选中的事件(ServerSocketChannel主要是OP_ACCEPT,SocketChannle主要是OP_READ)
            Iterator<SelectionKey> i = selectedKeys.iterator();
            for (;;) {
                final SelectionKey k = i.next();
                final Object a = k.attachment();
                i.remove();
    
                if (a instanceof AbstractNioChannel) {//  处理niochannel事件
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (!i.hasNext()) {
                    break;
                }
    
                if (needsToSelectAgain) {
                    selectAgain();
                    selectedKeys = selector.selectedKeys();
    
                    // Create the iterator again to avoid ConcurrentModificationException
                    if (selectedKeys.isEmpty()) {
                        break;
                    } else {
                        i = selectedKeys.iterator();
                    }
                }
            }
        }

    EventLoop自身实现了Executor接口,当调用executor方法提交任务时,则判断是否启动,未启动则调用内置的executor创建新线程来触发run方法执行

    channel注册到selector上

    请求

    服务端启动的过程,服务端的启动就是Bind绑定端口的过程

    回到EchoServer

    追踪bind源码

     // Start the server.
    ChannelFuture f = b.bind(PORT).sync();

    bind绑定端口并创建通道

     public ChannelFuture bind(int inetPort) {
            return bind(new InetSocketAddress(inetPort)); // 绑定端口的入口代码
        }
    
        /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(String inetHost, int inetPort) {
            return bind(SocketUtils.socketAddress(inetHost, inetPort));
        }
    
        /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(InetAddress inetHost, int inetPort) {
            return bind(new InetSocketAddress(inetHost, inetPort));
        }
    
        /**
         * Create a new {@link Channel} and bind it.
         */
        public ChannelFuture bind(SocketAddress localAddress) {
            validate();
            if (localAddress == null) {
                throw new NullPointerException("localAddress");
            }
            return doBind(localAddress);// 真正干事的代码
        }
    
        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();// 创建/初始化ServerSocketChannel对象,并注册到Selector
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
            // 等注册完成之后,再绑定端口。 防止端口开放了,却不能处理请求
            if (regFuture.isDone()) {
                // At this point we know that the registration was complete and successful.
                ChannelPromise promise = channel.newPromise();
                doBind0(regFuture, channel, localAddress, promise);// 实际操作绑定端口的代码
                return promise;
            } else {
                // Registration future is almost always fulfilled already, but just in case it's not.
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                            // IllegalStateException once we try to access the EventLoop of the Channel.
                            promise.setFailure(cause);
                        } else {
                            // Registration was successful, so set the correct executor to use.
                            // See https://github.com/netty/netty/issues/2586
                            promise.registered();
    
                            doBind0(regFuture, channel, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    
        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel(); // 通道
                init(channel); // 初始化通道
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            //(一开始初始化的group)MultithreadEventLoopGroup里面选择一个eventLoop进行绑定
            ChannelFuture regFuture = config().group().register(channel); // register见下面
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    
            // If we are here and the promise is not failed, it's one of the following cases:
            // 1) If we attempted registration from the event loop, the registration has been completed at this point.
            //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
            // 2) If we attempted registration from the other thread, the registration request has been successfully
            //    added to the event loop's task queue for later execution.
            //    i.e. It's safe to attempt bind() or connect() now:
            //         because bind() or connect() will be executed *after* the scheduled registration task is executed
            //         because register(), bind(), and connect() are all bound to the same thread.
    
            return regFuture;
        }
    
        abstract void init(Channel channel) throws Exception;
    
        private static void doBind0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {//这里向EventLoop提交任务,一旦有任务提交则会触发EventLoop的轮询
                    if (regFuture.isSuccess()) {// 本质又绕回到channel的bind方法上面。
                        channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    @Override
    public ChannelFuture register(Channel channel){
        return next().register(channel); // 根据选择器,选择一个合适的NioEventLoop进行注册(SingleEventLoop)
    }

    追踪register代码...

    netty中的Channel是一个抽象的概念,可以理解为对JDK NIO Channel的增强和拓展。增加了很多属性和方法,下面罗列几个常见的属性和方法:

    三、责任链设计模式

     责任链(Chain of Responsibility Pattern)为请求创建了一个处理对象的链

    发起请求具体处理请求的过程进行解耦:职责链上的处理者负责处理请求,客户只需要将请求发送到职责链上即可,无须关心请求的处理细节和请求的传递。

     handler是具体处理请求的程序

     实现责任链模式4个要素:处理器抽象类、具体的处理器实现类、保存处理器信息、处理执行

     

     责任链代码示例

    // -----链表形式调用------netty就是类似的这种形式
    public class PipelineDemo {
        /**
         * 初始化的时候造一个head,作为责任链的开始,但是并没有具体的处理
         */
        public HandlerChainContext head = new HandlerChainContext(new AbstractHandler() {
            @Override
            void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
                handlerChainContext.runNext(arg0);
            }
        });
    
        public void requestProcess(Object arg0) {
            this.head.handler(arg0);
        }
    
        public void addLast(AbstractHandler handler) {
            HandlerChainContext context = head;
            while (context.next != null) {
                context = context.next;
            }
            context.next = new HandlerChainContext(handler);
        }
    
    
        public static void main(String[] args) {
            PipelineDemo pipelineChainDemo = new PipelineDemo();
            pipelineChainDemo.addLast(new Handler2());
            pipelineChainDemo.addLast(new Handler1());
            pipelineChainDemo.addLast(new Handler1());
            pipelineChainDemo.addLast(new Handler2());
    
            // 发起请求
            pipelineChainDemo.requestProcess("火车呜呜呜~~");
    
        }
    }
    
    /**
     * handler上下文,我主要负责维护链,和链的执行
     */
    class HandlerChainContext {
        HandlerChainContext next; // 下一个节点
        AbstractHandler handler;
    
        public HandlerChainContext(AbstractHandler handler) {
            this.handler = handler;
        }
    
        void handler(Object arg0) {
            this.handler.doHandler(this, arg0);
        }
    
        /**
         * 继续执行下一个
         */
        void runNext(Object arg0) {
            if (this.next != null) {
                this.next.handler(arg0);
            }
        }
    }
    
    // 处理器抽象类
    abstract class AbstractHandler {
        /**
         * 处理器,这个处理器就做一件事情,在传入的字符串中增加一个尾巴..
         */
        abstract void doHandler(HandlerChainContext handlerChainContext, Object arg0); // handler方法
    }
    
    // 处理器具体实现类
    class Handler1 extends AbstractHandler {
        @Override
        void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
            arg0 = arg0.toString() + "..handler1的小尾巴.....";
            System.out.println("我是Handler1的实例,我在处理:" + arg0);
            // 继续执行下一个
            handlerChainContext.runNext(arg0);
        }
    }
    
    // 处理器具体实现类
    class Handler2 extends AbstractHandler {
        @Override
        void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
            arg0 = arg0.toString() + "..handler2的小尾巴.....";
            System.out.println("我是Handler2的实例,我在处理:" + arg0);
            // 继续执行下一个
            handlerChainContext.runNext(arg0);
        }
    }

     Netty中的ChannelPipeline责任链

     Nettty中定义了很多事件

     Pipeline中的handler是什么?

     ChannelPipeline是线程安全的,ChannelHandler可以在任何时候添加或删除。

    例如,可以在即将交换敏感信息时插入加密处理程序,并在交换后删除。

    一般操作,初始化的时候增加进去,较少删除。下面是Pipeline中管理handler的API:

     源码查看

    handler执行分析

    分析registered入站事件的处理

     源码查看 从bind()进入

     

    bind出站事件分析

     源码查看

     

    分析accept入站事件的处理

     源码查看

     

    read入站事件的处理

     源码查看

    四、零拷贝机制

    JDK ByteBuffer存在一些缺点

    • 无法动态扩容。长度是固定的,不能动态扩展和收缩,当数据大于ByteBuffer容量时,会发生索引越界异常。
    • API使用复杂。读写的时候需要手工调用flip()和rewind()等方法,使用时需要非常谨慎的使用这些api,否则容易出错。

    1.Netty自己的ByteBuf

    ByteBuf是为解决ByteBuffer的问题和满足网络应用程序开发人员的日常需求而设计的。

    ByteBuf的增强

    •  API操作便捷性
    • 动态扩容
    • 多种ByteBuf实现
    • 高效的零拷贝机制

     2.ByteBuf操作

    ByteBuf三个重要属性:capacity容量、readerIndex读取位置、writeIndex写入位置

    提供了两个指针变量来支持顺序读和写操作,分别时readerIndex和写操作writeIndex

    常用方法定义:

       

     

     

    下图显示了一个缓冲区是如何被两个指针分割成三个区域的:

    示例代码

    /**
     * bytebuf的常规API操作示例
     */
    public class ByteBufDemo {
        @Test
        public void apiTest() {
            //  +-------------------+------------------+------------------+
            //  | discardable bytes |  readable bytes  |  writable bytes  |
            //  |                   |     (CONTENT)    |                  |
            //  +-------------------+------------------+------------------+
            //  |                   |                  |                  |
            //  0      <=       readerIndex   <=   writerIndex    <=    capacity
    
            // 1.创建一个非池化的ByteBuf,大小为10个字节
            ByteBuf buf = Unpooled.buffer(10);
            System.out.println("原始ByteBuf为====================>" + buf.toString());
            System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 2.写入一段内容
            byte[] bytes = {1, 2, 3, 4, 5};
            buf.writeBytes(bytes);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 3.读取一段内容
            byte b1 = buf.readByte();
            byte b2 = buf.readByte();
            System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
            System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 4.将读取的内容丢弃
            buf.discardReadBytes();
            System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
            System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 5.清空读写指针
            buf.clear();
            System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
            System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 6.再次写入一段内容,比第一段内容少
            byte[] bytes2 = {1, 2, 3};
            buf.writeBytes(bytes2);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 7.将ByteBuf清零
            buf.setZero(0, buf.capacity());
            System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
            System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "
    ");
    
            // 8.再次写入一段超过容量的内容
            byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
            buf.writeBytes(bytes3);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
            //  随机访问索引 getByte
            //  顺序读 read*
            //  顺序写 write*
            //  清除已读内容 discardReadBytes
            //  清除缓冲区 clear
            //  搜索操作
            //  标记和重置
            //  完整代码示例:参考
            // 搜索操作 读取指定位置 buf.getByte(1);
            //
        }
    
    }

    Unpooled推荐的方式创建buf

    可以动态扩容

    运行结果

    原始ByteBuf为====================>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
    1.ByteBuf中的内容为===============>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    
    写入的bytes为====================>[1, 2, 3, 4, 5]
    写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 5, cap: 10)
    2.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
    
    读取的bytes为====================>[1, 2]
    读取一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 2, widx: 5, cap: 10)
    3.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
    
    将读取的内容丢弃后ByteBuf为========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
    4.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
    
    将读写指针清空后ByteBuf为==========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 0, cap: 10)
    5.ByteBuf中的内容为===============>[3, 4, 5, 4, 5, 0, 0, 0, 0, 0]
    
    写入的bytes为====================>[1, 2, 3]
    写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
    6.ByteBuf中的内容为===============>[1, 2, 3, 4, 5, 0, 0, 0, 0, 0]
    
    将内容清零后ByteBuf为==============>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 3, cap: 10)
    7.ByteBuf中的内容为================>[0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    
    写入的bytes为====================>[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
    写入一段内容后ByteBuf为===========>UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 14, cap: 64)
    8.ByteBuf中的内容为===============>[0, 0, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

    动态扩容

    io.netty.buffer.AbstractByteBuf下:

      final void ensureWritable0(int minWritableBytes) {
            ensureAccessible();
            if (minWritableBytes <= writableBytes()) {
                return;
            }
            final int writerIndex = writerIndex();
            if (checkBounds) {
                if (minWritableBytes > maxCapacity - writerIndex) {
                    throw new IndexOutOfBoundsException(String.format(
                            "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                            writerIndex, minWritableBytes, maxCapacity, this));
                }
            }
    
            // Normalize the current capacity to the power of 2.
            int minNewCapacity = writerIndex + minWritableBytes;
            int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
    
            int fastCapacity = writerIndex + maxFastWritableBytes();
            // Grow by a smaller amount if it will avoid reallocation
            if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
                newCapacity = fastCapacity;
            }
    
            // Adjust to the new capacity.
            capacity(newCapacity);
        }

    calculateNewCapacity

        @Override
        public int calculateNewCapacity(int minNewCapacity, int maxCapacity) { // minNewCapacity:14 maxCapacity:2147483647
            checkPositiveOrZero(minNewCapacity, "minNewCapacity");
            if (minNewCapacity > maxCapacity) { // minCapacity:14
                throw new IllegalArgumentException(String.format(
                        "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                        minNewCapacity, maxCapacity));
            } // 阈值4M,这个阈值的用意:容量要求4M以内,每次扩容以2的倍数进行计算,超过4M容量,另外的计算方式。
            final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
    
            if (minNewCapacity == threshold) { // 新容量的最小要求,如果等于阈值,则立刻返回
                return threshold;
            }
    
            // If over threshold, do not double but just increase by threshold.
            if (minNewCapacity > threshold) {
                int newCapacity = minNewCapacity / threshold * threshold;
                if (newCapacity > maxCapacity - threshold) {
                    newCapacity = maxCapacity;
                } else {
                    newCapacity += threshold;
                }
                return newCapacity;
            }
            // 如果容量要求没超过阈值,则从64字节开始,不断增加一倍,直至满足新容量最小要求
            // Not over threshold. Double up to 4 MiB, starting from 64.
            int newCapacity = 64;
            while (newCapacity < minNewCapacity) {
                newCapacity <<= 1;
            }
    
            return Math.min(newCapacity, maxCapacity);
        }

    选择合适的ByteBuf实现:

    了解核心的3个维度的划分方式,8种具体实现

     在使用中,都是使用ByteBufAllocator分配器进行申请,同时分配器具有内存管理的功能

    堆外内存示例

    /**
     * 堆外内存的常规API操作示例
     */
    public class DirectByteBufDemo {
        @Test
        public void apiTest() {
            //  +-------------------+------------------+------------------+
            //  | discardable bytes |  readable bytes  |  writable bytes  |
            //  |                   |     (CONTENT)    |                  |
            //  +-------------------+------------------+------------------+
            //  |                   |                  |                  |
            //  0      <=       readerIndex   <=   writerIndex    <=    capacity
    
            // 1.创建一个非池化的ByteBuf,大小为10个字节
            ByteBuf buf = Unpooled.directBuffer(10);
            System.out.println("原始ByteBuf为====================>" + buf.toString());
            // System.out.println("1.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    "); 堆外内存不能用buf.array()
    
            // 2.写入一段内容
            byte[] bytes = {1, 2, 3, 4, 5};
            buf.writeBytes(bytes);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
            //System.out.println("2.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 3.读取一段内容
            byte b1 = buf.readByte();
            byte b2 = buf.readByte();
            System.out.println("读取的bytes为====================>" + Arrays.toString(new byte[]{b1, b2}));
            System.out.println("读取一段内容后ByteBuf为===========>" + buf.toString());
           //System.out.println("3.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 4.将读取的内容丢弃
            buf.discardReadBytes();
            System.out.println("将读取的内容丢弃后ByteBuf为========>" + buf.toString());
            //System.out.println("4.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 5.清空读写指针
            buf.clear();
            System.out.println("将读写指针清空后ByteBuf为==========>" + buf.toString());
            //System.out.println("5.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 6.再次写入一段内容,比第一段内容少
            byte[] bytes2 = {1, 2, 3};
            buf.writeBytes(bytes2);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes2));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
           // System.out.println("6.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
    
            // 7.将ByteBuf清零
            buf.setZero(0, buf.capacity());
            System.out.println("将内容清零后ByteBuf为==============>" + buf.toString());
           // System.out.println("7.ByteBuf中的内容为================>" + Arrays.toString(buf.array()) + "
    ");
    
            // 8.再次写入一段超过容量的内容
            byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11};
            buf.writeBytes(bytes3);
            System.out.println("写入的bytes为====================>" + Arrays.toString(bytes3));
            System.out.println("写入一段内容后ByteBuf为===========>" + buf.toString());
           // System.out.println("8.ByteBuf中的内容为===============>" + Arrays.toString(buf.array()) + "
    ");
            //  随机访问索引 getByte
            //  顺序读 read*
            //  顺序写 write*
            //  清除已读内容 discardReadBytes
            //  清除缓冲区 clear
            //  搜索操作
            //  标记和重置
            //  完整代码示例:参考
            // 搜索操作 读取指定位置 buf.getByte(1);
            //
        }
    
    }

    Unsafe的实现

     

    内存复用

     

    PooledByteBuf对象、内存复用

    3.零拷贝机制

    Netty的零拷贝机制,是一种应用层的实现。和底层的JVM、操作系统内存机制并无过多关联。

     代码示例

    /**
     * 零拷贝示例
     */
    public class ZeroCopyTest {
        @org.junit.Test
        public void wrapTest() {
            byte[] arr = {1, 2, 3, 4, 5};
            ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
            System.out.println(byteBuf.getByte(4));
            arr[4] = 6;
            System.out.println(byteBuf.getByte(4));
        } // java数组转为buf 5 arr修改为6后,byteBuf也变为6,说明两者用的是相同的数据,零拷贝
    
        @org.junit.Test
        public void sliceTest() {
            ByteBuf buffer1 = Unpooled.wrappedBuffer("hello".getBytes());
            ByteBuf newBuffer = buffer1.slice(1, 2);
            newBuffer.unwrap();
            System.out.println(newBuffer.toString());
        } // 拆分。不会动原来的buf,还保留原来buf的地址
    
        @org.junit.Test
        public void compositeTest() {
            ByteBuf buffer1 = Unpooled.buffer(3);
            buffer1.writeByte(1);
            ByteBuf buffer2 = Unpooled.buffer(3);
            buffer2.writeByte(4);
            CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
            CompositeByteBuf newBuffer = compositeByteBuf.addComponents(true, buffer1, buffer2);
            System.out.println(newBuffer);
        } // 合并。还保留原来buf的信息
    
    }
  • 相关阅读:
    20155313 杨瀚 《网络对抗技术》实验九 Web安全基础
    20155313 杨瀚 《网络对抗技术》实验八 Web基础
    20155313 杨瀚 《网络对抗技术》实验七 网络欺诈防范
    20155313 杨瀚 《网络对抗技术》实验六 信息搜集与漏洞扫描
    20155313 杨瀚 《网络对抗技术》实验五 MSF基础应用
    20155313 杨瀚 《网络对抗技术》实验四 恶意代码分析
    20155313 杨瀚 《网络对抗技术》实验三 免杀原理与实践
    20155313 杨瀚 《网络对抗技术》实验二 后门原理与实践
    20155313 杨瀚 《网络对抗技术》实验一 PC平台逆向破解(5)M
    20155313 2017-2018-1 《信息安全系统设计基础》课程总结
  • 原文地址:https://www.cnblogs.com/aidata/p/11524388.html
Copyright © 2011-2022 走看看