zoukankan      html  css  js  c++  java
  • netty udp server demo

    之前在工作中已经用netty写了tcp服务,感觉还不错,就又简单的写了个Udp服务,防止以后工作中用到,到时就不用再到处翻了,拿来就用O(∩_∩)O~

    说明:我用的是netty是3.5.3 ,截止目前3.x最新稳定版,看官网已经有4.0的alpha版了,不过4.0和3.x是不兼容的,改动比较大,等4.0稳定后再尝试尝试

     1 /**
     2  * @author Jadic
     3  * @created 2012-8-10 
     4  */
     5 package com.jadic;
     6 
     7 import java.net.InetSocketAddress;
     8 
     9 import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
    10 import org.jboss.netty.channel.ChannelPipeline;
    11 import org.jboss.netty.channel.ChannelPipelineFactory;
    12 import org.jboss.netty.channel.Channels;
    13 import org.jboss.netty.channel.socket.DatagramChannelFactory;
    14 import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
    15 
    16 /**
    17  * @author Jadic
    18  *
    19  */
    20 public class UdpServer {
    21 
    22     private ConnectionlessBootstrap udpBootstrap;
    23     
    24     public UdpServer(int port) {
    25         DatagramChannelFactory channelFactory = new NioDatagramChannelFactory();
    26         udpBootstrap = new ConnectionlessBootstrap(channelFactory);
    27         udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    28             @Override
    29             public ChannelPipeline getPipeline() throws Exception {
    30                 return Channels.pipeline(new UdpEventHandler());
    31             }
    32         });
    33         udpBootstrap.bind(new InetSocketAddress("192.168.6.19", port));
    34         System.out.println("udp server started, listening on port:" + port);
    35     }
    36 
    37     public static void main(String[] args) {
    38         new UdpServer(6803);
    39     }
    40     
    41 }

    Udp数据handler,其中主要重写下messageReceived和exceptionCaught方法就可以了

     1 /**
     2  * @author Jadic
     3  * @created 2012-8-10 
     4  */
     5 package com.jadic;
     6 
     7 import org.jboss.netty.buffer.ChannelBuffer;
     8 import org.jboss.netty.channel.ChannelHandlerContext;
     9 import org.jboss.netty.channel.ExceptionEvent;
    10 import org.jboss.netty.channel.MessageEvent;
    11 import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    12 
    13 /**
    14  * @author Jadic
    15  *
    16  */
    17 public class UdpEventHandler extends SimpleChannelUpstreamHandler {
    18     
    19     private void log(Object msg) {
    20         System.out.println(msg);
    21     }
    22 
    23     @Override
    24     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
    25             throws Exception {
    26         log("messageReceived");
    27         ChannelBuffer buffer = (ChannelBuffer)e.getMessage();
    28         log("recvd " + buffer.readableBytes() + " bytes [" + KKTool.channelBufferToHexStr(buffer) + "]");
    29     }
    30 
    31     @Override
    32     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
    33             throws Exception {
    34         log("exceptionCaught");
    35     }
    36 
    37 }

    有个小细节,在打印udp数据日志时,有用到ChannelBuffer.array()获取字节内容,在tcp服务中,通过channelBuffer.array()获取的就是真实收到的数据,而在udp服务中,则不是,跟了下源码,tcp服务在接收数据时,是根据收到的字节数来新建一个ChannelBuffer的,而udp服务则是有根据默认接收缓存区大小新建一个ByteBuffer

    如下分别是Netty Udp和Tcp服务中底层读取数据方法的源码,两个类分别是NioDatagramWorker和NioWorker

     1    @Override
     2     protected boolean read(final SelectionKey key) {
     3         final NioDatagramChannel channel = (NioDatagramChannel) key.attachment();
     4         ReceiveBufferSizePredictor predictor =
     5             channel.getConfig().getReceiveBufferSizePredictor();
     6         final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();
     7         final DatagramChannel nioChannel = (DatagramChannel) key.channel();
     8 
     9         // Allocating a non-direct buffer with a max udp packge size.
    10         // Would using a direct buffer be more efficient or would this negatively
    11         // effect performance, as direct buffer allocation has a higher upfront cost
    12         // where as a ByteBuffer is heap allocated.//此处分配ByteBuffer大小时用的是Udp接收一包最大的数据大小,默认是768
    13         final ByteBuffer byteBuffer = ByteBuffer.allocate(
    14                 predictor.nextReceiveBufferSize()).order(bufferFactory.getDefaultOrder());
    15 
    16         boolean failure = true;
    17         SocketAddress remoteAddress = null;
    18         try {
    19             // Receive from the channel in a non blocking mode. We have already been notified that
    20             // the channel is ready to receive.
    21             remoteAddress = nioChannel.receive(byteBuffer);
    22             failure = false;
    23         } catch (ClosedChannelException e) {
    24             // Can happen, and does not need a user attention.
    25         } catch (Throwable t) {
    26             fireExceptionCaught(channel, t);
    27         }
    28 
    29         if (remoteAddress != null) {
    30             // Flip the buffer so that we can wrap it.
    31             byteBuffer.flip();
    32 
    33             int readBytes = byteBuffer.remaining();
    34             if (readBytes > 0) {
    35                 // Update the predictor.
    36                 predictor.previousReceiveBufferSize(readBytes);
    37 
    38                 // Notify the interested parties about the newly arrived message.  此处将bytebuffer封装成ChannelBuffer继续向后触发事件
    39                 fireMessageReceived(
    40                         channel, bufferFactory.getBuffer(byteBuffer), remoteAddress);
    41             }
    42         }

    Tcp的

     1     @Override
     2     protected boolean read(SelectionKey k) {
     3         final SocketChannel ch = (SocketChannel) k.channel();
     4         final NioSocketChannel channel = (NioSocketChannel) k.attachment();
     5 
     6         final ReceiveBufferSizePredictor predictor =
     7             channel.getConfig().getReceiveBufferSizePredictor();
     8         final int predictedRecvBufSize = predictor.nextReceiveBufferSize();
     9 
    10         int ret = 0;
    11         int readBytes = 0;
    12         boolean failure = true;
    13 
    14         ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
    15         try {
    16             while ((ret = ch.read(bb)) > 0) {
    17                 readBytes += ret;
    18                 if (!bb.hasRemaining()) {
    19                     break;
    20                 }
    21             }
    22             failure = false;
    23         } catch (ClosedChannelException e) {
    24             // Can happen, and does not need a user attention.
    25         } catch (Throwable t) {
    26             fireExceptionCaught(channel, t);
    27         }
    28 
    29         if (readBytes > 0) {
    30             bb.flip();
    31 
    32             final ChannelBufferFactory bufferFactory =
    33                 channel.getConfig().getBufferFactory();
    34             final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);//这就是tcp服务中获取到的ChannelBuffer,大小就是读取到的字节数 readBytes
    35             buffer.setBytes(0, bb);
    36             buffer.writerIndex(readBytes);
    37 
    38             recvBufferPool.release(bb);
    39 
    40             // Update the predictor.
    41             predictor.previousReceiveBufferSize(readBytes);
    42 
    43             // Fire the event.
    44             fireMessageReceived(channel, buffer);
    45         } else {
    46             recvBufferPool.release(bb);
    47         }
    48 
    49         if (ret < 0 || failure) {
    50             k.cancel(); // Some JDK implementations run into an infinite loop without this.
    51             close(channel, succeededFuture(channel));
    52             return false;
    53         }
    54 
    55         return true;
    56     }
  • 相关阅读:
    Ubuntu_14.04安装docker
    CentOS配置java运行环境
    github上传自己的开源代码
    eclipse使用maven插件创建web项目
    jar包解压后,修改完配置文件,再还原成jar包
    Python学习的几本建议书籍
    流批
    函数
    程序
    习 题
  • 原文地址:https://www.cnblogs.com/jadic/p/2633647.html
Copyright © 2011-2022 走看看