package netty_starter; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @auther guozg */ public class DiscardServer { private int port; public DiscardServer(int port) { this.port = port; } public void run() throws InterruptedException { EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // (3) .childHandler(new ChannelInitializer<SocketChannel>() { // (4) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()) .addLast(new OpenHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8088; } new DiscardServer(port).run(); } }
package netty_starter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import study01.ChannelList; /** * @auther guozg */ public class DiscardServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // (2) // super.channelRead(ctx, msg); // ((ByteBuf) msg).release(); // (3) ByteBuf in = (ByteBuf) msg; String message = in.toString(CharsetUtil.UTF_8); System.out.println(message); sendMessage(Unpooled.copiedBuffer(message,CharsetUtil.UTF_8).retain(ChannelList.channels.size()-1),ctx.channel()); // try { // while (in.isReadable()) { // char s = in.getChar(0); // System.out.print((char) in.readByte()); // System.out.flush(); // } // } finally { // ReferenceCountUtil.release(msg); // } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // (5) // super.exceptionCaught(ctx, cause); cause.printStackTrace(); ctx.close(); } public void sendMessage(Object msg,Channel channel){ for (Channel c:ChannelList.channels){ if(channel != c) { c.writeAndFlush(msg); } } } }
package netty_starter; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import study01.ChannelList; public class OpenHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ChannelList.channels.add(ctx.channel()); sendMessage(Unpooled.copiedBuffer("New client in! ",CharsetUtil.UTF_8).retain(ChannelList.channels.size()));//retain设置读取次数 } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ChannelList.channels.remove(ctx.channel()); ctx.close(); sendMessage(Unpooled.copiedBuffer("One client out! ",CharsetUtil.UTF_8).retain(ChannelList.channels.size())); } public void sendMessage(Object msg){ for (Channel c:ChannelList.channels){ c.writeAndFlush(msg); } } }
然后启动cmd,输入telnet 127.0.0.1 8088.即可链接服务,并有返回值。可同时启动多个Telnet,实现群发消息。