Netty是一个高性能的NIO通信框架,提供异步的、事件驱动的网络编程模型。使用Netty可以方便用户开发各种常用协议的网络程序。例如:TCP、UDP、HTTP等等。
Netty的最新版本是3.2.7,官网地址是:http://www.jboss.org/netty
本文的主要目的是基于Netty实现一个通用二进制协议的高效数据传输。协议是通用的二进制协议,高效并且扩展性很好。
一个好的协议有两个标准:
(1)生成的传输数据要少,即数据压缩比要高。这样可以减少网络开销。
(2)传输数据和业务对象之间的转换速度要快。
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
一、协议的定义
无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后。
(1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:
编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、数据包长(4byte)
(2)数据:由数据包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>
数据格式定义:
字段1键名长度 字段1键名 字段1值长度 字段1值
字段2键名长度 字段2键名 字段2值长度 字段2值
字段3键名长度 字段3键名 字段3值长度 字段3值
… … … …
长度为整型,占4个字节
代码中用两个Vo对象来表示:XLRequest和XLResponse。
package org.jboss.netty.example.xlsvr.vo;2

3
import java.util.HashMap;4
import java.util.Map;5

6
/**7
* @author hankchen10
* 2012-2-3 下午02:46:5211
*/12

13

14
/**15
* 响应数据16
*/17

18
/**19
* 通用协议介绍20
* 21
* 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后22
* (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:23
* 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)24
* (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>25
* 数据格式定义:26
* 字段1键名长度 字段1键名 字段1值长度 字段1值27
* 字段2键名长度 字段2键名 字段2值长度 字段2值28
* 字段3键名长度 字段3键名 字段3值长度 字段3值29
* … … … …30
* 长度为整型,占4个字节31
*/32
public class XLResponse {33
private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-134
private byte encrypt;// 加密类型。0表示不加密35
private byte extend1;// 用于扩展协议。暂未定义任何值36
private byte extend2;// 用于扩展协议。暂未定义任何值37
private int sessionid;// 会话ID38
private int result;// 结果码39
private int length;// 数据包长40
41
private Map<String,String> values=new HashMap<String, String>();42
43
private String ip;44
45
public void setValue(String key,String value){46
values.put(key, value);47
}48
49
public String getValue(String key){50
if (key==null) {51
return null;52
}53
return values.get(key);54
}55

56
public byte getEncode() {57
return encode;58
}59

60
public void setEncode(byte encode) {61
this.encode = encode;62
}63

64
public byte getEncrypt() {65
return encrypt;66
}67

68
public void setEncrypt(byte encrypt) {69
this.encrypt = encrypt;70
}71

72
public byte getExtend1() {73
return extend1;74
}75

76
public void setExtend1(byte extend1) {77
this.extend1 = extend1;78
}79

80
public byte getExtend2() {81
return extend2;82
}83

84
public void setExtend2(byte extend2) {85
this.extend2 = extend2;86
}87

88
public int getSessionid() {89
return sessionid;90
}91

92
public void setSessionid(int sessionid) {93
this.sessionid = sessionid;94
}95

96
public int getResult() {97
return result;98
}99

100
public void setResult(int result) {101
this.result = result;102
}103

104
public int getLength() {105
return length;106
}107

108
public void setLength(int length) {109
this.length = length;110
}111

112
public Map<String, String> getValues() {113
return values;114
}115

116
public String getIp() {117
return ip;118
}119

120
public void setIp(String ip) {121
this.ip = ip;122
}123

124
public void setValues(Map<String, String> values) {125
this.values = values;126
}127

128
@Override129
public String toString() {130
return "XLResponse [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2131
+ ", sessionid=" + sessionid + ", result=" + result + ", length=" + length + ", values=" + values + ", ip=" + ip + "]";132
}133
}
package org.jboss.netty.example.xlsvr.vo;2

3
import java.util.HashMap;4
import java.util.Map;5

6
/**7
* @author hankchen8
* 2012-2-3 下午02:46:419
*/10

11
/**12
* 请求数据13
*/14

15
/**16
* 通用协议介绍17
* 18
* 通用报文格式:无论是请求还是响应,报文都由一个通用报文头和实际数据组成。报文头在前,数据在后19
* (1)报文头:由数据解析类型,数据解析方法,编码,扩展字节,包长度组成,共16个字节:20
* 编码方式(1byte)、加密(1byte)、扩展1(1byte)、扩展2(1byte)、会话ID(4byte)、命令或者结果码(4byte)、包长(4byte)21
* (2)数据:由包长指定。请求或回复数据。类型对应为JAVA的Map<String,String>22
* 数据格式定义:23
* 字段1键名长度 字段1键名 字段1值长度 字段1值24
* 字段2键名长度 字段2键名 字段2值长度 字段2值25
* 字段3键名长度 字段3键名 字段3值长度 字段3值26
* … … … …27
* 长度为整型,占4个字节28
*/29
public class XLRequest {30
private byte encode;// 数据编码格式。已定义:0:UTF-8,1:GBK,2:GB2312,3:ISO8859-131
private byte encrypt;// 加密类型。0表示不加密32
private byte extend1;// 用于扩展协议。暂未定义任何值33
private byte extend2;// 用于扩展协议。暂未定义任何值34
private int sessionid;// 会话ID35
private int command;// 命令36
private int length;// 数据包长37
38
private Map<String,String> params=new HashMap<String, String>(); //参数39
40
private String ip;41

42
public byte getEncode() {43
return encode;44
}45

46
public void setEncode(byte encode) {47
this.encode = encode;48
}49

50
public byte getEncrypt() {51
return encrypt;52
}53

54
public void setEncrypt(byte encrypt) {55
this.encrypt = encrypt;56
}57

58
public byte getExtend1() {59
return extend1;60
}61

62
public void setExtend1(byte extend1) {63
this.extend1 = extend1;64
}65

66
public byte getExtend2() {67
return extend2;68
}69

70
public void setExtend2(byte extend2) {71
this.extend2 = extend2;72
}73

74
public int getSessionid() {75
return sessionid;76
}77

78
public void setSessionid(int sessionid) {79
this.sessionid = sessionid;80
}81

82
public int getCommand() {83
return command;84
}85

86
public void setCommand(int command) {87
this.command = command;88
}89

90
public int getLength() {91
return length;92
}93

94
public void setLength(int length) {95
this.length = length;96
}97

98
public Map<String, String> getParams() {99
return params;100
}101
102
public void setValue(String key,String value){103
params.put(key, value);104
}105
106
public String getValue(String key){107
if (key==null) {108
return null;109
}110
return params.get(key);111
}112

113
public String getIp() {114
return ip;115
}116

117
public void setIp(String ip) {118
this.ip = ip;119
}120

121
public void setParams(Map<String, String> params) {122
this.params = params;123
}124

125
@Override126
public String toString() {127
return "XLRequest [encode=" + encode + ", encrypt=" + encrypt + ", extend1=" + extend1 + ", extend2=" + extend2128
+ ", sessionid=" + sessionid + ", command=" + command + ", length=" + length + ", params=" + params + ", ip=" + ip + "]";129
}130
}131

二、协议的编码和解码
对于自定义二进制协议,编码解码器往往是Netty开发的重点。这里直接给出相关类的代码。
package org.jboss.netty.example.xlsvr.codec;2

3
import java.nio.ByteBuffer;4

5
import org.jboss.netty.buffer.ChannelBuffer;6
import org.jboss.netty.buffer.ChannelBuffers;7
import org.jboss.netty.channel.ChannelHandlerContext;8
import org.jboss.netty.channel.Channels;9
import org.jboss.netty.channel.MessageEvent;10
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;11
import org.jboss.netty.example.xlsvr.util.ProtocolUtil;12
import org.jboss.netty.example.xlsvr.vo.XLResponse;13
import org.slf4j.Logger;14
import org.slf4j.LoggerFactory;15

16
/**17
* @author hankchen18
* 2012-2-3 上午10:48:1519
*/20

21
/**22
* 服务器端编码器23
*/24
public class XLServerEncoder extends SimpleChannelDownstreamHandler {25
Logger logger=LoggerFactory.getLogger(XLServerEncoder.class);26
27
@Override28
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {29
XLResponse response=(XLResponse)e.getMessage();30
ByteBuffer headBuffer=ByteBuffer.allocate(16);31
/**32
* 先组织报文头33
*/34
headBuffer.put(response.getEncode());35
headBuffer.put(response.getEncrypt());36
headBuffer.put(response.getExtend1());37
headBuffer.put(response.getExtend2());38
headBuffer.putInt(response.getSessionid());39
headBuffer.putInt(response.getResult());40
41
/**42
* 组织报文的数据部分43
*/44
ChannelBuffer dataBuffer=ProtocolUtil.encode(response.getEncode(),response.getValues()); 45
int length=dataBuffer.readableBytes();46
headBuffer.putInt(length);47
/**48
* 非常重要49
* ByteBuffer需要手动flip(),ChannelBuffer不需要50
*/51
headBuffer.flip();52
ChannelBuffer totalBuffer=ChannelBuffers.dynamicBuffer();53
totalBuffer.writeBytes(headBuffer);54
logger.info("totalBuffer size="+totalBuffer.readableBytes());55
totalBuffer.writeBytes(dataBuffer);56
logger.info("totalBuffer size="+totalBuffer.readableBytes());57
Channels.write(ctx, e.getFuture(), totalBuffer);58
}59

60
}61

package org.jboss.netty.example.xlsvr.codec;2

3
import org.jboss.netty.buffer.ChannelBuffer;4
import org.jboss.netty.buffer.ChannelBuffers;5
import org.jboss.netty.channel.Channel;6
import org.jboss.netty.channel.ChannelHandlerContext;7
import org.jboss.netty.example.xlsvr.util.ProtocolUtil;8
import org.jboss.netty.example.xlsvr.vo.XLResponse;9
import org.jboss.netty.handler.codec.frame.FrameDecoder;10

11
/**12
* @author hankchen13
* 2012-2-3 上午10:47:5414
*/15

16
/**17
* 客户端解码器18
*/19
public class XLClientDecoder extends FrameDecoder {20

21
@Override22
protected Object decode(ChannelHandlerContext context, Channel channel, ChannelBuffer buffer) throws Exception {23
if (buffer.readableBytes()<16) {24
return null;25
}26
buffer.markReaderIndex();27
byte encode=buffer.readByte();28
byte encrypt=buffer.readByte();29
byte extend1=buffer.readByte();30
byte extend2=buffer.readByte();31
int sessionid=buffer.readInt();32
int result=buffer.readInt();33
int length=buffer.readInt(); // 数据包长34
if (buffer.readableBytes()<length) {35
buffer.resetReaderIndex();36
return null;37
}38
ChannelBuffer dataBuffer=ChannelBuffers.buffer(length);39
buffer.readBytes(dataBuffer, length);40
41
XLResponse response=new XLResponse();42
response.setEncode(encode);43
response.setEncrypt(encrypt);44
response.setExtend1(extend1);45
response.setExtend2(extend2);46
response.setSessionid(sessionid);47
response.setResult(result);48
response.setLength(length);49
response.setValues(ProtocolUtil.decode(encode, dataBuffer));50
response.setIp(ProtocolUtil.getClientIp(channel));51
return response;52
}53

54
}
package org.jboss.netty.example.xlsvr.util;2

3
import java.net.SocketAddress;4
import java.nio.charset.Charset;5
import java.util.HashMap;6
import java.util.Map;7
import java.util.Map.Entry;8

9
import org.jboss.netty.buffer.ChannelBuffer;10
import org.jboss.netty.buffer.ChannelBuffers;11
import org.jboss.netty.channel.Channel;12

13
/**14
* @author hankchen15
* 2012-2-4 下午01:57:3316
*/17
public class ProtocolUtil {18
19
/**20
* 编码报文的数据部分21
* @param encode22
* @param values23
* @return24
*/25
public static ChannelBuffer encode(int encode,Map<String,String> values){26
ChannelBuffer totalBuffer=null;27
if (values!=null && values.size()>0) {28
totalBuffer=ChannelBuffers.dynamicBuffer();29
int length=0,index=0;30
ChannelBuffer [] channelBuffers=new ChannelBuffer[values.size()];31
Charset charset=XLCharSetFactory.getCharset(encode);32
for(Entry<String,String> entry:values.entrySet()){33
String key=entry.getKey();34
String value=entry.getValue();35
ChannelBuffer buffer=ChannelBuffers.dynamicBuffer();36
buffer.writeInt(key.length());37
buffer.writeBytes(key.getBytes(charset));38
buffer.writeInt(value.length());39
buffer.writeBytes(value.getBytes(charset));40
channelBuffers[index++]=buffer;41
length+=buffer.readableBytes();42
}43
44
for (int i = 0; i < channelBuffers.length; i++) {45
totalBuffer.writeBytes(channelBuffers[i]);46
}47
}48
return totalBuffer;49
}50
51
/**52
* 解码报文的数据部分53
* @param encode54
* @param dataBuffer55
* @return56
*/57
public static Map<String,String> decode(int encode,ChannelBuffer dataBuffer){58
Map<String,String> dataMap=new HashMap<String, String>();59
if (dataBuffer!=null && dataBuffer.readableBytes()>0) {60
int processIndex=0,length=dataBuffer.readableBytes();61
Charset charset=XLCharSetFactory.getCharset(encode);62
while(processIndex<length){63
/**64
* 获取Key65
*/66
int size=dataBuffer.readInt();67
byte [] contents=new byte [size];68
dataBuffer.readBytes(contents);69
String key=new String(contents, charset);70
processIndex=processIndex+size+4;71
/**72
* 获取Value73
*/74
size=dataBuffer.readInt();75
contents=new byte [size];76
dataBuffer.readBytes(contents);77
String value=new String(contents, charset);78
dataMap.put(key, value);79
processIndex=processIndex+size+4;80
}81
}82
return dataMap;83
}84
85
/**86
* 获取客户端IP87
* @param channel88
* @return89
*/90
public static String getClientIp(Channel channel){91
/**92
* 获取客户端IP93
*/94
SocketAddress address = channel.getRemoteAddress();95
String ip = "";96
if (address != null) {97
ip = address.toString().trim();98
int index = ip.lastIndexOf(':');99
if (index < 1) {100
index = ip.length();101
}102
ip = ip.substring(1, index);103
}104
if (ip.length() > 15) {105
ip = ip.substring(Math.max(ip.indexOf("/") + 1, ip.length() - 15));106
}107
return ip;108
}109
}110

三、服务器端实现
服务器端提供的功能是:
1、接收客户端的请求(非关闭命令),返回XLResponse类型的数据。
2、如果客户端的请求是关闭命令:shutdown,则服务器端关闭自身进程。
为了展示多协议的运用,这里客户端的请求采用的是基于问本行( )的协议。
具体代码如下:
package org.jboss.netty.example.xlsvr;2

3
import java.net.InetSocketAddress;4
import java.util.concurrent.Executors;5

6
import org.jboss.netty.bootstrap.ServerBootstrap;7
import org.jboss.netty.channel.Channel;8
import org.jboss.netty.channel.ChannelPipeline;9
import org.jboss.netty.channel.group.ChannelGroup;10
import org.jboss.netty.channel.group.ChannelGroupFuture;11
import org.jboss.netty.channel.group.DefaultChannelGroup;12
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;13
import org.jboss.netty.example.xlsvr.codec.XLServerEncoder;14
import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;15
import org.jboss.netty.handler.codec.frame.Delimiters;16
import org.jboss.netty.handler.codec.string.StringDecoder;17
import org.jboss.netty.util.CharsetUtil;18
import org.slf4j.Logger;19
import org.slf4j.LoggerFactory;20

21
/**22
* @author hankchen23
* 2012-1-30 下午03:21:3824
*/25

26
public class XLServer {27
public static final int port =8080;28
public static final Logger logger=LoggerFactory.getLogger(XLServer.class);29
public static final ChannelGroup allChannels=new DefaultChannelGroup("XLServer");30
private static final ServerBootstrap serverBootstrap=new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));31
32
public static void main(String [] args){33
try {34
XLServer.startup();35
} catch (Exception e) {36
e.printStackTrace();37
}38
}39
40
public static boolean startup() throws Exception{41
/**42
* 采用默认ChannelPipeline管道43
* 这意味着同一个XLServerHandler实例将被多个Channel通道共享44
* 这种方式对于XLServerHandler中无有状态的成员变量是可以的,并且可以提高性能!45
*/46
ChannelPipeline pipeline=serverBootstrap.getPipeline(); 47
/**48
* 解码器是基于文本行的协议,
或者
49
*/50
pipeline.addLast("frameDecoder", new DelimiterBasedFrameDecoder(80, Delimiters.lineDelimiter()));51
pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));52
pipeline.addLast("encoder", new XLServerEncoder());53
pipeline.addLast("handler", new XLServerHandler());54
55
serverBootstrap.setOption("child.tcpNoDelay", true); //注意child前缀56
serverBootstrap.setOption("child.keepAlive", true); //注意child前缀57
58
/**59
* ServerBootstrap对象的bind方法返回了一个绑定了本地地址的服务端Channel通道对象60
*/61
Channel channel=serverBootstrap.bind(new InetSocketAddress(port));62
allChannels.add(channel);63
logger.info("server is started on port "+port);64
return false;65
}66
67
public static void shutdown() throws Exception{68
try {69
/**70
* 主动关闭服务器71
*/72
ChannelGroupFuture future=allChannels.close();73
future.awaitUninterruptibly();//阻塞,直到服务器关闭74
//serverBootstrap.releaseExternalResources();75
} catch (Exception e) {76
e.printStackTrace();77
logger.error(e.getMessage(),e);78
}79
finally{80
logger.info("server is shutdown on port "+port);81
System.exit(1);82
}83
}84
}85

