zoukankan      html  css  js  c++  java
  • 使用Netty实现通用二进制协议的高效数据传输

    Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。

    Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty

    本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。

    一个好的协议有两个标准:

    (1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。

    (2)传输数据和业务对象之间的转换速度要快。

    (友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen

    一、协议的定义

    无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。

    (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
           编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte)
    (2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
           数据格式定义:
           字段1键名长度    字段1键名 字段1值长度    字段1值
           字段2键名长度    字段2键名 字段2值长度    字段2值
           字段3键名长度    字段3键名 字段3值长度    字段3值
           …    …    …    …
           长度为整型,占4个字节

      代码中用两个Vo对象来表示:XLRequest和XLResponse。

      1package org.jboss.netty.example.xlsvr.vo;
      2
      3import java.util.HashMap;
      4import java.util.Map;
      5
      6/**
      7 *  @author hankchen
    10 *  2012-2-3 下午02:46:52
    11 */
    12
    13
    14/**
    15 * 响应数据
    16 */
    17
    18/**
    19 * 通用协议介绍
    20 * 
    21 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
    22 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
    23 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
    24 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
    25 * 数据格式定义:
    26 * 字段1键名长度    字段1键名 字段1值长度    字段1值
    27 * 字段2键名长度    字段2键名 字段2值长度    字段2值
    28 * 字段3键名长度    字段3键名 字段3值长度    字段3值
    29 * …    …    …    …
    30 * 长度为整型,占4个字节
    31 */
    32public class XLResponse {
    33    private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
    34    private byte encrypt;// 加密类型。0表示不加密
    35    private byte extend1;// 用于扩展协议。暂未定义任何值
    36    private byte extend2;// 用于扩展协议。暂未定义任何值
    37    private int sessionid;// 会话ID
    38    private int result;// 结果码
    39    private int length;// 数据包长
    40    
    41    private Map<String,String> values=new HashMap<String, String>();
    42    
    43    private String ip;
    44    
    45    public void setValue(String key,String value){
    46        values.put(key, value);
    47    }
    48    
    49    public String getValue(String key){
    50        if (key==null) {
    51            return null;
    52        }
    53        return values.get(key);
    54    }
    55
    56    public byte getEncode() {
    57        return encode;
    58    }
    59
    60    public void setEncode(byte encode) {
    61        this.encode = encode;
    62    }
    63
    64    public byte getEncrypt() {
    65        return encrypt;
    66    }
    67
    68    public void setEncrypt(byte encrypt) {
    69        this.encrypt = encrypt;
    70    }
    71
    72    public byte getExtend1() {
    73        return extend1;
    74    }
    75
    76    public void setExtend1(byte extend1) {
    77        this.extend1 = extend1;
    78    }
    79
    80    public byte getExtend2() {
    81        return extend2;
    82    }
    83
    84    public void setExtend2(byte extend2) {
    85        this.extend2 = extend2;
    86    }
    87
    88    public int getSessionid() {
    89        return sessionid;
    90    }
    91
    92    public void setSessionid(int sessionid) {
    93        this.sessionid = sessionid;
    94    }
    95
    96    public int getResult() {
    97        return result;
    98    }
    99
    100    public void setResult(int result) {
    101        this.result = result;
    102    }
    103
    104    public int getLength() {
    105        return length;
    106    }
    107
    108    public void setLength(int length) {
    109        this.length = length;
    110    }
    111
    112    public Map<String, String> getValues() {
    113        return values;
    114    }
    115
    116    public String getIp() {
    117        return ip;
    118    }
    119
    120    public void setIp(String ip) {
    121        this.ip = ip;
    122    }
    123
    124    public void setValues(Map<String, String> values) {
    125        this.values = values;
    126    }
    127
    128    @Override
    129    public String toString() {
    130        return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
    131                + ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]";
    132    }
    133}
      1package org.jboss.netty.example.xlsvr.vo;
      2
      3import java.util.HashMap;
      4import java.util.Map;
      5
      6/**
      7 *  @author hankchen
      8 *  2012-2-3 下午02:46:41
      9 */
    10
    11/**
    12 * 请求数据
    13 */
    14
    15/**
    16 * 通用协议介绍
    17 * 
    18 * 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后
    19 * (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
    20 * 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)
    21 * (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
    22 * 数据格式定义:
    23 * 字段1键名长度    字段1键名 字段1值长度    字段1值
    24 * 字段2键名长度    字段2键名 字段2值长度    字段2值
    25 * 字段3键名长度    字段3键名 字段3值长度    字段3值
    26 * …    …    …    …
    27 * 长度为整型,占4个字节
    28 */
    29public class XLRequest {
    30    private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-1
    31    private byte encrypt;// 加密类型。0表示不加密
    32    private byte extend1;// 用于扩展协议。暂未定义任何值
    33    private byte extend2;// 用于扩展协议。暂未定义任何值
    34    private int sessionid;// 会话ID
    35    private int command;// 命令
    36    private int length;// 数据包长
    37    
    38    private Map<String,String> params=new HashMap<String, String>(); //参数
    39    
    40    private String ip;
    41
    42    public byte getEncode() {
    43        return encode;
    44    }
    45
    46    public void setEncode(byte encode) {
    47        this.encode = encode;
    48    }
    49
    50    public byte getEncrypt() {
    51        return encrypt;
    52    }
    53
    54    public void setEncrypt(byte encrypt) {
    55        this.encrypt = encrypt;
    56    }
    57
    58    public byte getExtend1() {
    59        return extend1;
    60    }
    61
    62    public void setExtend1(byte extend1) {
    63        this.extend1 = extend1;
    64    }
    65
    66    public byte getExtend2() {
    67        return extend2;
    68    }
    69
    70    public void setExtend2(byte extend2) {
    71        this.extend2 = extend2;
    72    }
    73
    74    public int getSessionid() {
    75        return sessionid;
    76    }
    77
    78    public void setSessionid(int sessionid) {
    79        this.sessionid = sessionid;
    80    }
    81
    82    public int getCommand() {
    83        return command;
    84    }
    85
    86    public void setCommand(int command) {
    87        this.command = command;
    88    }
    89
    90    public int getLength() {
    91        return length;
    92    }
    93
    94    public void setLength(int length) {
    95        this.length = length;
    96    }
    97
    98    public Map<String, String> getParams() {
    99        return params;
    100    }
    101    
    102    public void setValue(String key,String value){
    103        params.put(key, value);
    104    }
    105    
    106    public String getValue(String key){
    107        if (key==null) {
    108            return null;
    109        }
    110        return params.get(key);
    111    }
    112
    113    public String getIp() {
    114        return ip;
    115    }
    116
    117    public void setIp(String ip) {
    118        this.ip = ip;
    119    }
    120
    121    public void setParams(Map<String, String> params) {
    122        this.params = params;
    123    }
    124
    125    @Override
    126    public String toString() {
    127        return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2
    128                + ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]";
    129    }
    130}
    131

    二、协议的编码和解码

    对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。

    1package org.jboss.netty.example.xlsvr.codec;
    2
    3import java.nio.ByteBuffer;
    4
    5import org.jboss.netty.buffer.ChannelBuffer;
    6import org.jboss.netty.buffer.ChannelBuffers;
    7import org.jboss.netty.channel.ChannelHandlerContext;
    8import org.jboss.netty.channel.Channels;
    9import org.jboss.netty.channel.MessageEvent;
    10import org.jboss.netty.channel.SimpleChannelDownstreamHandler;
    11import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
    12import org.jboss.netty.example.xlsvr.vo.XLResponse;
    13import org.slf4j.Logger;
    14import org.slf4j.LoggerFactory;
    15
    16/**
    17 *  @author hankchen
    18 *  2012-2-3 上午10:48:15
    19 */
    20
    21/**
    22 * 服务器端编码器
    23 */
    24public class XLServerEncoder extends SimpleChannelDownstreamHandler {
    25    Logger logger=LoggerFactory.getLogger(XLServerEncoder.class);
    26    
    27    @Override
    28    public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    29        XLResponse response=(XLResponse)e.getMessage();
    30        ByteBuffer headBuffer=ByteBuffer.allocate(16);
    31        /**
    32         * 先组织报文头
    33         */
    34        headBuffer.put(response.getEncode());
    35        headBuffer.put(response.getEncrypt());
    36        headBuffer.put(response.getExtend1());
    37        headBuffer.put(response.getExtend2());
    38        headBuffer.putInt(response.getSessionid());
    39        headBuffer.putInt(response.getResult());
    40        
    41        /**
    42         * 组织报文的数据部分
    43         */
    44        ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues()); 
    45        int length=dataBuffer.readableBytes();
    46        headBuffer.putInt(length);
    47        /**
    48         * 非常重要
    49         * ByteBuffer需要手动flip(),ChannelBuffer不需要
    50         */
    51        headBuffer.flip();
    52        ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer();
    53        totalBuffer.writeBytes(headBuffer);
    54        logger.info("totalBuffer size="+totalBuffer.readableBytes());
    55        totalBuffer.writeBytes(dataBuffer);
    56        logger.info("totalBuffer size="+totalBuffer.readableBytes());
    57        Channels.write(ctx, e.getFuture(), totalBuffer);
    58    }
    59
    60}
    61
    1package org.jboss.netty.example.xlsvr.codec;
    2
    3import org.jboss.netty.buffer.ChannelBuffer;
    4import org.jboss.netty.buffer.ChannelBuffers;
    5import org.jboss.netty.channel.Channel;
    6import org.jboss.netty.channel.ChannelHandlerContext;
    7import org.jboss.netty.example.xlsvr.util.ProtocolUtil;
    8import org.jboss.netty.example.xlsvr.vo.XLResponse;
    9import org.jboss.netty.handler.codec.frame.FrameDecoder;
    10
    11/**
    12 *  @author hankchen
    13 *  2012-2-3 上午10:47:54
    14 */
    15
    16/**
    17 * 客户端解码器
    18 */
    19public class XLClientDecoder extends FrameDecoder {
    20
    21    @Override
    22    protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception {
    23        if (buffer.readableBytes()<16) {
    24            return null;
    25        }
    26        buffer.markReaderIndex();
    27        byte encode=buffer.readByte();
    28        byte encrypt=buffer.readByte();
    29        byte extend1=buffer.readByte();
    30        byte extend2=buffer.readByte();
    31        int sessionid=buffer.readInt();
    32        int result=buffer.readInt();
    33        int length=buffer.readInt(); // 数据包长
    34        if (buffer.readableBytes()<length) {
    35            buffer.resetReaderIndex();
    36            return null;
    37        }
    38        ChannelBuffer dataBuffer=ChannelBuffers.buffer(length);
    39        buffer.readBytes(dataBuffer, length);
    40        
    41        XLResponse response=new XLResponse();
    42        response.setEncode(encode);
    43        response.setEncrypt(encrypt);
    44        response.setExtend1(extend1);
    45        response.setExtend2(extend2);
    46        response.setSessionid(sessionid);
    47        response.setResult(result);
    48        response.setLength(length);
    49        response.setValues(ProtocolUtil.decode(encode, dataBuffer));
    50        response.setIp(ProtocolUtil.getClientIp(channel));
    51        return response;
    52    }
    53
    54}
      1package org.jboss.netty.example.xlsvr.util;
      2
      3import java.net.SocketAddress;
      4import java.nio.charset.Charset;
      5import java.util.HashMap;
      6import java.util.Map;
      7import java.util.Map.Entry;
      8
      9import org.jboss.netty.buffer.ChannelBuffer;
    10import org.jboss.netty.buffer.ChannelBuffers;
    11import org.jboss.netty.channel.Channel;
    12
    13/**
    14 *  @author hankchen
    15 *  2012-2-4 下午01:57:33
    16 */
    17public class ProtocolUtil {
    18    
    19    /**
    20     * 编码报文的数据部分
    21     * @param encode
    22     * @param values
    23     * @return
    24     */
    25    public static ChannelBuffer encode(int encode,Map<String,String> values){
    26        ChannelBuffer totalBuffer=null;
    27        if (values!=null && values.size()>0) {
    28            totalBuffer=ChannelBuffers.dynamicBuffer();
    29            int length=0,index=0;
    30            ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()];
    31            Charset charset=XLCharSetFactory.getCharset(encode);
    32            for(Entry<String,String> entry:values.entrySet()){
    33                String key=entry.getKey();
    34                String value=entry.getValue();
    35                ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();
    36                buffer.writeInt(key.length());
    37                buffer.writeBytes(key.getBytes(charset));
    38                buffer.writeInt(value.length());
    39                buffer.writeBytes(value.getBytes(charset));
    40                channelBuffers[index++]=buffer;
    41                length+=buffer.readableBytes();
    42            }
    43            
    44            for (int i = 0; i < channelBuffers.length; i++) {
    45                totalBuffer.writeBytes(channelBuffers[i]);
    46            }
    47        }
    48        return totalBuffer;
    49    }
    50    
    51    /**
    52     * 解码报文的数据部分
    53     * @param encode
    54     * @param dataBuffer
    55     * @return
    56     */
    57    public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){
    58        Map<String,String> dataMap=new HashMap<String, String>();
    59        if (dataBuffer!=null && dataBuffer.readableBytes()>0) {
    60            int processIndex=0,length=dataBuffer.readableBytes();
    61            Charset charset=XLCharSetFactory.getCharset(encode);
    62            while(processIndex<length){
    63                /**
    64                 * 获取Key
    65                 */
    66                int size=dataBuffer.readInt();
    67                byte [] contents=new byte [size];
    68                dataBuffer.readBytes(contents);
    69                String key=new String(contents, charset);
    70                processIndex=processIndex+size+4;
    71                /**
    72                 * 获取Value
    73                 */
    74                size=dataBuffer.readInt();
    75                contents=new byte [size];
    76                dataBuffer.readBytes(contents);
    77                String value=new String(contents, charset);
    78                dataMap.put(key, value);
    79                processIndex=processIndex+size+4;
    80            }
    81        }
    82        return dataMap;
    83    }
    84    
    85    /**
    86     * 获取客户端IP
    87     * @param channel
    88     * @return
    89     */
    90    public static String getClientIp(Channel channel){
    91        /**
    92         * 获取客户端IP
    93         */
    94        SocketAddress address = channel.getRemoteAddress();
    95        String ip = "";
    96        if (address != null) {
    97            ip = address.toString().trim();
    98            int index = ip.lastIndexOf(':');
    99            if (index < 1) {
    100                index = ip.length();
    101            }
    102            ip = ip.substring(1, index);
    103        }
    104        if (ip.length() > 15) {
    105            ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15));
    106        }
    107        return ip;
    108    }
    109}
    110

    三、服务器端实现

    服务器端提供的功能是:

    1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。

    2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。

    为了展示多协议的运用,这里客户端的请求采用的是基于问本行( )的协议。

    具体代码如下:

    1package org.jboss.netty.example.xlsvr;
    2
    3import java.net.InetSocketAddress;
    4import java.util.concurrent.Executors;
    5
    6import org.jboss.netty.bootstrap.ServerBootstrap;
    7import org.jboss.netty.channel.Channel;
    8import org.jboss.netty.channel.ChannelPipeline;
    9import org.jboss.netty.channel.group.ChannelGroup;
    10import org.jboss.netty.channel.group.ChannelGroupFuture;
    11import org.jboss.netty.channel.group.DefaultChannelGroup;
    12import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    13import org.jboss.netty.example.xlsvr.codec.XLServerEncoder;
    14import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
    15import org.jboss.netty.handler.codec.frame.Delimiters;
    16import org.jboss.netty.handler.codec.string.StringDecoder;
    17import org.jboss.netty.util.CharsetUtil;
    18import org.slf4j.Logger;
    19import org.slf4j.LoggerFactory;
    20
    21/**
    22 *  @author hankchen
    23 *  2012-1-30 下午03:21:38
    24 */
    25
    26public class XLServer {
    27    public static final int port =8080;
    28    public static final Logger logger=LoggerFactory.getLogger(XLServer.class);
    29    public static final ChannelGroup allChannels=new DefaultChannelGroup("XLServer");
    30    private static final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
    31    
    32    public static void main(String [] args){
    33        try {
    34            XLServer.startup();
    35        } catch (Exception e) {
    36            e.printStackTrace();
    37        }
    38    }
    39    
    40    public static boolean startup() throws Exception{
    41        /**
    42         * 采用默认ChannelPipeline管道
    43         * 这意味着同一个XLServerHandler实例将被多个Channel通道共享
    44         * 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能!
    45         */
    46        ChannelPipeline pipeline=serverBootstrap.getPipeline(); 
    47        /**
    48         * 解码器是基于文本行的协议, 或者
    49         */
    50        pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));
    51        pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
    52        pipeline.addLast("encoder", new XLServerEncoder());
    53        pipeline.addLast("handler", new XLServerHandler());
    54        
    55        serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀
    56        serverBootstrap.setOption("child.keepAlive", true); //注意child前缀
    57        
    58        /**
    59         * ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象
    60         */
    61        Channel channel=serverBootstrap.bind(new InetSocketAddress(port));
    62        allChannels.add(channel);
    63        logger.info("server is started on port "+port);
    64        return false;
    65    }
    66    
    67    public static void shutdown() throws Exception{
    68        try {
    69            /**
    70             * 主动关闭服务器
    71             */
    72            ChannelGroupFuture future=allChannels.close();
    73            future.awaitUninterruptibly();//阻塞,直到服务器关闭
    74            //serverBootstrap.releaseExternalResources();
    75        } catch (Exception e) {
    76            e.printStackTrace();
    77            logger.error(e.getMessage(),e);
    78        }
    79        finally{
    80            logger.info("server is shutdown on port "+port);
    81            System.exit(1);
    82        }
    83    }
    84}
    85
      1package org.jboss.netty.example.xlsvr;
      2
      3import java.util.Random;
      4
      5import org.jboss.netty.channel.Channel;
      6import org.jboss.netty.channel.ChannelFuture;
      7import org.jboss.netty.channel.ChannelHandlerContext;
      8import org.jboss.netty.channel.ChannelHandler.Sharable;
      9import org.jboss.netty.channel.ChannelStateEvent;
    10import org.jboss.netty.channel.ExceptionEvent;
    11import org.jboss.netty.channel.MessageEvent;
    12import org.jboss.netty.channel.SimpleChannelHandler;
    13import org.jboss.netty.example.xlsvr.vo.XLResponse;
    14import org.slf4j.Logger;
    15import org.slf4j.LoggerFactory;
    16
    17/**
    18 *  @author hankchen
    19 *  2012-1-30 下午03:22:24
    20 */
    21
    22@Sharable
    23public class XLServerHandler extends SimpleChannelHandler {
    24    private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class);
    25    
    26    @Override
    27    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    28        logger.info("messageReceived");
    29        if (e.getMessage() instanceof String) {
    30            String content=(String)e.getMessage();
    31            logger.info("content is "+content);
    32            if ("shutdown".equalsIgnoreCase(content)) {
    33                //e.getChannel().close();
    34                XLServer.shutdown();
    35            }else {
    36                sendResponse(ctx);
    37            }
    38        }else {
    39            logger.error("message is not a String.");
    40            e.getChannel().close();
    41        }
    42    }
    43
    44    @Override
    45    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
    46        logger.error(e.getCause().getMessage(),e.getCause());
    47        e.getCause().printStackTrace();
    48        e.getChannel().close();
    49    }
    50
    51    @Override
    52    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    53        logger.info("channelConnected");
    54        sendResponse(ctx);
    55    }
    56
    57    @Override
    58    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    59        logger.info("channelClosed");
    60        //删除通道
    61        XLServer.allChannels.remove(e.getChannel());
    62    }
    63
    64    @Override
    65    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    66        logger.info("channelDisconnected");
    67        super.channelDisconnected(ctx, e);
    68    }
    69
    70    @Override
    71    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    72        logger.info("channelOpen");
    73        //增加通道
    74        XLServer.allChannels.add(e.getChannel());
    75    }
    76
    77    /**
    78     * 发送响应内容
    79     * @param ctx
    80     * @param e
    81     * @return
    82     */
    83    private ChannelFuture sendResponse(ChannelHandlerContext ctx){
    84        Channel channel=ctx.getChannel();
    85        Random random=new Random();
    86        XLResponse response=new XLResponse();
    87        response.setEncode((byte)0);
    88        response.setResult(1);
    89        response.setValue("name","hankchen");
    90        response.setValue("time", String.valueOf(System.currentTimeMillis()));
    91        response.setValue("age",String.valueOf(random.nextInt()));
    92        /**
    93         * 发送接收信息的时间戳到客户端
    94         * 注意:Netty中所有的IO操作都是异步的!
    95         */
    96        ChannelFuture future=channel.write(response); //发送内容
    97        return future;
    98    }
    99}
    100

    四、客户端实现

    客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。

    关键代码如下:

    1/**
    2 *  Copyright (C): 2012
    3 *  @author hankchen
    4 *  2012-1-30 下午03:21:26
    5 */
    6
    7/**
    8 * 服务器特征:
    9 * 1、使用专用解码器解析服务器发过来的数据
    10 * 2、客户端主动关闭连接
    11 */
    12public class XLClient {
    13    public static final int port =XLServer.port;
    14    public static final String host ="localhost";
    15    private static final Logger logger=LoggerFactory.getLogger(XLClient.class);
    16    private static final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());
    17    private static final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory);
    18    
    19    /**
    20     * @param args
    21     * @throws Exception 
    22     */
    23    public static void main(String[] args) throws Exception {
    24        ChannelFuture future=XLClient.startup();
    25        logger.info("future state is "+future.isSuccess());
    26    }
    27    
    28    /**
    29     * 启动客户端
    30     * @return
    31     * @throws Exception
    32     */
    33    public static ChannelFuture startup() throws Exception {
    34        /**
    35         * 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式
    36         * 例如,下面的代码形式是错误的:
    37         * ChannelPipeline pipeline=clientBootstrap.getPipeline();
    38         * pipeline.addLast("handler", new XLClientHandler());
    39         */
    40        clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置
    41        /**
    42         * 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象
    43         */
    44        clientBootstrap.setOption("tcpNoDelay", true);
    45        clientBootstrap.setOption("keepAlive", true);
    46        
    47        ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port));
    48        /**
    49         * 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态
    50         */
    51        future.awaitUninterruptibly();
    52        /**
    53         * 如果连接失败,我们将打印连接失败的原因。
    54         * 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。
    55         */
    56        if (!future.isSuccess()) {
    57            future.getCause().printStackTrace();
    58        }else {
    59            logger.info("client is connected to server "+host+":"+port);
    60        }
    61        return future;
    62    }
    63    
    64    /**
    65     * 关闭客户端
    66     * @param future
    67     * @throws Exception
    68     */
    69    public static void shutdown(ChannelFuture future) throws Exception{
    70        try {
    71            /**
    72             * 主动关闭客户端连接,会阻塞等待直到通道关闭
    73             */
    74            future.getChannel().close().awaitUninterruptibly();
    75            //future.getChannel().getCloseFuture().awaitUninterruptibly();
    76            /**
    77             * 释放ChannelFactory通道工厂使用的资源。
    78             * 这一步仅需要调用 releaseExternalResources()方法即可。
    79             * 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。
    80             */
    81            clientBootstrap.releaseExternalResources();
    82        } catch (Exception e) {
    83            e.printStackTrace();
    84            logger.error(e.getMessage(),e);
    85        }
    86        finally{
    87            System.exit(1);
    88            logger.info("client is shutdown to server "+host+":"+port);
    89        }
    90    }
    91}
    1public class XLClientPipelineFactory implements ChannelPipelineFactory{
    2
    3    @Override
    4    public ChannelPipeline getPipeline() throws Exception {
    5        ChannelPipeline pipeline=Channels.pipeline();
    6        /**
    7         * 使用专用的解码器,解决数据分段的问题
    8         * 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。
    9         */
    10        pipeline.addLast("decoder", new XLClientDecoder());
    11        /**
    12         * 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了!
    13         */
    14        pipeline.addLast("handler", new XLClientHandler());
    15        return pipeline;
    16    }
    17
    18}
    1/**
    2 *  Copyright (C): 2012
    3 *  @author hankchen
    4 *  2012-1-30 下午03:21:52
    5 */
    6
    7/**
    8 * 服务器特征:
    9 * 1、使用专用的编码解码器,解决数据分段的问题
    10 * 2、使用POJO替代ChannelBuffer传输
    11 */
    12public class XLClientHandler extends SimpleChannelHandler {
    13    private static final Logger logger=LoggerFactory.getLogger(XLClientHandler.class);
    14    private final AtomicInteger count=new AtomicInteger(0); //计数器
    15    
    16    @Override
    17    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
    18        processMethod1(ctx, e); //处理方式一
    19    }
    20    
    21    /**
    22     * @param ctx
    23     * @param e
    24     * @throws Exception
    25     */
    26    public void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
    27        logger.info("processMethod1……,count="+count.addAndGet(1));
    28        XLResponse serverTime=(XLResponse)e.getMessage();
    29        logger.info("messageReceived,content:"+serverTime.toString());
    30        Thread.sleep(1000);
    31        
    32        if (count.get()<10) {
    33            //从新发送请求获取最新的服务器时间
    34            ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again ".getBytes()));
    35        }else{
    36            //从新发送请求关闭服务器
    37            ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown ".getBytes()));
    38        }
    39    }
    40    
    41    @Override
    42    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
    43        logger.info("exceptionCaught");
    44        e.getCause().printStackTrace();
    45        ctx.getChannel().close();
    46    }
    47
    48    @Override
    49    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    50        logger.info("channelClosed");
    51        super.channelClosed(ctx, e);
    52    }
    53    
    54    
    55}

    全文代码较多,写了很多注释,希望对读者有用,谢谢!

    (友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen

    http://www.blogjava.net/hankchen/archive/2012/02/04/369378.html

  • 相关阅读:
    统计一个字符串中字母、空格和数字的个数
    java 将一个数组中的值按逆序重新存放,例如,原来顺序为:9,5,7,4,8,要求改为:8,4,7, 5,9。
    java判断一个数是否为素数[转]
    Set集合
    List&ArrayList&LinkedList
    java_异常
    内部类&匿名内部类
    多态&抽象类&接口
    数组排序和字符串
    Java笔记_数据类型和运算符
  • 原文地址:https://www.cnblogs.com/findumars/p/6357305.html
Copyright © 2011-2022 走看看