zoukankan      html  css  js  c++  java
  • netty源码分析

      1、Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。

      2、目前netty有3个版本netty3、netty4、netty5。3个版本的内容有所不同。neety3是核心的代码介绍。相对于netty4、和netty5的复杂性来说。netty3的源码是值得学习的。我这里解析了netty3的一些源码,仅供大家理解,也是为了方便大家理解做了很多简化。不代表作者的开发思路。

      3、我们先来看一张图(这张图是我在学习源码的时候扣的,哈哈)

      一、传统NIO流

      

      1)一个线程里面,存在一个selector,当然这个selector也承担起看大门和服务客人的工作。

      2)这里不管多少客户端进来,都是这个selector来处理。这样就就加大了这个服务员的工作量

      3)为了加入线程池,让多个selector同时工作,当时目的性都是一样的。

      4)虽然看大门的和服务客人的都是服务员,但是还是存在差别的。为了更好的处理多个线程的问题。所以这里netty就诞生了。

     二、netty框架

      

      理解:

      1)netty3的框架也是基于nio流做出来的。所以这里会详细介绍netty3框架的思路

      2)将看门的服务员和服务客人的服务员分开。形成两块(也就是2个线程池,也就是后面的boss和worker)

      3)当一个客人来的时候,首先boss,进行接待。然后boss分配工作给worker,这个,在两个线程池的工作下,有条不乱。

      4)原理:就是将看大门的selector和服务客人的selector分开。然后通过boss线程池,下发任务给对应的worker

      4、netty3源码分析

      1)加入对应的jar包。我这里为了了解源码用的是netty3的包。

        <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty</artifactId>
                <version>3.10.6.Final</version>
            </dependency>

      2)目录结构

      

      说明:

      a、NettyBoss、NettyWork是针对于selector做区分。虽然他们很多共性,我这里为了好理解,并没有做抽象类(忽略开发思路)。

      b、ThreadHandle是用来初始化线程池和对应的接口。

      c、Start为启动类

      3)NettyBoss(看大门的服务员,第一种线程selector)

    复制代码
    package com.troy.application.netty;
    

    import java.io.IOException;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Queue;
    import java.util.Set;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.Executor;
    import java.util.concurrent.atomic.AtomicBoolean;

    public class NettyBoss {

    </span><span style="color: #008000">//</span><span style="color: #008000">线程池</span>
    <span style="color: #0000ff">public</span> <span style="color: #0000ff">final</span><span style="color: #000000"> Executor executor;
    </span><span style="color: #008000">//</span><span style="color: #008000">boss选择器</span>
    <span style="color: #0000ff">protected</span><span style="color: #000000"> Selector selector;
    </span><span style="color: #008000">//</span><span style="color: #008000">原子变量,主要是用来保护线程安全。当本线程执行的时候,排除其他线程的执行</span>
    <span style="color: #0000ff">protected</span> <span style="color: #0000ff">final</span> AtomicBoolean wakenUp = <span style="color: #0000ff">new</span><span style="color: #000000"> AtomicBoolean();
    </span><span style="color: #008000">//</span><span style="color: #008000">队列,线程安全队列。</span>
    <span style="color: #0000ff">public</span> <span style="color: #0000ff">final</span> Queue&lt;Runnable&gt; taskQueue = <span style="color: #0000ff">new</span> ConcurrentLinkedQueue&lt;&gt;<span style="color: #000000">();
    </span><span style="color: #008000">//</span><span style="color: #008000">线程处理,这里主要是拿到work的线程池</span>
    <span style="color: #0000ff">protected</span><span style="color: #000000"> ThreadHandle threadHandle;
    
    </span><span style="color: #008000">//</span><span style="color: #008000">初始化</span>
    <span style="color: #0000ff">public</span><span style="color: #000000"> NettyBoss(Executor executor,ThreadHandle threadHandle) {
        </span><span style="color: #008000">//</span><span style="color: #008000">赋值</span>
        <span style="color: #0000ff">this</span>.executor =<span style="color: #000000"> executor;
        </span><span style="color: #0000ff">this</span>.threadHandle =<span style="color: #000000"> threadHandle;
        </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
            </span><span style="color: #008000">//</span><span style="color: #008000">每一个线程选择器</span>
            <span style="color: #0000ff">this</span>.selector =<span style="color: #000000"> Selector.open();
        } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (IOException e) {
            e.printStackTrace();
        }
        </span><span style="color: #008000">//</span><span style="color: #008000">从线程中获取一个线程执行以下内容</span>
        executor.execute(() -&gt;<span style="color: #000000"> {
            </span><span style="color: #0000ff">while</span> (<span style="color: #0000ff">true</span><span style="color: #000000">) {
                </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
                    </span><span style="color: #008000">//</span><span style="color: #008000">这里的目前就是排除其他线程同事执行,false因为这里处于阻塞状态,不用开启</span>
                    wakenUp.set(<span style="color: #0000ff">false</span><span style="color: #000000">);
                    </span><span style="color: #008000">//</span><span style="color: #008000">选择器阻塞</span>
    

    selector.select();
    //运行队列中的任务
    while (true) {
    final Runnable task = taskQueue.poll();
    if (task == null) {
    break;
    }
    //如果任务存在开始运行
    task.run();
    }
    //对进来的进行处理
    this.process(selector);
    }
    catch (Exception e) {
    e.printStackTrace();
    }
    }
    });
    }

    </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span> process(Selector selector) <span style="color: #0000ff">throws</span><span style="color: #000000"> IOException {
        Set</span>&lt;SelectionKey&gt; selectedKeys =<span style="color: #000000"> selector.selectedKeys();
        </span><span style="color: #0000ff">if</span><span style="color: #000000"> (selectedKeys.isEmpty()) {
            </span><span style="color: #0000ff">return</span><span style="color: #000000">;
        }
        </span><span style="color: #0000ff">for</span> (Iterator&lt;SelectionKey&gt; i =<span style="color: #000000"> selectedKeys.iterator(); i.hasNext();) {
            SelectionKey key </span>=<span style="color: #000000"> i.next();
            i.remove();
            ServerSocketChannel server </span>=<span style="color: #000000"> (ServerSocketChannel) key.channel();
            </span><span style="color: #008000">//</span><span style="color: #008000"> 新客户端</span>
            SocketChannel channel =<span style="color: #000000"> server.accept();
            </span><span style="color: #008000">//</span><span style="color: #008000"> 设置为非阻塞</span>
            channel.configureBlocking(<span style="color: #0000ff">false</span><span style="color: #000000">);
            </span><span style="color: #008000">//</span><span style="color: #008000"> 获取一个worker</span>
            NettyWork nextworker = threadHandle.workeres[Math.abs(threadHandle.workerIndex.getAndIncrement() %<span style="color: #000000"> threadHandle.workeres.length)];
            </span><span style="color: #008000">//</span><span style="color: #008000"> 注册新客户端接入任务</span>
            Runnable runnable = () -&gt;<span style="color: #000000"> {
                </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
                    </span><span style="color: #008000">//</span><span style="color: #008000">将客户端注册到selector中</span>
    

    channel.register(nextworker.selector, SelectionKey.OP_READ);
    }
    catch (ClosedChannelException e) {
    e.printStackTrace();
    }
    };
    //添加到work的队列中
    nextworker.taskQueue.add(runnable);
    if (nextworker.selector != null) {
    //这里的目前就是开启执行过程
    if (nextworker.wakenUp.compareAndSet(false, true)) {
    //放开本次阻塞,进行下一步执行
    nextworker.selector.wakeup();
    }
    }
    else {
    //任务完成移除线程
    taskQueue.remove(runnable);
    }
    System.out.println(
    "新客户端链接");
    }
    }
    }

    复制代码

      解释:

      a、初始化的时候,赋值线程池,和线程处理类(线程处理类目的是获取worker的工作线程)

      b、executor为线程池的执行过程。

      c、selector.select()为形成阻塞,wakenUp为了线程安全考核。在接入客户端的时候用selector.wakeup()来放开本次阻塞(很重要)。

      d、然后在worker安全队列中执行对应工作。(taskQueue的目前在boss和worker中的作用都是为了考虑线程安全,这里采用线程安全队列的目的是为了不直接操作其他线程)

      e、wakenUp.compareAndSet(false, true),这里是考虑并发问题。在本线程运行的时候,其他线程处于等待状态。这里也是为了线程安全考虑。

      4)NettyWork(服务客人的服务员,第二种selector)

    复制代码
    package com.troy.application.netty;
    

    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Queue;
    import java.util.Set;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.Executor;
    import java.util.concurrent.atomic.AtomicBoolean;

    public class NettyWork {
    //线程池
    public final Executor executor;
    //boss选择器
    protected Selector selector;
    //原子变量,主要是用来保护线程安全。当本线程执行的时候,排除其他线程的执行
    protected final AtomicBoolean wakenUp = new AtomicBoolean();
    //队列,线程安全队列。
    public final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<>();

    </span><span style="color: #008000">//</span><span style="color: #008000">初始化</span>
    <span style="color: #0000ff">public</span><span style="color: #000000"> NettyWork(Executor executor) {
        </span><span style="color: #0000ff">this</span>.executor =<span style="color: #000000"> executor;
        </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
            </span><span style="color: #008000">//</span><span style="color: #008000">每一个work也需要一个选择器用来管理通道</span>
            <span style="color: #0000ff">this</span>.selector =<span style="color: #000000"> Selector.open();
        } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (IOException e) {
            e.printStackTrace();
        }
        </span><span style="color: #008000">//</span><span style="color: #008000">从线程池中获取一个线程开始执行</span>
        executor.execute(() -&gt;<span style="color: #000000"> {
            </span><span style="color: #0000ff">while</span> (<span style="color: #0000ff">true</span><span style="color: #000000">) {
                </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
                    </span><span style="color: #008000">//</span><span style="color: #008000">阻塞状态排除问题</span>
                    wakenUp.set(<span style="color: #0000ff">false</span><span style="color: #000000">);
                    </span><span style="color: #008000">//</span><span style="color: #008000">阻塞</span>
    

    selector.select();
    //处理work任务
    while (true) {
    final Runnable task = taskQueue.poll();
    if (task == null) {
    break;
    }
    //存在work任务开始执行
    task.run();
    }
    //处理任务
    this.process(selector);
    }
    catch (Exception e) {
    e.printStackTrace();
    }
    }
    });
    }

    </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span> process(Selector selector) <span style="color: #0000ff">throws</span><span style="color: #000000"> IOException {
        Set</span>&lt;SelectionKey&gt; selectedKeys =<span style="color: #000000"> selector.selectedKeys();
        </span><span style="color: #0000ff">if</span><span style="color: #000000"> (selectedKeys.isEmpty()) {
            </span><span style="color: #0000ff">return</span><span style="color: #000000">;
        }
        Iterator</span>&lt;SelectionKey&gt; ite = <span style="color: #0000ff">this</span><span style="color: #000000">.selector.selectedKeys().iterator();
        </span><span style="color: #0000ff">while</span><span style="color: #000000"> (ite.hasNext()) {
            SelectionKey key </span>=<span style="color: #000000"> (SelectionKey) ite.next();
            </span><span style="color: #008000">//</span><span style="color: #008000"> 移除,防止重复处理</span>
    

    ite.remove();
    // 得到事件发生的Socket通道
    SocketChannel channel = (SocketChannel) key.channel();
    // 数据总长度
    int ret = 0;
    boolean failure = true;
    ByteBuffer buffer
    = ByteBuffer.allocate(1024);
    //读取数据
    try {
    ret
    = channel.read(buffer);
    failure
    = false;
    }
    catch (Exception e) {
    // ignore
    }
    //判断是否连接已断开
    if (ret <= 0 || failure) {
    key.cancel();
    System.out.println(
    "客户端断开连接");
    }
    else{
    System.out.println(
    "收到数据:" + new String(buffer.array()));
    //回写数据
    ByteBuffer outBuffer = ByteBuffer.wrap("收到 ".getBytes());
    channel.write(outBuffer);
    // 将消息回送给客户端
    }
    }
    }
    }

    复制代码

      解释:

      a、worker的执行方式基本上面和boss的方式是一样的,只不够是处理方式不一样

      b、这里需要注意的是,都是考虑线程队列执行。

      3)ThreadHandle(线程处理,这里主要是启动需要的东西)

    复制代码
    package com.troy.application.netty;
    

    import java.net.InetSocketAddress;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.ServerSocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.atomic.AtomicInteger;

    public class ThreadHandle {

    </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">final</span> AtomicInteger bossIndex = <span style="color: #0000ff">new</span><span style="color: #000000"> AtomicInteger();
    </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> NettyBoss[] bosses;
    </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">final</span> AtomicInteger workerIndex = <span style="color: #0000ff">new</span><span style="color: #000000"> AtomicInteger();
    </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span><span style="color: #000000"> NettyWork[] workeres;
    
    </span><span style="color: #0000ff">public</span><span style="color: #000000"> ThreadHandle(ExecutorService boss,ExecutorService work) {
        </span><span style="color: #0000ff">this</span>.bosses = <span style="color: #0000ff">new</span> NettyBoss[1<span style="color: #000000">];
        </span><span style="color: #008000">//</span><span style="color: #008000">初始化boss线程池</span>
        <span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> i = 0; i &lt; bosses.length; i++<span style="color: #000000">) {
            bosses[i] </span>= <span style="color: #0000ff">new</span> NettyBoss(boss,<span style="color: #0000ff">this</span><span style="color: #000000">);
        }
        </span><span style="color: #0000ff">this</span>.workeres = <span style="color: #0000ff">new</span> NettyWork[Runtime.getRuntime().availableProcessors() * 2<span style="color: #000000">];
        </span><span style="color: #008000">//</span><span style="color: #008000">初始化work线程池</span>
        <span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> i = 0; i &lt; workeres.length; i++<span style="color: #000000">) {
            workeres[i] </span>= <span style="color: #0000ff">new</span><span style="color: #000000"> NettyWork(work);
        }
    }
    
    </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span><span style="color: #000000"> bind(InetSocketAddress inetSocketAddress) {
        </span><span style="color: #0000ff">try</span><span style="color: #000000"> {
            </span><span style="color: #008000">//</span><span style="color: #008000"> 获得一个ServerSocket通道</span>
            ServerSocketChannel serverChannel =<span style="color: #000000"> ServerSocketChannel.open();
            </span><span style="color: #008000">//</span><span style="color: #008000"> 设置通道为非阻塞</span>
            serverChannel.configureBlocking(<span style="color: #0000ff">false</span><span style="color: #000000">);
            </span><span style="color: #008000">//</span><span style="color: #008000"> 将该通道对应的ServerSocket绑定到port端口</span>
    

    serverChannel.socket().bind(inetSocketAddress);
    //获取一个boss线程
    NettyBoss nextBoss = bosses[Math.abs(bossIndex.getAndIncrement() % workeres.length)];
    //向boss注册一个ServerSocket通道
    Runnable runnable = () -> {
    try {
    //注册serverChannel到selector
    serverChannel.register(nextBoss.selector, SelectionKey.OP_ACCEPT);
    }
    catch (ClosedChannelException e) {
    e.printStackTrace();
    }
    };
    //加入任务队列
    nextBoss.taskQueue.add(runnable);
    if (nextBoss.selector != null) {
    //排除其他任务处理
    if (nextBoss.wakenUp.compareAndSet(false, true)) {
    //放开阻塞
    nextBoss.selector.wakeup();
    }
    }
    else {
    //移除任务
    nextBoss.taskQueue.remove(runnable);
    }
    }
    catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    复制代码

      解释:

      a、这里采用数组的形式,主要目的是考虑多个看门的,和多个服务客人的线程。为了好控制,好选择,哪一个来执行。

      b、端口的注册,在NettyBoss里面进行初始化的的原理都是一样的。

      4)start

    复制代码
    package com.troy.application.netty;
    

    import java.net.InetSocketAddress;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class Start {

    </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">static</span> <span style="color: #0000ff">void</span><span style="color: #000000"> main(String[] args) {
        </span><span style="color: #008000">//</span><span style="color: #008000">声明线程池</span>
        ExecutorService boss =<span style="color: #000000"> Executors.newCachedThreadPool();
        ExecutorService work </span>=<span style="color: #000000"> Executors.newCachedThreadPool();
        </span><span style="color: #008000">//</span><span style="color: #008000">初始化线程池</span>
        ThreadHandle threadHandle = <span style="color: #0000ff">new</span><span style="color: #000000"> ThreadHandle(boss,work);
        </span><span style="color: #008000">//</span><span style="color: #008000">声明端口</span>
        threadHandle.bind(<span style="color: #0000ff">new</span> InetSocketAddress(9000<span style="color: #000000">));
        System.out.println(</span>"start"<span style="color: #000000">);
    }
    

    }

    复制代码

      说明一下流程

      a、初始化boss和work。让boss线程池加入设定第一种boss的selector,并且处于阻塞状态。work的初始化也基本上是一样的,只不过换成了第二种selector线程池,处于阻塞状态。

      b、当线程处理类初始化监听端口的时候。就是选择boss中其中一个selector。声明一个线程先监听,加入boss的线程安全队列中。然后放开boss阻塞,向下执行。线程执行会监听对应端口并阻塞。

      c、当一个客户端接入的时候,boss中的selector会监听到对应端口。然后选择work线程中的一个selector给work分派任务。

      d、最后work中的selector来处理事务。

      4、源码下载:https://pan.baidu.com/s/1pKIxuMf

      5、本代码只是用于理解netty的实现过程,不代表开发思路。其中我为了简化代码,做了很多调整。目的就是压缩代码,方便理解。

  • 相关阅读:
    Innodb存储引擎
    Innodb学习之MySQL体系结构
    C# sql查询数据库返回单个值方法
    Welcome To SWPUNC-ACM
    P2184 贪婪大陆 题解
    线上Java调优-Arthas入门
    JVM调优学习笔记
    RabbitMQ博文收藏
    System.Net.WebException: 远程服务器返回错误: (405) 不允许的方法。
    随机过程-Brown运动
  • 原文地址:https://www.cnblogs.com/jpfss/p/10118306.html
Copyright © 2011-2022 走看看