场景
使用Netty实现简单群聊。服务端实现监控客户端上下线及通知、群聊消息转发。
实现
客户端与服务端使用String类型的消息进行发送与接收,因此客户端与服务端需要首先添加Netty封装的用于网络传输的编码解密处理器,否则将无法成功打印消息。
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//字符串解码器,用于将通过ByteBuf传输的数据转换成String
pipeline.addLast("decoder", new StringDecoder());
//字符串编码器,用于将String编码到ByteBuf中进行网络传输
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new ServerHandler());
}
});
服务端
package others.netty.groupChat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* Netty群聊服务端实现
* 根据不同事件驱动通知客户端 上下线状态,消息转发
*
* @author makeDoBetter
* @version 1.0
* @date 2021/4/25 10:36
* @since JDK 1.8
*/
public class Sever {
private int port;
public Sever(int port) {
this.port = port;
}
public void run(){
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap sever = new ServerBootstrap();
try {
sever.group(boss, worker)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE ,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//字符串解码器,用于将通过ByteBuf传输的数据转换成String
pipeline.addLast("decoder", new StringDecoder());
//字符串编码器,用于将String编码到ByteBuf中进行网络传输
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new ServerHandler());
}
});
ChannelFuture channelFuture = sever.bind(port).sync();
//添加一个监听器
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("服务端启动完成");
} else {
System.out.println("服务端启动失败");
}
}
});
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Sever(1234).run();
}
}
自定义服务处理器,用于处理客户端连接与断开、消息转发等。
package others.netty.groupChat;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* 实现客户端上下线状态、消息通知
*
* @author makeDoBetter
* @version 1.0
* @date 2021/4/25 10:47
* @since JDK 1.8
*/
public class ServerHandler extends SimpleChannelInboundHandler<String> {
/**
* 定义一个全局的单线程的静态变量,用于存储整个连接的客户端集合
*/
private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//不需要遍历,group可以直接处理
group.writeAndFlush("[客户端]" + channel.remoteAddress() + "上线");
group.add(channel);
System.out.println(group.toString());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//不需要手动remove()当前channel,group会自动剔除离线channel
group.writeAndFlush("[客户端]" + channel.remoteAddress() + "下线");
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel self = ctx.channel();
//实现消息的转发,忽略自身
group.forEach(channel -> {
if (self != channel) {
System.out.println(msg + "发送到客户端" + channel.remoteAddress());
channel.writeAndFlush("[客户端]" + self.remoteAddress() + "说:" + msg + "
");
}
});
}
}
客户端
package others.netty.groupChat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
/**
* 客户端实现服务端连接,消息发送、接收打印
*
* @author makeDoBetter
* @version 1.0
* @date 2021/4/25 11:23
* @since JDK 1.8
*/
public class Client {
private String host;
private int port;
public Client(String host, int port) {
this.host = host;
this.port = port;
}
public void run(){
NioEventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
try {
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//字符串解码器,用于将通过ByteBuf传输的数据转换成String
pipeline.addLast("decoder", new StringDecoder());
//字符串编码器,用于将String编码到ByteBuf中进行网络传输
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(new ClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()){
System.out.println("====连接成功=====");
} else {
System.out.println("客户端连接失败");
}
}
});
Channel channel = channelFuture.channel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String line = scanner.nextLine();
channel.writeAndFlush(line);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) {
new Client("127.0.0.1", 1234).run();
}
}
客户端处理程序,打印转发到当前客户端的消息。
package others.netty.groupChat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 客户端读取数据事件
*
* @author makeDoBetter
* @version 1.0
* @date 2021/4/25 11:37
* @since JDK 1.8
*/
public class ClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}