zoukankan      html  css  js  c++  java
  • mina的编码和解码以及断包的处理,发送自己定义协议,仿qq聊天,发送xml或json

    近期一段时间以来,mina非常火,和移动开发一样。异常的火爆。前面写了几篇移动开发的文章,都还不错。你们的鼓舞就是我最大的动力。好了,废话少说。我们来看下tcp通讯吧。
    tcp通讯对于java来说是非常easy的。就是socket,也就是大家常说的套接字。大家不要把它看的非常难。

    说白了tcp通讯事实上就是数据流的读写。一条输入流,一条输出流。分别复杂发消息和接收消息。


    明确了这些,ok。我们来看看我写的样例吧。

    先看服务器端的測试类的源代码:

    package com.minaqq.test;
    
    import com.minaqq.server.ServerMsgProtocol;
    import com.minaqq.worker.ServerSendMsgThread;
    
    public class MsgServerTest {
    	public static void main(String[] args) {
    		if(ServerMsgProtocol.serverStart()){
    			System.out.println("服务器启动成功......");
    			ServerSendMsgThread ssmt=new ServerSendMsgThread();
    			ssmt.start();
    			System.out.println("工作线程启动成功......");
    		}
    	}
    }
    

    服务端连接代码:

    package com.minaqq.server;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    
    import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.keepalive.KeepAliveFilter;
    import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
    import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
    import org.apache.mina.filter.logging.LogLevel;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.SocketAcceptor;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
    
    import com.minaqq.protocol.MsgProtocol;
    
    /**
     * @see server启动类,字符串消息測试类
     * @author Herman.Xiong
     * @date 2013年12月6日 09:23:31
     * @file MinaServer.java
     * @package com.minaqq.server
     * @project MINA_QQ
     * @version 1.0
     * @since jdk1.6,mina 2.0
     */
    public class ServerMsgProtocol {
    	
    	//30秒后超时 
        private static final int IDELTIMEOUT = 30;
        //15秒发送一次心跳包
        private static final int HEARTBEATRATE = 15;
        
        private static SocketAcceptor acceptor;
    
        private ServerMsgProtocol() {}
        
        public static SocketAcceptor getAcceptor(){
        	if(null==acceptor){
        		// 创建非堵塞的server端的Socket连接
        		acceptor = new NioSocketAcceptor();
        	}
        	return acceptor;
        }
    
        public static boolean serverStart() {
            DefaultIoFilterChainBuilder filterChain = getAcceptor().getFilterChain();
            // 加入编码过滤器 处理乱码、编码问题
            filterChain.addLast("codec", new ProtocolCodecFilter(new MsgProtocol()));
            LoggingFilter loggingFilter = new LoggingFilter();
            loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
            loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
            // 加入日志过滤器
            filterChain.addLast("loger", loggingFilter);
            // 设置核心消息业务处理器
            getAcceptor().setHandler(new ServerMessageHandler());
            //KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
            //KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();
            //KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE, heartBeatHandler);
            // 是否回发 
            //heartBeat.setForwardEvent(false);
            // 发送频率 
            //heartBeat.setRequestInterval(HEARTBEATRATE);
            //getAcceptor().getFilterChain().addLast("heartbeat", heartBeat);
            getAcceptor().getSessionConfig().setReceiveBufferSize(2048*5000);//接收缓冲区1M
            getAcceptor().getSessionConfig().setBothIdleTime(30);
            //getAcceptor().getSessionConfig().setKeepAlive(true);
            // 设置session配置,30秒内无操作进入空暇状态
            getAcceptor().getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDELTIMEOUT);
            try {
                // 绑定端口3456
            	getAcceptor().bind(new InetSocketAddress(8888));
            	return true;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return false;
        }
    
    }
    
    server的消息处理:
    package com.minaqq.server;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.mina.core.future.CloseFuture;
    import org.apache.mina.core.future.IoFuture;
    import org.apache.mina.core.future.IoFutureListener;
    import org.apache.mina.core.service.IoHandler;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    
    import com.minaqq.domain.MsgPack;
    /**
     * @see 处理server端消息
     * @author Herman.Xiong
     * @date 2012-6-26 下午01:12:34
     * @file ServerMessageHandler.java
     * @package com.minaqq.server
     * @project MINA_QQ
     * @version 1.0
     * @since jdk1.6,mina 2.0
     */
    public class ServerMessageHandler implements IoHandler{
    
        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
            System.out.println("server发生异常:"+ cause.toString());
        }
    
        public void messageReceived(IoSession session, Object message) throws Exception {
        	MsgPack mp=(MsgPack)message;
            System.out.println("收到client数据messageReceived----------:"+ mp.toString());
    		/*//请求协议
    		mp.setMsgMethod(3000);
    		mp.setMsgPack("我是server发的消息");
    		mp.setMsgLength(mp.getMsgPack().getBytes().length);
            session.write(mp);*/
            /*String content = mp.toString();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            String datetime = sdf.format(new Date());
            System.out.println(datetime+"server接收到数据的内容为messageReceived----------: " + content);*/
            // 拿到全部的clientSession
            /*Collection<IoSession> sessions = session.getService().getManagedSessions().values();
            // 向全部client发送数据
            for (IoSession sess : sessions) {
                sess.write(datetime + "	" + content);
            }*/
            
        }
    
        public void messageSent(IoSession session, Object message) throws Exception {
           /* System.out.println("server发送消息messageSent----------: "+ message);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            String datetime = sdf.format(new Date());
            System.out.println(datetime+"server发送消息messageSent----------: "+message.toString());*/
        }
     
        public void sessionClosed(IoSession session) throws Exception {
            System.out.println("关闭当前session: "+session.getId()+session.getRemoteAddress());
            CloseFuture closeFuture = session.close(true);
            closeFuture.addListener(new IoFutureListener<IoFuture>() {
                public void operationComplete(IoFuture future) {
                    if (future instanceof CloseFuture) {
                        ((CloseFuture) future).setClosed();
                        System.out.println("sessionClosed CloseFuture setClosed-->"+ future.getSession().getId());
                    }
                }
            });
        }
    
        public void sessionCreated(IoSession session) throws Exception {
            System.out.println("创建一个新连接:"+ session.getRemoteAddress()+"  id:  "+session.getId());
            session.write("welcome to the chat room !");
        }
    
        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
            System.out.println("当前连接处于空暇状态:"+ session.getRemoteAddress()+ status);
        }
    
        public void sessionOpened(IoSession session) throws Exception {
            System.out.println("打开一个session id:"+ session.getId()+"  空暇连接个数IdleCount:  "+ session.getBothIdleCount());
        }
    }
    
    自己定义协议类:

    /**
     * @see 自己定义协议
     * @author Herman.Xiong
     * @date 2014年6月11日 10:30:40
     */
    public class MsgProtocol implements ProtocolCodecFactory{
    	private static final Charset charset=Charset.forName("UTF-8");
      
        public ProtocolDecoder getDecoder(IoSession session) throws Exception {  
            return new MsgProtocolDecoder(charset);
        }  
      
        public ProtocolEncoder getEncoder(IoSession session) throws Exception {  
            return new MsgProtocolEncoder(charset);
        }
    }
    协议解码类:
    package com.minaqq.charset;
    
    import java.nio.ByteOrder;
    import java.nio.charset.Charset;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    
    import com.minaqq.domain.MsgPack;
    /**
     * @see 协议解码
     * @author Herman.Xiong
     * @date 2014年6月11日 16:47:24
     */
    public class MsgProtocolDecoder extends CumulativeProtocolDecoder  {  
        private Charset charset=null;  
      
        public MsgProtocolDecoder() {  
            this(Charset.defaultCharset());  
        }  
        
        public MsgProtocolDecoder(Charset charset) {  
            this.charset = charset;  
        }
        
    	public void decode1(IoSession is, IoBuffer buf, ProtocolDecoderOutput out)
    			throws Exception {
    		buf.order(ByteOrder.LITTLE_ENDIAN);
    		MsgPack mp=new MsgPack();
    		//获取消息的内容长度
    		mp.setMsgLength(buf.getInt());
    		//获取消息的功能函数
    		mp.setMsgMethod(buf.getInt());
    		byte[] msg=new byte[mp.getMsgLength()];
    		buf.get(msg);
    		mp.setMsgPack(new String(msg,charset));
    		buf.flip();
    		out.write(mp);
    	}
    	
    	public void dispose(IoSession arg0) throws Exception {
    		
    	}
    	
    	public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)
    			throws Exception {
    		
    	}
    
    	public void decode0(IoSession arg0, IoBuffer arg1, ProtocolDecoderOutput arg2)
    			throws Exception {
    		int limit = arg1.limit();
    		byte[] bytes = new byte[limit];
    		arg1.get(bytes);
    		arg2.write(bytes);
    	}
    
    	protected boolean doDecode(IoSession session, IoBuffer ioBuffer, ProtocolDecoderOutput out) throws Exception {
    		ioBuffer.order(ByteOrder.LITTLE_ENDIAN); 
    		MsgPack mp = (MsgPack) session.getAttribute("nac-msg-pack"); // 从session对象中获取“xhs-upload”属性值 
    		if(null==mp){
    			 if (ioBuffer.remaining() >= 8) {
    				 //取消息体长度
    				 int msgLength = ioBuffer.getInt(); 
    				 int msgMethod = ioBuffer.getInt();
    				 mp=new MsgPack();
    				 mp.setMsgLength(msgLength);
    				 mp.setMsgMethod(msgMethod);
    				 session.setAttribute("nac-msg-pack",mp);
    				 return true;
    			 }
    			 return false;
    		}
    		if(ioBuffer.remaining()>=mp.getMsgLength()){
    			byte [] msgPack=new byte[mp.getMsgLength()];
    			ioBuffer.get(msgPack);
    			mp.setMsgPack(new String(msgPack,charset));
    			session.removeAttribute("nac-msg-pack");
    			out.write(mp);
    			return true;
    		}
    		return false;
    	}   
    
    }
    协议编码类:
    package com.minaqq.charset;
    
    import java.io.NotSerializableException;
    import java.io.Serializable;
    import java.nio.ByteOrder;
    import java.nio.charset.Charset;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
    import org.apache.mina.filter.codec.ProtocolEncoderOutput;
    
    import com.minaqq.domain.MsgPack;
    
    public class MsgProtocolEncoder extends ProtocolEncoderAdapter{
    	private Charset charset=null;
    
        public MsgProtocolEncoder(Charset charset) {
            this.charset = charset;     
        }     
        //在此处实现对MsgProtocolEncoder包的编码工作,并把它写入输出流中     
        public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { 
            if(message instanceof MsgPack){
            	 MsgPack mp = (MsgPack) message; 
            	 IoBuffer buf = IoBuffer.allocate(mp.getMsgLength());
            	 buf.order(ByteOrder.LITTLE_ENDIAN);
                 buf.setAutoExpand(true);    
                 //设置消息内容的长度
                 buf.putInt(mp.getMsgLength()); 
                 //设置消息的功能函数
                 buf.putInt(mp.getMsgMethod());
                 if (null != mp.getMsgPack()) {
                	 buf.put(mp.getMsgPack().getBytes(charset));
                 }   
                 buf.flip();     
                 out.write(buf);  
                 out.flush();
                 buf.free();
            }
        }     
        public void dispose() throws Exception {     
        }
        
    	public void encode0(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2)
    			throws Exception {
    		if (!(arg1 instanceof Serializable)) {
    	        throw new NotSerializableException();
    	    }
    	    IoBuffer buf = IoBuffer.allocate(64);
    	    buf.setAutoExpand(true);
    	    buf.putObject(arg1);
    
    	    int objectSize = buf.position() - 4;
    	    if (objectSize > 1024) {
    	        throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + 1024
    	                + ')');
    	    }
    
    	    buf.flip();
    	    arg2.write(buf);
    	}
    	
    }
    
    协议实体类:

    package com.minaqq.domain;
    
    import java.io.Serializable;
    
    /**
     * @see 自己定义数据包
     * @author Herman.Xiong
     * @date 2014年6月11日 11:31:45
     */
    public class MsgPack implements Serializable{
    	/**
    	 * 序列化和反序列化的版本
    	 */
    	private static final long serialVersionUID = 1L;
    	//消息长度
    	private int msgLength;
    	//消息方法
    	private int msgMethod;
    	//消息包内容
    	private String msgPack;
    	
    	public MsgPack() {}
    
    	public int getMsgLength() {
    		return msgLength;
    	}
    
    	public void setMsgLength(int msgLength) {
    		this.msgLength = msgLength;
    	}
    
    	public int getMsgMethod() {
    		return msgMethod;
    	}
    
    	public void setMsgMethod(int msgMethod) {
    		this.msgMethod = msgMethod;
    	}
    
    	public String getMsgPack() {
    		return msgPack;
    	}
    
    	public void setMsgPack(String msgPack) {
    		this.msgPack = msgPack;
    	}
    
    	public MsgPack(int msgLength, int msgMethod, String msgPack) {
    		this.msgLength = msgLength;
    		this.msgMethod = msgMethod;
    		this.msgPack = msgPack;
    	}
    
    	public String toString() {
    		return "MsgPack [msgLength=" + msgLength + ", msgMethod=" + msgMethod
    				+ ", msgPack=" + msgPack + "]";
    	}
    	
    }
    
    心跳信息工厂类:
    package com.minaqq.server;
    
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
    /**
     * @see 发送心跳包的内容
     * getResponse()---->isResponse();获取数据推断心跳事件(目的是推断是否触发心跳超时异常)
     * isRequest()----->getRequest(); 写回数据是心跳事件触发的数据(目的写回给server(client)心跳包)
     * @author Herman.Xiong
     */
    public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{
    	
    	//心跳包内容
        private static final String HEARTBEATREQUEST = "HEARTBEATREQUEST";
        private static final String HEARTBEATRESPONSE = "HEARTBEATRESPONSE";
        
    	/**
         * @see 返回给client的心跳包数据 return 返回结果才是client收到的心跳包数据
         * @author Herman.Xiong
         */
        public Object getRequest(IoSession session) {
            return HEARTBEATREQUEST;
        }
    
        /**
         * @see 接受到的client数据包
         * @author Herman.Xiong
         */
        public Object getResponse(IoSession session, Object request) {
            return request;
        }
    
        /**
         * @see 推断是否是client发送来的的心跳包此推断影响 KeepAliveRequestTimeoutHandler实现类推断是否心跳包发送超时
         * @author Herman.Xiong
         */
        public boolean isRequest(IoSession session, Object message) {
            if(message.equals(HEARTBEATRESPONSE)){
                System.out.println("接收到client心数据包引发心跳事件                 心跳数据包是》》" + message);
    	        return true;
    	    }
            return false;
        }
    
        /**
         * @see  推断发送信息是否是心跳数据包此推断影响 KeepAliveRequestTimeoutHandler实现类 推断是否心跳包发送超时
         * @author Herman.Xiong
         */
        public boolean isResponse(IoSession session, Object message) {
            if(message.equals(HEARTBEATREQUEST)){
                System.out.println("server发送数据包中引发心跳事件: " + message);
                return true;
            }
            return false;
        }
    }
    
    心跳业务处理类:
    package com.minaqq.server;
    
    import org.apache.mina.core.future.CloseFuture;
    import org.apache.mina.core.future.IoFuture;
    import org.apache.mina.core.future.IoFutureListener;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.keepalive.KeepAliveFilter;
    import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
    /**
     * @see 当心跳超时时的处理,也能够用默认处理 这里like
     * KeepAliveRequestTimeoutHandler.LOG的处理
     * @author Herman.Xiong
     */
    public class KeepAliveRequestTimeoutHandlerImpl  implements KeepAliveRequestTimeoutHandler {
    	/**
         * @see org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler心跳超时处理
         * @author Herman.Xiong
         */
        public void keepAliveRequestTimedOut(KeepAliveFilter filter,
                IoSession session) throws Exception {
            System.out.println("server端心跳包发送超时处理(即长时间没有发送(接受)心跳包)---关闭当前长连接");
            CloseFuture closeFuture = session.close(true);
            closeFuture.addListener(new IoFutureListener<IoFuture>() {
                public void operationComplete(IoFuture future) {
                    if (future instanceof CloseFuture) {
                        ((CloseFuture) future).setClosed();
                        System.out.println("sessionClosed CloseFuture setClosed-->"+ future.getSession().getId());
                    }
                }
            });
        }
    }
    
    server发送数据包的线程类:
    package com.minaqq.worker;
    
    import java.util.Map;
    
    import org.apache.mina.core.session.IoSession;
    
    import com.minaqq.domain.MsgPack;
    import com.minaqq.server.ServerMsgProtocol;
    import com.minaqq.utils.XmlUtils;
    /**
     * @see server端发送数据
     * @author Herman.Xiong
     * @date 2014年6月9日 10:38:59
     */
    public class ServerSendMsgThread extends Thread{
    	
    	public void run() {
    		while(true){
    			if(null!=ServerMsgProtocol.getAcceptor()){
    				System.out.println("MinaServer.getAcceptor().getManagedSessionCount() is "+ServerMsgProtocol.getAcceptor().getManagedSessionCount());
    				
    				Map<Long, IoSession> map=ServerMsgProtocol.getAcceptor().getManagedSessions();
    				for (Long key : map.keySet()) {
    					IoSession is = map.get(key);
    					//SocketAddress sa=is.getRemoteAddress();
    					//InetSocketAddress isa=(InetSocketAddress)sa;
    					//is.write("我是中文測试"+"session id is "+key+"  hostName:"+isa.getHostName()+"   address:"+isa.getAddress()+"   port:"+isa.getPort()+"        isa.toString:"+isa.toString());
    					MsgPack mp=new MsgPack();
    					//请求协议
    					mp.setMsgMethod(1000);
    					//mp.setMsgPack("我是服务端");
    					String str="";
    					for (int i = 0; i < 100; i++) {
    						str+=XmlUtils.getXml();
    					}
    					mp.setMsgPack(str);
    					mp.setMsgLength(mp.getMsgPack().getBytes().length);
    					try {
    						is.write(mp);
    					} catch (Exception e) {
    						e.printStackTrace();
    					}
    				}
    			}else {
    				System.out.println("MinaServer.getAcceptor is null ");
    			}
    			try {
    				Thread.sleep(3000);
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    client測试类:
    package com.minaqq.test;
    
    import com.minaqq.client.ClientMsgProtocol;
    import com.minaqq.domain.MsgPack;
    import com.minaqq.worker.ClientSendMsgThread;
    
    public class MsgClientTest {
    	public static void main(String[] args) {
    		ClientMsgProtocol.clientStart();
    		System.out.println("client启动成功......");
    		ClientSendMsgThread csmt=new ClientSendMsgThread();
    		csmt.start();
    		/*MsgPack mp=new MsgPack();
    		//请求协议
    		mp.setMsgMethod(2000);
    		mp.setMsgPack("我是client");
    		mp.setMsgLength(mp.getMsgPack().getBytes().length);
    		ClientMsgProtocol.getIoSession().write(mp);*/
    		System.out.println("client工作线程启动成功......");
    	}
    }
    
    client创建连接类:
    package com.minaqq.client;
    
    import java.net.InetSocketAddress;
    
    import org.apache.mina.core.future.ConnectFuture;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.logging.LogLevel;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketConnector;
    
    import com.minaqq.protocol.MsgProtocol;
    
    /**
     * @see 模拟client;
     * 用于连接服务端。并向服务端发送消息
     * @author Herman.Xiong
     * @date 2013年11月26日 11:27:50
     * @version 1.0
     * @serial jdk 1.6
     */
    public class ClientMsgProtocol {
    	
    	private static NioSocketConnector connector ;
    	
    	private static IoSession is;
    	
    	public static NioSocketConnector getConnector(){
        	if(null==connector){
        		// 创建非堵塞的server端的Socket连接
        		connector = new NioSocketConnector();
        	}
        	return connector;
        }
    	
    	public static IoSession getIoSession(){
        	return is;
    	}
    	
    	public static void clientStart(){
    		// 创建client连接器
    		NioSocketConnector connector = getConnector(); 
    		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MsgProtocol()));
    		LoggingFilter loggingFilter = new LoggingFilter();
            loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
            loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
    		connector.getFilterChain().addLast("logger", loggingFilter); 
    		connector.getSessionConfig().setReceiveBufferSize(2048*5000);//接收缓冲区1M
    		connector.setConnectTimeoutMillis(30000); // 设置连接超时
    		connector.setHandler(new TimeClientHandler());// 设置消息处理器
    		ConnectFuture cf = connector.connect(new InetSocketAddress("10.10.2.136",8888));// 建立连接
    		cf.awaitUninterruptibly();// 等待连接创建完毕
    		try {
    			is=cf.getSession();
    			//getIoSession().write(new String(XmlUtils.getXml().getBytes("UTF-8")));// 发送消息
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    }
    
    client消息处理事件类:
    package com.minaqq.client;
    
    import org.apache.mina.core.service.IoHandler;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    
    import com.minaqq.domain.MsgPack;
    
    
    /**
     * @see 处理接收client消的息事件
     * @author Herman.Xiong
     * @date 2013年11月26日 11:23:32
     * @version 1.0
     * @since jdk1.6
     */
    public class TimeClientHandler implements IoHandler{
    	
    	/**
    	 * 接收client发送的消息
    	 */
    	public void messageReceived(IoSession session, Object message) throws Exception { 
    		MsgPack mp=(MsgPack)message;
    		System.out.println("收到服务端发来的消息:"+mp.toString());// 显示接收到的消息
    	}
    
    	public void exceptionCaught(IoSession arg0, Throwable arg1)
    			throws Exception {
    		
    	}
    
    	public void messageSent(IoSession arg0, Object arg1) throws Exception {
    		
    	}
    
    	public void sessionClosed(IoSession arg0) throws Exception {
    		
    	}
    
    	public void sessionCreated(IoSession arg0) throws Exception {
    		
    	}
    
    	public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {
    		
    	}
    
    	public void sessionOpened(IoSession arg0) throws Exception {
    		
    	}
    }
    
    client发送消息线程类:
    package com.minaqq.worker;
    
    import com.minaqq.client.ClientMsgProtocol;
    import com.minaqq.domain.MsgPack;
    /**
     * @see 模拟client发送数据
     * @author Herman.Xiong
     * @date 2014年6月9日 10:38:59
     */
    public class ClientSendMsgThread extends Thread{
    	public void run() {
    		while(true){
    			if(null!=ClientMsgProtocol.getConnector()){
    				try {
    					//ClientMsgProtocol.getIoSession().write(new String("我是client".getBytes("UTF-8")));
    					MsgPack mp=new MsgPack();
    					//请求协议
    					mp.setMsgMethod(2000);
    					mp.setMsgPack("我是client");
    					mp.setMsgLength(mp.getMsgPack().getBytes().length);
    					ClientMsgProtocol.getIoSession().write(mp);
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    			}else {
    				System.out.println("MinaServer.getAcceptor is null ");
    			}
    			try {
    				Thread.sleep(3000);
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    发送xml消息的工具类:
    package com.minaqq.utils;
    
    import com.minaqq.domain.Address;
    import com.minaqq.domain.House;
    import com.minaqq.domain.Person;
    import com.minaqq.domain.PhoneNumber;
    import com.thoughtworks.xstream.XStream;
    import com.thoughtworks.xstream.io.xml.DomDriver;
    
    public class XmlUtils {
    	
    	public static void testXStream(){
    		XStream xstream=new XStream(new DomDriver());
    		xstream.alias("PERSON", Person.class);
    		xstream.alias("ADDRESS",Address.class);
    		xstream.alias("PHONENUMBER", PhoneNumber.class);
    		xstream.alias("HOUSE", House.class);
    		Person person=(Person)xstream.fromXML(XmlUtils.getXml());
    		System.out.println(person.toString());
    	}
    	
    	public static String getXml(){
    		StringBuffer sb=new StringBuffer("<?xml version="1.0" encoding="UTF-8"?>");//
    		sb.append("<PERSON firstName="Herman">");
    		sb.append("<lastName>Xiong</lastName>  ");
    		sb.append("<phonex>");
    		sb.append("<code>0</code>");
    		sb.append("<number>1234567</number>");
    		sb.append("</phonex>");
    		sb.append("<fax>");
    		sb.append("<code>0</code>");
    		sb.append("<number>7654321</number>");
    		sb.append("</fax>");
    		sb.append("<addList>");
    		sb.append("<ADDRESS>");
    		sb.append("<add>上海市</add>");
    		sb.append("<zipCode>123456</zipCode>");
    		sb.append("</ADDRESS>");
    		sb.append("</addList>");
    		sb.append("<house>");
    		sb.append("<HOUSE>");
    		sb.append("<size>300万</size>");
    		sb.append("<price>120平方米</price>");
    		sb.append("</HOUSE>");
    		sb.append("<HOUSE>");
    		sb.append("<size>500万</size>");
    		sb.append("<price>130平方米</price>");
    		sb.append("</HOUSE>");
    		sb.append("<HOUSE>");
    		sb.append("<size>160万</size>");
    		sb.append("<price>61.5平方米</price>");
    		sb.append("</HOUSE>");
    		sb.append("</house>");
    		sb.append("</PERSON>
    ");
    		return sb.toString();
    	}
    }
    
    xml有关的实体类:
    package com.minaqq.domain;
    
    import java.util.Arrays;
    import java.util.List;
    
    import com.thoughtworks.xstream.annotations.XStreamAsAttribute;
    
    public class Person {
    	@XStreamAsAttribute
    	private String firstName;
    	private String lastName;
    	private PhoneNumber phonex;
    	private PhoneNumber fax;
    	private List<Address> addList;
    	private House[] house;
    	public String getFirstName() {
    		return firstName;
    	}
    	public void setFirstName(String firstName) {
    		this.firstName = firstName;
    	}
    	public String getLastName() {
    		return lastName;
    	}
    	public void setLastName(String lastName) {
    		this.lastName = lastName;
    	}
    	public PhoneNumber getPhonex() {
    		return phonex;
    	}
    	public void setPhonex(PhoneNumber phonex) {
    		this.phonex = phonex;
    	}
    	public PhoneNumber getFax() {
    		return fax;
    	}
    	public void setFax(PhoneNumber fax) {
    		this.fax = fax;
    	}
    	public List<Address> getAddList() {
    		return addList;
    	}
    	public void setAddList(List<Address> addList) {
    		this.addList = addList;
    	}
    	public House[] getHouse() {
    		return house;
    	}
    	public void setHouse(House[] house) {
    		this.house = house;
    	}
    	public Person() {
    	}
    	public Person(String firstName, String lastName) {
    		this.firstName=firstName;
    		this.lastName=lastName;
    	}
    	public Person(String firstName, String lastName, PhoneNumber phonex,
    			PhoneNumber fax) {
    		this.firstName = firstName;
    		this.lastName = lastName;
    		this.phonex = phonex;
    		this.fax = fax;
    	}
    	public Person(String firstName, String lastName, PhoneNumber phonex,
    			PhoneNumber fax, List<Address> addList) {
    		this.firstName = firstName;
    		this.lastName = lastName;
    		this.phonex = phonex;
    		this.fax = fax;
    		this.addList = addList;
    	}
    	public Person(String firstName, String lastName, PhoneNumber phonex,
    			PhoneNumber fax, List<Address> addList, House[] house) {
    		this.firstName = firstName;
    		this.lastName = lastName;
    		this.phonex = phonex;
    		this.fax = fax;
    		this.addList = addList;
    		this.house = house;
    	}
    	@Override
    	public String toString() {
    		return "Person [addList=" + addList + ", fax=" + fax + ", firstName="
    				+ firstName + ", house=" + Arrays.toString(house)
    				+ ", lastName=" + lastName + ", phonex=" + phonex + "]";
    	}
    }
    
    xml有关的实体类:
    package com.minaqq.domain;
    
    public class Address {
    	private String add;
    	private String zipCode;
    	public String getAdd() {
    		return add;
    	}
    	public void setAdd(String add) {
    		this.add = add;
    	}
    	public String getZipCode() {
    		return zipCode;
    	}
    	public void setZipCode(String zipCode) {
    		this.zipCode = zipCode;
    	}
    	public Address() {
    	}
    	public Address(String add, String zipCode) {
    		this.add = add;
    		this.zipCode = zipCode;
    	}
    	@Override
    	public String toString() {
    		return "Address [add=" + add + ", zipCode=" + zipCode + "]";
    	}
    }
    
    xml有关的实体类:
    package com.minaqq.domain;
    
    public class PhoneNumber {
    	private int code;
    	private int number;
    	public int getCode() {
    		return code;
    	}
    	public void setCode(int code) {
    		this.code = code;
    	}
    	public int getNumber() {
    		return number;
    	}
    	public void setNumber(int number) {
    		this.number = number;
    	}
    	public PhoneNumber(){}
    	public PhoneNumber(int code,int number){
    		this.code=code;
    		this.number=number;
    	}
    	@Override
    	public String toString() {
    		return "PhoneNumber [code=" + code + ", number=" + number + "]";
    	}
    }
    
    xml有关的实体类:
    package com.minaqq.domain;
    
    public class House {
    	private String size;
    	private String price;
    	public String getSize() {
    		return size;
    	}
    	public void setSize(String size) {
    		this.size = size;
    	}
    	public String getPrice() {
    		return price;
    	}
    	public void setPrice(String price) {
    		this.price = price;
    	}
    	public House() {
    	}
    	public House(String size, String price) {
    		this.size = size;
    		this.price = price;
    	}
    	@Override
    	public String toString() {
    		return "House [price=" + price + ", size=" + size + "]";
    	}
    }
    

    执行效果图,服务图片:


    client图片:

    OK,到此结束了,欢迎大家关注我的个人博客。

    学习资料已经源代码下载请点击:http://download.csdn.net/download/xmt1139057136/7487611

    如有不懂。请大家增加qq群:135430763共同学习。

  • 相关阅读:
    107. Binary Tree Level Order Traversal II
    103. Binary Tree Zigzag Level Order Traversal
    102. Binary Tree Level Order Traversal
    690. Employee Importance
    1723. Find Minimum Time to Finish All Jobs
    LeetCode 329 矩阵中最长增长路径
    7.2 物理内存管理
    LeetCode 面试题 特定深度节点链表
    LeetCode 100 相同的树
    npm安装包命令详解,dependencies与devDependencies实际区别
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/5122003.html
Copyright © 2011-2022 走看看