zoukankan      html  css  js  c++  java
  • Netty-Mina

    Netty网络应用框架和Mina类似,都封装了Socket。


    Mina入门
    长连接的基本概念
    连接后一直与服务器保持长期连接

    长连接的基本原理
    底层都是基于TCP/IP协议
    通过Socket,ServerSocket与服务器保持连接
    服务端一般使用ServerSocket建立监听,监听客户端与之连接
    客户端使用Socket,指定端口和IP地址,与服务端进行连接

    长连接的意义
    通过长连接,可以实现服务器主动向客户端推送消息。
    通过长连接,可以减少客户端对服务器的轮询,减小服务器的压力
    通信效率远高于HTTP

    Mina的优势
    非常适合C/S架构的通讯框架
    apache出品
    使用非常简单,减小学习成本

    Mina整体讲解
    客户端和服务端通信时主要经过三个步骤:
    1.服务器将数据发送到Session中,2.Session将数据发送到过滤链中,3.过滤链将数据过滤以后才发送到客户端

    Mian核心类讲解
    IOService接口、及其相关子类
    Responsibilities: 职责
    AbstractIoService:默认的方法、默认的成员变量、过滤链、线程池等.

    IOAcceptor接口、及其相关子类,服务器端最重要的类
    //接口继承关系 //支持TCP协议 //UDP协议
    IOService<——IOAcceptor<——SocketAcceptor、DatagramAcceptor
    SocketAcceptor<——NioSocketAcceptor(类、TCP协议的监听器)
    DatagramAcceptor<——NioDatagramAcceptor(类、UDP协议的监听器)

    IOConnector接口/及其相关子类
    //支持TCP协议 //UDP协议
    IOService<——IOConnector<——SocketConnector、DatagramConnector
    SocketConnector<——NioSocketConnector(类)
    DatagramConnector<——DatagramConnector(类)

    Filter接口、及其相关子类
    LoggingFilter: 记录mina所有日志
    ProtocolCodecFilter: 数据转化过滤器
    CompressionFilter: 数据压缩过滤器
    //HTTPS之所以支持加密传输,就是因为它在HTTP协议和TCP协议之间加了一层SSL协议数据加密
    SSLFilter数据加密过滤器
    通过继承IoFilterAdapter自己可以实现过滤器

    IOSession类、
    状态建立后会返回一个 IOSession的对象,之后就可以read()/write()读写数据到服务器,也可以closed掉
    receive buffer size:设置接收数据缓存区大小
    sending buffer size:设置数据发送缓存区大小
    ldel time: 设置状态恢复时间
    write timeout: 设置写数据超时时间,等

    Handler类、应用层较重要类、所有业务逻辑都要在Handler中完成
    sessionCreated/sessionOper/sessionClosed 事件监听
    messageReceived/messageSend 事件监听
    exceptionCaught 异常监听


    Mina服务器搭建
    调用Mina为我们提供的服务器API。

    IOAcceptor acceptor = new NioSocketAcceptor();
    //添加的日志过滤器
    acceptor.getFilterChain().addLast("logger",new LoggingFilter());
    acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
    //添加事件处理
    acceptor.setHandler( new DemoServerHandler());
    acceptor.getSessionConfig().SetReadBufferSize(2048);设置读缓存区大小
    acceptor.getSessionConfig().SetIdleTime(IdleStatus.BOTH_IDLE,10);//Session的空闲时间
    //监听客户端的链接
    try{
    acceptor.bind(new InetSocketAddress(9123));//指定监听端口
    }catch(Exception e){

    }

    //该类,负责session对象的创建监听以及消息发送和接收的监听
    private static class DemeServerHandler extends IoHandlerAdapter{

    //session创建以后,会调这个方法
    @Override
    public void sessionCreated(IoSession session)throws Exception{
    super.sessionCreated(session);
    }

    //session打开后,会调这个方法
    @Override
    public void sessionOpened(IoSession session)throws Exception{
    super.sessionOpened(session);
    }

    //服务器接收到消息后,会调这个方法
    @Override
    public void messageReceived(IoSession sessionm,Object message)throws Exception{
    super.messageReceived(session,message);

    String str = message.toString();
    Date date = new Date();
    session.write(date.toString());
    System.out.println("接收到的数据:"+str);
    }

    //服务器发送消息后,会调这个方法
    @Override
    public void messageSent(IoSession session,Object message)throws Exception{
    super.messageSent(session,message);
    }

    //session关闭后,会调这个方法
    @Override
    public void sessionClosed(IoSession session)throws Exception{
    super.sessionClosed(session);
    }

    }

    Mina客户端搭建
    创建一个Service,用来与远程服务器连接

    封装一个ConnectionManager类来提供与服务器的连接与断开方法

    在Service中启动线程,调用ConnectionManager完成连接的创建

    //构建者模式,
    public class ConnectionConfig{
    private Context context;
    private String ip;
    private int port;
    private int readBufferSize;
    private long connectionTimeout;

    public static class Builder{
    private Context context;
    private String ip = "192.168.1.16";
    private int port = 9123;
    private int readBufferSize = 10240;
    private long connectionTimeout = 10000;

    public Builder(Context context){
    this.context = context;
    }
    //构建者标准写法
    public Builder setIP(String ip){
    this.ip = ip;
    return this;
    }
    public Builder setPort(int port){
    this.port = port;
    return this;
    }
    public Builder setReadBufferSize(int size){
    this.readBufferSize = size;
    return this;
    }
    public Builder setConnectionTimeout(int timeout){
    this.connectionTimeout = timeout;
    return this;
    }

    private void applyConfig(ConnectionConfig config){
    config.context = this.context;
    config.ip = this.ip;
    config.port = this.port;
    config.connectionTimeout = this.connectionTimeout;
    }

    public ConnectionConfig builder(){
    ConnectionConfig config = new ConnectionConfig();
    applyConfig(config);
    return config;
    }
    }
    }

    public class ConnectionManager{

    public static final String BROADCAST_ACTION = "com.commonlibrary.mina";
    public static final String MESSAGE = "message";

    //动态的为连接配置、是构建者模式、更为灵活
    private ConnectionConfig mConfig;
    //确保不会内存溢出、用弱引用进行包装
    private WeakReference<Context> mContext;
    //连接对象
    private NioSocketConnector mConnection;
    //session对象
    private IoSession mSession;
    //服务器地址
    private InetSocketAddress mAddress;

    public ConnectionManager(ConnectionConfig config){
    this.mConfig = config;
    this.mContext = new WeakReference<Context>(config.getContext());
    init();
    }

    private void init(){//初始化方法
    mAddress = new InetSocketAddress(mConfig.getIp(),mConfig.getPort());
    mConnection = new NioSocketConnector();
    //配置参数
    mConnection.getSessionConfig().setReadBufferSize(mConfig.getReadBufferSize());
    //配置过滤器
    mConnection.getFilterChain().addLast("logging", new LoggingFilter());
    mConnection.getFilterChain().addLast("codec", new ProtocolCodecFilter(
    new ObjectSerializationCodecFactory()));
    //业务处理类Handler
    mConnection.setHandler(new DefaultHandler(mContext.get()));
    }

    public boolean connect(){//连接方法
    try{
    ConnectFuture future = mConnection.connect();
    future.awaitUniterruptibly();
    mSession = future.getSession();
    }catch(Exception e){
    return false;
    }
    return mSession == null ? false : true;

    }
    public void disConnection(){//断开连接方法
    mConnection.dispose();
    mConnection = null;
    mSession = null;
    mAddress = null;
    mContext = null;
    }

    private static class DefultHandler extends IoHandlerAdapter{
    private Context mContext;

    DefaultHandler(Context context){
    this.mContext = context
    }

    @Override
    public void sessionOpened(IoSession session)throws Exception{
    //将我们的session保存到我的session manager类中,
    }

    @Override
    public void messageReceived(IoSession session,Object message)throws Exception{
    if (mContext != null){
    //局部广播、安全
    Intent intent = new Intent(BROADCAST_ACTION);
    intent.putExtra(MESSAGE,message.toString());
    LocalBroadcastManager.getInstance(mContext).sendBroadcast(intent);
    }
    }
    }


    }

    public class minaService extends Service{

    private ConnectionThread thread;

    @Oversion
    public void onCreate(){
    super.onCreate();
    thread = new ConnectionThread("mina",getApplicationContext());//创建时初始化线程
    thread.start();//打开
    }

    @Oversion
    public int onStartCommand(Intent intent,int flags,int startId){
    return super.onStartCommand(intent,flags,startId);
    }

    @Oversion
    public void onDestroy(){//关闭释放掉
    super.onDestroy();
    thread.disConnection();
    }
    @Nullable
    @Oversion
    public IBinder onBind(Intent intent){
    return null;
    }
    //线程类,负责调用connection manager类来完成与服务器的连接
    public class ConnectionThread extends HandlerThread{
    private Context context;
    boolean isConnection;
    ConnectionManager mManager;

    ConnectionThread (String name,Context context){
    super(name);
    this.context = context;
    ConnectionConfig_config = new ConnectionConfig().Builder(context)
    .setIP("192.168.1.16")
    .setPort(9123)
    .setReadBufferSize(10240)
    .setConnectionTimeout(10000).builder();
    }

    @Override
    protected void onLooperPrepared(){/开始连接服务器
    for(;;){
    isConnection = mManager.connect();//完成服务器的连接
    if(isConnection){
    break;
    }
    try{
    Threaddd.sleep(3000);
    }catch(Exception e){

    }
    }
    }
    @Override
    public void disConnection(){//断开连接
    mManager.disConnection();//完成服务器的断开
    }

    }
    }

    Mina客户端与服务器通信

    public class SessionManager{
    private static SessionManager mInstance = null;
    private IoSession mSession;//与服务器通信的对象

    public static SessionManager getInstance(){
    if(mInstance == null){
    synchronized(SessionManager.class){
    if(mInstance == null){
    mInstance = new SessionManager();
    }
    }
    }
    return mInstance;
    }
    private SessionManager(){}

    public void setSession(IoSession session){this.mSession = session;}

    public void writeToServer(Object msg){//将对象写到服务器
    if(mSesssion != null){
    mSession.write(msg);
    }
    }

    public void closeSession(){
    if(mSession != null){
    mSession.closeOnFlush();
    }
    }

    public void removeSession(){this.mSession = null;}
    }

    //Mina测试类
    public class MinaTestActivity extends BaseActivity implements View.OnClickListener{
    @Bind(R.id.start_servie_view)
    protected TextView mConnectView;
    @Bind(R.id.send_view)
    protected TextView mSendView;

    //自定义了一个广播接收器
    private MessageBroadcastReveiver receiver = new MessageBroadcastReveiver();

    @Override
    protected void onCreate(Bundle savedInstanceState){
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_mina_layout);

    initButterknife();
    initView();
    registerBroadcast();注册广播接收器
    }

    private void registerBroadcast(){
    IntentFilter filter = new IntentFilter("com.commonlibrary.mina");
    LocalBroadcastManager.getInstance(this)
    .registerReceiver(receiver,filter);

    }

    private void unregisterBroadcast(){
    LocalBroadcastManager.getInstance(this)
    .unregisterReceiver(receiver);
    }

    private void initView(){
    mConnectView.setOnClickListener(this);
    mSendView.setOnClickListener(this);
    }

    @Override
    protected void onDestroy(){
    super.onDestroy();
    stopService(new Intent(this,MinaService.class));//关闭,MianService
    unregisterBroadcast()//清空动态注册的广播接收器
    }

    @Overrid
    public void onClick(View v){
    switch(v.getId()){
    case R.id.send_view:
    SessionManager.getInstance().writeToServer("123");发送
    break;
    case R.id.start_servie_view:
    Intent intent = new Intent(this,MinaService.class);启动
    startService(intent);
    break;
    }
    }
    //接收mina发送来的消息,并更新UI
    private class MessageBroadcastReveiver extends BroadcastReceiver{
    @Override
    public void onReceive(Context context,Intent,intent){
    setTitle(intent.getStringExtra("message"));
    }
    }
    }


    -----------------------------------------------------------------------------------------------------

    不错的搭建案例:https://baijiahao.baidu.com/s?id=1654057194889667404&wfr=spider&for=pc,ID:java的架构师技术栈

    阿里巴巴的分布式服务框架Dubbo,底层通讯框架使用了Netty

    Netty介绍
    高性能事件驱动、异步非堵塞,Jboss开源,Java所写
    支持http,webSocket,破肉的霸服,百纳瑞,UDP,TCP
    基于NIO的客户端,服务端编程框架
    Netty使用场景
    多线程并发领域
    大数据领域
    异步通信领域

    IO通信
    BIO: 客户端的个数和服务端的个数相同。 阻塞同步的
    伪异步IO:线程池负责连接,线程池阻塞。 阻塞同步的
    NIO: 缓存区Buffer对象,任何操作都是:读到缓冲区和写到缓冲, 非阻塞同步的
    通道Channel,读,写,可以二者同时进行,
    多路复用器Selector,会不断的轮询注册的'拆No'。

    AIO: 主动通知程序,读写方法异步,连接注册读写事件和回调函数 非阻塞异步的

    Netty入门
    API简单,入门门槛低,性能高,成熟稳定

    WebSocket入门
    H5提出的的协议规范,
    握手机制
    解决客户端与服务端实时通讯技术
    服务器主动传送数据给客户端

    WebSocket建立连接:1.客户端发起握手请求,2.服务端响应请求,3.建立连接

    WebSocket生命周期:1打开事件,2.消息事件,3.错误事件,4.关闭事件

    WebSocket关闭连接:两种关闭方式:1.服务器关闭底层TCP连接,2.客户端发起TCP Close

    Netty实现WebSocket通信案例
    Netty开发服务端:
    /**
    * 存储整个工程的全局配置 类
    * @author AAA
    *
    */
    public class NettyConfig {
    private static final String GlobalEventExecutor = null;
    /**
    * 存储每一个客户端接入进来时的channel对象
    */
    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }


    /**
    *接收/处理/响应客户端websocket请求的核心业务处理类
    * @author AAA
    *
    */
    public class MyWebSocketHandler extends SimpleChannelInboundHandler<Object> {

    private WebSocketServerHandshaker handshaker;
    private static final String WEB_SOCKET_URL="ws://localhost:8080/";
    //客户端与服务端创建连接的时候调用
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    NettyConfig.group.add(ctx.channel());
    System.out.println("客户端与服务端连接开启");
    }

    //客户端与服务端断开连接的时候调用
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    NettyConfig.group.remove(ctx.channel());
    System.out.println("客户端与服务端连接关闭");
    }

    //服务端接收客户端发送过来的结束之后调用
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    ctx.flush();
    }

    //工程出现异常的时候调用
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    ctx.close();
    }

    //服务端处理客户端websocket请求的核心方法
    @Override
    protected void messageReceived()(ChannelHandlerContext context,Object msg) throws Exception {

    if(msg instanceof FullHttpRequest){//处理客户端向服务端发起http握手请求的业务
    handHttpRequest(context,(FullHttpRequest) msg);//调用 客户端向服务端发起http握手请求的业务
    }else if(msg instanceof WebSocketFrame){//处理websocket连接业务
    handWebsocketFrame(context,(WebSocketFrame) msg);
    }

    }

    //处理客户端与服务端之前的websocket业务
    private void handWebsocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame){

    //判断是否是关闭websocket指令
    if(frame instanceof CloseWebSocketFrame){
    handshaker.close(ctx.channel(),(CloseWebSocketFrame) frame.retain());
    }

    //判断是否是ping消息
    if(frame instanceof CloseWebSocketFrame){
    ctx.channel().write(new PingWebSocketFrame(frame.content().retain()));
    }

    //判断是否是二进制消息,如果是二进制消息,抛出异常
    if(!(frame instanceof TextWebSocketFrame)){
    System.out.println("目前我们不支持二进制消息");
    throw new RuntimeException("{"+this.getClass().getName()+"},不支持消息");
    }
    //返回应答消息
    String request = ((TextWebSocketFrame) frame).text();
    System.out.println("服务端收到客户端的消息");
    TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
    +ctx.channel().id()
    //群发,服务端向每个连接上来的客户端群发消息
    NettyConfig.group.writeAndFlush(tws);

    }

    //处理客户端向服务端发起http握手请求的业务
    private void handHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){
    if(!req.getDecoderResult().isSuccess()||!("websocket".equals(req.headers().get("upgrade")))){
    sendHttResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
    return;
    }
    WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory
    (WEB_SOCKET_URL, null, false);
    handshaker = wsFactory.newHandshaker(req);
    if(handshaker == null){
    WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel())
    }else{
    handshaker.handshake(ctx.channel(), req);
    }
    }

    //服务端向客户端响应信息
    private void sendHttResponse(ChannelHandlerContext ctx,FullHttpRequest req,
    DefaultFullHttpResponse res){
    if(res.getStatus().code() != 200){
    ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),CharsetUtil.UTF_8);
    res.content().writeBytes(buf);
    buf.release();
    }
    //服务端向客户端发送数据
    ChannelFuture f = ctx.channel().writeAndFlush(res);
    if(res.getStatus().code() != 200){
    f.addListener(ChannelFutureListener.CLOSE);
    }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext arg0, Object arg1) throws Exception {
    // TODO Auto-generated method stub

    }

    }


    /**
    * 初始化连接时,加载各个组件
    * @author AAA
    *
    */
    public class MyWebSocketChannelHandler extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel e) throws Exception {

    e.pipeline().addLast("http-codec",new HttpServerCodec());
    e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
    e.pipeline().addLast("http-chunked",new ChunkedWriteHandler());
    e.pipeline().addLast("handler",new MyWebSocketHandler());


    }

    }


    /**
    * WebSocket启动类
    * @author AAA
    *
    */
    public class Main {
    public static void main(String[] args) {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workGroup = new NioEventLoopGroup()
    try{
    ServerBootstrap b =new ServerBootstrap();
    b.group(bossGroup,workGroup);
    b.channel(NioSctpServerChannel.class);
    b.childHandler(new MyWebSocketChannelHandler());
    System.out.println("服务端开启等待客户端连接。。。");
    Channel ch = b.bind(8888).sync().channel();
    ch.closeFuture().sync();

    }catch(Exception e ){
    e.printStackTrace();
    }finally {
    bossGroup.shutdownGracefully();
    workGroup.shutdownGracefully();
    }


    }
    }

    HTML开发客户端:

    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
    <title>WebSocket客户端</title>
    <script type="text/javascript">
    var socket;
    if(!window.WebSocket){
    window.WebSocket = window.MozWebSocket;
    }
    if(window.WebSocket){
    socket = new WebSocket("ws://localhost:8888/websocket");
    socket.onmessage = function(event){
    var ta = document.getElementById('responseContent');
    ta.value += event.data + " ";
    };

    socket.onopen = function(event){
    var ta = document.getElementById('responseContent');
    ta.value = "你当前的浏览器支持WebSocket,请继续操作 ";
    };

    socket.onclose = function(event){
    var ta = document.getElementById('responseContent');
    ta.value = null;
    ta.value = "WebSocket连接已关闭 ";
    };
    }else{
    alert("您的浏览器不支持WebSocket");
    }

    function send(message){
    if(!window.WebSocket){
    return;
    }
    if(socket.readyState == WebSocket.OPEN){
    socket.send(message);
    }else{
    alert("WebSocket连接没有建立成功");
    }
    }
    </script>
    </head>
    <body>
    <form onsubmit="return false;">
    <input type="text" name="message" value=""/>
    <br/><br/>
    <input type="button" value="发送WebSocket请求消息" onclick="send(this.form.message.value)">
    <hr color="red">
    <h2>客户端接收到服务端返回的应答消息</h2>
    <textarea id="responseContent" style="1024px;height:300px"></textarea>
    </form>
    </body>
    </html>


    实现服务端与客户端的实时交互

    ---------------------------------------------------------------------------------------------------------
    Java读源码之Netty深入剖析
    Netty是什么?
    异步事件驱动框架,用于快速开发高性能客户端和服务端

    封装了JDK底层BIO和NIO模型,提供高度可以的API

    自带编解码器解决拆包粘包问题,用户只要关心业务逻辑

    精心设计的reactor线程模型支持高并发海量连接

    自带何种协议栈让你处理任何一种通用协议都几乎不用亲自动手

    Netty基本组件

    服务端首先去监听服务端口、客户端去连接服务端、服务端每次写数据、每隔五秒写一次

    监听端口 NioEventLoop:新连接的接入、连接当前存在的数据流读写
    新连接 Channel: 读写操作
    接收数据 ByteBuf: 服务端接收客户端数据
    业务逻辑 ChannelHandler:
    发送数据
    业务逻辑 ChannelHanler:


    Netty和Mina的对比:
    1、都是Trustin Lee的作品,Netty更晚;
    2、Mina将内核和一些特性的联系过于紧密,使得用户在不需要这些特性的时候无法脱离,相比下性能会有所下降,Netty解决了这个设计问题;
    3、Netty的文档更清晰,很多Mina的特性在Netty里都有;
    4、Netty更新周期更短,新版本的发布比较快;
    5、它们的架构差别不大,Mina靠apache生存,而Netty靠jboss,和jboss的结合度非常高,Netty有对google protocal buf的支持,有更完整的ioc容器支持(spring,guice,jbossmc和osgi);
    6、Netty比Mina使用起来更简单,Netty里你可以自定义的处理upstream events 或/和 downstream events,可以使用decoder和encoder来解码和编码发送内容;
    7、Netty和Mina在处理UDP时有一些不同,Netty将UDP无连接的特性暴露出来;而Mina对UDP进行了高级层次的抽象,可以把UDP当成"面向连接"的协议,而要Netty做到这一点比较困难。

    Netty服务端启动
    分为4个过程:1.创建服务端Channel
    bind()用户代码入口
    initAndRegister()初始化并注册
    newChannel()创建服务端channel
    //反射创建服务端Channel
    newSocket()通过jdk来创建底层jdk channel
    NioServerSocketChannelConfig()tcp参数配置类
    AbstractNioChannel()
    configureBlocking(false)(非)阻塞模式
    AbstractChannel()创建id、unsafe、pipeline(重点、用于逻辑处理)
    2.初始化服务端Channel
    init()初始化入口
    set ChannelOptions、ChannelAttrs
    set ChildOptions、ChlidAttrs
    config handler配置服务端pipeline
    add ServerBootstrapAcceptor添加连接器
    3.注册selector
    AbstractChannel.register(channel)入口
    this.eventLoop = eventLoop绑定线程
    resgiterO()实际注册
    doRegister()调用jdk底层注册
    invokeHandlerAddedIfNeeded()事件的回调
    fireChannelRegistered()传播事件
    4.端口绑定
    AbstractUnsafe.bind()入口
    doBind()
    JavaChannel.bing()jdk底层绑定
    pipeline.fireChannelActive()传播事件
    HeadContext.readIfIsAutoRead()重新绑定新连接

    服务端启动核心路径总结:newChannel()——》init()——》doBind()——》register()

    默认情况下,Netty服务端器多少线程?何时启动?
    Netty是如何解决jdk空轮训bug的?
    Netty如何保证异步串行无锁化?

    NioEventLoop创建
    new NioEventLoopGroup()线程组,默认2*cpu
    new ThreadPerTaskExecutor()线程创建器
    for(){newChild()}构造NioEventLoop
    chooserFactory.newChooser()线程选择器


    ThreadPerTaskExecuteor线程执行器
    //每次执行任务都会创建一个线程实体
    //NioEventLoop线程命名规则nioEventLoop-1-xx

    newchild()
    //保存线程执行器ThreadPerTaskExecutor
    //创建一个MpscQueue
    //创建一个selector

    chooserFactory.newChooser()
    //为新连接绑定NioEventLoop


    NioEventLoop启动:分为两种触发器
    服务端启动绑定端口
    bind()->execute(task)入口
    startThread()->doStartThread()创建线程
    ThreadPerTaskExecutor.execute()
    thread = Thread.currentThread()
    NioEventLoop.run启动(Neety运转的核心)
    run()-> for(;;)
    select()检查是否有io事件
    //deadline以及任务穿插逻辑处理
    //阻塞是select
    //避免jdk空轮询的bug
    processSelectedKeys()处理io事件
    //selected keySet优化
    //processSelectedKeysOptimized()对key做处理
    runAllTasks()(执行逻辑)处理异步任务队列
    //task的分类和添加
    //任务的聚合
    //任务的执行
    新连接接入通过chooser绑定一个NioEventLoop

    第五章 Netty新连接接入

    两个问题:
    Netty是在哪里检测有新连接接入的?
    新连接是怎样注册到NioEventLoop线程的?

    Netty新连接接入处理逻辑:
    检测新连接
    processSelectedKey(key,channel)入口
    NioMessageUnsafe.read()
    doReadMessages()while循环
    javaChannel.accept();
    创建NioSocketChannel
    new NioSocketChannel(parent,ch)入口
    AbstractNioByteChannel(p,ch,op_read)
    configureBlocking(false) & save op
    create id,unsafe,pipeline
    new NioSocketChannelConfig()
    setTcpNoDelay(true)禁止Nagle算法
    Channel的分类
    NioServerSocketChannel
    NioSocketChannel
    Unsafe
    Channel的层级
    AbstractChannel
    AbstractNioChannel
    AbstractNioByteChannel
    NioSocketChannel
    NioByteUnsafe
    NioSocketChannelConfig
    AbstractNioMessageChannel
    NioMessageUnsafe
    NioServerSocketChannel
    NioServerSocketChannelConfig

    分配线程及注册selector
    服务端Channel的pipeline构成
    Head - ServerBootstrapAcceptor - Tail
    ServerBootstrapAcceptor
    //添加childHandler
    //设置options和attrs
    //NioEventLoop并注册selector

    向selector注册读事件
    NioSocketChannel读事件的注册

    第六章pipeline
    三个问题:
    netty是如何判断ChannelHandler类型的?
    对于ChannelHandler的添加应该遵循什么样的顺序?
    用户手动触发事件传播,不动的触发方式有什么的区别?

    pipline初始化:
    pipline在创建Channel的时候被创建
    pipline节点数据结构:ChannelHandlerContext
    pipline中的两大哨兵:head和tail

    添加ChannelHandler
    //判断是否重复添加
    //创建节点并添加至链表
    //回调添加完成事件

    删除ChannelHandler
    //找到节点
    //连接的删除
    //回调删除Handler事件

    inBound事件的传播
    //何为inBound事件以及ChannelInboundHandler
    //ChannelRead事件的传播
    //SimpleInBoundHandler处理器

    outBound事件的创博
    //可谓outBound事件以及ChannelOutBoundHandler
    //write()事件的传播

    异常的传播
    //异常触发链
    //异常处理的最佳实践

    pipeline总结:pipeline的初始化、pipeline的数据结构是双向列表结构
    添加删除ChannelHandler
    默认pipeline中存在两种类型节点:Head和Tail
    三个问题:
    netty是如何判断ChannelHandler类型的?
    对于ChannelHandler的添加应该遵循什么样的顺序?
    用户手动触发事件传播,不同的触发方式有什么样的区别


    第七章ByteBuf(内存分配、最为底层的、
    主要负责把数据从底层读到ByteBuf传递应用程序,处理完后封装为ByteBuf写会到Io)
    三个问题:
    Netty的内存类别有哪些?
    如何减少多线程内存分配之间的竞争?
    不同大小的内存如何进行分配的?

    内存与内存管理器的抽象
    不同规格大小和不同类别的内存的分配策略
    内存的回收过程

    ByteBuf结构:
    0 <= readerIndex <= writerIndex <= capacity
    (读数据) (写数据) (扩容)
    read,write,set方法

    mark和reset方法(保持读或写完数据、后可以返回原样)

    ByteBuf分类:
    AbstractByteBuf
    PooledHeapByteBuf
    UnpooledUnsafeDirectByteBuf
    UnpooledDirectByteBuf
    PooledUnsafeHeapByteBuf
    UnpooledHeapByteBuf
    PooledUnsafeDirectByteBuf
    PooledDirectByteBuf
    UnpooledUnsafeHeapByteBuf
    Pooled和Unpooled(预先分配、直接分配)
    Unsafe和非Unsafe(直接拿到ByteBuf内存、不会依赖JDK底层的Unsafe对象)
    Heap和Direct (在堆上内存分配的、调用JDK的API内存分配的、)

    ByteBufAllocator(内存分配管理器)
    ByteBufAllocator功能:分配内存、在堆上内存分配、对外内存分配
    AbstractByteBufAllocator
    UnpooledByteBufAllocator
    //heap内存的分配
    //direct内存的分配
    PooledByteBufAllocator
    //拿到线程局部缓存PoolThreadCache
    //在线程局部缓存的Area上进行内存分配

    directArena分配direct内存的流程
    //从对象池里面拿到PooledByteBuf进行复用
    //从缓存上进行内存分配
    //从内存堆里面进行内存分配

    内存规格介绍:0、tiny 512B、small 8k、 normal 16M、 huge
    (0--8k SubPage) (Page) (Chunk)

    缓存数据结构:MemoryRegionCache
    queue: chunk handler chunk handler chunk handler
    sizeClass: tiny(0~512B) small(512B~8k) normal(8k~16M)
    size: N*16B 512B、1k、2k、4k 8k、16、32k

    命中缓存的分配流程
    找到对应size的MemoryRegionCache
    从queue中弹出一个entry给ByteBuf初始化
    将弹出的entry扔到对象池进行服用
    arena/chunk /page/subpage
    page级别的内存分配:allocateNormal()
    尝试在现有的chunk上分配
    创建一个chunk进行内存分配
    初始化PooledByteBuf
    subpage级别的内存分配:allocateTiny()
    定位一个Subpage对象
    初始化Subpage
    初始化PooledByteBuf
    ByteBuf的释放:
    连续的内存区段加到缓存
    标记连续的内存区段为未使用
    ByteBuf加到对象池
    总结:
    ByteBuf的api和分类
    分配pooled内存的总步骤
    不同规格的pooled内存分配与释放


    第八章 Netty解码(把二进制数据流解析成一个定义协议的数据包,也就是ByteBuf)

    两个问题:
    解码器抽象的解码过程
    答:(ByteToMessageDecoder)
    netty里面有哪些拆箱即用的解码器
    答:有如下四中!!!

    解码器基类
    ByteToMessageDecoder解码步骤
    累加字节流
    调用子类的decode方法进行解析
    将解析到的ByteBuf向下传播

    Netty中常见的解码器分析
    基于固定长度解码器(netty中较为简单的解码器)
    基于行解码器: LineBasedFrameDecoder
    基于分隔符解码器:DelimiterBasedFrameDecoder
    基于长度域解码器:LengthFieldBasedFrameDecoder
    长度域解码器步骤
    计算需要抽取的数据包长度
    跳过字节逻辑处理
    丢弃模式下的处理

    第九章 Netty编码

    一个问题:
    如何把对象变成字节流,最终写到socket底层?

    write And Flush()
    Head - encoder - ... - biz - Tail
    从tail节点开始往前传播
    逐个调用channelHandler的write方法
    逐个调用channelHandler的flush方法

    编码器处理逻辑:MessageToByteEncoder
    //匹配对象
    //分配内存
    //编码实现
    //释放对象
    //传播数据
    //释放内存

    write-写buffer队列
    direct化ByteBuf(如果不是对外内存、就转换为对外内存)
    插入写队列
    设置写状态
    flush-刷新buffer队列
    添加刷新标志并设置写状态
    遍历buffer队列,过滤ByteBuf
    调用jdk底层api进行自旋写

    第十章 Netty性能优化工具类解析
    两大性能优化工具类:
    FastThreadLocal的实现机制
    FastThreadLocal的创建
    FastThreadLocal的get()方法实现
    获取ThreadLocalMap
    直接通过索引取出对象
    初始化
    FastThreadlocal的set()方法实现
    获取ThreadLocalMap
    直接通过索引set对象

    Recycler(对象池,减少GC的压力、不需要每次new对象、)
    Recycler的创建
    Thread
    ratioMask控制对象回收频率
    maxCapacity池子最大大小
    maxDelayedQueues
    (head)(prev)(cursor )
    availableSharedCapacity 缓存最大的个数
    Recycler获取对象
    获取当前线程的Stack
    从Stack里面弹出对象
    创建对象并绑定到Stack
    回收对象到Recycler
    同线程回收对象

    异线程回收对象:一个对象在一个线程中创建、在另一个线程中回收
    获取WeakOrderQueue
    创建WeakOrderQueue
    将对象追加到WeakOrderQueue
    异线程收割对象

    第十一章 Netty设计模式的应用
    单例模式:ReadTimeoutException/MqttEncoder
    一个类全局只有一个对象
    延迟创建
    避免线程安全问题

    策略模式:DefaultEventExecutorChooserFactory
    封装一系列可相互替换的算法家族
    动态选择某一个策略

    装饰者模式:WrappedByteBuf

    观察者模式:
    观察者和被观察者
    观察者订阅消息,被观察者发布消息
    订阅则能收到,取消订阅则收不到

    迭代器模式:
    迭代器接口
    对容器里面各个对象进行访问

    责任链模式:(使得多个对象都有机会处理同一个对象,)
    责任处理器接口 :ChannelHandler
    创建链、添加删除责任处理器接口 :ChannelPipeline
    上下文 :ChannelHandlerContext
    责任终止机制

    第十二章 Netty高并发性能调优

    单机百万连接调优
    如何模拟百万连接
    /data/centos6.5/server/server.jar
    netty-study mvn package -DskipTests maven打成jar包:
    突破局部文件句柄限制
    ulimit -n
    vi /etc/security/limits.conf添加如下:
    * hard nofile 1000000 (最大文件数)
    * soft nofile 1000000
    exit退出虚拟机
    server vagrant reload重启虚拟机
    server vagrant ssh登录虚拟机
    ./start.sh启动客户端/服务端
    突破全局文件句柄限制
    cat /proc/sys/fs/file-max
    sudo -s
    echo 20000 > /proc/sys/fs/file-max

    sudo vi /etc/sysctl.conf添加如下:
    fs.file-max = 1000000
    sudo sysctl -p

    Netty应用级别性能调优

    完结!

    -----------------------------------------------------------------------------------

    在我看来Channel、Buffer、Selector构成了核心的API
    Java NIO的主要Channel实现:包含UDP/TCP/网络IO/文件IO
    FileChannel
    DatagramChannel
    SocketChannel
    ServerSocketChannel
    Java NIO的主要Buffer的实现:
    ByteBuffer
    CharBuffer
    DoubleBuffer
    FloatBuffer
    intBuffer
    LongBuffer
    ShortBuffer
    MappedByteBuffer,表示内存映射文件,
    S

  • 相关阅读:
    const 函数
    为什么要进行初始化(C语言)
    关于矩阵的逆
    在写论文的参考文献时,有的段落空格很大,有的段落则正常,原因及解决方法(wps)
    C#递归搜索指定目录下的文件或目录
    try catch finally 执行顺序面试题总结
    关于try catch finally的执行顺序解释 都有return
    C# 序列号和反序列化 对象写入bin
    C# 序列化和反序列化 详解 对象和XML文件之间
    在C#中,Json的序列化和反序列化的几种方式总结
  • 原文地址:https://www.cnblogs.com/Bkxk/p/9371747.html
Copyright © 2011-2022 走看看