代码基于第二个例子,支持多客户端的连接,在线聊天。
主要思路:
连接建立时,在服务器端,保存channel 对象,当有新的客户端加入时,遍历保存的channel集合,向其他客户端发送加入消息。
当一个客户端发送消息时,在服务器端,遍历channel集合,判断是否为发送者,来修改发送内容,如: XX说: 我说:
同样的:
server中的主程序和第二个例子类似
server中的initializer
1 import io.netty.channel.ChannelInitializer; 2 import io.netty.channel.ChannelPipeline; 3 import io.netty.channel.socket.SocketChannel; 4 import io.netty.handler.codec.DelimiterBasedFrameDecoder; 5 import io.netty.handler.codec.Delimiters; 6 import io.netty.handler.codec.string.StringDecoder; 7 import io.netty.handler.codec.string.StringEncoder; 8 import io.netty.util.CharsetUtil; 9 10 public class MyChatServerInitlalizer extends ChannelInitializer<SocketChannel> { 11 12 @Override 13 protected void initChannel(SocketChannel ch) throws Exception { 14 ChannelPipeline pipeline = ch.pipeline(); 15 //解码器,根据分隔符来分割 16 pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); 17 pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));//编码 18 pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));//解码 19 20 pipeline.addLast(new MyChatServerHandler()); 21 22 } 23 }
server中的handler
1 import io.netty.channel.Channel; 2 import io.netty.channel.ChannelHandlerContext; 3 import io.netty.channel.SimpleChannelInboundHandler; 4 import io.netty.channel.group.ChannelGroup; 5 import io.netty.channel.group.DefaultChannelGroup; 6 import io.netty.util.concurrent.GlobalEventExecutor; 7 8 public class MyChatServerHandler extends SimpleChannelInboundHandler<String> { 9 10 //这个对象可以获取到所有的channel 11 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 12 13 14 @Override 15 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 16 cause.printStackTrace(); 17 ctx.close(); 18 } 19 20 @Override 21 protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { 22 Channel channel = ctx.channel(); 23 24 channelGroup.forEach(ch -> { 25 System.out.println("ii"); 26 if(ch != channel) { 27 System.out.println("othor"); 28 ch.writeAndFlush(channel.remoteAddress() + "发送的消息:" + msg + " ");//发送消息出去的时候,这个 ,一定不能丢,不然发不出去 29 } else { 30 ch.writeAndFlush( "自己: " + msg + " "); 31 } 32 }); 33 34 //服务器接收到消息,相所有客户端 发送 消息 35 System.out.println("接收到 " + channel.remoteAddress() +" 客户端的消息:" + msg); 36 // //发送给其他客户端 37 // ctx.writeAndFlush("msg"); 38 } 39 40 @Override 41 public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 42 Channel channel = ctx.channel();//获取到连接 43 //告诉已有的其他连接 44 channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + " - 加入 "); 45 //再加入 46 channelGroup.add(channel);//把channel加到channelGroup 47 } 48 49 @Override 50 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { 51 Channel channel = ctx.channel(); 52 channelGroup.writeAndFlush("[服务器] - " + channel.remoteAddress() + "已离开 "); 53 // channelGroup.remove(channel);//netty会自动将已经失去连接的channel,从channelGroup 中移除 54 } 55 56 @Override 57 public void channelActive(ChannelHandlerContext ctx) throws Exception { 58 Channel channel = ctx.channel(); 59 System.out.println(channel.remoteAddress() + "上线了!"); 60 } 61 62 @Override 63 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 64 Channel channel = ctx.channel(); 65 System.out.println(channel.remoteAddress() + "下线了!"); 66 } 67 }
clien中的主程序
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.Channel; 3 import io.netty.channel.ChannelFuture; 4 import io.netty.channel.EventLoopGroup; 5 import io.netty.channel.nio.NioEventLoopGroup; 6 import io.netty.channel.socket.nio.NioSocketChannel; 7 8 import java.io.BufferedReader; 9 import java.io.IOException; 10 import java.io.InputStreamReader; 11 12 public class MyChatClient { 13 public static void main(String[] args) throws InterruptedException, IOException { 14 //客户端只需要一个 15 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); 16 17 try{ 18 19 Bootstrap bootstrap = new Bootstrap(); 20 bootstrap.group(eventLoopGroup) 21 .channel(NioSocketChannel.class) 22 .handler(new MyChatInitlaizer()); 23 ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync(); 24 25 Channel channel = channelFuture.channel(); 26 27 //channelFuture.channel().writeAndFlush("first msg");//发送数据,其实应该写到handler的active方法中 28 29 BufferedReader br = new BufferedReader(new InputStreamReader( System.in)); 30 for (;;) {//死循环来接收客户端的输入 31 channel.writeAndFlush(br.readLine() + " "); 32 } 33 34 // channelFuture.channel().closeFuture().sync(); 35 }finally { 36 eventLoopGroup.shutdownGracefully(); 37 } 38 39 } 40 }
client中的initializer和服务器端的类似
client中的handle,只要简单输出就行。