Netty
目前3.0版本属于老版本,4.0.X属于稳定版本,5.0.X因为发生了重大bug基本被废弃。
netty使用自己本身的channelBuffer,更加优化了之前的NIO ByteBuffer。
netty的事件模型可以自己定制基于拦截器的事件模型,缓存支持零拷贝,同时支持编码解码,SSL、HTTP、WebSockets、Google Protocol Buffer Integration,需要环境支持jdk1.6以上就可以。
channelBuffer特点
1 可以自定义
2 可以实现零复制
3 不需要flip()进行复位
4 它通常比ByteBuffer
合并和切片ChannelBuffer
1 在通信层输出数据的时候,对数据进行合并或者切片,netty支持零复制通过channelBuffer方法
线程模型基本介绍
io基本传统模型
基本io模型当高并发的时候线程在服务端进行大量链接产生线程会导致并发容量较高,空闲时也在消耗cpu资源,所有事件处于监听状态。
reactor 事件模型
reactor 大致称呼有: 1 反应器模式 2 分发者模式 3 通知者模式
1 reactor 一个或多个请求同时传递给处理器 的一种模式 基于事件驱动
2 服务器端同时处理多个请求,并且分派到相应的线程进行处理。因此reactor 也叫dispatcher 模式 。
3 reactor 使用了 io复用监听事件,收到事件后进行分发给某个线程。这点就是网络高并发的处理关键
reactor 核心组成
1 reactor在一个单独的线程中运行,负责监听和派发。
2 handler 处理程序执行的事件,属于真正处理事情的,处理程序执行非阻塞。
单reactor单线程实现方式
1 客户端发送请求,单线程reactor 通过select阻塞事件收到客户端的请求,如果第一次accept,如果不是第一次链接调用handler处理业务请求。
2 reactor对象通过select监控客户端发送的请求事件,收到事件后通过diapatch进行分发
3 如果建立的是链接请求则调用acceptor对象创建一个handler处理后续业务
4 如果不是链接事件则调用对应的handler事件 进行分发
5 handler 会完成read---->业务处理----->send完成业务员处理。
单线程的优点 模型简单,没有多线程,进程通信,全部都在一个线程内完成。
缺点性能容易达到瓶颈,hadler处理业务的时候整个进程无法处理其他链接事件,容易达到瓶颈。
单reactor多线程模式
1 Reactor对象通过select 监听客户端请求,通过dispatch分发,如果是链接请求,调用acceptor进行阻塞 ,如果是业务处理请求调用handler进行业务处理 ,其中handler只是业务处理事件的响应,通过read进行业务员测处理分发到wworker线程池,worker线程池进行worker线程业务处理,并且将结果返回给handler通过send发送发回给客户端。
优点:可以充分利用cpu的处理能力
缺点:多线程共享数据复杂,reactor的响应处理在单线程运行。
主从reactor多线程
1 主线程ManReactor通过对1selecto监听事件链接,收到连接事件后对acceptor进行处理
2 当Acceptor处理链接事件后,MainReactor将分配给SubReactor
3 SubReactor 将链接加入到监听队列,并创建handler
4 当事件发生的时候,subReactor 调用对应的handler
5 handler 读取read事件,分发给worker线程
6 worker线程池分配给独立的worker线程去处理业务
7 worker线程在处理之后返回给handler ,handler通过send 返回给客户端
8 一个主的reactor主线程可以对应多个子的SubReactor子线程。
main分配给subReactor ,subReactor 又分配给worker
neety模型
简化版
客户端进入 Boss线程,boss线程通过accept 生成一个ServerChannel 在封装成NioSocketChannel,在由 NioSocketChannel注册到WorkerGroup 对象的selector上 去监听,如果有事件发生,则通过dispather 调配到handler
1 boosgroup 维护一个selector 只关注了是否链接accpet事件
2 当接收到accpet事件 就会获取到对应的socketChannel进一步封装成NioSocketChannel 并且注册到Worker线程的selector监听队列中。
3 当worker监听到事件的时候,就通过分发对应的通道去处理业务。handler 已经加入到通道中
netty模型进阶版
bossGroup在netty中可以有多个NioEventGroup线程,每一个线程对应一个NioEventGroup,通过自旋去监听事件,如果监听到就创建ScoketChannel 然后封装成NioSocketChannel,NioSocketChannel又注册到workerGroup,worker也对应循环,同时也维护了selector 让selector在监听。
netty工作模型详细版
1 netty 抽象出两种线程池,BossGroup BossGroup对应有一个NioEventGroup,而bossGroup 专门接收客户端的链接,
workerGroup负责网络的读写。
2 BossGroup和WorkerGroup 都是对应的NioEventLoopGroup
3 NioEventLoopGroup 是相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环其实是NioEventLoop
4 NioEventLoop 表示一个不断循环执行处理任务的线程,每个NioEventLoop 都有一个selector用于监听绑定在Socket的。网络通讯。
5 NioEventLoopGroup 可以有多个线程的,可以含有多个NioEventLoop,NioEventLoop是可以制定的
6 BossNioEventLoop执行有3步骤
6.1 轮询accept事件
6.2 处理事件,与client建立链接,生成NioSocketChannel 并且注册到NioEventloop的selector上
6.3处理任务队列的队列,即runAllTasks
7 WorkerNioEventLoopGroup
7.1 每一个workerNioEventLoop轮询read write事件。
7.2 处理io事件在NioSocketChannel 上进行处理 。
7.3 处理任务队列的其他队列,即runAllTasks 。
8 workerNioEventLoop 会使用到 pipeline 是管道包含了NioSocketChannel ,pipeline 是获取到NioSocketChannel (通道)类似一个处理器,管道中维护了很多处理器,对各种数据进行处理。
netty DemonTCP服务
netty在服务器监听端口号6688,客户端发送给服务器代码 及其注释
netty在服务器监听端口号6688,客户端发送给服务器代码 及其注释
// server端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyServer {
public static void main(String[] args) throws Exception {
int i = NettyRuntime.availableProcessors(); // cpu 线程核数
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* 1 设置两个线程组
* 2 设备boos线程的管道 ServerSocketChannel
* 3 设置线程队列链接数
* 4 设置链接状态
*/
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
/**
* 1 创建一个通道初始化对象 向这个workerGroup 给 pipline 设置处理器 增加一个handler
* 2 channel 既可以拿到 pipeline 通过pipeline 也可以得到channel
* 3 addLast 向管道最后添加处理器
*/
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline().addLast(new NettyServerHandler());
System.out.println(pipeline);
}
});
System.out.println("服务器已经准备好了");
/**
*绑定端口
*/
ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("监听端口绑定成功!");
}
}
});
/**
* 对关闭通道进行监听
*/
channelFuture.channel().closeFuture().sync();
} finally {
/**
* 优雅关闭
*/
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
// =========================serverHandler 自定义业务逻辑方法=======================
// serverHandler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
/**
* 继承ChannelInboundHandlerAdapter
* 自定义的handler 需要继承规定好的某个ChannelInboundHandlerAdapter 的适配器 不仅仅这一个
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
*
* @param ctx 是一个上下文对象 包含了 pipline 管道 channel 通道 等等各种信息
*
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/**
* 自定义 任务学习
*
*/
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try{
Thread.sleep(20*1000);
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println("读取客户端数据:"+byteBuf.toString(CharsetUtil.UTF_8));
}catch (Exception ex){
ex.printStackTrace();
}
}
});
// 定时任务
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println("读取客户端数据:"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
/**
* Unpooled 非持化的
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("发生异常,关闭通道~");
}
}
// client客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NeetyClient {
public static void main(String[] args) throws InterruptedException {
// 客户端 创建事件循环组
EventLoopGroup loopGroup = new NioEventLoopGroup();
// 使用neety的 包
Bootstrap bootstrap = new Bootstrap();
try{
// 设置参数
bootstrap.group(loopGroup). //设置线程组
channel(NioSocketChannel.class)// k客户端=通道类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler());
}
}) ; // handelr
System.out.println("客户端ok");
/**
* sync 不会让其阻塞
*/
ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8080).sync();
System.out.println(connectFuture);
connectFuture.channel().closeFuture().sync();
}finally {
loopGroup.shutdownGracefully();
}
}
}
//================================clientHandler============================================
/**
* 继承ChannelInboundHandlerAdapter
* 自定义的handler 需要继承规定好的某个ChannelInboundHandlerAdapter 的适配器 不仅仅这一个
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
*
* @param ctx 是一个上下文对象 包含了 pipline 管道 channel 通道 等等各种信息
*
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
/**
* 自定义 任务学习
*
*/
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try{
Thread.sleep(20*1000);
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println("读取客户端数据:"+byteBuf.toString(CharsetUtil.UTF_8));
}catch (Exception ex){
ex.printStackTrace();
}
}
});
// 定时任务
ByteBuf byteBuf = (ByteBuf)msg;
System.out.println("读取客户端数据:"+byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("客户端地址:"+ctx.channel().remoteAddress());
}
/**
* Unpooled 非持化的
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello 客户端",CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("发生异常,关闭通道~");
}
}