参考:https://blog.csdn.net/x3499633/article/details/81195705
为什么要制定协议呢?
我们知道,底层传输的都是二进制数据,服务端和客户端建立连接后进行数据的交互,接受这对方发送来的消息,如何判定发送的请求或者响应的数据结束了呢?总不能一直傻等着,或者随意的就结束消息接收吧。这就需要一个规则!比如QQ聊天工具,当输入完一个消息后,点击发送按钮向对方发送时,此时系统就会在在你的消息后添加一个文本换行符,接收方看到这个文本换行符就认为这是一个完整的消息,解析成字符串显示出来。而这个规则,就称之为协议!
根据协议,把二进制数据转换成Java对象称为解码(也叫做拆包);把Java对象转换为二进制数据称为编码(也叫做打包);
常用的协议制定方法有哪些?
定长消息法:这种方式是使用长度固定的数据发送,一般适用于指令发送。譬如:数据发送端规定发送的数据都是双字节,AA 表示启动、BB 表示关闭等等。
字符定界法:这种方式是使用特殊字符作为数据的结束符,一般适用于简单数据的发送。譬如:在消息的结尾自动加上文本换行符(Windows使用
,Linux使用
),接收方见到文本换行符就认为是一个完整的消息,结束接收数据开始解析。注意:这个标识结束的特殊字符一定要简单,常常使用ASCII码中的特殊字符来标识(会出现粘包、半包情况)。
定长报文头法:使用定长报文头,在报文头的某个域指明报文长度。该方法最灵活,使用最广。譬如:协议为– 协议编号(1字节)+数据长度(4个字节)+真实数据。请求到达后,解析协议编号和数据长度,根据数据长度来判断后面的真实数据是否接收完整。HTTP 协议的消息报头中的Content-Length 也是表示消息正文的长度,这样数据的接收端就知道到底读到多长的字节数就不用再读取数据了。
本文使用的是定长报文头法,也是实际中使用的最多的协议方法。
在定长报文头法中
包头:数据包的版本号,以及整个数据包(包头+包体)的长度
包体:实际数据
来编写一个自定义协议(Modbus)
在本项目中是对上面0x04:读输入寄存器协议进行编解码,mina客户端发出的是 00 01 00 00 00 06 01 04 00 02 00 05;收到的是:00 01 00 00 00 0D 01 04 0A 00 0C 00 00 00 00 00 00 00 00 所以这里定义两个协议包
package com.datacollector.mina.encoder; /** * Created by IntelliJ IDEA * 这是一个神奇的Class * * @author zhz * @date 2019/12/20 17:06 */ public class CusModbusPack { /** * 事务处理标识 2字节 */ private byte transaction = 0x01; /** * 协议标识 2字节 */ private byte protocolId = 0x00; /** * 长度 */ private byte length = 0x06; /** * 主机号 */ private byte hostNumber= 01; /** * 功能码 */ private byte func = 04; /** * 起始楼层 */ private int offset; /** * 读取楼层数 */ private int size; public CusModbusPack() { } public CusModbusPack(int offset, int size) { this.offset = offset; this.size = size; } public byte getProtocolId() { return protocolId; } public void setProtocolId(byte protocolId) { this.protocolId = protocolId; } public byte getLength() { return length; } public void setLength(byte length) { this.length = length; } public byte getHostNumber() { return hostNumber; } public void setHostNumber(byte hostNumber) { this.hostNumber = hostNumber; } public byte getFunc() { return func; } public void setFunc(byte func) { this.func = func; } public int getOffset() { return offset; } public void setOffset(int offset) { this.offset = offset; } public int getSize() { return size; } public void setSize(int size) { this.size = size; } public byte getTransaction() { return transaction; } public void setTransaction(byte transaction) { this.transaction = transaction; } @Override public String toString() { return "CusModbusPack{" + "transaction=" + transaction + ", protocolId=" + protocolId + ", length=" + length + ", hostNumber=" + hostNumber + ", func=" + func + ", offset=" + offset + ", size=" + size + '}'; } }
package com.datacollector.mina.encoder; /** * Created by IntelliJ IDEA * 这是一个神奇的Class * * @author zhz * @date 2019/12/19 20:55 */ public class ContentEntity { /** * 主机号 */ private int hostNumber; /** * 楼层 */ private int floor; /** * 楼层 对应的值 */ private double value; public ContentEntity(int hostNumber, int floor, double value) { this.hostNumber = hostNumber; this.floor = floor; this.value = value; } public ContentEntity(int floor, double value) { this.floor = floor; this.value = value; } public int getHostNumber() { return hostNumber; } public void setHostNumber(int hostNumber) { this.hostNumber = hostNumber; } public int getFloor() { return floor; } public void setFloor(int floor) { this.floor = floor; } public double getValue() { return value; } public void setValue(double value) { this.value = value; } @Override public String toString() { return "ContentEntity{" + "hostNumber=" + hostNumber + ", floor=" + floor + ", value=" + value + '}'; } }
自定义编解码器及工厂类
有了我们自己定义的协议,那么怎么把我们的协添加到Mina的通讯机制中呢?
我们查看ProtocolCodecFilter的构造方法,发现需要注入一个ProtocolCodecFactory编解码工厂:
我们继续查看ProtocolCodecFactory接口,发现需要实现2个方法,该接口的两个方法需要返回ProtocolDecoder和ProtocolEncoder的实现类对象(自定义编解码器):
自定义编码器
package com.datacollector.mina.encoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import java.nio.charset.Charset; /** * Created by IntelliJ IDEA * 这是一个神奇的Class * * @author zhz * @date 2019/12/20 15:39 */ public class CusProtocolEncoder implements ProtocolEncoder { private final Charset charset; public CusProtocolEncoder() { this.charset = Charset.defaultCharset(); } // 构造方法注入编码格式 public CusProtocolEncoder(Charset charset) { this.charset = charset; } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { // 转为自定义协议包 byte[] bytes=(byte[]) message; // 初始化缓冲区 IoBuffer buffer = IoBuffer.allocate(bytes.length).setAutoExpand(true); buffer.put(bytes); buffer.flip(); out.write(buffer); } @Override public void dispose(IoSession session) throws Exception { } }
自定义解码器
package com.datacollector.mina.encoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; /** * Created by IntelliJ IDEA * 这是一个神奇的Class * * @author zhz * @date 2019/12/19 20:55 */ public class CusProtocolDecoder extends CumulativeProtocolDecoder { private final Charset charset; public CusProtocolDecoder() { this.charset = Charset.defaultCharset(); } // 构造方法注入编码格式 public CusProtocolDecoder(Charset charset) { this.charset = charset; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // 包头长度9 final int PACK_HEAD_LEN = 6; //功能码 final int SIGN = 4; //如果可读取数据的长度 int remaining = in.remaining(); // 拆包时,如果可读数据的长度小于包头的长度,就不进行读取 if (remaining < PACK_HEAD_LEN) { return false; } if (in.remaining() > 1) { // 标记设为当前 in.mark(); byte[] headBytes = new byte[PACK_HEAD_LEN]; in.get(headBytes, 0, PACK_HEAD_LEN); int length = (byteArrayToInt(headBytes[4], headBytes[5])); //如果可读取数据的长度 小于 内容的长度 ,则结束拆包,等待下一次 if (in.remaining() < length) { in.reset(); return false; } else { // 重置,并读取一条完整记录 in.reset(); // 标记设为当前 in.position(PACK_HEAD_LEN).mark(); if (length != in.remaining()) { in.reset(); return false; } byte[] secondBytes = new byte[length]; // 将此缓冲区的字节传输到给定的目标数组中 in.get(secondBytes, 0, length); //主机号 Integer hostNumber = byteArrayToInt(secondBytes[0]); //功能码 Integer func = byteArrayToInt(secondBytes[1]); if (func != SIGN) { out.write("[ Read Input Registers,Illegal data address ]"); // in.reset(); return false; }else { //寄存器的数据量=楼层个数 Integer pmCount = (byteArrayToInt(secondBytes[2])) / 2; List<ContentEntity> contentEntityList = new ArrayList<>(); int bufIndex = 3; for (int i = 0; i < pmCount; i++) { Integer hexContent = byteArrayToInt(secondBytes[bufIndex], secondBytes[bufIndex + 1]); bufIndex = bufIndex + 2; contentEntityList.add(new ContentEntity(hostNumber, i, hexContent)); } out.write(contentEntityList); // 如果读取一条记录后,还存在数据(粘包),则再次进行调用 return true; } } } return false; } /** * byte数组转int,高位在前 * * @param bytes * @return */ public static Integer byteArrayToInt(byte... bytes) { if (bytes.length == 0 || bytes.length > 4) { return -1; } int value = 0; for (int i = 0; i < bytes.length; i++) { value = value | ((bytes[i] & 0xff) << ((bytes.length - i - 1) * 8)); } return value; } }
自定义编解码工厂
package com.datacollector.mina.encoder; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; import java.nio.charset.Charset; /** * Created by IntelliJ IDEA * 这是一个神奇的Class * 自定义编解码工厂类 * * @author zhz * @date 2019/12/19 10:23 */ public class CustomProtocolCodecFactory implements ProtocolCodecFactory { private final ProtocolEncoder encoder; private final ProtocolDecoder decoder; public CustomProtocolCodecFactory() { this(Charset.forName("UTF-8")); } // 构造方法注入编解码器 public CustomProtocolCodecFactory(Charset charset) { this.encoder = new CusProtocolEncoder(charset); this.decoder=new CusProtocolDecoder(charset); } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return decoder; } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return encoder; } }