前言:在实现过程查找过许多资料,各种波折,最后综合多篇文章最终实现并上线使用。为了减少大家踩坑的时间,所以写了本文,希望有用。对于实现过程中有用的参考资料直接放上链接,可能有些内容相对冗余,不过时间允许多看看也并不无益。
入门文章:
http://www.tuicool.com/articles/mEJvYb
netty官网:
(官网的user guide相对一般,javadoc倒是要看的)
需求场景:
实现用户的在线离线状态实时展现(我们的客户端是android)。
技术选型:
在线好办,关键是要监测到什么时候离线,于是我们选择了心跳模型,当心跳失效时即为离线。如果用http发送心跳包虽然简单但是极度不科学,耗电量太大,所以直接否决。我们选择基于TCP实现长连接,而借助一些第三方插件可以更好更快地实现长连接,于是在mina和netty之间我们选择了netty。(理由仅仅是在百度知道里边看到别人说netty使用的更广泛,没有深入对比过)
相关版本:
netty5.0
jdk1.7
tomcat6.0
基础流程图如下:
服务端代码:
HeartBeatServer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
public class HeartBeatServer { // 端口 private int port ; public HeartBeatServer( int port) { this .port = port; } ChannelFuture f ; ServerBootstrap b ; // 检测chanel是否接受过心跳数据时间间隔(单位秒) private static final int READ_WAIT_SECONDS = 10 ; public void startServer() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel. class ); b.childHandler( new HeartBeatServerInitializer()); // 服务器绑定端口监听 f = b.bind(port).sync(); // 监听服务器关闭监听,此方法会阻塞 f.channel().closeFuture().sync(); // 可以简写为 /* b.bind(portNumber).sync().channel().closeFuture().sync(); */ } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * 消息处理器 * @author cullen edward */ private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); /* * 使用ObjectDecoder和ObjectEncoder * 因为双向都有写数据和读数据,所以这里需要两个都设置 * 如果只读,那么只需要ObjectDecoder即可 */ pipeline.addLast( "decoder" , new StringDecoder()); pipeline.addLast( "encoder" , new StringEncoder()); /* * 这里只监听读操作 * 可以根据需求,监听写操作和总得操作 */ pipeline.addLast( "pong" , new IdleStateHandler(READ_WAIT_SECONDS, 0 , 0 ,TimeUnit.SECONDS)); //pipeline.addLast("handler", new Heartbeat()); pipeline.addLast( "handler" , new HeartbeatHandler()); } } public void stopServer(){ if (f!= null ){ f.channel().close(); } } /** * @param args */ public static void main(String[] args) { HeartbeatServer heartbeatServer = new HeartbeatServer( 9597 ); heartbeatServer.startServer(); } } |
HeartbeatHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
public class HeartbeatHandler extends SimpleChannelInboundHandler<String> { // 失败计数器:未收到client端发送的ping请求 private int unRecPingTimes = 0 ; private String userid; // 定义服务端没有收到心跳消息的最大次数 private static final int MAX_UN_REC_PING_TIMES = 3 ; @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println( "----->msg=" + msg); //msg格式约定为"operation,userid" String[] args = msg.split( "," ); String msg_operation = args[ 0 ]; String msg_userid = args[ 1 ]; if ( "LOGIN" .equals(msg_operation)){ if (!Utils.isBlank(msg_userid)){ userid = msg_userid; } setUserOnlineStatus(userid, true ); } else if ( "HEARTBEAT" .equals(msg_operation)){ ctx.channel().writeAndFlush( "success" ); // 失败计数器清零 unRecPingTimes = 0 ; } } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { /*读超时*/ System.out.println( "===服务端===(READER_IDLE 读超时)" ); // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 if (unRecPingTimes >= MAX_UN_REC_PING_TIMES){ System.out.println( "===服务端===(读超时,关闭chanel)" ); // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 ctx.channel().close(); } else { // 失败计数器加1 unRecPingTimes++; } } else if (event.state() == IdleState.WRITER_IDLE) { /*写超时*/ System.out.println( "===服务端===(WRITER_IDLE 写超时)" ); } else if (event.state() == IdleState.ALL_IDLE) { /*总超时*/ System.out.println( "===服务端===(ALL_IDLE 总超时)" ); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println( "错误原因:" +cause.getMessage()); ctx.channel().close(); setUserOnlineStatus(userid, false ); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println( "Client active " ); super .channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 关闭,等待重连 ctx.close(); System.out.println( "===服务端===(客户端失效)" ); setUserOnlineStatus(userid, false ); } //设置用户在线离线状态 private void setUserOnlineStatus(String userid, boolean isOnline){ if (!Utils.isBlank(userid)){ //更新用户信息为在线状态(此处代码省略) } } } |
简易的测试客户端代码:
SimpleClient.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public class SimpleClient { public static void main(String[] args) throws Exception { new SimpleClient( "127.0.0.1" , 9597 ).run(); } private final String host; private final int port; public SimpleClient(String host, int port) { this .host = host; this .port = port; } public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap().group(group).channel( NioSocketChannel. class ).handler( new SimpleClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader( new InputStreamReader( System.in)); while ( true ) { channel.writeAndFlush(in.readLine()); } } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } } |
SimpleClientInitializer.java
1
2
3
4
5
6
7
8
9
|
public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast( "framer" , new DelimiterBasedFrameDecoder( 8192 , Delimiters.lineDelimiter())); pipeline.addLast( "decoder" , new StringDecoder()); pipeline.addLast( "encoder" , new StringEncoder()); } } |
备注:代码大部分是从其他网站复制修改调整的,写得相对简易一点,其中还有很多安全性、合理性有待优化。
代码参考文章:
更多相关文章: