zoukankan      html  css  js  c++  java
  • Netty框架原理

    用这张图表示的就是一个基本的Netty框架

    通过创建两个线程池,一个负责接入, 一个负责处理

    public class Start {
        public static void main(String[] args) {
                
            //初始化线程
            NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());   //1
            
            //获取服务类
            ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);
            
            //绑定端口
            bootstrap.bind(new InetSocketAddress(80));
            
            System.out.println("start");
        }
    }
    NioSelectorRunnablePool  相当于一个线程池操作类
    public class NioSelectorRunnablePool {
        
        /**
         * boss 线程数组
         */
        private final AtomicInteger bossIndex = new AtomicInteger();
        private Boss[] bosses;
        
        /**
         * worker线程数组
         */
        private final AtomicInteger workerIndex = new AtomicInteger();
        private Worker[] workers;
    
        public NioSelectorRunnablePool(Executor boss, Executor worker) {
            //初始化boss线程  即接入线程 
            initBoss(boss, 1);
            //根据当前核心数*2 初始化处理线程
            initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);
        }
    
        /**
         * 初始化boss线程组
         * @param boss
         * @param count
         */
        private void initBoss(Executor boss, int count) {
            this.bosses = new NioServerBoss[count];
            for (int i = 0; i < bosses.length; i++) {
                //线程池数组
                bosses[i] = new NioServerBoss(boss, "boss thread" + (i+1), this);
            }
        }
    
        /**
         * 初始化worker线程
         * @param worker worker线程池
         * @param count 线程数
         */
        private void initWorker(Executor worker, int count) {
            this.workers = new NioServerWorker[count];
            for (int i = 0; i < bosses.length; i++) {
                workers[i] = new NioServerWorker(worker, "worker thread" + (i+1), this);
            }        
        }
        
        /**
         * 获取下一个boss线程
         * @return
         */
        public Boss nextBoss() {
            return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
        }
        
        /**
         * 获取下一个work线程
         * @return
         */
        public Worker nextWorkr() {
            return workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];
        }
    }
    初始化两个线程池 NioServerBoss 和NioServerWorker  两个类都实现 各自的Boss  和 Worker 接口   继承 了 AbstractNioSelector 抽象Selector
    public interface Boss {
        
        /**
         * 加入一个新的ServerSocket
         * @param serverChannel
         */
        public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
    }
    Boos
    public interface Worker {
        /**
         * 加入一个新的客户端会话
         * @param channel
         */
        public void registerNewChannelTask(SocketChannel channel);
    
    }
    Worker
    /**
    *@Description 抽象selector线程类
    *@autor:mxz
    *2018-08-17
    **/
    public abstract class AbstractNioSelector implements Runnable{
        /**
         * 线程池
         */
        private Executor executor;
        private String threadName;
        
        /**
         * 选择器wakenUp状态标记
         */
        protected final AtomicBoolean wakenUp = new AtomicBoolean();
        
        /**
         * 线程管理对象,存储线程池数组
         */
        private NioSelectorRunnablePool selectorRunnablePool;
    
        protected Selector selector;
    
        /**
         * 任务队列
         */
        private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
        
    
        public AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
            this.executor = executor;
            this.threadName = threadName;
            this.selectorRunnablePool = selectorRunnablePool;
            //一个线程 加入一个selector
            openSelector();
        }
        
        /**
         * 获取线程管理对象
         * @return
         */
        protected NioSelectorRunnablePool  getselectorRunnablePool() {
            return this.selectorRunnablePool;
        }
        /**
         * 获取selector并启动线程
         */
        private void openSelector() {
            try {
                this.selector = Selector.open();
            } catch (IOException e) {
                throw new RuntimeException("Failed to create a selector");
            }
            //线程池执行该线程
            executor.execute(this);
        }
    
        @Override
        public void run() {
            Thread.currentThread().setName(this.threadName);
            
            while (true) {
                try {
                    wakenUp.set(false);
                    //当注册事件到达时,方法返回,否则该方法会一直阻塞
                    select(selector);
                    
                    //运行任务队列中的任务
                    processTaskQueue();
                    
                    process(selector);
                } catch (Exception e) {
                    
                }
            }
        }
        /**
         * 注册一个任务并激活selector 重新执行
         * @param task
         */
        protected final void registerTask(Runnable task) {
            taskQueue.add(task);
            
            Selector selector = this.selector;
            
            if (selector != null) {
                if (wakenUp.compareAndSet(false, true)) {
                    //会首先唤醒Boss  接入总线线程  唤醒阻塞在selector上的线程, 去做其他事情,例如注册channel改变interestOps的值
                    selector.wakeup();
                }
            } else {
                taskQueue.remove(task);
            }
        }
        
        /**
         *  
         */
        
        //执行队列中的任务
        private void processTaskQueue() {
            for(;;) {
                final Runnable task = taskQueue.poll();
                if (task == null) {
                    break;
                }
                task.run();
            }
        }
    
        /**
         * selector抽象方法
         * @param selector
         * @return
         * @throws IOException
         */
        protected abstract int select(Selector selector) throws IOException;
    
        /**
         * selector的业务处理
         * @param selector
         * @return
         * @throws IOException
         */
        protected abstract void process(Selector selector) throws IOException;
    }

    执行openSelector() 创建 selector  execute 执行线程  执行各自的 select()

    public class NioServerBoss extends AbstractNioSelector implements Boss {
    
        public NioServerBoss(Executor boss, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
            super(boss, threadName, selectorRunnablePool);
        }
    
        @Override
        protected int select(Selector selector) throws IOException {
            return selector.select();
        }
    
        @Override
        protected void process(Selector selector) throws IOException {
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            if (selectedKeys.isEmpty()) {
                return;
            }
            
            for (SelectionKey key : selectedKeys) {
                selectedKeys.remove(key);
                ServerSocketChannel server = (ServerSocketChannel) key.channel();
                //新客户端
                SocketChannel channel = server.accept();
                //设置为非阻塞
                channel.configureBlocking(false);
                //获取一个worker
                Worker nextworker = getselectorRunnablePool().nextWorkr();
                //注册新客户端介入任务给另一个线程任务队列加入新任务
                nextworker.registerNewChannelTask(channel);
                System.out.println("新客户连接");
            }
        }
    
        @Override
        public void registerAcceptChannelTask(final ServerSocketChannel serverChannel) {
            final Selector selector = this.selector;
            registerTask(new Runnable() {
                @Override
                public void run() {
                    try {
                        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                    } catch (ClosedChannelException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
    }
    public class NioServerWorker extends AbstractNioSelector implements Worker{
    
        public NioServerWorker(Executor worker, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
            super(worker, threadName, selectorRunnablePool);
        }
    
        @Override
        protected int select(Selector selector) throws IOException {
            return selector.select();
        }
    
        @Override
        protected void process(Selector selector) throws IOException {
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            if (selectedKeys.isEmpty()) {
                return;
            }
            Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                //移除 防止重复提交
                iterator.remove();
                
                //得到事件发生的socket通道
                SocketChannel channel = (SocketChannel) key.channel();
    
                //数据总长度
                int ret = 0;
                boolean failure = true;
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //读取数据
                try {
                    ret = channel.read(buffer);
                    failure = false;
                } catch (Exception e) {
                }
                //判断是否连接已断开
                if (ret <= 0 || failure) {
                    key.channel();
                    System.out.println("客户端已断开连接");
                } else {
                    System.out.println("收到数据:" + new String (buffer.array()));
                    
                    //回 写数据
                    ByteBuffer outBuf = ByteBuffer.wrap("收到
    ".getBytes());
                    channel.write(outBuf);// 将消息发送到客户端
                }
            }
        }

    此时都会selector.selector() 阻塞等待连接

    此时再看 bootstrap.bind(new InetSocketAddress(80));  会调用 

    public class ServerBootstrap {
    
        private NioSelectorRunnablePool selectorRunnablePool;
    
        public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {
            this.selectorRunnablePool = selectorRunnablePool;
        }
    
        public void bind(final InetSocketAddress localAddress) {
            try {
                //获得一个ServerSocket通道
                ServerSocketChannel serverChannel = ServerSocketChannel.open();
                //设置通道为非阻塞
                serverChannel.configureBlocking(false);
                //将该通道对应的serverSocket绑定到port
                serverChannel.socket().bind(localAddress);
                
                //获取一个boss线程
                Boss nextBoss = selectorRunnablePool.nextBoss();
                //向当前boss 线 程注册一个ServerSocket通道
                nextBoss.registerAcceptChannelTask(serverChannel);
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    
    } 

    这个时候通过获取下一个线程注入任务池(其实就一个) 这里可以看AbstractSelector 中的nextBoss 和nextWorker 方法 从线程数组循环拿出线程

    这个时候会将当前通道在selector上注册 OP_ACCEPT  的事件 并将这个任务添加到Taskqueue

    /**
         * 注册一个任务并激活selector 重新执行
         * @param task
         */
        protected final void registerTask(Runnable task) {
            taskQueue.add(task);
            
            Selector selector = this.selector;
            
            if (selector != null) {
                if (wakenUp.compareAndSet(false, true)) {
                    //会首先唤醒Boss  接入总线线程  唤醒阻塞在selector上的线程, 去做其他事情,例如注册channel改变interestOps的值
                    selector.wakeup();
                }
            } else {
                taskQueue.remove(task);
            }
        }

    这个时候会唤醒selector  不过唤醒的是boss的selector

    唤醒后之前的阻塞会继续往下执行

              wakenUp.set(false);
                    //当注册事件到达时,方法返回,否则该方法会一直阻塞
                    select(selector);
                    
                    //运行任务队列中的任务
                    processTaskQueue();
                    
                    process(selector);

    先执行任务  也就是 channel regiter 到 selector 上

    执行 Boss中的 process 的方法

        @Override
        protected void process(Selector selector) throws IOException {
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            if (selectedKeys.isEmpty()) {
                return;
            }
            
            for (SelectionKey key : selectedKeys) {
                selectedKeys.remove(key);
                ServerSocketChannel server = (ServerSocketChannel) key.channel();
                //新客户端
                SocketChannel channel = server.accept();
                //设置为非阻塞
                channel.configureBlocking(false);
                //获取一个worker
                Worker nextworker = getselectorRunnablePool().nextWorkr();
                //注册新客户端介入任务给另一个线程任务队列加入新任务
                nextworker.registerNewChannelTask(channel);
                System.out.println("新客户连接");
            }
        }

    同理再去 走相同的路线 把获取到的通道绑到 worker的selector上


     
  • 相关阅读:
    try,catch,finally的简单问题
    设置类可序列化,写入VIewState
    jQuery实现购物车物品数量的加减 (针对GirdView的类似事件)
    js获取Gridview中的控件id
    asmx ASp.net AJAX使用 ScriptManager
    js返回上一页并刷新,JS实现关闭当前子窗口,刷新父窗口
    asp.net(c#)网页跳转七种方法小结
    在触发器中回滚和提交
    redis 缓存对象、列表
    spring cloud 停止服务
  • 原文地址:https://www.cnblogs.com/mxz1994/p/9512832.html
Copyright © 2011-2022 走看看