之前在工作中已经用netty写了tcp服务,感觉还不错,就又简单的写了个Udp服务,防止以后工作中用到,到时就不用再到处翻了,拿来就用O(∩_∩)O~
说明:我用的是netty是3.5.3 ,截止目前3.x最新稳定版,看官网已经有4.0的alpha版了,不过4.0和3.x是不兼容的,改动比较大,等4.0稳定后再尝试尝试
1 /** 2 * @author Jadic 3 * @created 2012-8-10 4 */ 5 package com.jadic; 6 7 import java.net.InetSocketAddress; 8 9 import org.jboss.netty.bootstrap.ConnectionlessBootstrap; 10 import org.jboss.netty.channel.ChannelPipeline; 11 import org.jboss.netty.channel.ChannelPipelineFactory; 12 import org.jboss.netty.channel.Channels; 13 import org.jboss.netty.channel.socket.DatagramChannelFactory; 14 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory; 15 16 /** 17 * @author Jadic 18 * 19 */ 20 public class UdpServer { 21 22 private ConnectionlessBootstrap udpBootstrap; 23 24 public UdpServer(int port) { 25 DatagramChannelFactory channelFactory = new NioDatagramChannelFactory(); 26 udpBootstrap = new ConnectionlessBootstrap(channelFactory); 27 udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { 28 @Override 29 public ChannelPipeline getPipeline() throws Exception { 30 return Channels.pipeline(new UdpEventHandler()); 31 } 32 }); 33 udpBootstrap.bind(new InetSocketAddress("192.168.6.19", port)); 34 System.out.println("udp server started, listening on port:" + port); 35 } 36 37 public static void main(String[] args) { 38 new UdpServer(6803); 39 } 40 41 }
Udp数据handler,其中主要重写下messageReceived和exceptionCaught方法就可以了
1 /** 2 * @author Jadic 3 * @created 2012-8-10 4 */ 5 package com.jadic; 6 7 import org.jboss.netty.buffer.ChannelBuffer; 8 import org.jboss.netty.channel.ChannelHandlerContext; 9 import org.jboss.netty.channel.ExceptionEvent; 10 import org.jboss.netty.channel.MessageEvent; 11 import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 12 13 /** 14 * @author Jadic 15 * 16 */ 17 public class UdpEventHandler extends SimpleChannelUpstreamHandler { 18 19 private void log(Object msg) { 20 System.out.println(msg); 21 } 22 23 @Override 24 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 25 throws Exception { 26 log("messageReceived"); 27 ChannelBuffer buffer = (ChannelBuffer)e.getMessage(); 28 log("recvd " + buffer.readableBytes() + " bytes [" + KKTool.channelBufferToHexStr(buffer) + "]"); 29 } 30 31 @Override 32 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 33 throws Exception { 34 log("exceptionCaught"); 35 } 36 37 }
有个小细节,在打印udp数据日志时,有用到ChannelBuffer.array()获取字节内容,在tcp服务中,通过channelBuffer.array()获取的就是真实收到的数据,而在udp服务中,则不是,跟了下源码,tcp服务在接收数据时,是根据收到的字节数来新建一个ChannelBuffer的,而udp服务则是有根据默认接收缓存区大小新建一个ByteBuffer
如下分别是Netty Udp和Tcp服务中底层读取数据方法的源码,两个类分别是NioDatagramWorker和NioWorker
1 @Override 2 protected boolean read(final SelectionKey key) { 3 final NioDatagramChannel channel = (NioDatagramChannel) key.attachment(); 4 ReceiveBufferSizePredictor predictor = 5 channel.getConfig().getReceiveBufferSizePredictor(); 6 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); 7 final DatagramChannel nioChannel = (DatagramChannel) key.channel(); 8 9 // Allocating a non-direct buffer with a max udp packge size. 10 // Would using a direct buffer be more efficient or would this negatively 11 // effect performance, as direct buffer allocation has a higher upfront cost 12 // where as a ByteBuffer is heap allocated.//此处分配ByteBuffer大小时用的是Udp接收一包最大的数据大小,默认是768 13 final ByteBuffer byteBuffer = ByteBuffer.allocate( 14 predictor.nextReceiveBufferSize()).order(bufferFactory.getDefaultOrder()); 15 16 boolean failure = true; 17 SocketAddress remoteAddress = null; 18 try { 19 // Receive from the channel in a non blocking mode. We have already been notified that 20 // the channel is ready to receive. 21 remoteAddress = nioChannel.receive(byteBuffer); 22 failure = false; 23 } catch (ClosedChannelException e) { 24 // Can happen, and does not need a user attention. 25 } catch (Throwable t) { 26 fireExceptionCaught(channel, t); 27 } 28 29 if (remoteAddress != null) { 30 // Flip the buffer so that we can wrap it. 31 byteBuffer.flip(); 32 33 int readBytes = byteBuffer.remaining(); 34 if (readBytes > 0) { 35 // Update the predictor. 36 predictor.previousReceiveBufferSize(readBytes); 37 38 // Notify the interested parties about the newly arrived message. 此处将bytebuffer封装成ChannelBuffer继续向后触发事件 39 fireMessageReceived( 40 channel, bufferFactory.getBuffer(byteBuffer), remoteAddress); 41 } 42 }
Tcp的
1 @Override 2 protected boolean read(SelectionKey k) { 3 final SocketChannel ch = (SocketChannel) k.channel(); 4 final NioSocketChannel channel = (NioSocketChannel) k.attachment(); 5 6 final ReceiveBufferSizePredictor predictor = 7 channel.getConfig().getReceiveBufferSizePredictor(); 8 final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); 9 10 int ret = 0; 11 int readBytes = 0; 12 boolean failure = true; 13 14 ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); 15 try { 16 while ((ret = ch.read(bb)) > 0) { 17 readBytes += ret; 18 if (!bb.hasRemaining()) { 19 break; 20 } 21 } 22 failure = false; 23 } catch (ClosedChannelException e) { 24 // Can happen, and does not need a user attention. 25 } catch (Throwable t) { 26 fireExceptionCaught(channel, t); 27 } 28 29 if (readBytes > 0) { 30 bb.flip(); 31 32 final ChannelBufferFactory bufferFactory = 33 channel.getConfig().getBufferFactory(); 34 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);//这就是tcp服务中获取到的ChannelBuffer,大小就是读取到的字节数 readBytes 35 buffer.setBytes(0, bb); 36 buffer.writerIndex(readBytes); 37 38 recvBufferPool.release(bb); 39 40 // Update the predictor. 41 predictor.previousReceiveBufferSize(readBytes); 42 43 // Fire the event. 44 fireMessageReceived(channel, buffer); 45 } else { 46 recvBufferPool.release(bb); 47 } 48 49 if (ret < 0 || failure) { 50 k.cancel(); // Some JDK implementations run into an infinite loop without this. 51 close(channel, succeededFuture(channel)); 52 return false; 53 } 54 55 return true; 56 }