zoukankan      html  css  js  c++  java
  • Mina自定义编解码

    协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据时将JAVA 对象编码二进制数据,而接收数据时将二进制数据解码为JAVA 对象(这个可不是JAVA 对象的序列化、反序列化那么简单的事情)

    Mina 中的协议编解码器通过过滤器ProtocolCodecFilter 构造,这个过滤器的构造方法需要一个ProtocolCodecFactory,
    ProtocolCodecFactory 中有如下两个方法:

    public interface ProtocolCodecFactory {
    
    ProtocolEncoder getEncoder(IoSession session) throws Exception;
    
    ProtocolDecoder getDecoder(IoSession session) throws Exception;
    
    }

    ProtocolEncoder是自定义编码器要实现的接口,ProtocolDecoder是自定义解码器要实现的接口。

    下面是示例代码:(模拟手机信息的编解码,消息格式;有报头,发送人,接收人,内容长度,内容信息)

    MsgObject.java: 消息实体类

     1 public class MsgObject {
     2     //发送者
     3     private String sender;
     4     //接收者
     5     private String receiver;
     6     //信息内容
     7     private String content;
     8 
     9     public String getSender() {
    10         return sender;
    11     }
    12 
    13     public void setSender(String sender) {
    14         this.sender = sender;
    15     }
    16 
    17     public String getReceiver() {
    18         return receiver;
    19     }
    20 
    21     public void setReceiver(String receiver) {
    22         this.receiver = receiver;
    23     }
    24 
    25     public String getContent() {
    26         return content;
    27     }
    28 
    29     public void setContent(String content) {
    30         this.content = content;
    31     }
    32 
    33 }

    MessageEncoder.java: 消息编码器 

     1 //消息编码器
     2 public class MessageEncoder extends ProtocolEncoderAdapter {
     3     private Charset charset;
     4     
     5     public MessageEncoder(Charset charset)
     6     {
     7         this.charset = charset;
     8     }
     9     
    10     @Override
    11     public void encode(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2)
    12             throws Exception {
    13         MsgObject msg = (MsgObject) arg1;
    14         //生成字符编码器
    15         CharsetEncoder charsetEncoder = charset.newEncoder();
    16         //得到要发送对象属性内容,准备进行编码
    17         String status = "M sip:wap.fetion.com.cn SIP-C/2.0";
    18         String sender = msg.getSender();
    19         String receiver = msg.getReceiver();
    20         String content = msg.getContent();
    21         //开辟一个缓存空间,设置为自动调整大小
    22         IoBuffer ioBuffer = IoBuffer.allocate(100);
    23         ioBuffer.setAutoExpand(true);
    24         //将要发送的信息放入缓存空间
    25         //消息头
    26         ioBuffer.putString(status + "\n", charsetEncoder);
    27         //消息发送者
    28         ioBuffer.putString("S: " + sender + "\n", charsetEncoder);
    29         //消息接收者
    30         ioBuffer.putString("R: " + receiver + "\n", charsetEncoder);
    31         //消息内容长度
    32         ioBuffer.putString("L: " + content.getBytes(charset).length + "\n", charsetEncoder);
    33         //消息内容
    34         ioBuffer.putString(content + "\n", charsetEncoder);
    35         //编码后的信息已放入ioBuffer中,进行写回
    36         ioBuffer.flip();
    37         arg2.write(ioBuffer);
    38     }
    39 
    40 } 


    MessageDecoder.java: 消息解码器

     1 //消息解码器
     2 public class MessageDecoder extends CumulativeProtocolDecoder {
     3     private Charset charset;
     4 
     5     public MessageDecoder(Charset charset) {
     6         this.charset = charset;
     7     }
     8 
     9     @Override
    10     protected boolean doDecode(IoSession arg0, IoBuffer arg1,
    11             ProtocolDecoderOutput arg2) throws Exception {
    12         CharsetDecoder charDecoder = charset.newDecoder();
    13         IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
    14         // 接收解码后的信息
    15         String status = "";
    16         String sender = "";
    17         String receiver = "";
    18         String contentLen = "";
    19         String content = "";
    20 
    21         int textLineNumber = 1;
    22         int columnNumber = 0;
    23         // 如果缓存区还有消息
    24         while (arg1.hasRemaining()) {
    25             byte bt = arg1.get();
    26             buffer.put(bt);
    27             //换行
    28             if (bt == 10 && textLineNumber < 5) {
    29                 columnNumber++;
    30                 if (textLineNumber == 1) {
    31                     buffer.flip();
    32                     status = buffer.getString(columnNumber, charDecoder);
    33                     status = status.substring(0, status.length() - 1);
    34                     columnNumber = 0;
    35                     buffer.clear();
    36                 }
    37                 if (textLineNumber == 2) {
    38                     buffer.flip();
    39                     sender = buffer.getString(columnNumber, charDecoder);
    40                     sender = sender.substring(0, sender.length() - 1);
    41                     columnNumber = 0;
    42                     buffer.clear();
    43                 }
    44                 if (textLineNumber == 3) {
    45                     buffer.flip();
    46                     receiver = buffer.getString(columnNumber, charDecoder);
    47                     receiver = receiver.substring(0, receiver.length() - 1);
    48                     columnNumber = 0;
    49                     buffer.clear();
    50                 }
    51                 if (textLineNumber == 4) {
    52                     buffer.flip();
    53                     contentLen = buffer.getString(columnNumber, charDecoder);
    54                     contentLen = contentLen.substring(0,
    55                             contentLen.length() - 1);
    56                     columnNumber = 0;
    57                     buffer.clear();
    58                 }
    59                 textLineNumber++;
    60             } else if (textLineNumber == 5) {
    61                 columnNumber++;
    62                 if (columnNumber == Long.parseLong(contentLen.split(": ")[1])) {
    63                     buffer.flip();
    64                     content = buffer.getString(columnNumber, charDecoder);
    65                     textLineNumber++;
    66                     break;
    67                 }
    68             } else {
    69                 columnNumber++;
    70             }
    71 
    72         }
    73         MsgObject smsObject = new MsgObject();
    74         smsObject.setSender(sender.split(": ")[1]);
    75         smsObject.setReceiver(receiver.split(": ")[1]);
    76         smsObject.setContent(content);
    77         arg2.write(smsObject);
    78         return false;
    79     }
    80 }

    关于IoBuffer的读操作,需要了解一下原理,可参考文章:Mina框架研究(2)

     

    MessageProtocolCodecFactory.java: 生成消息编解码器工厂

     1 //编解码器生成工产
     2 public class MessageProtocolCodecFactory implements ProtocolCodecFactory {
     3     private ProtocolEncoder encoder;
     4     private ProtocolDecoder decoder;
     5     
     6     public MessageProtocolCodecFactory()
     7     {
     8         this(Charset.forName("UTF-8"));
     9     }
    10     
    11     public MessageProtocolCodecFactory(Charset charset)
    12     {
    13         encoder = new MessageEncoder(charset);
    14         decoder = new MessageDecoder(charset);
    15     }
    16     
    17     @Override
    18     public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
    19         return decoder;
    20     }
    21 
    22     @Override
    23     public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
    24         return encoder;
    25     }
    26 
    27 }

    接着就是调用这些编解码器来进行对象的传输了,服务器端和客户端的主程序编写可参考Mina框架HelloWorld入门

     

    温馨提示:
    上面的消息解码器( MessageDecoder.java)中的解码考虑的情况是消息一次性从服务器发送过来,但有时消息可能不是一次性从服务器发送过来,而是分成了几次分批过来,这时就会重复调用解码器的deCode()方法,这时状态变量textLineNumber和columnNumber就会被重置,所以要把状态变量保存起来。可能你会想到将状态变量保存在解码器的成员变量中,但是Mina不保证每次调用deCode()方法的都是同一个线程,所以状态变量不是线程安全的。所以要将状态变量保存到IoSession中,因为IoSession用了一个同步的HashMap保存对象。


    在IoSession中保存状态变量:

    // 保存数据状态对象的key值
        private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context");

    通过IoSession.setAttribute和IoSession.getAttribute的保存和得到保存数据的对象
    如下:

    1 private MsgContext getContext(IoSession session) {
    2         MsgContext context = (MsgContext) session.getAttribute(CONTEXT);
    3         if (null == context) {
    4             context = new MsgContext();
    5             session.setAttribute(CONTEXT, context);
    6         }
    7         return context;
    8     }
    一颗平常心,踏踏实实,平静对待一切
  • 相关阅读:
    IfcFurnishingElementType
    IfcRelAssociatesClassification
    IfcRelAssociatesDocument
    IfcContext
    IfcRelAssociatesMaterial
    我是高敏感的人,你呢?
    介绍一本红色的书
    矫枉必须过正
    大家都在说的民法典,与我有何关系?
    线上Kafka突发rebalance异常,如何快速解决?
  • 原文地址:https://www.cnblogs.com/hanyuan/p/2960663.html
Copyright © 2011-2022 走看看