package org.jboss.netty.example.xlsvr;2

3
import java.util.Random;4

5
import org.jboss.netty.channel.Channel;6
import org.jboss.netty.channel.ChannelFuture;7
import org.jboss.netty.channel.ChannelHandlerContext;8
import org.jboss.netty.channel.ChannelHandler.Sharable;9
import org.jboss.netty.channel.ChannelStateEvent;10
import org.jboss.netty.channel.ExceptionEvent;11
import org.jboss.netty.channel.MessageEvent;12
import org.jboss.netty.channel.SimpleChannelHandler;13
import org.jboss.netty.example.xlsvr.vo.XLResponse;14
import org.slf4j.Logger;15
import org.slf4j.LoggerFactory;16

17
/**18
* @author hankchen19
* 2012-1-30 下午03:22:2420
*/21

22
@Sharable23
public class XLServerHandler extends SimpleChannelHandler {24
private static final Logger logger=LoggerFactory.getLogger(XLServerHandler.class);25
26
@Override27
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {28
logger.info("messageReceived");29
if (e.getMessage() instanceof String) {30
String content=(String)e.getMessage();31
logger.info("content is "+content);32
if ("shutdown".equalsIgnoreCase(content)) {33
//e.getChannel().close();34
XLServer.shutdown();35
}else {36
sendResponse(ctx);37
}38
}else {39
logger.error("message is not a String.");40
e.getChannel().close();41
}42
}43

44
@Override45
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {46
logger.error(e.getCause().getMessage(),e.getCause());47
e.getCause().printStackTrace();48
e.getChannel().close();49
}50

51
@Override52
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {53
logger.info("channelConnected");54
sendResponse(ctx);55
}56

57
@Override58
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {59
logger.info("channelClosed");60
//删除通道61
XLServer.allChannels.remove(e.getChannel());62
}63

64
@Override65
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {66
logger.info("channelDisconnected");67
super.channelDisconnected(ctx, e);68
}69

70
@Override71
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {72
logger.info("channelOpen");73
//增加通道74
XLServer.allChannels.add(e.getChannel());75
}76

77
/**78
* 发送响应内容79
* @param ctx80
* @param e81
* @return82
*/83
private ChannelFuture sendResponse(ChannelHandlerContext ctx){84
Channel channel=ctx.getChannel();85
Random random=new Random();86
XLResponse response=new XLResponse();87
response.setEncode((byte)0);88
response.setResult(1);89
response.setValue("name","hankchen");90
response.setValue("time", String.valueOf(System.currentTimeMillis()));91
response.setValue("age",String.valueOf(random.nextInt()));92
/**93
* 发送接收信息的时间戳到客户端94
* 注意:Netty中所有的IO操作都是异步的!95
*/96
ChannelFuture future=channel.write(response); //发送内容97
return future;98
}99
}100

