zoukankan      html  css  js  c++  java
  • 搭建生产级的Netty项目

    Netty是Trustin Lee在2004年开发的一款高性能的网络应用程序框架。相比于JDK自带的NIO,Netty做了相当多的增强,且隔离了jdk nio的实现细节,API也比较友好,还支持流量整形等高级特性。在我们常见的一些开源项目中已经普遍的应用到了Netty,比如Dubbo、Elasticsearch、Zookeeper等。

    Netty的具体开发

    提示:因代码相对较多,这里只展示其主要部分,至于项目中用到的编解码器、工具类,请直接拉到最后下载源码!也欢迎顺手给个Star~

    需要的依赖
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
    </dependency>
    
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
        <groupId>io.dropwizard.metrics</groupId>
        <artifactId>metrics-core</artifactId>
        <version>4.1.1</version>
    </dependency>
    <dependency>
        <groupId>io.dropwizard.metrics</groupId>
        <artifactId>metrics-jmx</artifactId>
        <version>4.1.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.29.Final</version>
    </dependency>
    
    Client端代码
    package com.example.nettydemo.client;
    
    import com.example.nettydemo.client.codec.*;
    import com.example.nettydemo.client.codec.dispatcher.OperationResultFuture;
    import com.example.nettydemo.client.codec.dispatcher.RequestPendingCenter;
    import com.example.nettydemo.client.codec.dispatcher.ResponseDispatcherHandler;
    import com.example.nettydemo.common.RequestMessage;
    import com.example.nettydemo.common.string.StringOperation;
    import com.example.nettydemo.util.IdUtil;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioChannelOption;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    import javax.net.ssl.SSLException;
    import java.util.concurrent.ExecutionException;
    
    public class Client {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException, SSLException {
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
    
            //客户端连接服务器最大允许时间,默认为30s
            bootstrap.option(NioChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000); //10s
    
            NioEventLoopGroup group = new NioEventLoopGroup();
            try {
    
                bootstrap.group(group);
    
                RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
                LoggingHandler loggingHandler = new LoggingHandler(LogLevel.INFO);
    
                bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
    
                        pipeline.addLast(new FrameDecoder());
                        pipeline.addLast(new FrameEncoder());
    
                        pipeline.addLast(new ProtocolEncoder());
                        pipeline.addLast(new ProtocolDecoder());
    
                        pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
                        pipeline.addLast(new OperationToRequestMessageEncoder());
    
    //                    pipeline.addLast(loggingHandler);
    
                    }
                });
    
                //连接服务
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888);
                //因为future是异步执行,所以需要先连接上后,再进行下一步操作
                channelFuture.sync();
    
                long streamId = IdUtil.nextId();
                /**
                 * 发送数据测试,按照定义的规则组装数据
                 */
    //            OrderOperation orderOperation =  new OrderOperation(1001, "你好啊,hi");
                RequestMessage requestMessage = new RequestMessage(streamId, new StringOperation(1234, "你好啊,hi"));
    
                //将future放入center
                OperationResultFuture operationResultFuture = new OperationResultFuture();
                requestPendingCenter.add(streamId, operationResultFuture);
    
                //发送消息
                for (int i = 0; i < 10; i++) {
                    channelFuture.channel().writeAndFlush(requestMessage);
                }
    
                //阻塞等待结果,结果来了之后会调用ResponseDispatcherHandler去set结果
    //            OperationResult operationResult = operationResultFuture.get();
    //            //将结果打印
    //            System.out.println("返回:"+operationResult);
    
                channelFuture.channel().closeFuture().get();
    
            } finally {
                group.shutdownGracefully();
            }
    
        }
    
    }
    
    
    Server端代码
    package com.example.nettydemo.server;
    
    import com.example.nettydemo.server.codec.FrameDecoder;
    import com.example.nettydemo.server.codec.FrameEncoder;
    import com.example.nettydemo.server.codec.ProtocolDecoder;
    import com.example.nettydemo.server.codec.ProtocolEncoder;
    import com.example.nettydemo.server.handler.MetricsHandler;
    import com.example.nettydemo.server.handler.ServerIdleCheckHandler;
    import com.example.nettydemo.server.handler.ServerProcessHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioChannelOption;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.flush.FlushConsolidationHandler;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.traffic.GlobalTrafficShapingHandler;
    import io.netty.util.concurrent.DefaultThreadFactory;
    import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor;
    import lombok.extern.slf4j.Slf4j;
    
    import javax.net.ssl.SSLException;
    import java.security.cert.CertificateException;
    import java.util.concurrent.ExecutionException;
    
    /**
     * netty server 入口
     */
    @Slf4j
    public class Server {
    
    
        public static void main(String... args) throws InterruptedException, ExecutionException, CertificateException, SSLException {
    
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            //设置channel模式,因为是server所以使用NioServerSocketChannel
            serverBootstrap.channel(NioServerSocketChannel.class);
    
            //最大的等待连接数量
            serverBootstrap.option(NioChannelOption.SO_BACKLOG, 1024);
            //设置是否启用 Nagle 算法:用将小的碎片数据连接成更大的报文 来提高发送效率。
            //如果需要发送一些较小的报文,则需要禁用该算法
            serverBootstrap.childOption(NioChannelOption.TCP_NODELAY, true);
    
            //设置netty自带的log,并设置级别
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
    
            //thread
            //用户指定线程名
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
            NioEventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
            UnorderedThreadPoolEventExecutor businessGroup = new UnorderedThreadPoolEventExecutor(10, new DefaultThreadFactory("business"));
    
            //只能使用一个线程,因GlobalTrafficShapingHandler比较轻量级
            NioEventLoopGroup eventLoopGroupForTrafficShaping = new NioEventLoopGroup(0, new DefaultThreadFactory("TS"));
    
            try {
                //设置react方式
                serverBootstrap.group(bossGroup, workGroup);
    
                //metrics
                MetricsHandler metricsHandler = new MetricsHandler();
    
                //trafficShaping流量整形
                //long writeLimit 写入时控制, long readLimit 读取时控制 具体设置看业务修改
                GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(eventLoopGroupForTrafficShaping, 10 * 1024 * 1024, 10 * 1024 * 1024);
    
    
                //log
                LoggingHandler debugLogHandler = new LoggingHandler(LogLevel.DEBUG);
                LoggingHandler infoLogHandler = new LoggingHandler(LogLevel.INFO);
    
                //设置childHandler,按执行顺序放
                serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
    
                        ChannelPipeline pipeline = ch.pipeline();
    
                        pipeline.addLast("debugLog", debugLogHandler);
                        pipeline.addLast("tsHandler", globalTrafficShapingHandler);
                        pipeline.addLast("metricHandler", metricsHandler);
                        pipeline.addLast("idleHandler", new ServerIdleCheckHandler());
    
                        pipeline.addLast("frameDecoder", new FrameDecoder());
                        pipeline.addLast("frameEncoder", new FrameEncoder());
                        pipeline.addLast("protocolDecoder", new ProtocolDecoder());
                        pipeline.addLast("protocolEncoder", new ProtocolEncoder());
    
                        pipeline.addLast("infoLog", infoLogHandler);
                        //对flush增强,减少flush次数牺牲延迟增强吞吐量
                        pipeline.addLast("flushEnhance", new FlushConsolidationHandler(10, true));
                        //为业务处理指定单独的线程池
                        pipeline.addLast(businessGroup, new ServerProcessHandler());//businessGroup,
                    }
                });
    
                //绑定端口并阻塞启动
                ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();
    
                channelFuture.channel().closeFuture().sync();
    
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
                businessGroup.shutdownGracefully();
                eventLoopGroupForTrafficShaping.shutdownGracefully();
            }
    
        }
    
    }
    

    最后

    以上介绍了Netty的基本用法,在代码中也做了一部分的关键注释,但可能还会有许多不足,也不可能满足所有人的要求,大家可根据自己的实际需求去改造此项目。附上源码地址netty源码

    持续学习,记录点滴。更多文章请访问 文章首发

  • 相关阅读:
    提升Android编译速度
    NYOJ 158 省赛来了
    浅谈 ZipArchive 类
    块状元素的text-align对齐属性
    BestCoder Round #2 1001 TIANKENG’s restaurant
    Saltstack运行cmd.run重新启动tomcat后出现日志乱码(15)
    【HRS项目】Axure兴许问题解决---与SVN结合
    软件质量之道:PCLint之中的一个
    字典树 一种高速插入查询数据结构
    【JS】JavaScript引擎的内部执行机制
  • 原文地址:https://www.cnblogs.com/qupengkun/p/12614607.html
Copyright © 2011-2022 走看看