一 Netty核心组件介绍
1.1、 channel
channel 是一个通道,我们通常说其是一个NIO的构造
1.2、回调
回调本质是一个方法,方法中的参数指向另一个方法的引用;
1.3 、Futrure
通知机制,当方法执行结束时会发一个通知消息;
1.4ChannelHandler
通道处理事件,即一般就是我们的处理业务逻辑的地方;常用的通道处理类 ChannelInboundHandler,SimpleChannelInboundHandler,ChannelHandlerAdapter;不同的通道处理类适配不同的适配器;如下图的处理类或者适配器就是我们常用的类;
二 入门应用
首先需要引入 netty依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.55.Final</version>
</dependency>
2.1、 服务端
- 首选 我们 需要创建一个reactor模型的线程组,这里我们选择的是NIO异步线程
- 其次我们创建一个服务引导类,即服务端辅助启动器ServerBootstrap;在 ServerBootstrap 我们添加线程组NioEventLoopGroup和事件处理ChildChannelHandler;然后将 ServerBootstrap 绑定方法传入的参数 端口,执行 sync同步阻塞;
- 等待同步阻塞完成后,我们调用通道的closeFuture方法和sync将 线程阻塞,直到 处理器执行完成;
- 最后调用线程组的shutdownGracefully 方法释放资源;
public void bind(int port) throws Exception{
// 配置线程组 实质是 reactor线程组
NioEventLoopGroup parentGroup = new NioEventLoopGroup();
// 启动 NIO
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 启动类
serverBootstrap.group(parentGroup)
.channel(NioServerSocketChannel.class)// 相当于 ServerSocketChannel
.option(ChannelOption.SO_BACKLOG,1024)//TCP参数
.childHandler(new ChildChannelHandler());// 处理事件
// 绑定端口 同步阻塞等待同步成功 channelFuture 异步操作通知回调
ChannelFuture channelFuture = serverBootstrap
.bind(port)
.sync();
// 同步阻塞等待服务监听端口关闭
channelFuture
.channel()
.closeFuture()
.sync();
// 关闭资源
parentGroup.shutdownGracefully();
}
其中的关键就是 new ChildChannelHandler()
, 当来一个端口时就会新建一个处理类,保证了监听多个端口的可能性;
/**
* @Author lsc
* <p>通道初始化 </p>
* @Param
* @Return
*/
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//管道(Pipeline)持有某个通道的全部处理器
ChannelPipeline pipeline = socketChannel.pipeline();
// 添加处理器
pipeline.addLast(new NettyServerHandler());
}
}
然后我们看下处理器 ChildChannelHandler,核心方法有三个
- channelRead,读取通道的消息;
- channelReadComplete 读取消息完毕后,执行的回调;
- exceptionCaught 异常出现执行的回调;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 转为字节缓冲区
ByteBuf buf = (ByteBuf)msg;
// 字节数组
byte[] bytes = new byte[buf.readableBytes()];
// 缓冲区数据读入字节数组
buf.readBytes(bytes);
// 编码转为字符串
String body = (new String(bytes, "UTF-8"));
System.out.println(" get the data from client : " + body);
// 构造响应数据
String responseData = "那天刚刚好遇见你";
// 数据写入缓冲区
ByteBuf resp = Unpooled.copiedBuffer(responseData.getBytes());
// 写入数据响应
ChannelFuture channelFuture = ctx.writeAndFlush(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 写入 seocketChannel
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 异常关闭资源句柄
ctx.close();
}
}
2.2 、客户端
- 客户端我们也是创建 NIO线程组 NioEventLoopGroup;
- 使用 Bootstrap 进行辅助启动,在通道初始化的时候传入处理器 NettyClientHandler;
- bootstrap 连接时不止绑定了端口,还要绑定ip;
- 最后阻塞处理器执行完成后关闭资源
public void connect(int port, String host) throws InterruptedException {
// 创建线程组
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
// netty启动辅助类
Bootstrap bootstrap = new Bootstrap();
//
bootstrap.group(nioEventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 处理IO事件
ChannelPipeline pipeline = socketChannel.pipeline();
//
pipeline.addLast(new NettyClientHandler());
}
});
// 异步操作
ChannelFuture connect = bootstrap.connect(host, port).sync();
// 关闭客户端
connect.channel().closeFuture().sync();
// 退出线程组
nioEventLoopGroup.shutdownGracefully();
}
我们再来看下处理器NettyServerHandler
处理器的构造都是大同小异
- exceptionCaught 异常回调;
- channelActive 当与服务端连接成功后被调用的回调;
- channelRead: 通道读取消息的回调;
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public NettyClientHandler() {
super();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.warn("Unexpected exception from downstream : [{}]" ,cause.getMessage());
}
/* *
* @Author lsc
* <p>触发回调 </p>
* @Param [ctx]
* @Return void
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] bytes = "关注公众号知识追寻者回复netty获取本教程源码".getBytes();
// 创建节字缓冲区
ByteBuf message = Unpooled.buffer(bytes.length);
// 将数据写入缓冲区
message.writeBytes(bytes);
// 写入数据
ctx.writeAndFlush(message);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 消息转为 字节缓冲区
ByteBuf buf = (ByteBuf)msg;
// 创建字节数组
byte[] bytes = new byte[buf.readableBytes()];
// 获得响应的数据写入字节数组
buf.readBytes(bytes);
// 字节数组转为字符串a
String body = new String(bytes, "UTF-8");
// 打印
System.out.println("get the data from server: "+body);
}
}
2.3 测试
启动服务端,监听8080端口
public static void main(String[] args) throws Exception {
NettyServer nettyServer = new NettyServer();
// 连接的ip d端口
nettyServer.bind(8080);
}
服务端启动成功
启动客户端,连接服务端,绑定监听端口
public static void main(String[] args) throws Exception {
NettyClient nettyClient = new NettyClient();
// 连接的ip d端口
nettyClient.connect(8080,"127.0.0.1");
}
客户端启动成功,与服务端连接后会收到服务端发的消息
由于服务端与客户端连接成功后,客户端会激活channelActive 方法,故 服务端也收到一条消息;
想获取本套教程源码和后续内容 关注 公众号 知识追寻者 , 后台回复 netty 获取源码;