本节通过案例介绍springboot与netty的集成
第一步:新建Spring Initializr 项目
我这里选择Gradle项目,也可选择Maven项目
(注意:最好选择自己下载gradle,如下图)
然后修改build.gradle文件,加入依赖(需要安装Lombok插件)
plugins { id 'org.springframework.boot' version '2.1.5.RELEASE' id 'java' } apply plugin: 'io.spring.dependency-management' group = 'com.spring.netty' version = '0.0.1-SNAPSHOT' sourceCompatibility = '1.8' targetCompatibility = 1.8 repositories { mavenCentral() } dependencies { compile( "junit:junit:4.12", "io.netty:netty-all:4.1.36.Final", "org.springframework.boot:spring-boot-starter", "org.springframework.boot:spring-boot-starter-test",
"org.projectlombok:lombok:1.18.8"
) }
接下来编写服务端程序
package com.spring.netty.springbootnettydemo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * description: * @since 2019/05/21 **/ @Component public class NettyTcpServer { private static final Logger log = LoggerFactory.getLogger(NettyTcpServer.class); //boss事件轮询线程组 //处理Accept连接事件的线程,这里线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源 private EventLoopGroup boss = new NioEventLoopGroup(1); //worker事件轮询线程组 //处理hadnler的工作线程,其实也就是处理IO读写 。线程数据默认为 CPU 核心数乘以2 private EventLoopGroup worker = new NioEventLoopGroup(); @Autowired ServerChannelInitializer serverChannelInitializer; @Value("${netty.tcp.client.port}") private Integer port; //与客户端建立连接后得到的通道对象 private Channel channel; /** * 存储client的channel * key:ip,value:Channel */ public static Map<String, Channel> map = new ConcurrentHashMap<String, Channel>(); /** * 开启Netty tcp server服务 * * @return */ public ChannelFuture start() { //启动类 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker)//组配置,初始化ServerBootstrap的线程组 .channel(NioServerSocketChannel.class)///构造channel通道工厂//bossGroup的通道,只是负责连接 .childHandler(serverChannelInitializer)//设置通道处理者ChannelHandler////workerGroup的处理器 .option(ChannelOption.SO_BACKLOG, 1024)//socket参数,当服务器请求处理程全满时,用于临时存放已完成三次握手请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。 .childOption(ChannelOption.SO_KEEPALIVE, true);//启用心跳保活机制,tcp,默认2小时发一次心跳 //Future:异步任务的生命周期,可用来获取任务结果 ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//绑定端口,开启监听,同步等待 if (channelFuture1 != null && channelFuture1.isSuccess()) { channel = channelFuture1.channel();//获取通道 log.info("Netty tcp server start success, port = {}", port); } else { log.error("Netty tcp server start fail"); } return channelFuture1; } /** * 停止Netty tcp server服务 */ @PreDestroy public void destroy() { if (channel != null) { channel.close(); } try { Future<?> future = worker.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("netty tcp workerGroup shutdown fail, {}", future.cause()); } Future<?> future1 = boss.shutdownGracefully().await(); if (!future1.isSuccess()) { log.error("netty tcp bossGroup shutdown fail, {}", future1.cause()); } } catch (InterruptedException e) { e.printStackTrace(); } log.info("Netty tcp server shutdown success"); } }
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * description: 通道初始化,主要用于设置各种Handler * @since 2019/05/21 **/ @Component public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired ServerChannelHandler serverChannelHandler; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //IdleStateHandler心跳机制,如果超时触发Handle中userEventTrigger()方法 pipeline.addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); //字符串编解码器 pipeline.addLast( new StringDecoder(), new StringEncoder() ); //自定义Handler pipeline.addLast("serverChannelHandler", serverChannelHandler); } }
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * description: * @since 2019/05/21 **/ @Component @ChannelHandler.Sharable @Slf4j public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> { /** * 拿到传过来的msg数据,开始处理 * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Netty tcp server receive msg : " + msg); ctx.channel().writeAndFlush(" response msg ").syncUninterruptibly(); } /** * 活跃的、有效的通道 * 第一次连接成功后进入的方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); log.info("tcp client " + getRemoteAddress(ctx) + " connect success"); //往channel map中添加channel信息 NettyTcpServer.map.put(getIPString(ctx), ctx.channel()); } /** * 不活动的通道 * 连接丢失后执行的方法(client端可据此实现断线重连) * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //删除Channel Map中的失效Client NettyTcpServer.map.remove(getIPString(ctx)); ctx.close(); } /** * 异常处理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); //发生异常,关闭连接 log.error("引擎 {} 的通道发生异常,即将断开连接", getRemoteAddress(ctx)); ctx.close();//再次建议close } /** * 心跳机制,超时处理 * * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: " + socketString + " READER_IDLE 读超时"); ctx.disconnect();//断开 } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: " + socketString + " WRITER_IDLE 写超时"); ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: " + socketString + " ALL_IDLE 总超时"); ctx.disconnect(); } } } /** * 获取client对象:ip+port * * @param ctx * @return */ public String getRemoteAddress(ChannelHandlerContext ctx) { String socketString = ""; socketString = ctx.channel().remoteAddress().toString(); return socketString; } /** * 获取client的ip * * @param ctx * @return */ public String getIPString(ChannelHandlerContext ctx) { String ipString = ""; String socketString = ctx.channel().remoteAddress().toString(); int colonAt = socketString.indexOf(":"); ipString = socketString.substring(1, colonAt); return ipString; } }
编写客户端代码
package com.spring.netty.springbootnettydemo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * description: * @since 2019/05/21 **/ @Component public class NettyTcpClient { private static final Logger log = LoggerFactory.getLogger(NettyTcpClient.class); @Value(("${netty.tcp.server.host}")) String HOST; @Value("${netty.tcp.server.port}") int PORT; @Autowired ClientChannelInitializer clientChannelInitializer; //与服务端建立连接后得到的通道对象 private Channel channel; /** * 初始化 `Bootstrap` 客户端引导程序 * * @return */ private final Bootstrap getBootstrap() { Bootstrap b = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(); b.group(group) .channel(NioSocketChannel.class)//通道连接者 .handler(clientChannelInitializer)//通道处理者 .option(ChannelOption.SO_KEEPALIVE, true);//心跳报活 return b; } /** * 建立连接,获取连接通道对象 * * @return */ public void connect() { ChannelFuture channelFuture = getBootstrap().connect(HOST, PORT).syncUninterruptibly(); if (channelFuture != null && channelFuture.isSuccess()) { channel = channelFuture.channel(); log.info("connect tcp server host = {}, port = {} success", HOST, PORT); } else { log.error("connect tcp server host = {}, port = {} fail", HOST, PORT); } } /** * 向服务器发送消息 * * @param msg * @throws Exception */ public void sendMsg(Object msg) throws Exception { if (channel != null) { channel.writeAndFlush(msg).sync(); } else { log.warn("消息发送失败,连接尚未建立!"); } } }
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** * description: 通道初始化,主要用于设置各种Handler * @since 2019/05/21 **/ @Component public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired ClientChannelHandler clientChannelHandler; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //IdleStateHandler心跳机制,如果超时触发Handle中userEventTrigger()方法 pipeline.addLast("idleStateHandler", new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); //字符串编解码器 pipeline.addLast( new StringDecoder(), new StringEncoder() ); //自定义Handler pipeline.addLast("clientChannelHandler", clientChannelHandler); } }
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.springframework.stereotype.Component; /** * description: * @since 2019/05/21 **/ @Component @ChannelHandler.Sharable public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> { /** * 从服务器接收到的msg * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Netty tcp client receive msg : " + msg); } }
最后修改springboot启动程序
package com.spring.netty.springbootnettydemo; import io.netty.channel.ChannelFuture; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class SpringbootNettyDemoApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(SpringbootNettyDemoApplication.class, args); } @Autowired NettyTcpServer nettyTcpServer; @Autowired NettyTcpClient nettyTcpClient; @Override public void run(String... args) throws Exception { //启动服务端 ChannelFuture start = nettyTcpServer.start(); //启动客户端,发送数据 nettyTcpClient.connect(); for (int i = 0; i < 10; i++) { nettyTcpClient.sendMsg("hello world" + i); } //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程 start.channel().closeFuture().syncUninterruptibly(); } }
运行测试效果如下:
本节我们通过一个案例将springboot与netty结合实现了异步通信,下节我们将详细介绍Netty的相关知识