四、客户端实现
客户端的功能是连接服务器,发送10次请求,然后发送关闭服务器的命令,最后主动关闭客户端。
关键代码如下:
/**2
* Copyright (C): 20123
* @author hankchen4
* 2012-1-30 下午03:21:265
*/6

7
/**8
* 服务器特征:9
* 1、使用专用解码器解析服务器发过来的数据10
* 2、客户端主动关闭连接11
*/12
public class XLClient {13
public static final int port =XLServer.port;14
public static final String host ="localhost";15
private static final Logger logger=LoggerFactory.getLogger(XLClient.class);16
private static final NioClientSocketChannelFactory clientSocketChannelFactory=new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool());17
private static final ClientBootstrap clientBootstrap=new ClientBootstrap(clientSocketChannelFactory);18
19
/**20
* @param args21
* @throws Exception 22
*/23
public static void main(String[] args) throws Exception {24
ChannelFuture future=XLClient.startup();25
logger.info("future state is "+future.isSuccess());26
}27
28
/**29
* 启动客户端30
* @return31
* @throws Exception32
*/33
public static ChannelFuture startup() throws Exception {34
/**35
* 注意:由于XLClientHandler中有状态的成员变量,因此不能采用默认共享ChannelPipeline的方式36
* 例如,下面的代码形式是错误的:37
* ChannelPipeline pipeline=clientBootstrap.getPipeline();38
* pipeline.addLast("handler", new XLClientHandler());39
*/40
clientBootstrap.setPipelineFactory(new XLClientPipelineFactory()); //只能这样设置41
/**42
* 请注意,这里不存在使用“child.”前缀的配置项,客户端的SocketChannel实例不存在父级Channel对象43
*/44
clientBootstrap.setOption("tcpNoDelay", true);45
clientBootstrap.setOption("keepAlive", true);46
47
ChannelFuture future=clientBootstrap.connect(new InetSocketAddress(host, port));48
/**49
* 阻塞式的等待,直到ChannelFuture对象返回这个连接操作的成功或失败状态50
*/51
future.awaitUninterruptibly();52
/**53
* 如果连接失败,我们将打印连接失败的原因。54
* 如果连接操作没有成功或者被取消,ChannelFuture对象的getCause()方法将返回连接失败的原因。55
*/56
if (!future.isSuccess()) {57
future.getCause().printStackTrace();58
}else {59
logger.info("client is connected to server "+host+":"+port);60
}61
return future;62
}63
64
/**65
* 关闭客户端66
* @param future67
* @throws Exception68
*/69
public static void shutdown(ChannelFuture future) throws Exception{70
try {71
/**72
* 主动关闭客户端连接,会阻塞等待直到通道关闭73
*/74
future.getChannel().close().awaitUninterruptibly();75
//future.getChannel().getCloseFuture().awaitUninterruptibly();76
/**77
* 释放ChannelFactory通道工厂使用的资源。78
* 这一步仅需要调用 releaseExternalResources()方法即可。79
* 包括NIO Secector和线程池在内的所有资源将被自动的关闭和终止。80
*/81
clientBootstrap.releaseExternalResources();82
} catch (Exception e) {83
e.printStackTrace();84
logger.error(e.getMessage(),e);85
}86
finally{87
System.exit(1);88
logger.info("client is shutdown to server "+host+":"+port);89
}90
}91
}
public class XLClientPipelineFactory implements ChannelPipelineFactory{2

3
@Override4
public ChannelPipeline getPipeline() throws Exception {5
ChannelPipeline pipeline=Channels.pipeline();6
/**7
* 使用专用的解码器,解决数据分段的问题8
* 从业务逻辑代码中分离协议处理部分总是一个很不错的想法。9
*/10
pipeline.addLast("decoder", new XLClientDecoder());11
/**12
* 有专门的编码解码器,这时处理器就不需要管数据分段和数据格式问题,只需要关注业务逻辑了!13
*/14
pipeline.addLast("handler", new XLClientHandler());15
return pipeline;16
}17

18
}
/**2
* Copyright (C): 20123
* @author hankchen4
* 2012-1-30 下午03:21:525
*/6

