zoukankan      html  css  js  c++  java
  • 基于netty框架的Socket传输

    一、Netty框架介绍

    什么是netty?先看下百度百科的解释:

            Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
    也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。
           “快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMTP,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性

    为什么好多大公司都在使用netty框架?主要是基于netty框架的以下几个特点决定的:

    1)健壮性,2)功能齐全,3)可定制,4)扩展性

    二、框架优点

           传统的RPC性能差,主要是由于客户端和远程调用采用了同步阻塞IO线程,当客户端的并发压力增大后,同步阻塞会由于频繁的等待导致I/O线程堵塞,线程无法高效的工作,IO处理能力自然会降低。影响性能的三个因素:第一,IO模型,IO模型在一定程度上决定了框架的性能。第二、协议,如:HTTP、TCP/IP等,协议选择的不同,性能模型也不同,通常情况下,内部私有协议的性能比较优,这是由于内部设计决定的。第三、线程,数据报文的接收、读取、编码、解码等,线程模型的不同,性能也不同。相比于传统的RPC框架,netty的优点主要体现在以下几个方面:

    1. API使用简单,封装非常完善,开发门槛低
    2. 功能上强大,预置了多种编码解码功能,多种协议支持
    3. 定制能力强,可以对ChannelHandler对通信框架灵活扩展
    4. 性能高,Reactor线程模型调度,ChannelFuture-Listener,通过Listener机制主动推送结果
    5. 版本成熟稳定,社区活跃,版本更新快,出现的Bug会被很快的修复,同时,有心功能的加入,经历了大规模的商业应用考验,质量的到了充分的验证。已经广泛应用到互联网、大数据、企业应用、电信软件、网络游戏等热门行业,他可以满足不同的商业标准。

    三、Netty架构分析

             Netty是一个基于三层网络架构模型的框架,三层网络架构分析包括调度层、链条传递层以及业务逻辑层。

    1. Reactor通信调度层,是一个模型,

    NIO线程池组件{

            监听网络读写连接

            业务调度处理

    NIO,AIO,配合NIO通道NioSocketChannel组件

    }

            Netty通过内部select巡查机制,能够实现IO多路复用,通过把多个IO阻塞复用到同一个select的阻塞上,从而能够使系统即使在单线程的情况下,也能够同时处理多个请求。这样就使得netty实现了IO多路复用的优势,与传统多线程相比,大大减少了系统的开销,因为系统不必创建新的线程和销毁线程了,减少了系统的维护难度,节省了资源。

    ByteBuffer池化支持,不用手动切换标志位,实现零拷贝。传统的Socket读写,基本是使用堆内存进行,即jvm事先会把堆内存拷贝到内存中,然后再写入Socket,而netty采用的是DIRECT BUFFERS,不需要经过jvm内存拷贝,在堆外内存直接进行Socket读写,这样就少了一次缓冲区的内存拷贝,从而实现零拷贝。

           2.Pipleline职责链条传递

           拦截处理向前向后事件,外部传入的消息包对象,有POJO信息抽象,上层也只需要处理逻辑,类似SpringIOC处理BeanDefince。不同的Handler节点的功能也不同,通常情况下需要编码解码等,它可以完成外部协议到内部POJO对象的转化,这样上层只需要关注业务逻辑,不需要知道底层的协议和线程模型,从而实现解耦。

          3.构建逻辑业务处理单元

           底层的协议隔离,上层处理逻辑框架并不需要关心底层协议是什么。Netty框架的分层设计使得开发人员不需要关注协议框架的实现,只需要关注服务层的业务逻辑开发即可,实现了简单化。

           之前有个项目是基于传统Socket和线程池的技术实现的,但是在高并发的时候发现并发能力不足,压测的时候发现TPS达不到理想值,所以经过考虑,决定使用netty框架来解决此问题。同样,netty框架也分为客户端和服务端,经过整理,先写一个demo初探netty框架,下面是代码的实现过程。

    首先是服务端,服务端包含两个方面,第一、服务端Server的主要作用就是通过辅助引导程序,设置NIO的连接方式处理客户端请求,通过绑定特定端口、设定解码方式以及监听来实现整个线程的处理请求;第二、服务端Handler需要继承ChannelInboundHandlerAdapter类,handler类的主要作用是读取客户端数据,处理业务,抛出异常,响应客户端请求。代码如下:

    服务端Server:

    public class Server {
    
    private static Log logger = LogFactory.getLog(Server.class);
    
    private int port;
    
    public Server(int port) {
    
            super();
    
            this.port = port;
    
    }
    
    public  void start(){
    
            ServerBootstrap b = new ServerBootstrap();//引导辅助程序
    
            EventLoopGroup group = new NioEventLoopGroup();//通过nio方式来接收连接和处理请求
    
            try {
    
                   b.group(group);
    
                   b.channel(NioServerSocketChannel.class);//设置nio类型的channnel
    
                   b.localAddress(new InetSocketAddress(port));//设置监听端口
    
                   //b.option(ChannelOption.SO_BACKLOG, 2048);
    
                   b.childHandler(new ChannelInitializer<SocketChannel>() {//有连接到达时会创建一个channel
    
                          @Override
    
                          protected void initChannel(SocketChannel ch) throws Exception {
    
                                 //注册handler
    
                                 ch.pipeline().addLast(new ByteArrayDecoder());
    
                                 ch.pipeline().addLast(new ByteArrayEncoder());
    
                                 ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
    
                                 ch.pipeline().addLast(new ServerHandler());
    
                                
    
                          }
    
                   });//.option(ChannelOption.SO_BACKLOG, 2048).childOption(ChannelOption.SO_KEEPALIVE, true);
    
                   ChannelFuture f = b.bind().sync();//配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功
    
                   logger.info(Server.class.getName()+"开始监听:"+f.channel().localAddress());
    
                   f.channel().closeFuture().sync();//应用程序会一直等待直到channel关闭
    
            } catch (Exception e) {
    
                   e.printStackTrace();
    
            } finally {
    
                   try {
    
                          //关闭EventLoopGroup,释放掉所有资源包括创建的线程
    
                          group.shutdownGracefully().sync();
    
                   } catch (InterruptedException e) {
    
                          e.printStackTrace();
    
                   }
    
            }
    
    }
    
    }
    View Code

    服务端Handler 

    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
    private static Log logger=LogFactory.getLog(ServerHandler.class);
    
    @Override
    
    public void channelActive(ChannelHandlerContext ctx){
    
            logger.info(ctx.channel().localAddress().toString()+"通道活跃....");
    
    }
    
    @Override
    
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    
            logger.error(ctx.channel().localAddress().toString()+"通道不活跃....");
    
    }
    
    /**
    
     *
    
     * 读取客户端传过来的消息
    
     */
    
    @Override
    
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            //业务处理类
    
            logger.info("开始业务处理....");
    
            new SocketController(ctx,msg).run();
    
    }
    
    @Override
    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    
            //出现异常,关闭连
    
            logger.error("服务端出现异常:"+cause.getMessage(),cause);
    
            ctx.close();
    
    }
    
    @Override
    
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
            logger.info("服务端完成请求!");
    
            ctx.flush();
    
    }
    
    }
    View Code

    客户端代码

        客户端主要是用来向服务端发送数据,同样包含两个方面,第一、Client主要通过设定端口和IP和服务器建立连接,进行数据包的编码;第二、ClientHandler 需要继承 SimpleChannelInboundHandler<ByteBuf>类,针对不同的传输方式,继承不同的类,handler类同样处理业务请求,响应服务端的请求。代码如下:

    客户端Client:

    public class Client {
        private static Log logger=LogFactory.getLog(Client.class);
        private String host;
        private int port;
        public Client(String host, int port) {
            super();
            this.host = host;
            this.port = port;
        }
        public void connect(){
            EventLoopGroup workGroup=new NioEventLoopGroup();
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(workGroup);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    logger.info("客户端触发连接......");
                    ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
            //客户端开始连接
            try {
                logger.info("连接到服务器......");
                ChannelFuture future=bootstrap.connect(host,port).sync();
                //等待连接关闭
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }finally{
                workGroup.shutdownGracefully();
            }
        }
    }
    View Code

    客户端Handler:

    public class ClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
        private static Log logger=LogFactory.getLog(ClientHandler.class);
        /**
         * 向服务端发送消息
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            logger.info(ctx.channel().localAddress().toString()+"客户点活跃...");
            //向服务端写字符串
            logger.info("客户端连接服务端,开始发送数据.....");
            String string ="hello server!";
            System.out.println("发送数据为:"+string);
            ByteBuf buf=ctx.alloc().buffer(4*string.length());
            buf.writeBytes(string.getBytes());
            ctx.writeAndFlush(buf);
            logger.info("发送完毕...");
        }
        
        /**
         * 读取服务端返回来的消息
         */
        @Override
        protected void channelRead0(ChannelHandlerContext arg0, ByteBuf in) throws Exception {
            logger.info("开始接受服务端数据");
            byte[] b=new byte[in.readableBytes()];
            in.readBytes(b);
            String string=new String(b);
            logger.info("服务端发送的数据为:"+string);
            in.release();
        }
        
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            logger.info("客户端异常:"+cause.getMessage(),cause);
        }
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            logger.info("客户端完成请求....");
            ctx.flush();    
        }
    }
    View Code

    服务端启动:

    public class ServerMain {
    
    private static Log logger=LogFactory.getLog(ServerMain.class);
    
    private static Server server =new Server(55550);
    
    public static void main(String[] args) {
    
            logger.info("服务端启动.......");
    
            server.start();
    
    }
    
    }
    View Code 

    客户端启动类:

    public class Test {
    
    private static Client client = new Client("127.0.0.1", 55550);
    
    public static void main(String[] args) throws UnknownHostException, IOException {
    
            client.connect();
    
    }
    
    }
    View Code

    测试结果:

    服务端:

     

    客户端:

    总结:

           以上只是一个netty框架初探的小Demo,学习使用netty框架的开始,这里面涉及到了很多的技术以及非常多的组件,比如:Channels、Callbacks、Futures、Events和handlers等等,需要进一步的学习,另外,消息的编码解码、粘包、拆包的方式方法、消息格式的转换以及报文格式大小限制都需要进一步的研究学习。

  • 相关阅读:
    C# 实现 Snowflake算法生成唯一性Id
    kafka可视化客户端工具(Kafka Tool)的基本使用(转)
    docker 安装kafka
    Model类代码生成器
    使用docker 部署rabbitmq 镜像
    Vue 增删改查 demo
    git 提交代码到库
    Android ble蓝牙问题
    mac 配置 ssh 到git (Could not resolve hostname github.com, Failed to connect to github.com port 443 Operation timed out)
    okhttp
  • 原文地址:https://www.cnblogs.com/10158wsj/p/8428347.html
Copyright © 2011-2022 走看看