zoukankan      html  css  js  c++  java
  • Netty实现服务端客户端长连接通讯及心跳检测

    通过netty实现服务端与客户端的长连接通讯,及心跳检测。

           基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent 事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

            环境JDK1.8 和netty5

            以下是具体的代码实现和介绍:

    1公共的Share部分(主要包含消息协议类型的定义)

         设计消息类型:

    public enum  MsgType {
        PING,ASK,REPLY,LOGIN
    }

    Message基类:

    //必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
    public abstract class BaseMsg  implements Serializable {
        private static final long serialVersionUID = 1L;
        private MsgType type;
        //必须唯一,否者会出现channel调用混乱
        private String clientId;
     
        //初始化客户端id
        public BaseMsg() {
            this.clientId = Constants.getClientId();
        }
     
        public String getClientId() {
            return clientId;
        }
     
        public void setClientId(String clientId) {
            this.clientId = clientId;
        }
     
        public MsgType getType() {
            return type;
        }
     
        public void setType(MsgType type) {
            this.type = type;
        }
    }

    常量设置:

    public class Constants {
        private static String clientId;
        public static String getClientId() {
            return clientId;
        }
        public static void setClientId(String clientId) {
            Constants.clientId = clientId;
        }
    }
    登录类型消息:
    public class LoginMsg extends BaseMsg {
        private String userName;
        private String password;
        public LoginMsg() {
            super();
            setType(MsgType.LOGIN);
        }
     
        public String getUserName() {
            return userName;
        }
     
        public void setUserName(String userName) {
            this.userName = userName;
        }
     
        public String getPassword() {
            return password;
        }
     
        public void setPassword(String password) {
            this.password = password;
        }
    }

    心跳检测Ping类型消息:

    public class PingMsg extends BaseMsg {
        public PingMsg() {
            super();
            setType(MsgType.PING);
        }
    }

    请求类型消息:

    public class AskMsg extends BaseMsg {
        public AskMsg() {
            super();
            setType(MsgType.ASK);
        }
        private AskParams params;
     
        public AskParams getParams() {
            return params;
        }
     
        public void setParams(AskParams params) {
            this.params = params;
        }
    }
    //请求类型参数
    //必须实现序列化接口
    public class AskParams implements Serializable {
        private static final long serialVersionUID = 1L;
        private String auth;
     
        public String getAuth() {
            return auth;
        }
     
        public void setAuth(String auth) {
            this.auth = auth;
        }
    }

    响应类型消息:

    public class ReplyMsg extends BaseMsg {
        public ReplyMsg() {
            super();
            setType(MsgType.REPLY);
        }
        private ReplyBody body;
     
        public ReplyBody getBody() {
            return body;
        }
     
        public void setBody(ReplyBody body) {
            this.body = body;
        }
    }
    //相应类型body对像
    public class ReplyBody implements Serializable {
        private static final long serialVersionUID = 1L;
    }
    public class ReplyClientBody extends ReplyBody {
        private String clientInfo;
     
        public ReplyClientBody(String clientInfo) {
            this.clientInfo = clientInfo;
        }
     
        public String getClientInfo() {
            return clientInfo;
        }
     
        public void setClientInfo(String clientInfo) {
            this.clientInfo = clientInfo;
        }
    }
    public class ReplyServerBody extends ReplyBody {
        private String serverInfo;
        public ReplyServerBody(String serverInfo) {
            this.serverInfo = serverInfo;
        }
        public String getServerInfo() {
            return serverInfo;
        }
        public void setServerInfo(String serverInfo) {
            this.serverInfo = serverInfo;
        }
    }

    2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

    Map:

    public class NettyChannelMap {
        private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
        public static void add(String clientId,SocketChannel socketChannel){
            map.put(clientId,socketChannel);
        }
        public static Channel get(String clientId){
           return map.get(clientId);
        }
        public static void remove(SocketChannel socketChannel){
            for (Map.Entry entry:map.entrySet()){
                if (entry.getValue()==socketChannel){
                    map.remove(entry.getKey());
                }
            }
        }
     
    }

    Handler

    public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            //channel失效,从Map中移除
            NettyChannelMap.remove((SocketChannel)ctx.channel());
        }
        @Override
        protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
     
            if(MsgType.LOGIN.equals(baseMsg.getType())){
                LoginMsg loginMsg=(LoginMsg)baseMsg;
                if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
                    //登录成功,把channel存到服务端的map中
                    NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
                    System.out.println("client"+loginMsg.getClientId()+" 登录成功");
                }
            }else{
                if(NettyChannelMap.get(baseMsg.getClientId())==null){
                        //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
                        LoginMsg loginMsg=new LoginMsg();
                        channelHandlerContext.channel().writeAndFlush(loginMsg);
                }
            }
            switch (baseMsg.getType()){
                case PING:{
                    PingMsg pingMsg=(PingMsg)baseMsg;
                    PingMsg replyPing=new PingMsg();
                    NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
                }break;
                case ASK:{
                    //收到客户端的请求
                    AskMsg askMsg=(AskMsg)baseMsg;
                    if("authToken".equals(askMsg.getParams().getAuth())){
                        ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
                        ReplyMsg replyMsg=new ReplyMsg();
                        replyMsg.setBody(replyBody);
                        NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
                    }
                }break;
                case REPLY:{
                    //收到客户端回复
                    ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                    ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
                    System.out.println("receive client msg: "+clientBody.getClientInfo());
                }break;
                default:break;
            }
            ReferenceCountUtil.release(baseMsg);
        }
    }

    ServerBootstrap:

    public class NettyServerBootstrap {
        private int port;
        private SocketChannel socketChannel;
        public NettyServerBootstrap(int port) throws InterruptedException {
            this.port = port;
            bind();
        }
     
        private void bind() throws InterruptedException {
            EventLoopGroup boss=new NioEventLoopGroup();
            EventLoopGroup worker=new NioEventLoopGroup();
            ServerBootstrap bootstrap=new ServerBootstrap();
            bootstrap.group(boss,worker);
            bootstrap.channel(NioServerSocketChannel.class);
            bootstrap.option(ChannelOption.SO_BACKLOG, 128);
            //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
            bootstrap.option(ChannelOption.TCP_NODELAY, true);
            //保持长连接状态
            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline p = socketChannel.pipeline();
                    p.addLast(new ObjectEncoder());
                    p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                    p.addLast(new NettyServerHandler());
                }
            });
            ChannelFuture f= bootstrap.bind(port).sync();
            if(f.isSuccess()){
                System.out.println("server start---------------");
            }
        }
        public static void main(String []args) throws InterruptedException {
            NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);
            while (true){
                SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
                if(channel!=null){
                    AskMsg askMsg=new AskMsg();
                    channel.writeAndFlush(askMsg);
                }
                TimeUnit.SECONDS.sleep(5);
            }
        }
    }

    3 Client端:包含发起登录,发送心跳,及对应消息处理

    handler

    public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {
        //利用写空闲发送心跳检测消息
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent) evt;
                switch (e.state()) {
                    case WRITER_IDLE:
                        PingMsg pingMsg=new PingMsg();
                        ctx.writeAndFlush(pingMsg);
                        System.out.println("send ping to server----------");
                        break;
                    default:
                        break;
                }
            }
        }
        @Override
        protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
            MsgType msgType=baseMsg.getType();
            switch (msgType){
                case LOGIN:{
                    //向服务器发起登录
                    LoginMsg loginMsg=new LoginMsg();
                    loginMsg.setPassword("yao");
                    loginMsg.setUserName("robin");
                    channelHandlerContext.writeAndFlush(loginMsg);
                }break;
                case PING:{
                    System.out.println("receive ping from server----------");
                }break;
                case ASK:{
                    ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");
                    ReplyMsg replyMsg=new ReplyMsg();
                    replyMsg.setBody(replyClientBody);
                    channelHandlerContext.writeAndFlush(replyMsg);
                }break;
                case REPLY:{
                    ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                    ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
                    System.out.println("receive client msg: "+replyServerBody.getServerInfo());
                }
                default:break;
            }
            ReferenceCountUtil.release(msgType);
        }
    }

    bootstrap

    public class NettyClientBootstrap {
        private int port;
        private String host;
        private SocketChannel socketChannel;
        private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
        public NettyClientBootstrap(int port, String host) throws InterruptedException {
            this.port = port;
            this.host = host;
            start();
        }
        private void start() throws InterruptedException {
            EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
            bootstrap.group(eventLoopGroup);
            bootstrap.remoteAddress(host,port);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
                    socketChannel.pipeline().addLast(new ObjectEncoder());
                    socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                    socketChannel.pipeline().addLast(new NettyClientHandler());
                }
            });
            ChannelFuture future =bootstrap.connect(host,port).sync();
            if (future.isSuccess()) {
                socketChannel = (SocketChannel)future.channel();
                System.out.println("connect server  成功---------");
            }
        }
        public static void main(String[]args) throws InterruptedException {
            Constants.setClientId("001");
            NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");
     
            LoginMsg loginMsg=new LoginMsg();
            loginMsg.setPassword("yao");
            loginMsg.setUserName("robin");
            bootstrap.socketChannel.writeAndFlush(loginMsg);
            while (true){
                TimeUnit.SECONDS.sleep(3);
                AskMsg askMsg=new AskMsg();
                AskParams askParams=new AskParams();
                askParams.setAuth("authToken");
                askMsg.setParams(askParams);
                bootstrap.socketChannel.writeAndFlush(askMsg);
            }
        }
    }

    具体的例子和相应pom.xml 见 https://github.com/WangErXiao/ServerClient

    来源: http://my.oschina.net/robinyao/blog/399060

  • 相关阅读:
    调试脚本的技巧与实际应用
    mysqlconnector将EXCEL表数据导入数据库
    第四十三节,文件、文件夹、压缩包、处理模块shutil
    第四十二节,configparser特定格式的ini配置文件模块
    第四十一节,xml处理模块
    第四十节,requests模拟浏览器请求模块初识
    第三十九节,python内置全局变量
    第三十八节,字符串格式化
    第三十七节,hashlib加密模块
    第三十六节,os系统级别操作模块
  • 原文地址:https://www.cnblogs.com/Tuzki/p/4423612.html
Copyright © 2011-2022 走看看