zoukankan      html  css  js  c++  java
  • [置顶] spring集成mina 实现消息推送以及转发

    spring集成mina:

    在学习mina这块时,在网上找了很多资料,只有一些demo,只能实现客户端向服务端发送消息、建立长连接之类。但是实际上在项目中,并不简单实现这些,还有业务逻辑之类的处理以及消息的推送之类的。于是就单独建立了一个工程项目,能够实现客户端和服务端相互之间发送消息、建立长连接、实现心跳检测等功能。
    例如:可以实现客户端A向服务端发送消息,服务端将消息转发给客户端B。

    效果实现图:
    服务端启动成功后, 客户端A绑定服务端。

    这里写图片描述

    客户端B向服务端发送信息,请求服务端向客户端A推送消息
    这里写图片描述

    客户端A受到服务端转发的客户端B的消息
    这里写图片描述

    服务端心跳检测的实现
    这里写图片描述

    代码的目录结构:
    这里写图片描述

    那么开始实现代码的编写。(可以直接跳到底部,通过链接下载工程代码)
    首先在官网上下载mina以及spring相关架包,这里相关架包已准备好:http://download.csdn.net/detail/qazwsxpcm/9870787

    服务端:

    1. 首先实现数据传输对象、消息常量的代码编写。

    我使用的两个传输对象,接受和发送,代码如下。(传输对象可以自行定义)。

    package com.pcm.mina.service.model;
    
    import java.io.Serializable;
    import java.util.HashMap;
    
    /**
     * @author ZERO
     * @Description 服务端接收消息对象
     */
    public class SentBody implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        private String key;
    
        private HashMap<String, String> data;
    
        private long timestamp;
    
        public SentBody() {
            data = new HashMap<String, String>();
            timestamp = System.currentTimeMillis();
        }
    
        public String getKey() {
            return key;
        }
    
        public String get(String k) {
            return data.get(k);
        }
    
        public void put(String k, String v) {
            data.put(k, v);
        }
    
        public long getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }
    
        public void setKey(String key) {
            this.key = key;
        }
    
        public void remove(String k) {
            data.remove(k);
        }
    
        public HashMap<String, String> getData() {
            return data;
        }
    
        @Override
        public String toString() {
            StringBuffer buffer = new StringBuffer();
            buffer.append("<?xml version="1.0" encoding="UTF-8"?>");
            buffer.append("<sent>");
            buffer.append("<key>").append(key).append("</key>");
            buffer.append("<timestamp>").append(timestamp).append("</timestamp>");
            buffer.append("<data>");
            for (String key : data.keySet()) {
                buffer.append("<" + key + ">").append(data.get(key)).append(
                        "</" + key + ">");
            }
            buffer.append("</data>");
            buffer.append("</sent>");
            return buffer.toString();
        }
    
        public String toXmlString() {
            return toString();
        }
    }
    package com.pcm.mina.service.model;
    
    import java.io.Serializable;
    import java.util.HashMap;
    /**
     * @author ZERO
     * @Description 服务端发送消息对象
     */
    public class ReplyBody implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        /**
         * 请求key
         */
        private String key;
    
        /**
         * 返回码
         */
        private String code;
    
        /**
         * 返回说明
         */
        private String message;
    
        /**
         * 返回数据集合
         */
        private HashMap<String, String> data;
    
    
        private long timestamp;
    
        public ReplyBody()
        {
            data = new HashMap<String, String>();
            timestamp = System.currentTimeMillis();
        }
        public long getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(long timestamp) {
            this.timestamp = timestamp;
        }
    
    
    
        public String getKey() {
            return key;
        }
    
        public void setKey(String key) {
            this.key = key;
        }
    
        public void put(String k, String v) {
            data.put(k, v);
        }
    
        public String get(String k) {
            return data.get(k);
        }
    
        public void remove(String k) {
            data.remove(k);
        }
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    
        public HashMap<String, String> getData() {
            return data;
        }
    
        public void setData(HashMap<String, String> data) {
            this.data = data;
        }
    
        public String getCode() {
            return code;
        }
    
        public void setCode(String code) {
            this.code = code;
        }
    
    
        public String toString()
        {
    
            StringBuffer buffer = new StringBuffer();
            buffer.append("<?xml version="1.0" encoding="UTF-8"?>");
            buffer.append("<reply>");
            buffer.append("<key>").append(this.getKey()).append("</key>");
            buffer.append("<timestamp>").append(timestamp).append("</timestamp>");
            buffer.append("<code>").append(code).append("</code>");
            buffer.append("<message>").append(message).append("</message>");
            buffer.append("<data>");
            for(String key:this.getData().keySet())
            {
                buffer.append("<"+key+">").append(this.get(key)).append("</"+key+">");
            }
            buffer.append("</data>");
            buffer.append("</reply>");
            return buffer.toString();
        }
    
    
        public String toXmlString()
        {
    
            return toString();
        }
    }
    
    package com.pcm.mina.service.model;
    
    /**
     * @author ZERO
     * @Description 消息常量
     */
    public class Message {
        public static class ReturnCode {
    
            public static String CODE_404 = "404"; 
    
            public static String CODE_403 = "403";  //该账号未绑定
    
            public static String CODE_405 = "405"; //事物未定义
    
            public static String CODE_200 = "200"; //成功
    
            public static String CODE_500 = "500"; //未知错误
    
        }
    
    
        public static final String SESSION_KEY = "account";
    
    
        /**
         * 服务端心跳请求命令
         */
        public static final String CMD_HEARTBEAT_REQUEST = "hb_request";
        /**
         * 客户端心跳响应命令
         */
        public static final String CMD_HEARTBEAT_RESPONSE = "hb_response";
    
    
        public static class MessageType {
            // 用户会 踢出下线消息类型
            public static String TYPE_999 = "999";
    
        }
    
    }
    

    2,实现心跳检测功能。

    服务端发送的是hb_request,那么客户端就应该返回hb_response,以此来实现心跳检测。

    /**
     * @author ZERO
     * @Description  心跳协议的实现类
     */ 
    public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{
        private final Logger LOG=Logger.getLogger(KeepAliveMessageFactoryImpl.class);
        /**
         * 客户端心跳响应命令
         */
        private static  String HEARRESPONSE=Message.CMD_HEARTBEAT_RESPONSE; 
        /**
         * 服务端心跳请求命令
         */
        private static  String HEARREQUEST=Message.CMD_HEARTBEAT_REQUEST;
    
        public Object getRequest(IoSession session) {
            LOG.warn("请求预设信息:"+HEARREQUEST);
            return HEARREQUEST;
        }
    
        public Object getResponse(IoSession session, Object message) {
             LOG.warn("响应预设信息: " + message);  
                /** 返回预设语句 */  
              return HEARRESPONSE;  
        }
    
        public boolean isRequest(IoSession session, Object message) {
            LOG.warn("请求心跳包信息: " + message);  
            return message.equals(HEARREQUEST); 
        }
    
        public boolean isResponse(IoSession session, Object message) {
             LOG.warn("响应心跳包信息: " + message);  
             return message.equals(HEARRESPONSE);
           }
    }

    3, 实现服务端代码编写

    服务端代码这块,因为注释写的已经够详细了,所以这里就不细说了。

    package com.pcm.mina.service;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.util.Map;
    import org.apache.log4j.Logger;
    import org.apache.mina.core.service.IoAcceptor;
    import org.apache.mina.core.service.IoHandler;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
    import org.apache.mina.filter.executor.ExecutorFilter;
    import org.apache.mina.filter.keepalive.KeepAliveFilter;
    import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
    import com.pcm.mina.service.filter.KeepAliveMessageFactoryImpl;
    
    
    /**
     * @author ZERO
     * @Description  mina服务端
     */
    public class SerNioSociketAcceptor {
        IoAcceptor acceptor;
        IoHandler ioHandler;
        int port;
        //记录日志
        public static Logger logger=Logger.getLogger(SerNioSociketAcceptor.class);
       //创建bind()方法接收连接
        public void bind() throws IOException
        {   
           //创建 协议编码解码过滤器ProtocolCodecFilter
           //设置序列化Object  可以自行设置自定义解码器
            ProtocolCodecFilter pf=new ProtocolCodecFilter(new ObjectSerializationCodecFactory());
           //getFilterChain() 获取 I/O 过滤器链,可以对 I/O 过滤器进行管理,包括添加和删除 I/O 过滤器。      
            acceptor = new NioSocketAcceptor();  
            //设置缓存大小
            acceptor.getSessionConfig().setReadBufferSize(1024);  
             // 设置过滤器
            acceptor.getFilterChain().addLast("executor",new ExecutorFilter()); 
            acceptor.getFilterChain().addLast("logger",new LoggingFilter());  
            acceptor.getFilterChain().addLast("codec",pf);
    
            KeepAliveMessageFactory kamf=new KeepAliveMessageFactoryImpl();
            KeepAliveFilter kaf = new KeepAliveFilter(kamf, IdleStatus.BOTH_IDLE);
            kaf.setForwardEvent(true);
            kaf.setRequestInterval(30);  //本服务器为被定型心跳  即需要每30秒接受一个心跳请求  否则该连接进入空闲状态 并且发出idled方法回调
            acceptor.getFilterChain().addLast("heart", kaf); 
            //读写通道60秒内无操作进入空闲状态
            acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);
            //绑定逻辑处理器
            acceptor.setHandler(ioHandler);  
            //绑定端口
            acceptor.bind(new InetSocketAddress(port));
            logger.info("Mina服务端启动成功...端口号为:"+port); //测试使用
        }
        //创建unbind()方法停止监听
        public void unbind()
        {
            acceptor.unbind();
            logger.info("服务端停止成功");
        }
        public void setAcceptor(IoAcceptor acceptor) {
            this.acceptor = acceptor;
        }
        //  设置 I/O 处理器。该 I/O 处理器会负责处理该 I/O 服务所管理的所有 I/O 会话产生的 I/O 事件。
        public void setIoHandler(IoHandler ioHandler) {
            this.ioHandler = ioHandler;
        }
        //设置端口
        public void setPort(int port) {
            this.port = port;
        }
    //  获取该 I/O 服务所管理的 I/O 会话。
        public  Map<Long, IoSession> getManagedSessions()
        {
            return acceptor.getManagedSessions();
        }
    }
    

    4,实现session容器

    如果需要保证线程安全,可以使用 ConcurrentHashMap,作为session容器。

    package com.pcm.mina.service.session;
    
    import java.io.Serializable;
    import java.net.InetAddress;
    import java.net.SocketAddress;
    import java.net.UnknownHostException;
    import org.apache.mina.core.session.IoSession;
    
    
    /**
     * @author ZERO
     * @Description  IoSession包装类  
     */
    public class PcmSession implements Serializable{
    
        private static final long serialVersionUID = 1L;
    
        private transient IoSession session;
    
        private String gid;             //session全局ID
        private Long nid;               //session在本台服务器上的ID
        private String host;            //session绑定的服务器IP
        private String account;         //session绑定的账号
        private String message;         //session绑定账号的消息
        private Long bindTime;          //登录时间
        private Long heartbeat;         //心跳时间
    
    
        public PcmSession(){}
    
        public PcmSession(IoSession session) {
            this.session = session;
            this.nid = session.getId();
        }
    
        public String getGid() {
            return gid;
        }
    
        public void setGid(String gid) {
            this.gid = gid;
        }
    
        public Long getBindTime() {
            return bindTime;
        }
    
        public void setBindTime(Long bindTime) {
            this.bindTime = bindTime;
        }
    
        public Long getHeartbeat() {
            return heartbeat;
        }
    
        public void setHeartbeat(Long heartbeat) {
            this.heartbeat = heartbeat;
        }
    
    
        public String getMessage() {
            return message;
        }
    
        public void setMessage(String message) {
            this.message = message;
        }
    
        public String getAccount() {
            return account;
        }
    
        public void setAccount(String account) {
            this.account = account;
        }
    
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host = host;
        }
    
        public void setIoSession(IoSession session) {
            this.session = session;
        }
    
        public IoSession getIoSession() {
            return session;
        }
    
    
    //  将键为 key,值为 value的用户自定义的属性存储到 I/O 会话中。
        public void setAttribute(String key, Object value) {
            if(session!=null){
                session.setAttribute(key, value);
            }
    
        }
    
    
        public boolean containsAttribute(String key) {
            if(session!=null){
    
                return session.containsAttribute(key);
            }
            return false;
        }
        //  从 I/O 会话中获取键为 key的用户自定义的属性。
        public Object getAttribute(String key) {
            if(session!=null){
    
                return session.getAttribute(key);
            }
    
            return null;
        }
        //从 I/O 会话中删除键为 key的用户自定义的属性。
        public void removeAttribute(String key) {
            if(session!=null){
                session.removeAttribute(key);
            }
        }
    
    
        public SocketAddress getRemoteAddress() {
            if(session!=null){
                return session.getRemoteAddress();
            }       
            return null;
        }
    
    /*   将消息对象 message发送到当前连接的对等体。该方法是异步的,当消息被真正发送到对等体的时候,
        IoHandler.messageSent(IoSession,Object)会被调用。如果需要的话,
        也可以等消息真正发送出去之后再继续执行后续操作。*/
        public void write(Object msg) {
            if(session!=null)
            {
                session.write(msg).isWritten(); 
            }
        }
    
        public boolean isConnected() {
            if(session!=null){
                return session.isConnected();
            }
            return false;
        }
    
        public boolean  isLocalhost()
        {
            try {
                String ip = InetAddress.getLocalHost().getHostAddress();
                return ip.equals(host);
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
            return false;
    
        }
    
    /* 关闭当前连接。如果参数 immediately为 true的话,
     * 连接会等到队列中所有的数据发送请求都完成之后才关闭;否则的话就立即关闭。 
     */ 
        public void close(boolean immediately) {
            if(session!=null){
                session.close(immediately);
            }
        }
    
        public boolean equals(Object message) {
    
            if (message instanceof PcmSession) {
    
                PcmSession t = (PcmSession) message;
                if( t.nid!=null && nid!=null)
                {
                    return  t.nid.longValue()==nid.longValue() && t.host.equals(host);
                } 
            }  
            return false;
        }
    
    
        public String  toString()
        {
            StringBuffer buffer = new   StringBuffer();
            buffer.append("{");
        buffer.append(""").append("gid").append("":").append(""").append(gid).append(""").append(",");
            buffer.append(""").append("nid").append("":").append(nid).append(",");
            buffer.append(""").append("host").append("":").append(""").append(host).append(""").append(",");
            buffer.append(""").append("account").append("":").append(""").append(account).append(""").append(",");
            buffer.append(""").append("bindTime").append("":").append(bindTime).append(",");
            buffer.append(""").append("heartbeat").append("":").append(heartbeat);
            buffer.append("}");
            return buffer.toString();
    
        }
    }
    
    package com.pcm.mina.service.session;
    
    /**
     * @author ZERO
     * @Description  客户端的session管理接口
     */
    public interface SessionManager {
        /**
         * 添加新的session
         */
        public void addSession(String account,PcmSession session);
    
        /**
         * 
         * @param account 客户端session的 key 一般可用 用户账号来对应session
         * @return
         */
        PcmSession getSession(String account);
    
        /**
         * 删除session
         * @param session
         */
        public void  removeSession(PcmSession session);
    
        /**
         * 删除session
         * @param account
         */
        public void  removeSession(String account);
    
    }
    
    package com.pcm.mina.service.session;
    
    import java.util.HashMap;
    import java.util.concurrent.atomic.AtomicInteger;
    import com.pcm.mina.service.model.Message;
    
    /**
     * @author ZERO
     * @Description  自带默认 session管理实现
     */
    public class DefaultSessionManager implements SessionManager{
    
         private static HashMap<String,PcmSession> sessions =new  HashMap<String,PcmSession>();
         private static final AtomicInteger connectionsCounter = new AtomicInteger(0);
    
        public void addSession(String account, PcmSession session) {
            if(session !=null){
                sessions.put(account, session);
                connectionsCounter.incrementAndGet();
            }
        }
    
        public PcmSession getSession(String account) {
            return sessions.get(account);
        }
    
        public void removeSession(PcmSession session) {
            sessions.remove(session.getAttribute(Message.SESSION_KEY));
        }
    
        public void removeSession(String account) {
            sessions.remove(account);
        }
    
    }
    

    5, 实现业务逻辑处理器。

    因为注释写的已经够详细了,所以这里就不细说了。
    做了简单业务逻辑处理,如有需要可以自行更改。

    package com.pcm.mina.service.handler;
    
    import java.net.InetSocketAddress;
    import java.util.HashMap;
    import org.apache.log4j.Logger;
    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.springframework.stereotype.Component;
    import com.pcm.mina.service.RequestHandler;
    import com.pcm.mina.service.model.Message;
    import com.pcm.mina.service.model.ReplyBody;
    import com.pcm.mina.service.model.SentBody;
    import com.pcm.mina.service.session.PcmSession;
    
    /**
     * @author ZERO
     * @Description  I/O 处理器 客户端请求的入口,所有请求都首先经过它分发处理 业务逻辑实现
     */ 
    
    @Component("sercixeMainHandler")
    public class ServiceMainHandler extends IoHandlerAdapter{
        protected final Logger logger = Logger.getLogger(ServiceMainHandler.class);
    
        //本地handler请求
        private HashMap<String, RequestHandler> handlers = new HashMap<String, RequestHandler>();
    
        //出错时
        @Override
        public void exceptionCaught(IoSession session, Throwable cause){
            logger.error("exceptionCaught()... from "+session.getRemoteAddress());
            logger.error(cause);
            cause.printStackTrace();
        }
    
        //接收到消息时
        @Override
        public void messageReceived(IoSession iosession,Object message){        
            logger.info("服务端接收到的消息..."+message.toString());
            if(message instanceof SentBody){
                SentBody sent=(SentBody) message;
                ReplyBody rb=new ReplyBody();
                PcmSession session=new PcmSession(iosession);
                String key=sent.getKey();
                if("quit".equals(sent.get("message"))){ //服务器断开的条件
                    try {
                        sessionClosed(iosession);
                    } catch (Exception e) {
                        rb.setCode(Message.ReturnCode.CODE_500);
                        e.printStackTrace();
                    }
                }else{
                //根据key的不同调用不同的handler
                RequestHandler rhandler=handlers.get(key);
                if(rhandler==null){//如果没有这个handler
                    rb.setCode(Message.ReturnCode.CODE_405);
                    rb.setMessage("服务端未定义!");
                }else{//有的话
                    rb=rhandler.process(session, sent);
                }
                }
                if(rb !=null){
                    rb.setKey(key);
                    session.write(rb);
                    logger.info("服务端发送的消息: " + rb.toString());
                }
    
            }
        }
    
        //发送消息
        @Override
        public void messageSent(IoSession session, Object message) throws Exception {
       //   session.close(); //发送成功后主动断开与客户端的连接 实现短连接
           logger.info("服务端发送信息成功...");
            }
    
        //建立连接时
        @Override
        public void sessionCreated(IoSession session) throws Exception {
                InetSocketAddress sa=(InetSocketAddress)session.getRemoteAddress();
                String address=sa.getAddress().getHostAddress(); //访问的ip
                session.setAttribute("address", address);
                //将连接的客户端ip保存到map集合中
                SentBody body=new SentBody();
                body.put("address", address);
                logger.info("访问的ip:"+address);
            }
    
        //关闭连接时   
        @Override
        public void sessionClosed(IoSession iosession) throws Exception {
                PcmSession session=new PcmSession(iosession);
                logger.debug("sessionClosed()... from "+session.getRemoteAddress());
                try {
                    RequestHandler hand=handlers.get("client_closs");
                    if(hand !=null && session.containsAttribute(Message.SESSION_KEY)){
                        hand.process(session, null);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                session.close(true);  
                logger.info("连接关闭");
            }
    
        //空闲时
        @Override
        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
            logger.debug("sessionIdle()... from "+session.getRemoteAddress());
            }
    
        //打开连接时   
        @Override
         public void sessionOpened(IoSession session) throws Exception {
                logger.info("开启连接...");
            }
    
        public HashMap<String, RequestHandler> getHandlers() {
            return handlers;
        }
        public void setHandlers(HashMap<String, RequestHandler> handlers) {
            this.handlers = handlers;
        } 
    } 
    

    6, 实现业务逻辑代码。

    目前实现了绑定,推送以及关闭逻辑代码。如有需要,可自行增加。

    package com.pcm.mina.service;
    
    import com.pcm.mina.service.model.ReplyBody;
    import com.pcm.mina.service.model.SentBody;
    import com.pcm.mina.service.session.PcmSession;
    /**
     * @author ZERO
     * @Description  请求处理接口,所有的请求必须实现此接口
     */
    public interface RequestHandler {
        public abstract ReplyBody process(PcmSession session,SentBody message);
    }
    
    package com.pcm.mina.service.handler;
    
    import java.net.InetAddress;
    import java.util.UUID;
    import org.apache.log4j.Logger;
    import com.pcm.mina.service.RequestHandler;
    import com.pcm.mina.service.model.Message;
    import com.pcm.mina.service.model.ReplyBody;
    import com.pcm.mina.service.model.SentBody;
    import com.pcm.mina.service.session.DefaultSessionManager;
    import com.pcm.mina.service.session.PcmSession;
    import com.pcm.util.ContextHolder;
    
    /**
     * @author ZERO
     * @Description  账号绑定实现
     */ 
    public class BindHandler implements RequestHandler {
        protected final Logger logger = Logger.getLogger(BindHandler.class);
        public ReplyBody process(PcmSession newSession, SentBody message) {
            ReplyBody reply = new ReplyBody();
            DefaultSessionManager sessionManager= ((DefaultSessionManager) ContextHolder.getBean("PcmSessionManager"));
            try { 
                String account = message.get(Message.SESSION_KEY);
                newSession.setAccount(account);
                newSession.setMessage(message.get("message"));
                newSession.setGid(UUID.randomUUID().toString());
                newSession.setHost(InetAddress.getLocalHost().getHostAddress());
                //第一次设置心跳时间为登录时间
                newSession.setBindTime(System.currentTimeMillis());
                newSession.setHeartbeat(System.currentTimeMillis());
                /**
                 * 由于客户端断线服务端可能会无法获知的情况,客户端重连时,需要关闭旧的连接
                 */
                PcmSession oldSession  = sessionManager.getSession(account);
                //如果是账号已经在另一台终端登录。则让另一个终端下线
                if(oldSession!=null&&!oldSession.equals(newSession))
                {
                        oldSession.removeAttribute(Message.SESSION_KEY);
                        ReplyBody rb = new ReplyBody();
                        rb.setCode(Message.MessageType.TYPE_999);//强行下线消息类型
                        rb.put(Message.SESSION_KEY, account);
                        if(!oldSession.isLocalhost())
                        {
                            /*
                            判断当前session是否连接于本台服务器,如不是发往目标服务器处理
                            MessageDispatcher.execute(rb, oldSession.getHost());
                            */
                        }else
                        {
                            oldSession.write(rb);
                            oldSession.close(true);
                            oldSession = null;
                        }
                        oldSession = null;
                }
                if(oldSession==null)
                {
                    sessionManager.addSession(account, newSession);
                }
    
                reply.setCode(Message.ReturnCode.CODE_200);
            } catch (Exception e) {
                reply.setCode(Message.ReturnCode.CODE_500);
                e.printStackTrace();
            }
            logger.debug("绑定账号:" +message.get(Message.SESSION_KEY)+"-----------------------------" +reply.getCode());
            return reply;
        }
    
    }
    package com.pcm.mina.service.handler;
    
    import org.apache.log4j.Logger;
    import com.pcm.mina.service.RequestHandler;
    import com.pcm.mina.service.model.Message;
    import com.pcm.mina.service.model.ReplyBody;
    import com.pcm.mina.service.model.SentBody;
    import com.pcm.mina.service.session.DefaultSessionManager;
    import com.pcm.mina.service.session.PcmSession;
    import com.pcm.util.ContextHolder;
    
    /**
     * @author ZERO
     * @Description  推送消息
     */ 
    public class PushMessageHandler implements RequestHandler {
    
        protected final Logger logger = Logger.getLogger(PushMessageHandler.class);
        public ReplyBody process(PcmSession ios, SentBody sent) {
            ReplyBody reply = new ReplyBody();
            String account=(String) sent.getData().get(Message.SESSION_KEY);
            DefaultSessionManager sessionManager=(DefaultSessionManager) ContextHolder.getBean("PcmSessionManager");
            PcmSession session=sessionManager.getSession(account);
            if(session !=null){
                sent.remove(Message.SESSION_KEY);
                reply.setKey(sent.getKey());
                reply.setMessage("推送的消息");
                reply.setData(sent.getData());
                reply.setCode(Message.ReturnCode.CODE_200); 
                session.write(reply); //转发获取的消息
                logger.info("推送的消息是:"+reply.toString());
            }else{
                reply.setCode(Message.ReturnCode.CODE_403);
                reply.setMessage("推送失败");
            }
            return reply;
        }
    }
    package com.pcm.mina.service.handler;
    
    import org.apache.log4j.Logger;
    import com.pcm.mina.service.RequestHandler;
    import com.pcm.mina.service.model.Message;
    import com.pcm.mina.service.model.ReplyBody;
    import com.pcm.mina.service.model.SentBody;
    import com.pcm.mina.service.session.DefaultSessionManager;
    import com.pcm.mina.service.session.PcmSession;
    import com.pcm.util.ContextHolder;
    
    /**
     * @author ZERO
     * @Description  断开连接,清除session
     */
    public class SessionClosedHandler implements RequestHandler {
        protected final Logger logger = Logger.getLogger(SessionClosedHandler.class);
        public ReplyBody process(PcmSession ios, SentBody message) {
    
            DefaultSessionManager sessionManager  =  ((DefaultSessionManager) ContextHolder.getBean("PcmSessionManager"));
    
            if(ios.getAttribute(Message.SESSION_KEY)==null)
            {
                return null;
            }
            String account = ios.getAttribute(Message.SESSION_KEY).toString();
            sessionManager.removeSession(account);
            return null;
        }
    }

    7,spring配置

    可以将过滤器添加到spring这块,包括心跳设置。

    <!-- spring集成mina -->
            <!-- 设置 I/O 接受器,并指定接收到请求后交给 myHandler 进行处理 --> 
            <bean id="customEditorConfigurer" class="org.springframework.beans.factory.config.CustomEditorConfigurer">
               <property name="customEditors" >
                 <map>
                   <entry key="java.net.SocketAddress"  value="org.apache.mina.integration.beans.InetSocketAddressEditor"/>
                 </map>
               </property>
             </bean>
    
             <!-- handlers事件 -->
            <bean id="IoHandler" class="com.pcm.mina.service.handler.ServiceMainHandler">
                <property name="handlers">
                    <map>
                        <entry key="client_bind">  <!-- 创建连接 -->
                            <bean class="com.pcm.mina.service.handler.BindHandler"></bean>
                        </entry>
                        <entry key="client_closs">  <!--断开清除会话  -->
                            <bean class="com.pcm.mina.service.handler.SessionClosedHandler"></bean>
                        </entry>
                        <entry key="client_push">  <!--在线推送消息  -->
                            <bean class="com.pcm.mina.service.handler.PushMessageHandler"></bean>
                        </entry>
                    </map>
                </property>
            </bean>
    
             <!-- IoAccepter,绑定到1255端口 -->
              <!-- 通过 init-method指明了当 I/O 接受器创建成功之后,调用其 bind方法来接受连接;通过 destroy-method声明了当其被销毁的时候,调用其 unbind来停止监听 -->
            <bean id="SerNioSociketAcceptor"  class="com.pcm.mina.service.SerNioSociketAcceptor" 
            init-method="bind" destroy-method="unbind">  
                 <property name="port" value="1255" /> 
                 <property name="ioHandler" ref="IoHandler" /> 
            </bean>
    
             <!--spring动态获取bean实现  -->
            <bean id="ContextHolder" class="com.pcm.util.ContextHolder"></bean>
            <bean id="PcmSessionManager" class="com.pcm.mina.service.session.DefaultSessionManager"/> 

    客户端

    1,编写业务逻辑处理器

    几乎和服务端一样,这里因为测试,所以就从简了。

    package com.pcm.mina.client.MinaDemo;
    
    import org.apache.log4j.Logger;
    import org.apache.mina.core.service.IoHandlerAdapter;
    import org.apache.mina.core.session.IoSession;
    
    /**
     * @author ZERO
     * @Description 客户端handle
     */
    public class MinaClientHandler extends IoHandlerAdapter {
        private static Logger logger = Logger.getLogger(MinaClientHandler.class);
    
        @Override
        public void messageReceived(IoSession session, Object message)
                throws Exception {
             String msg = message.toString();
        //    logger.info("客户端A接收的数据:" + msg);
          System.out.println("客户端A接收的数据:" + msg);
           if(msg.equals("hb_request")){
            logger.warn("客户端A成功收到心跳包:hb_request");
            session.write("hb_response");
            logger.warn("客户端A成功发送心跳包:hb_response");
           }
        }
    
        @Override
        public void exceptionCaught(IoSession session, Throwable cause)
                throws Exception {
            logger.error("发生错误...", cause);
        }
    }

    2,编写客户端程序。

    也几乎和服务端一致,为了简单使用,编写main方法。
    注:客户端和服务端的过滤器要一致。

    package com.pcm.mina.client.MinaDemo;
    
    import java.net.InetSocketAddress;
    import org.apache.log4j.Logger;
    import org.apache.mina.core.future.ConnectFuture;
    import org.apache.mina.core.service.IoConnector;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
    import org.apache.mina.transport.socket.nio.NioSocketConnector;
    import com.pcm.mina.service.model.SentBody;
    
    
    /**
     * @author ZERO
     * @Description mina 客户端
     */
        public class MinaClient {
            private static Logger logger = Logger.getLogger(MinaClient.class);
            private static String HOST = "127.0.0.1";
            private static int PORT = 1255;
            private static  IoConnector connector=new NioSocketConnector();
            private static   IoSession session;
            public static IoConnector getConnector() {
                return connector;
            }
    
            public static void setConnector(IoConnector connector) {
                MinaClient.connector = connector;
            }
    
            /* 
            * 测试服务端与客户端程序!
            a. 启动服务端,然后再启动客户端
            b. 服务端接收消息并处理成功;
            */
            @SuppressWarnings("deprecation")
            public static void main(String[] args) {
                   // 设置链接超时时间
                connector.setConnectTimeout(30000);
                // 添加过滤器  可序列话的对象 
                connector.getFilterChain().addLast(
                        "codec",
                        new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
                // 添加业务逻辑处理器类
                connector.setHandler(new MinaClientHandler());
                ConnectFuture future = connector.connect(new InetSocketAddress(
                        HOST, PORT));// 创建连接
                future.awaitUninterruptibly();// 等待连接创建完成
                session = future.getSession();// 获得session
    
                bindstart();
           //   pushstart();
            }
    
            public static void bindstart(){
                logger.info("客户端A绑定服务端");
                try {
                    SentBody sy=new SentBody();
                    sy.put("message", "这是个测试账号");
                    sy.put("account", "123456");
                    sy.setKey("client_bind");
                    session.write(sy);// 发送消息
                    System.out.println("客户端A与服务端建立连接成功...发送的消息为:"+sy);
              //      logger.info("客户端A与服务端建立连接成功...发送的消息为:"+sy);
                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error("客户A端链接异常...", e);
                }
                session.getCloseFuture().awaitUninterruptibly();// 等待连接断开
                connector.dispose();
            }
    }       

    注意事项

    1,客户端和服务端的过滤器要保持一致,不然容易出现异常 java.nio.charset.MalformedInputException。

    2,使用对象进行传输的时候需要实现接口java.io.Serializable接口。

    3,如果使用对象传输,所在的包。类名也要一致,不然会出现 org.apache.mina.core.buffer.BufferDataException: java.io.InvalidClassException: failed to read class descriptor (Hexdump: 00 00 00 3C AC ED 00 05 73 72 01 00 1C 63 6F 6D 2E 65 78 61 6D 70 6C 65 2E 63 这种错误(被困扰过很久)。

    代码就先告一段落。客户端也可以通过socket和mina进行数据传输,这里就不贴代码了。
    spring整合mina,暂时就到这了。项目我放到了github上,地址:https://github.com/xuwujing/springMina/tree/master
    如果感觉不错,希望可以给个star。

  • 相关阅读:
    年度开源盛会 ApacheCon 来临,Apache Pulsar 专场大咖齐聚
    开源流数据公司 StreamNative 正式加入 CNCF,积极推动云原生策略发展
    php摇杆Tiger摇奖
    php调试局部错误强制输出 display_errors
    php文件写入PHP_EOL与FILE_APPEND
    window自动任务实现数据库定时备份
    php 跨服务器ftp移动文件
    linux 关于session缓存丢失,自己掉坑里面了
    mysql 定时任务
    mysql 查询去重 distinct
  • 原文地址:https://www.cnblogs.com/xuwujing/p/7629968.html
Copyright © 2011-2022 走看看