zoukankan      html  css  js  c++  java
  • 《精通并发与Netty》学习笔记(08

    本节通过案例介绍springboot与netty的集成

    第一步:新建Spring Initializr 项目

    我这里选择Gradle项目,也可选择Maven项目

    (注意:最好选择自己下载gradle,如下图)

    然后修改build.gradle文件,加入依赖(需要安装Lombok插件)

    plugins {
        id 'org.springframework.boot' version '2.1.5.RELEASE'
        id 'java'
    }
    
    apply plugin: 'io.spring.dependency-management'
    
    group = 'com.spring.netty'
    version = '0.0.1-SNAPSHOT'
    sourceCompatibility = '1.8'
    targetCompatibility = 1.8
    
    repositories {
        mavenCentral()
    }
    
    dependencies {
        compile(
                "junit:junit:4.12",
                "io.netty:netty-all:4.1.36.Final",
                "org.springframework.boot:spring-boot-starter",
                "org.springframework.boot:spring-boot-starter-test",
                "org.projectlombok:lombok:1.18.8"
     ) }

     接下来编写服务端程序

    package com.spring.netty.springbootnettydemo;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.util.concurrent.Future;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PreDestroy;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * description:
     * @since 2019/05/21
     **/
    @Component
    public class NettyTcpServer {
    
        private static final Logger log = LoggerFactory.getLogger(NettyTcpServer.class);
        //boss事件轮询线程组
        //处理Accept连接事件的线程,这里线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源
        private EventLoopGroup boss = new NioEventLoopGroup(1);
        //worker事件轮询线程组
        //处理hadnler的工作线程,其实也就是处理IO读写 。线程数据默认为 CPU 核心数乘以2
        private EventLoopGroup worker = new NioEventLoopGroup();
    
        @Autowired
        ServerChannelInitializer serverChannelInitializer;
    
        @Value("${netty.tcp.client.port}")
        private Integer port;
    
        //与客户端建立连接后得到的通道对象
        private Channel channel;
    
        /**
         * 存储client的channel
         * key:ip,value:Channel
         */
        public static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>();
    
        /**
         * 开启Netty tcp server服务
         *
         * @return
         */
        public ChannelFuture start() {
            //启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)//组配置,初始化ServerBootstrap的线程组
                    .channel(NioServerSocketChannel.class)///构造channel通道工厂//bossGroup的通道,只是负责连接
                    .childHandler(serverChannelInitializer)//设置通道处理者ChannelHandler////workerGroup的处理器
                    .option(ChannelOption.SO_BACKLOG, 1024)//socket参数,当服务器请求处理程全满时,用于临时存放已完成三次握手请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                    .childOption(ChannelOption.SO_KEEPALIVE, true);//启用心跳保活机制,tcp,默认2小时发一次心跳
            //Future:异步任务的生命周期,可用来获取任务结果
            ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//绑定端口,开启监听,同步等待
            if (channelFuture1 != null && channelFuture1.isSuccess()) {
                channel = channelFuture1.channel();//获取通道
                log.info("Netty tcp server start success, port = {}", port);
            } else {
                log.error("Netty tcp server start fail");
            }
            return channelFuture1;
        }
    
        /**
         * 停止Netty tcp server服务
         */
        @PreDestroy
        public void destroy() {
            if (channel != null) {
                channel.close();
            }
            try {
                Future<?> future = worker.shutdownGracefully().await();
                if (!future.isSuccess()) {
                    log.error("netty tcp workerGroup shutdown fail, {}", future.cause());
                }
                Future<?> future1 = boss.shutdownGracefully().await();
                if (!future1.isSuccess()) {
                    log.error("netty tcp bossGroup shutdown fail, {}", future1.cause());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("Netty tcp server shutdown success");
        }
    
    }
    package com.spring.netty.springbootnettydemo;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * description: 通道初始化,主要用于设置各种Handler
     * @since 2019/05/21
     **/
    @Component
    public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Autowired
        ServerChannelHandler serverChannelHandler;
    
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            //IdleStateHandler心跳机制,如果超时触发Handle中userEventTrigger()方法
            pipeline.addLast("idleStateHandler",
                    new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
            //字符串编解码器
            pipeline.addLast(
                    new StringDecoder(),
                    new StringEncoder()
            );
            //自定义Handler
            pipeline.addLast("serverChannelHandler", serverChannelHandler);
        }
    }
    package com.spring.netty.springbootnettydemo;
    
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    /**
     * description:
     * @since 2019/05/21
     **/
    @Component
    @ChannelHandler.Sharable
    @Slf4j
    public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
    
        /**
         * 拿到传过来的msg数据,开始处理
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Netty tcp server receive msg : " + msg);
            ctx.channel().writeAndFlush(" response msg ").syncUninterruptibly();
        }
    
        /**
         * 活跃的、有效的通道
         * 第一次连接成功后进入的方法
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            log.info("tcp client " + getRemoteAddress(ctx) + " connect success");
            //往channel map中添加channel信息
            NettyTcpServer.map.put(getIPString(ctx), ctx.channel());
        }
    
        /**
         * 不活动的通道
         * 连接丢失后执行的方法(client端可据此实现断线重连)
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            //删除Channel Map中的失效Client
            NettyTcpServer.map.remove(getIPString(ctx));
            ctx.close();
        }
    
        /**
         * 异常处理
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);
            //发生异常,关闭连接
            log.error("引擎 {} 的通道发生异常,即将断开连接", getRemoteAddress(ctx));
            ctx.close();//再次建议close
        }
    
        /**
         * 心跳机制,超时处理
         *
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            String socketString = ctx.channel().remoteAddress().toString();
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    log.info("Client: " + socketString + " READER_IDLE 读超时");
                    ctx.disconnect();//断开
                } else if (event.state() == IdleState.WRITER_IDLE) {
                    log.info("Client: " + socketString + " WRITER_IDLE 写超时");
                    ctx.disconnect();
                } else if (event.state() == IdleState.ALL_IDLE) {
                    log.info("Client: " + socketString + " ALL_IDLE 总超时");
                    ctx.disconnect();
                }
            }
        }
    
        /**
         * 获取client对象:ip+port
         *
         * @param ctx
         * @return
         */
        public String getRemoteAddress(ChannelHandlerContext ctx) {
            String socketString = "";
            socketString = ctx.channel().remoteAddress().toString();
            return socketString;
        }
    
        /**
         * 获取client的ip
         *
         * @param ctx
         * @return
         */
        public String getIPString(ChannelHandlerContext ctx) {
            String ipString = "";
            String socketString = ctx.channel().remoteAddress().toString();
            int colonAt = socketString.indexOf(":");
            ipString = socketString.substring(1, colonAt);
            return ipString;
        }
    
    }

    编写客户端代码

    package com.spring.netty.springbootnettydemo;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    /**
     * description:
     * @since 2019/05/21
     **/
    @Component
    public class NettyTcpClient {
    
        private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class);
    
        @Value(("${netty.tcp.server.host}"))
        String HOST;
        @Value("${netty.tcp.server.port}")
        int PORT;
    
        @Autowired
        ClientChannelInitializer clientChannelInitializer;
    
        //与服务端建立连接后得到的通道对象
        private Channel channel;
    
        /**
         * 初始化 `Bootstrap` 客户端引导程序
         *
         * @return
         */
        private final Bootstrap getBootstrap() {
            Bootstrap b = new Bootstrap();
            EventLoopGroup group = new NioEventLoopGroup();
            b.group(group)
                    .channel(NioSocketChannel.class)//通道连接者
                    .handler(clientChannelInitializer)//通道处理者
                    .option(ChannelOption.SO_KEEPALIVE, true);//心跳报活
            return b;
        }
    
        /**
         * 建立连接,获取连接通道对象
         *
         * @return
         */
        public void connect() {
            ChannelFuture channelFuture = getBootstrap().connect(HOST, PORT).syncUninterruptibly();
            if (channelFuture != null && channelFuture.isSuccess()) {
                channel = channelFuture.channel();
                log.info("connect tcp server host = {}, port = {} success", HOST, PORT);
            } else {
                log.error("connect tcp server host = {}, port = {} fail", HOST, PORT);
            }
        }
    
        /**
         * 向服务器发送消息
         *
         * @param msg
         * @throws Exception
         */
        public void sendMsg(Object msg) throws Exception {
            if (channel != null) {
                channel.writeAndFlush(msg).sync();
            } else {
                log.warn("消息发送失败,连接尚未建立!");
            }
        }
    
    }
    package com.spring.netty.springbootnettydemo;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.timeout.IdleStateHandler;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * description: 通道初始化,主要用于设置各种Handler
     * @since 2019/05/21
     **/
    @Component
    public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        @Autowired
        ClientChannelHandler clientChannelHandler;
    
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            //IdleStateHandler心跳机制,如果超时触发Handle中userEventTrigger()方法
            pipeline.addLast("idleStateHandler",
                    new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES));
            //字符串编解码器
            pipeline.addLast(
                    new StringDecoder(),
                    new StringEncoder()
            );
            //自定义Handler
            pipeline.addLast("clientChannelHandler", clientChannelHandler);
        }
    }
    package com.spring.netty.springbootnettydemo;
    
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import org.springframework.stereotype.Component;
    
    /**
     * description:
     * @since 2019/05/21
     **/
    @Component
    @ChannelHandler.Sharable
    public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
    
        /**
         * 从服务器接收到的msg
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Netty tcp client receive msg : " + msg);
        }
    
    }

    最后修改springboot启动程序

    package com.spring.netty.springbootnettydemo;
    
    import io.netty.channel.ChannelFuture;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class SpringbootNettyDemoApplication implements CommandLineRunner {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringbootNettyDemoApplication.class, args);
        }
    
        @Autowired
        NettyTcpServer nettyTcpServer;
        @Autowired
        NettyTcpClient nettyTcpClient;
    
        @Override
        public void run(String... args) throws Exception {
            //启动服务端
            ChannelFuture start = nettyTcpServer.start();
    
            //启动客户端,发送数据
            nettyTcpClient.connect();
            for (int i = 0; i < 10; i++) {
                nettyTcpClient.sendMsg("hello world" + i);
            }
    
            //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程
            start.channel().closeFuture().syncUninterruptibly();
        }
    
    }

    运行测试效果如下:

    本节我们通过一个案例将springboot与netty结合实现了异步通信,下节我们将详细介绍Netty的相关知识

  • 相关阅读:
    华为实习日记——第二十三天
    华为实习日记——第二十二天
    华为实习日记——第二十一天
    华为实习日记——第二十天
    HDU 5102 The K-th Distance(模拟)
    HDU 4113 Construct the Great Wall(插头dp)
    UVALive 4849 String Phone(2-sat、01染色)
    HDU 4859 海岸线(最大流最小割)
    HDU 3879 Base Station(最大权闭合子图)
    POJ 3155 Hard Life(最大密度子图)
  • 原文地址:https://www.cnblogs.com/happy2010/p/10895209.html
Copyright © 2011-2022 走看看