注意
本文的代码,为了学习方便,简化代码复杂度,未考虑拆包、粘包等情况的处理。所以仅供学习使用,不能用于实际环境。
阻塞IO,BIO
Java1.1发布的IO是BIO。阻塞地连接之后,通过流进行同步阻塞地通讯。
同步阻塞连接
因同步阻塞地监听连接,如果服务端只有单线程进程处理,每个请求必须等待前一请求处理完毕才开始处理新请求。
所以,一般情况下,服务端每接收一个请求,可交派给一个线程处理这个请求,这样,在处理环节实现异步。其逻辑图如下:
阻塞通讯
客户端与服务端之间的通讯是通过流进行传输的,而流是单向的、阻塞的,即通讯效率依赖于对方以及网络。其逻辑图如下:
代码实例
服务端:
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nicchagil.ioexercise.Constant;
public class MyHelloServer {
private static Logger logger = LoggerFactory.getLogger(MyHelloServer.class);
public static void main(String[] args) {
try (ServerSocket serverSocket = new ServerSocket(Constant.SERVER_PORT)) {
while (true) {
Socket socket = serverSocket.accept(); // 阻塞接收
new HelloBusinessThread(socket).start();
}
} catch (IOException e) {
logger.error("输入输出异常", e);
// throw new RuntimeException(e);
}
}
}
服务端业务线程:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HelloBusinessThread extends Thread {
Logger logger = LoggerFactory.getLogger(HelloBusinessThread.class);
Socket socket = null;
public HelloBusinessThread(Socket socket) {
super();
this.socket = socket;
}
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String result = bufferedReader.readLine();
logger.info("接收一个请求:" + result);
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println("hello");
logger.info("发送一个响应");
printWriter.flush();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
客户端:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nicchagil.ioexercise.Constant;
public class MyHelloClient {
private static Logger logger = LoggerFactory.getLogger(MyHelloClient.class);
public static void main(String[] args) throws IOException {
try (Socket socket = new Socket(Constant.SERVER_HOST, Constant.SERVER_PORT);) {
OutputStream outputStream = socket.getOutputStream();
PrintWriter printWriter = new PrintWriter(outputStream);
printWriter.println("hi");
logger.info("发送一个请求");
printWriter.flush();
InputStream inputStream = socket.getInputStream();
InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String result = bufferedReader.readLine();
logger.info("收到一个答复:" + result);
} catch (UnknownHostException e) {
logger.error("无法找到此主机", e);
// throw new RuntimeException(e);
} catch (IOException e) {
logger.error("输入输出异常", e);
// throw new RuntimeException(e);
}
System.in.read();
}
}
非阻塞IO,NIO
JDK1.4后,推出了NIO,为非阻塞IO。
其在Unix中依赖select、poll、epoll
调用,在JDK1.5 update10
之后,使用的是epoll
调用。
多路复用器,Selector
多路复用器,通过单线程轮询多条Channel是否就绪,如果就绪,则获取对应的SelectionKey,从中去获取就绪的Channel进行后续的IO操作。
通道与缓冲区,Channel与Buffer
流,是单向的。而通道,是全双工的,即双向的。
缓冲区有3个属性,position、limit、capacity,分别表示位置、限制位、容量。
比如flip(),翻转缓冲区:
public final Buffer flip() {
limit = position; // 将原位置赋予限制位
position = 0; // 位置置0
mark = -1;
return this;
}
加入写入完毕是这样的:
flip()后开始读取是这样的:
比如clear(),清除缓冲区:
public final Buffer clear() {
position = 0; // 位置置0
limit = capacity; // 容量赋予限制位
mark = -1;
return this;
}
比如rewind(),重绕缓冲区:
public final Buffer rewind() {
position = 0; // 位置置0
mark = -1;
return this;
}
代码实例
先写一个工具类:
package com.nicchagil.ioexercise.nio.util;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NIOUtil {
private static Logger logger = LoggerFactory.getLogger(NIOUtil.class);
/**
* 将信息写入通道
*/
public static void writeToChannel(SocketChannel socketChannel, String message) {
if (message == null || message.trim().length() == 0) {
return;
}
byte[] bytes = null;
try {
bytes = message.getBytes("UTF-8"); // 转换为字节数组
} catch (UnsupportedEncodingException e1) {
throw new RuntimeException(e1);
}
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); // 开辟缓冲区
byteBuffer.put(bytes); // 放入缓冲区
byteBuffer.flip(); // 切换读取缓冲区模式
try {
socketChannel.write(byteBuffer); // 写入通道
// logger.info("发送 -> {}", message);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* 读取并转换为String
*/
public static String readToString(SocketChannel socketChannel, SelectionKey selectionKey) throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(64); // 开辟缓冲区
int readByteNum = socketChannel.read(byteBuffer); // 读取数据
/* 有数据 */
if (readByteNum > 0) {
byteBuffer.flip();
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
String message = new String(bytes, "UTF-8");
return message;
}
/* 无数据,无处理 */
if (readByteNum == 0) {
}
/* 小于零,表示连接已关闭 */
if (readByteNum < 0) {
NIOUtil.cancelSelectionKey(selectionKey);
socketChannel.close();
}
return null;
}
/**
* 取消/关闭SelectionKey
*/
public static void cancelSelectionKey(SelectionKey selectionKey) {
if (selectionKey == null) {
return;
}
selectionKey.cancel(); // 取消SelectionKey
if (selectionKey.channel() != null) {
try {
selectionKey.channel().close(); // 关闭通道
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
服务端:
package com.nicchagil.ioexercise.nio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nicchagil.ioexercise.Constant;
import com.nicchagil.ioexercise.nio.util.NIOUtil;
public class NIOServer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public NIOServer() throws Exception {
selector = Selector.open(); // 多路复用器
/* 配置通道 */
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 非阻塞
serverSocketChannel.socket().bind(new InetSocketAddress(Constant.SERVER_HOST, Constant.SERVER_PORT), 512); // 监听的主机、端口,和挂起的最大连接数
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 通道绑定多路复用器,并监听“连接”事件
Set<SelectionKey> selectionKeys = null;
Iterator<SelectionKey> iterator = null;
SelectionKey selectionKey = null;
while (true) {
try {
this.logger.info("polling...");
selector.select(); // 阻塞轮询,当轮询通道中有IO就绪时,返回
selectionKeys = selector.selectedKeys(); // 获取就绪通道的SelectionKey
this.logger.info("当前就绪的通道数 -> {}", selectionKeys.size());
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
iterator.remove();
try {
this.handle(selectionKey); // 处理该通道业务
} catch (IOException e) {
this.logger.error("通道{}出现异常:{}", selectionKey, e);
NIOUtil.cancelSelectionKey(selectionKey);
}
}
} catch (IOException e) {
this.logger.error("多路复用监听出现异常", e);
throw new RuntimeException(e);
}
}
}
/**
* 处理通道业务
*/
public void handle(SelectionKey selectionKey) throws IOException {
if (!selectionKey.isValid()) { // 无效快速失败
this.logger.info("连接无效");
return;
}
/* 连接事件 */
if (selectionKey.isAcceptable()) {
this.accept(selectionKey);
}
/* 读取事件 */
else if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); // 转换为连接Socket
String message = NIOUtil.readToString(socketChannel, selectionKey);
this.logger.info("message -> {}", message);
if (message != null) {
NIOUtil.writeToChannel(socketChannel, "hi, client");
}
}
}
/**
* 接受连接
*/
private void accept(SelectionKey selectionKey) throws IOException {
this.logger.info("开始连接事件");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel(); // 转换为服务器Socket
SocketChannel socketChannel = serverSocketChannel.accept(); // 接收一个连接
socketChannel.configureBlocking(false); // 非阻塞
socketChannel.register(selector, SelectionKey.OP_READ); // 通道绑定多路复用器,并监听“读取”事件
}
public static void main(String[] args) throws Exception {
new NIOServer();
}
}
客户端:
package com.nicchagil.ioexercise.nio.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nicchagil.ioexercise.Constant;
import com.nicchagil.ioexercise.nio.util.NIOUtil;
public class NIOClient {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private Selector selector; // 多路复用器
private SocketChannel socketChannel; // 通道
public NIOClient() throws IOException {
this.selector = Selector.open(); // 多路复用器
/* 配置通道 */
this.socketChannel = SocketChannel.open();
this.socketChannel.configureBlocking(false); // 非阻塞
/* 连接服务器 */
this.connect();
Set<SelectionKey> selectionKeys = null;
Iterator<SelectionKey> iterator = null;
SelectionKey selectionKey = null;
while (true) {
try {
this.selector.select(); // 阻塞轮询,当轮询通道中有IO就绪时,返回
selectionKeys = this.selector.selectedKeys(); // 获取就绪通道的SelectionKey
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
iterator.remove();
try {
this.handle(selectionKey); // 处理该通道业务
} catch (IOException e) {
this.logger.error("通道{}出现异常:{}", selectionKey, e);
NIOUtil.cancelSelectionKey(selectionKey);
}
}
} catch (IOException e) {
this.logger.error("多路复用监听出现异常", e);
throw new RuntimeException(e);
}
}
}
/**
* 连接服务器
*/
public void connect() throws IOException {
boolean connect = this.socketChannel.connect(new InetSocketAddress(Constant.SERVER_HOST, Constant.SERVER_PORT));
if (connect) { // 连接成功
socketChannel.register(selector, SelectionKey.OP_READ);
NIOUtil.writeToChannel(socketChannel, "hi, server");
} else { // 连接失败
this.socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
/**
* 处理通道业务
*/
public void handle(SelectionKey selectionKey) throws IOException {
if (!selectionKey.isValid()) { // 无效快速失败
// this.logger.info("连接无效");
return;
}
/* 连接事件 */
if (selectionKey.isConnectable()) {
this.connect(selectionKey);
}
/* 读取事件 */
else if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); // 转换为连接Socket
String message = NIOUtil.readToString(socketChannel, selectionKey);
this.logger.info("message -> {}", message);
}
}
/**
* 连接
*/
private void connect(SelectionKey selectionKey) throws IOException {
if (socketChannel.finishConnect()) { // 完成连接
socketChannel.register(selector, SelectionKey.OP_READ);
NIOUtil.writeToChannel(socketChannel, "hi, server");
}
}
public static void main(String[] args) throws Exception {
new NIOClient();
}
}
异步IO,AIO
JDK1.7提供了NIO 2.0,包含异步IO,对应UNIX网络编程的AIO。
它是真正的异步非阻塞IO,不需多路复用器轮询,操作完成回调CompletionHandler
接口(从代码上可以看到,有好几个处理器实现CompletionHandler
,比如连接处理器、读取处理器、发送处理器)。
代码实例
首先将一些重复的代码提到一个公用类中:
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AIOUtil {
private static Logger logger = LoggerFactory.getLogger(AIOUtil.class);
/**
* 通过通道发送消息
* @param socketChannel 异步套接字通道
* @param message 消息
*/
public static void write(AsynchronousSocketChannel socketChannel, String message) {
byte[] bytes = message.getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
byteBuffer.put(bytes);
byteBuffer.flip();
socketChannel.write(byteBuffer, byteBuffer, new WriteHandler(socketChannel)); // 写消息
}
/**
* 关闭通道
* @param socketChannel 异步套接字通道
*/
public static void close(AsynchronousSocketChannel socketChannel) {
try {
socketChannel.close();
} catch (IOException e) {
logger.error("关闭套接字通道异常:{}", e);
throw new RuntimeException(e);
}
}
}
服务器入口类:
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import com.nicchagil.ioexercise.Constant;
public class Server {
public static void main(String[] args) throws Exception {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); // 服务器套接字通道
serverSocketChannel.bind(new InetSocketAddress(Constant.SERVER_PORT)); // 监听端口
/*
* 接收一个连接,此方法初始一个异步操作来接收一个连接。
* 处理器参数是一个完成处理器,当接收到连接或连接失败时被回调完成处理器。
* 为了能并发处理新连接,完成处理器并不是被初始线程直接调用。
*/
serverSocketChannel.accept(serverSocketChannel, new AcceptHandler()); // 接收一个连接
System.in.read();
}
}
接收连接处理器:
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 接收连接处理器
*/
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void completed(AsynchronousSocketChannel socketChannel, AsynchronousServerSocketChannel serverSocketChannel) {
serverSocketChannel.accept(serverSocketChannel, this); // 服务器Socket继续接收请求
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
/*
* 从通道中读取字节到缓冲区,此方法初始一个异步读取操作,从通道中读取字节到缓冲区。
* 处理器参数是一个完成处理器,读取完成或失败时被调用。
* 读取的字节数会传递给处理器,如没有可读取的字节,则传递-1。
*/
socketChannel.read(byteBuffer, byteBuffer, new ReadHandler(socketChannel)); // 读取消息
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
this.logger.error("接收连接异常:{}", exc);
}
}
读取消息处理器:
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 读取消息处理器
*/
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private AsynchronousSocketChannel socketChannel;
public ReadHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
/**
* 读取消息
*/
@Override
public void completed(Integer result, ByteBuffer byteBuffer) {
byteBuffer.flip(); // 翻转缓冲区
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
try {
String message = new String(bytes, "UTF-8");
this.logger.info("接收到消息 -> {}", message);
} catch (UnsupportedEncodingException e) {
this.logger.info("接收消息异常:{}", e);
}
// 向客户端发送消息
AIOUtil.write(socketChannel, "hi, client");
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
this.logger.info("接收消息异常");
AIOUtil.close(socketChannel);
}
}
发送消息处理器:
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 发送消息处理器
*/
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private AsynchronousSocketChannel socketChannel;
public WriteHandler(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
socketChannel.write(buffer, buffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
this.logger.info("发送消息异常");
AIOUtil.close(socketChannel);
}
}
Netty,NIO框架
NIO的API太过复杂,开发者极容易出错,所以如果有一个NIO框架对我们项目的开发有好处,而Netty是一个优秀的NIO框架。
代码实例
首先定义了些常量:
public interface Constant {
String HOST = "127.0.0.1";
Integer PORT = 60000;
String DELIMITER = "<!#%&(@$^*)>";
}
服务端与服务端入口:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nicchagil.nettyexercise.common.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
public class MyServer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private Integer port;
public MyServer(Integer port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); // 事件循环群组
try {
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 启动类
serverBootstrap.group(eventLoopGroup) // 指定事件循环群组
.channel(NioServerSocketChannel.class) // 指定通道类型
.localAddress(this.port) // 指定监听端口
.childHandler(new ChannelInitializer<SocketChannel>() { // 指定通道初始化器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
/* 分隔符方式分包 */
ByteBuf delimiterByteBuf = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiterByteBuf)); // 指定单条消息最大长度和分隔符
ch.pipeline().addLast(new StringDecoder());
channelPipeline.addLast(new MyServerHandler()); // 指定数据入站处理器
}
});
ChannelFuture cf = serverBootstrap.bind().sync(); // 服务器同步绑定
cf.channel().closeFuture().sync(); // 关闭服务器通道
} finally {
eventLoopGroup.shutdownGracefully().sync(); // 释放线程池资源
}
}
public static void main(String[] args) throws Exception {
new MyServer(Constant.PORT).start();
}
}
服务端处理器:
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class MyServerHandler extends ChannelInboundHandlerAdapter {
public static AtomicInteger counter = new AtomicInteger();
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.logger.info("通道被激活");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
this.logger.info("第{}次读取信息 -> {}", counter.incrementAndGet(), msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
this.logger.info("读取完成");
ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("ok...", CharsetUtil.UTF_8));
// channelFuture.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
this.logger.error("出现异常 -> {}", throwable);
ctx.close();
}
}
客户端与客户端入口:
import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nicchagil.nettyexercise.common.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class MyClient {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private final String host;
private final int port;
public MyClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(this.host, this.port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MyClientHandler());
}
});
ChannelFuture cf = bootstrap.connect().sync();
cf.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
new MyClient(Constant.HOST, Constant.PORT).start();
System.in.read();
}
}
客户端处理器:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.nicchagil.nettyexercise.common.Constant;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class MyClientHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.logger.info("通道被激活");
for (int i = 0; i < 100; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, Netty, abcdefghijklmnopqrstuvwsyz, 123456" + Constant.DELIMITER, CharsetUtil.UTF_8));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
this.logger.info("读取信息 -> {}", msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
this.logger.info("读取完成");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) throws Exception {
this.logger.error("出现异常 -> {}", throwable);
ctx.close();
}
}