zoukankan      html  css  js  c++  java
  • netty8---自定义编码解码器

    package com.cn.codc;
    
    import org.jboss.netty.buffer.ChannelBuffer;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.handler.codec.frame.FrameDecoder;
    
    import com.cn.constant.ConstantValue;
    import com.cn.model.Request;
    
    /**
     * 请求解码器
     * <pre>
     * 数据包格式
     * +——----——+——-----——+——----——+——----——+——-----——+
     * | 包头          | 模块号        | 命令号      |  长度        |   数据       |
     * +——----——+——-----——+——----——+——----——+——-----——+
     * </pre>
     * 包头4字节
     * 模块号2字节short
     * 命令号2字节short
     * 长度4字节(描述数据部分字节长度)
     */
    public class RequestDecoder extends FrameDecoder{// FrameDecoder 这个decoder可以协助我们解决粘包分包问题
        
        /**
         * 数据包基本长度
         */
        public static int BASE_LENTH = 4 + 2 + 2 + 4;
    
        //ChannelBuffer里面有一个读指针和写指针。读指针和写指针初始值是0,写多少数据写指针就移动多少
        //调用readShort方法,readInt方法就会移动读指针, 0 =< readerIndex =< writerIndex
        @Override
        protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
            
            //可读长度readableBytes必须大于基本长度才处理
            if(buffer.readableBytes() >= BASE_LENTH){
                //防止socket字节流攻击
                if(buffer.readableBytes() > 2048){
                    buffer.skipBytes(buffer.readableBytes());
                }
                
                //记录包头开始的index
                int beginReader;
                
                while(true){//循环读取,直到包头读取完毕
                    beginReader = buffer.readerIndex();//获取读指针
                    buffer.markReaderIndex();
                    if(buffer.readInt() == ConstantValue.FLAG){
                        break;
                    }
                    
                    //未读到包头,略过一个字节
                    buffer.resetReaderIndex();
                    buffer.readByte();
                    
                    //长度又变得不满足
                    if(buffer.readableBytes() < BASE_LENTH){
                        return null;
                    }
                }
                
                //包头读取完毕,读取模块号
                short module = buffer.readShort();
                //读取命令号
                short cmd = buffer.readShort();
                //读取长度
                int length = buffer.readInt();
                
                //readableBytes现在可读的长度小于数据的长度。判断请求数据包数据部分是否到齐
                if(buffer.readableBytes() < length){
                    //还原读指针,已经读取了12个字节,但是没用,所以要还原buffer的读指针,
                    buffer.readerIndex(beginReader);
                    return null;//等待后面的数据包来
                }
                
                //比length要长,就读取data数据
                byte[] data = new byte[length];
                buffer.readBytes(data);//数据读取完毕
                
                //封装request对象继续向下传递
                Request request = new Request();
                request.setModule(module);
                request.setCmd(cmd);
                request.setData(data);
                
                //继续往下传递 ,调用sendUpStreamEvent方法向下传递
                return request;
                
            }
            //长度短了,数据包不完整,需要等待后面的包来
            return null;
            //FrameDecoder: return null就是等待后面的包,return一个解码的对象就是向下传递。
        }
    
    }
    package com.cn.codc;
    
    import org.jboss.netty.buffer.ChannelBuffer;
    import org.jboss.netty.buffer.ChannelBuffers;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
    
    import com.cn.constant.ConstantValue;
    import com.cn.model.Request;
    
    /**
     * 请求编码器
     * <pre>
     * 数据包格式
     * +——----——+——-----——+——----——+——----——+——-----——+
     * | 包头          | 模块号        | 命令号      |  长度        |   数据       |
     * +——----——+——-----——+——----——+——----——+——-----——+
     * </pre>
     * 包头4字节
     * 模块号2字节short
     * 命令号2字节short
     * 长度4字节(描述数据部分字节长度)
     */
    public class RequestEncoder extends OneToOneEncoder{
    
        //把一个request对象转换成了一个ChannelBuffer二进制数据
        @Override
        protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {
            Request request = (Request)(rs);
            ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
            //包头,确定数据包的开始
            buffer.writeInt(ConstantValue.FLAG);
            //module
            buffer.writeShort(request.getModule());
            //cmd
            buffer.writeShort(request.getCmd());
            //长度
            buffer.writeInt(request.getDataLength());
            //data
            if(request.getData() != null){
                buffer.writeBytes(request.getData());
            }
            return buffer;//返回一个ChannelBuffer继续向下传递。
        }
    
    }
    package com.cn.codc;
    
    import org.jboss.netty.buffer.ChannelBuffer;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.handler.codec.frame.FrameDecoder;
    import com.cn.constant.ConstantValue;
    import com.cn.model.Response;
    
    /**
     * response解码器
     * <pre>
     * 数据包格式
     * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
     * | 包头          | 模块号        | 命令号       |  状态码    |  长度          |   数据       |
     * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
     * </pre>
     * 包头4字节
     * 模块号2字节short
     * 命令号2字节short
     * 长度4字节(描述数据部分字节长度)
     */
    public class ResponseDecoder extends FrameDecoder{
        
        /**
         * 数据包基本长度
         */
        public static int BASE_LENTH = 4 + 2 + 2 + 4;
    
        @Override
        protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {
            
            //可读长度必须大于基本长度
            if(buffer.readableBytes() >= BASE_LENTH){
                
                //记录包头开始的index
                int beginReader = buffer.readerIndex();
                
                while(true){
                    if(buffer.readInt() == ConstantValue.FLAG){
                        break;
                    }
                }
                
                //模块号
                short module = buffer.readShort();
                //命令号
                short cmd = buffer.readShort();
                //状态码
                int stateCode = buffer.readInt();
                //长度
                int length = buffer.readInt();
                
                if(buffer.readableBytes() < length){
                    //还原读指针
                    buffer.readerIndex(beginReader);
                    return null;
                }
                
                byte[] data = new byte[length];
                buffer.readBytes(data);
                
                //封装Response对象
                Response response = new Response();
                response.setModule(module);
                response.setCmd(cmd);
                response.setStateCode(stateCode);
                response.setData(data);
                
                //继续往下传递 
                return response;
                
            }
            //数据包不完整,需要等待后面的包来
            return null;
        }
    
    }
    package com.cn.codc;
    
    import org.jboss.netty.buffer.ChannelBuffer;
    import org.jboss.netty.buffer.ChannelBuffers;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
    import com.cn.constant.ConstantValue;
    import com.cn.model.Response;
    
    /**
     * 请求编码器
     * <pre>
     * 数据包格式
     * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
     * | 包头          | 模块号        | 命令号       |  状态码    |  长度          |   数据       |
     * +——----——+——-----——+——----——+——----——+——-----——+——-----——+
     * </pre>
     * 包头4字节
     * 模块号2字节short
     * 命令号2字节short
     * 长度4字节(描述数据部分字节长度)
     */
    public class ResponseEncoder extends OneToOneEncoder{
    
        @Override
        protected Object encode(ChannelHandlerContext context, Channel channel, Object rs) throws Exception {
            Response response = (Response)(rs);
            
            ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
            //包头
            buffer.writeInt(ConstantValue.FLAG);
            //module
            buffer.writeShort(response.getModule());
            //cmd
            buffer.writeShort(response.getCmd());
            //状态码
            buffer.writeInt(response.getStateCode());
            //长度
            buffer.writeInt(response.getDataLength());
            //data
            if(response.getData() != null){
                buffer.writeBytes(response.getData());
            }
        
            return buffer;
        }
    
    }
    package com.cn.constant;
    
    public interface ConstantValue {
        
        /**
         * 包头
         */
        public static final int FLAG = -32523523;
    
    }
    package com.cn.model;
    /**
     * 客户端请求服务端的对象
     */
    public class Request {
        
        /**
         * 请求模块
         */
        private short module;
        
        /**
         * 命令号
         */
        private short cmd;
        
        /**
         * 数据部分
         */
        private byte[] data;
    
        public short getModule() {
            return module;
        }
    
        public void setModule(short module) {
            this.module = module;
        }
    
        public short getCmd() {
            return cmd;
        }
    
        public void setCmd(short cmd) {
            this.cmd = cmd;
        }
    
        public byte[] getData() {
            return data;
        }
    
        public void setData(byte[] data) {
            this.data = data;
        }
        
        
        public int getDataLength(){
            if(data == null){
                return 0;
            }
            return data.length;
        }
    }
    package com.cn.model;
    /**
     * 服务端返回给客户端的对象
     */
    public class Response {
        /**
         * 请求模块
         */
        private short module;
        
        /**
         * 命令号
         */
        private short cmd;
        
        /**
         * 状态码
         */
        private int stateCode;
        
        /**
         * 数据部分
         */
        private byte[] data;
    
        public short getModule() {
            return module;
        }
    
        public void setModule(short module) {
            this.module = module;
        }
    
        public short getCmd() {
            return cmd;
        }
    
        public void setCmd(short cmd) {
            this.cmd = cmd;
        }
    
        public int getStateCode() {
            return stateCode;
        }
    
        public void setStateCode(int stateCode) {
            this.stateCode = stateCode;
        }
    
        public byte[] getData() {
            return data;
        }
    
        public void setData(byte[] data) {
            this.data = data;
        }
        
        public int getDataLength(){
            if(data == null){
                return 0;
            }
            return data.length;
        }
    }
    package com.cn.model;
    
    public interface StateCode {
        
        /**
         * 成功
         */
        public static int SUCCESS  = 0;
        
        /**
         * 失败
         */
        public static int FAIL  =  1;
    
    }
    package com.cn.module.fuben.request;
    
    import com.cn.serial.Serializer;
    
    //FightRequest是模块名
    public class FightRequest extends Serializer{
        
        /**
         * 副本id
         */
        private int fubenId;
        
        /**
         * 次数
         */
        private int count;
    
        public int getFubenId() {
            return fubenId;
        }
    
        public void setFubenId(int fubenId) {
            this.fubenId = fubenId;
        }
    
        public int getCount() {
            return count;
        }
    
        public void setCount(int count) {
            this.count = count;
        }
    
        @Override
        protected void read() {
            this.fubenId = readInt();
            this.count = readInt();
        }
    
        @Override
        protected void write() {
            writeInt(fubenId);
            writeInt(count);
        }
        
        
    
    }
    package com.cn.module.fuben.response;
    
    import com.cn.serial.Serializer;
    
    public class FightResponse extends Serializer{
    
        /**
         * 获取金币
         */
        private int gold;
    
        public int getGold() {
            return gold;
        }
    
        public void setGold(int gold) {
            this.gold = gold;
        }
    
        @Override
        protected void read() {
            this.gold = readInt();
        }
    
        @Override
        protected void write() {
            writeInt(gold);
        }
    }
    package com.cn.serial;
    
    
    import java.nio.ByteOrder;
    import org.jboss.netty.buffer.ChannelBuffer;
    import org.jboss.netty.buffer.ChannelBuffers;
    /**
     * buff工厂
     */
    public class BufferFactory {
        
        public static ByteOrder BYTE_ORDER = ByteOrder.BIG_ENDIAN;
    
        /**
         * 获取一个buffer
         */
        public static ChannelBuffer getBuffer() {
            ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer();
            return dynamicBuffer;
        }
    
        /**
         * 将数据写入buffer
         */
        public static ChannelBuffer getBuffer(byte[] bytes) {
            ChannelBuffer copiedBuffer = ChannelBuffers.copiedBuffer(bytes);
            return copiedBuffer;
        }
    
    }
    package com.cn.serial;
    
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import org.jboss.netty.buffer.ChannelBuffer;
    /**
     * 自定义序列化接口
     */
    public abstract class Serializer {
        
        
        public static final Charset CHARSET = Charset.forName("UTF-8");
        
        protected ChannelBuffer writeBuffer;
        
        protected ChannelBuffer readBuffer;
        
        /**
         * 反序列化具体实现
         */
        protected abstract void read();
        
        /**
         * 序列化具体实现
         */
        protected abstract void write();
        
        /**
         * 从byte数组获取数据
         * @param bytes    读取的数组
         */
        public Serializer readFromBytes(byte[] bytes) {
            readBuffer = BufferFactory.getBuffer(bytes);
            read();
            readBuffer.clear();
            return this;
        }
        
        /**
         * 从buff获取数据
         * @param readBuffer
         */
        public void readFromBuffer(ChannelBuffer readBuffer) {
            this.readBuffer = readBuffer;
            read();
        }
        
        /**
         * 写入本地buff
         * @return
         */
        public ChannelBuffer writeToLocalBuff(){
            writeBuffer = BufferFactory.getBuffer();
            write();
            return writeBuffer;
        }
        
        /**
         * 写入目标buff
         * @param buffer
         * @return
         */
        public ChannelBuffer writeToTargetBuff(ChannelBuffer buffer){
            writeBuffer = buffer;
            write();
            return writeBuffer;
        }
        
        /**
         * 返回buffer数组
         * 
         * @return
         */
        public byte[] getBytes() {
            writeToLocalBuff();
            byte[] bytes = null;
            if (writeBuffer.writerIndex() == 0) {
                bytes = new byte[0];
            } else {
                bytes = new byte[writeBuffer.writerIndex()];
                writeBuffer.readBytes(bytes);
            }
            writeBuffer.clear();
            return bytes;
        }
    
        
        public byte readByte() {
            return readBuffer.readByte();
        }
    
        public short readShort() {
            return readBuffer.readShort();
        }
    
        public int readInt() {
            return readBuffer.readInt();
        }
    
        public long readLong() {
            return readBuffer.readLong();
        }
    
        public float readFloat() {
            return readBuffer.readFloat();
        }
    
        public double readDouble() {
            return readBuffer.readDouble();
        }
        
        public String readString() {
            int size = readBuffer.readShort();
            if (size <= 0) {
                return "";
            }
    
            byte[] bytes = new byte[size];
            readBuffer.readBytes(bytes);
    
            return new String(bytes, CHARSET);
        }
        
        public <T> List<T> readList(Class<T> clz) {
            List<T> list = new ArrayList<>();
            int size = readBuffer.readShort();
            for (int i = 0; i < size; i++) {
                list.add(read(clz));
            }
            return list;
        }
        
        public <K,V> Map<K,V> readMap(Class<K> keyClz, Class<V> valueClz) {
            Map<K,V> map = new HashMap<>();
            int size = readBuffer.readShort();
            for (int i = 0; i < size; i++) {
                K key = read(keyClz);
                V value = read(valueClz);
                map.put(key, value);    
            }
            return map;
        }
        
        @SuppressWarnings("unchecked")
        public <I> I read(Class<I> clz) {
            Object t = null;
            if ( clz == int.class || clz == Integer.class) {
                t = this.readInt();
            } else if (clz == byte.class || clz == Byte.class){
                t = this.readByte();
            } else if (clz == short.class || clz == Short.class){
                t = this.readShort();
            } else if (clz == long.class || clz == Long.class){
                t = this.readLong();
            } else if (clz == float.class || clz == Float.class){
                t = readFloat();
            } else if (clz == double.class || clz == Double.class){
                t = readDouble();
            } else if (clz == String.class ){
                t = readString();
            } else if (Serializer.class.isAssignableFrom(clz)){
                try {
                    byte hasObject = this.readBuffer.readByte();
                    if(hasObject == 1){
                        Serializer temp = (Serializer)clz.newInstance();
                        temp.readFromBuffer(this.readBuffer);
                        t = temp;
                    }else{
                        t = null;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
                
            } else {
                throw new RuntimeException(String.format("不支持类型:[%s]", clz));
            }
            return (I) t;
        }
    
    
        public Serializer writeByte(Byte value) {
            writeBuffer.writeByte(value);
            return this;
        }
    
        public Serializer writeShort(Short value) {
            writeBuffer.writeShort(value);
            return this;
        }
    
        public Serializer writeInt(Integer value) {
            writeBuffer.writeInt(value);
            return this;
        }
    
        public Serializer writeLong(Long value) {
            writeBuffer.writeLong(value);
            return this;
        }
    
        public Serializer writeFloat(Float value) {
            writeBuffer.writeFloat(value);
            return this;
        }
    
        public Serializer writeDouble(Double value) {
            writeBuffer.writeDouble(value);
            return this;
        }
    
        public <T> Serializer writeList(List<T> list) {
            if (isEmpty(list)) {
                writeBuffer.writeShort((short) 0);
                return this;
            }
            writeBuffer.writeShort((short) list.size());
            for (T item : list) {
                writeObject(item);
            }
            return this;
        }
    
        public <K,V> Serializer writeMap(Map<K, V> map) {
            if (isEmpty(map)) {
                writeBuffer.writeShort((short) 0);
                return this;
            }
            writeBuffer.writeShort((short) map.size());
            for (Entry<K, V> entry : map.entrySet()) {
                writeObject(entry.getKey());
                writeObject(entry.getValue());
            }
            return this;
        }
    
        public Serializer writeString(String value) {
            if (value == null || value.isEmpty()) {
                writeShort((short) 0);
                return this;
            }
    
            byte data[] = value.getBytes(CHARSET);
            short len = (short) data.length;
            writeBuffer.writeShort(len);
            writeBuffer.writeBytes(data);
            return this;
        }
    
        public Serializer writeObject(Object object) {
            
            if(object == null){
                writeByte((byte)0);
            }else{
                if (object instanceof Integer) {
                    writeInt((int) object);
                    return this;
                }
    
                if (object instanceof Long) {
                    writeLong((long) object);
                    return this;
                }
    
                if (object instanceof Short) {
                    writeShort((short) object);
                    return this;
                }
    
                if (object instanceof Byte) {
                    writeByte((byte) object);
                    return this;
                }
    
                if (object instanceof String) {
                    String value = (String) object;
                    writeString(value);
                    return this;
                }
                if (object instanceof Serializer) {
                    writeByte((byte)1);
                    Serializer value = (Serializer) object;
                    value.writeToTargetBuff(writeBuffer);
                    return this;
                }
                
                throw new RuntimeException("不可序列化的类型:" + object.getClass());
            }
            
            return this;
        }
    
        private <T> boolean isEmpty(Collection<T> c) {
            return c == null || c.size() == 0;
        }
        public <K,V> boolean isEmpty(Map<K,V> c) {
            return c == null || c.size() == 0;
        }
    }
  • 相关阅读:
    python--threading多线程总结
    云服务器 ECS Linux CentOS 修改内核引导顺序
    日记——心刊
    64位linux安装wine等软件
    service: no such service mysqld 与MySQL的开启,关闭和重启
    python调用chrome ie等浏览器
    Linux系统下强制踢掉登录用户
    python读取数据库数据,读取出的中文乱码问题
    jmeter生成时间的函数
    PHP 递归超过100次会自动停止
  • 原文地址:https://www.cnblogs.com/yaowen/p/9063053.html
Copyright © 2011-2022 走看看