zoukankan      html  css  js  c++  java
  • Netty实现简单私有协议

    本文参考《Netty权威指南》

    私有协议实现的功能:

    1、基于Netty的NIO通信框架,提供高性能异步通信能力

    2、提供消息的编码解码框架,实现POJO的序列化和反序列化

    3、提供基于IP地址的白名单接入认证机制

    4、链路有效性校验机制

    5、断路重连机制

    协议模型:

    消息定义:

    NettyMessage

    名称 类型 长度 描述
    header Header 变长 消息头定义
    body Object 变长

    请求消息:参数

    响应消息:返回值

    Header

    名称 类型 长度 描述
    crcCode int 32

    Netty消息校验码(三部分)

    1、0xABEF:固定值,表明消息是Netty协议消息,2字节

    2、主版本号:1~255,1字节

    3、次版本号:1~255,1字节

    crcCode=0xABEF+主版本号+次版本号

    length int 32 消息长度,包括消息头,消息体
    sessionID long 64 集群节点全局唯一,由会话生成器生成
    type Byte 8

    0:业务请求消息

    1:业务响应消息

    2:业务ONE-WAY消息(既是请求又是响应)

    3:握手请求消息

    4:握手应答消息

    5:心跳请求消息

    6:心跳应答消息

    priority Byte 8 消息优先级:0~255
    attachment Mep<String,Object>  变长  可选,用于扩展消息头

    参考:http://blog.csdn.net/iter_zc/article/details/39317311

    依赖:

    ├── jboss-marshalling-1.3.0.CR9.jar
    ├── jboss-marshalling-serial-1.3.0.CR9.jar
    └── netty-all-5.0.0.Alpha2.jar
    

    源码:

    ├── Header.java
    ├── HeartBeatReqHandler.java
    ├── HeartBeatRespHandler.java
    ├── LoginAuthReqHandler.java
    ├── LoginAuthRespHandler.java
    ├── MarshallingCodeCFactory.java
    ├── MessageType.java
    ├── NettyClient.java
    ├── NettyConstants.java
    ├── NettyMarshallingDecoder.java
    ├── NettyMarshallingEncoder.java
    ├── NettyMessageDecoder.java
    ├── NettyMessageEncoder.java
    ├── NettyMessage.java
    └── NettyServer.java

    package com.xh.netty.test14;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 消息头
     * Created by root on 1/11/18.
     */
    public final class Header {
        private int crcCode = 0xabef0101;
        private int length;
        private long sessionID;
        private byte type;//消息类型
        private byte priority;//消息优先级
        private Map<String, Object> attachment = new HashMap<String, Object>();//附件
    
    
    ...//set get
    
    
        @Override
        public String toString() {
    ...
        }
    }
    
    package com.xh.netty.test14;
    
    /**
     * 消息
     * Created by root on 1/11/18.
     */
    public final class NettyMessage {
    
        private Header header;
        private Object body;
    
    ...//set get
    
    
        @Override
        public String toString() {
    ...
        }
    }
    
    package com.xh.netty.test14;
    
    /**
     * 消息类型
     * Created by root on 1/12/18.
     */
    public enum MessageType {
    
        //心跳请求,应答
        HEARTBEAT_REQ((byte) 5),
        HEARTBEAT_RESP((byte) 6),
    
        //握手请求,应答
        LOGIN_REQ((byte) 3),
        LOGIN_RESP((byte) 4);
    
        byte value;
    
        MessageType(byte value) {
            this.value = value;
        }
    }
    
    package com.xh.netty.test14;
    
    /**
     * 常量
     * Created by root on 1/12/18.
     */
    public class NettyConstants {
        public static int LOCAL_PORT = 8080;
        public static String LOCAL_IP = "127.0.0.1";
    
        public static int REMOTE_PORT = 80;
        public static String REMOTE_IP = "127.0.0.1";
    }
    
    package com.xh.netty.test14;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageEncoder;
    
    import java.util.List;
    import java.util.Map;
    
    /**
     * 消息编码器
     * Created by root on 1/11/18.
     */
    public final class NettyMessageEncoder extends MessageToMessageEncoder<NettyMessage> {
    
        NettyMarshallingEncoder marshallingEncoder;
    
        /**
         * 这里和书中不一样,可能有问题
         */
        public NettyMessageEncoder() {
            //this.marshallingEncoder = new MarshallingEncoder(new DefaultMarshallerProvider(new SerialMarshallerFactory(), new MarshallingConfiguration()));
            this.marshallingEncoder = MarshallingCodeCFactory.buildMarshallingEncoder();
    
        }
    
    
        protected void encode(ChannelHandlerContext ctx, NettyMessage nettyMessage, List<Object> list) throws Exception {
            if (nettyMessage == null || nettyMessage.getHeader() == null) {
                throw new Exception("the encode message is null");
            }
    
            ByteBuf sendBuf = Unpooled.buffer();
            sendBuf.writeInt(nettyMessage.getHeader().getCrcCode());
            sendBuf.writeInt(nettyMessage.getHeader().getLength());
            sendBuf.writeLong(nettyMessage.getHeader().getSessionID());
            sendBuf.writeByte(nettyMessage.getHeader().getType());
            sendBuf.writeByte(nettyMessage.getHeader().getPriority());
            sendBuf.writeInt(nettyMessage.getHeader().getAttachment().size());
    
            String key = null;
            byte[] keyArray = null;
            Object value = null;
            for (Map.Entry<String, Object> param : nettyMessage.getHeader().getAttachment().entrySet()) {
                key = param.getKey();
                keyArray = key.getBytes("UTF-8");
                sendBuf.writeInt(keyArray.length);
                sendBuf.writeBytes(keyArray);
                value = param.getValue();
                marshallingEncoder.encode(ctx, value, sendBuf);
    
            }
    
            key = null;
            keyArray = null;
            value = null;
            if (nettyMessage.getBody() != null) {
                marshallingEncoder.encode(ctx, nettyMessage.getBody(), sendBuf);
            } else {
                sendBuf.writeInt(0);
            }
            // 在第4个字节出写入Buffer的长度
            int readableBytes = sendBuf.readableBytes();
            sendBuf.setInt(4, readableBytes);
            // 把Message添加到List传递到下一个Handler
            list.add(sendBuf);
    
        }
    
    }
    
    package com.xh.netty.test14;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    
    /**
     * Created by root on 1/11/18.
     */
    public class NettyMarshallingEncoder extends MarshallingEncoder {
        public NettyMarshallingEncoder(MarshallerProvider provider) {
            super(provider);
        }
    
        public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
            super.encode(ctx, msg, out);
        }
    }
    
    package com.xh.netty.test14;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 消息解码器
     * Created by root on 1/11/18.
     */
    public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {
        private NettyMarshallingDecoder marshallingDecoder;
    
    
        public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,
                                   int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip) {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
            marshallingDecoder = MarshallingCodeCFactory.buildMarshallingDecoder();
        }
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            ByteBuf frame = (ByteBuf) super.decode(ctx, in);
            if (frame == null) {
                System.out.println("NettyMessageDecoder NULL");
                return null;
            }
    
            //???
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setCrcCode(frame.readInt());
            header.setLength(frame.readInt());
            header.setSessionID(frame.readLong());
            header.setType(frame.readByte());
            header.setPriority(frame.readByte());
            int size = frame.readInt();
            if (size > 0) {
                Map attch = new HashMap<String, Object>(size);
                int keySize = 0;
                byte[] keyArray = null;
                String key = null;
                for (int i = 0; i < size; i++) {
                    keySize = frame.readInt();
                    keyArray = new byte[keySize];
                    in.readBytes(keyArray);
                    key = new String(keyArray, "UTF-8");
                    attch.put(key, marshallingDecoder.decode(ctx, frame));
                }
                key = null;
                keyArray = null;
                header.setAttachment(attch);
            }
    
            if (frame.readableBytes() > 0) {
                message.setBody(marshallingDecoder.decode(ctx, frame));
            }
    
            message.setHeader(header);
            return message;
        }
    }
    
    package com.xh.netty.test14;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    
    /**
     * Created by root on 1/11/18.
     */
    public class NettyMarshallingDecoder extends MarshallingDecoder {
    
        public NettyMarshallingDecoder(UnmarshallerProvider provider) {
            super(provider);
        }
    
        public NettyMarshallingDecoder(UnmarshallerProvider provider, int maxObjectSize){
            super(provider, maxObjectSize);
        }
    
        public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            return super.decode(ctx, in);
        }
    
    }
    
    package com.xh.netty.test14;
    
    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    /**
     * Created by root on 1/11/18.
     */
    public class MarshallingCodeCFactory {
    
        public static NettyMarshallingDecoder buildMarshallingDecoder() {
            MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
            NettyMarshallingDecoder decoder = new NettyMarshallingDecoder(provider, 1024);
            return decoder;
        }
    
        public static NettyMarshallingEncoder buildMarshallingEncoder() {
            MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
            MarshallingConfiguration configuration = new MarshallingConfiguration();
            configuration.setVersion(5);
            MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
            NettyMarshallingEncoder encoder = new NettyMarshallingEncoder(provider);
            return encoder;
        }
    }
    
    package com.xh.netty.test14;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * 握手认证(client)
     * Created by root on 1/12/18.
     */
    public class LoginAuthReqHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //握手请求
            ctx.writeAndFlush(buildLoginReq());
            System.out.println("client: send LoginAuthReq ---> " + buildLoginReq());
        }
    
        private NettyMessage buildLoginReq() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType((byte) 3);
            message.setHeader(header);
            message.setBody((byte) 0);
            return message;
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //握手应答消息
            NettyMessage message = (NettyMessage) msg;
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value) {
                byte loginResult = (Byte) message.getBody();
                if (loginResult != (byte) 0) {
                    //如果应答消息体不为0则认证失败
                    ctx.close();
                } else {
                    System.out.println("client:Login is OK --->" + message);
                    ctx.fireChannelRead(message);
                }
    
            } else {
                //通知下一个Handler
                ctx.fireChannelRead(message);
            }
    
    
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
    package com.xh.netty.test14;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.net.InetSocketAddress;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 握手认证(server)
     * Created by root on 1/12/18.
     */
    public class LoginAuthRespHandler extends ChannelHandlerAdapter {
        //已登录列表
        Map node = new HashMap<String, Boolean>();
        //白名单
        private String[] whiteList = {"127.0.0.1"};
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage message = (NettyMessage) msg;
    
            //握手请求消息
            if (message != null && message.getHeader().getType() == MessageType.LOGIN_REQ.value) {
                String nodeInde = ctx.channel().remoteAddress().toString();
                System.out.println("server:receive  LoginAuthReq <--- " + message);
                NettyMessage loginResp = null;
                //拒绝重复登录
                if (node.containsKey(nodeInde)) {
                    loginResp = buildResp((byte) -1);
                } else {
                    InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                    Boolean isOK = false;
                    String IP = address.getAddress().getHostAddress();
                    for (String WIP : whiteList) {
                        if (WIP.equalsIgnoreCase(IP)) {
                            isOK = true;
                            break;
                        }
                    }
                    loginResp = isOK ? buildResp((byte) 0) : buildResp((byte) -1);
                    if (isOK) {
                        node.put(nodeInde, true);
                    }
    
                }
                System.out.println("server:send LoginAuthResp ---> " + loginResp);
                ctx.writeAndFlush(loginResp);
            } else {
                ctx.fireChannelRead(message);
            }
        }
    
        private NettyMessage buildResp(byte b) {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.LOGIN_RESP.value);
            message.setHeader(header);
            message.setBody(b);
            return message;
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //删除缓存
            node.remove(ctx.channel().remoteAddress().toString());
            ctx.close();
            ctx.fireExceptionCaught(cause);
        }
    }
    
    package com.xh.netty.test14;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 心跳检测(client)
     * Created by root on 1/12/18.
     */
    public class HeartBeatReqHandler extends ChannelHandlerAdapter {
    
        private volatile ScheduledFuture<?> heartBeat;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage message = (NettyMessage) msg;
            //握手成功,主动发送心跳
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.LOGIN_RESP.value) {
                heartBeat = ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000, TimeUnit.MILLISECONDS);
            } else if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_RESP.value) {
                System.out.println("client: receive HEARTBEAT_RESP ---> " + message);
            } else {
                ctx.fireChannelRead(message);
            }
        }
    
        private class HeartBeatTask implements Runnable {
            private ChannelHandlerContext ctx;
    
            public HeartBeatTask(final ChannelHandlerContext ctx) {
                this.ctx = ctx;
            }
    
            public void run() {
                NettyMessage message = buildHeartBeat();
                System.out.println("client: send HeartBeat to server --->" + message);
                ctx.writeAndFlush(message);
            }
    
            private NettyMessage buildHeartBeat() {
                NettyMessage message = new NettyMessage();
                Header header = new Header();
                header.setType(MessageType.HEARTBEAT_REQ.value);
                message.setHeader(header);
                message.setBody((byte) 0);
                return message;
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (heartBeat != null) {
                heartBeat.cancel(true);
                heartBeat = null;
            }
            ctx.fireExceptionCaught(cause);
        }
    }
    
    package com.xh.netty.test14;
    
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    /**
     * 心跳检测(server)
     * Created by root on 1/12/18.
     */
    public class HeartBeatRespHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage message = (NettyMessage) msg;
            if (message.getHeader() != null && message.getHeader().getType() == MessageType.HEARTBEAT_REQ.value) {
                System.out.println("server:receive client HeartBeat ---> " + message);
                NettyMessage heartBeat = buildHeartBeat();
                ctx.writeAndFlush(heartBeat);
                System.out.println("server:send  HeartBeat to client ---> " + heartBeat);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        private NettyMessage buildHeartBeat() {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            header.setType(MessageType.HEARTBEAT_RESP.value);
            message.setHeader(header);
            message.setBody((byte) 0);
            return message;
        }
    }
    
    package com.xh.netty.test14;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    import java.net.InetSocketAddress;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 客户端
     * Created by root on 1/11/18.
     */
    public class NettyClient {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        EventLoopGroup group = new NioEventLoopGroup();
    
        public void connect(int port, String host) {
    
            //client NIO thread
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4, -8, 0));
                                ch.pipeline().addLast(new NettyMessageEncoder());
                                ch.pipeline().addLast(new ReadTimeoutHandler(50));
                                ch.pipeline().addLast(new LoginAuthReqHandler());
                                ch.pipeline().addLast(new HeartBeatReqHandler());
    
                            }
                        });
    
                //异步连接
                ChannelFuture future = b.connect(
                        new InetSocketAddress(host, port)
                        //指定本地的端口
                        //new InetSocketAddress(NettyConstants.LOCAL_IP, NettyConstants.LOCAL_PORT)
                ).sync();
                System.out.println("client: connect to server host:" + host + ", port:" + port);
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放资源,重连
                executorService.execute(new Runnable() {
                    public void run() {
                        try {
                            TimeUnit.SECONDS.sleep(5);
                            //重连
                            connect(NettyConstants.REMOTE_PORT, NettyConstants.REMOTE_IP);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
    
        }
    
    
        public static void main(String[] args) {
            new NettyClient().connect(NettyConstants.REMOTE_PORT, NettyConstants.REMOTE_IP);
        }
    }
    
    package com.xh.netty.test14;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    /**
     * 服务端
     * Created by root on 1/11/18.
     */
    public class NettyServer {
        public void bind(int port) throws InterruptedException {
    
            EventLoopGroup bossGrop = new NioEventLoopGroup();
            EventLoopGroup workerGrop = new NioEventLoopGroup();
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGrop, workerGrop).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4, -8, 0));
                            ch.pipeline().addLast(new NettyMessageEncoder());
                            ch.pipeline().addLast(new ReadTimeoutHandler(50));
                            ch.pipeline().addLast(new LoginAuthRespHandler());
                            ch.pipeline().addLast(new HeartBeatRespHandler());
                        }
                    });
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("server: start ok ! host:" + NettyConstants.REMOTE_IP + ",port:" + NettyConstants.REMOTE_PORT);
            future.channel().closeFuture().sync();
        }
    
        public static void main(String[] args) throws InterruptedException {
            new NettyServer().bind(NettyConstants.REMOTE_PORT);
        }
    }
    
  • 相关阅读:
    composer 版本号前置~与^符号的区别
    阿里巴巴离线数据同步工具/平台datax安装、使用笔记
    LAMP集成环境搭建
    MySQL在Windows中设置环境变量
    jQuery 事件
    jQuery 属性,元素
    jQuery 选择器
    JavaScript 动画
    JavaScript BOM-三剑客
    JavaScript BOM-对象
  • 原文地址:https://www.cnblogs.com/lanqie/p/8268469.html
Copyright © 2011-2022 走看看