7
/**8
* 服务器特征:9
* 1、使用专用的编码解码器,解决数据分段的问题10
* 2、使用POJO替代ChannelBuffer传输11
*/12
public class XLClientHandler extends SimpleChannelHandler {13
private static final Logger logger=LoggerFactory.getLogger(XLClientHandler.class);14
private final AtomicInteger count=new AtomicInteger(0); //计数器15
16
@Override17
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {18
processMethod1(ctx, e); //处理方式一19
}20
21
/**22
* @param ctx23
* @param e24
* @throws Exception25
*/26
public void processMethod1(ChannelHandlerContext ctx, MessageEvent e) throws Exception{27
logger.info("processMethod1……,count="+count.addAndGet(1));28
XLResponse serverTime=(XLResponse)e.getMessage();29
logger.info("messageReceived,content:"+serverTime.toString());30
Thread.sleep(1000);31
32
if (count.get()<10) {33
//从新发送请求获取最新的服务器时间34
ctx.getChannel().write(ChannelBuffers.wrappedBuffer("again
".getBytes()));35
}else{36
//从新发送请求关闭服务器37
ctx.getChannel().write(ChannelBuffers.wrappedBuffer("shutdown
".getBytes()));38
}39
}40
41
@Override42
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {43
logger.info("exceptionCaught");44
e.getCause().printStackTrace();45
ctx.getChannel().close();46
}47

48
@Override49
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {50
logger.info("channelClosed");51
super.channelClosed(ctx, e);52
}53
54
55
}全文代码较多,写了很多注释,希望对读者有用,谢谢!
(友情提示:本博文章欢迎转载,但请注明出处:hankchen,http://www.blogjava.net/hankchen)
http://www.blogjava.net/hankchen/archive/2012/02/04/369378.html