zoukankan      html  css  js  c++  java
  • mina的编码和解码以及断包的处理,发送自己定义协议,仿qq聊天,发送xml或json

    近期一段时间以来,mina非常火,和移动开发一样。异常的火爆。前面写了几篇移动开发的文章,都还不错。你们的鼓舞就是我最大的动力。好了,废话少说。我们来看下tcp通讯吧。
    tcp通讯对于java来说是非常easy的。就是socket,也就是大家常说的套接字。大家不要把它看的非常难。

    说白了tcp通讯事实上就是数据流的读写。一条输入流,一条输出流。分别复杂发消息和接收消息。


    明确了这些,ok。我们来看看我写的样例吧。

    先看服务器端的測试类的源代码:

    package com.minaqq.test;
    
    import com.minaqq.server.ServerMsgProtocol;
    import com.minaqq.worker.ServerSendMsgThread;
    
    public class MsgServerTest {
    	public static void main(String[] args) {
    		if(ServerMsgProtocol.serverStart()){
    			System.out.println("服务器启动成功......");
    			ServerSendMsgThread ssmt=new ServerSendMsgThread();
    			ssmt.start();
    			System.out.println("工作线程启动成功......");
    		}
    	}
    }
    

    服务端连接代码:

    package com.minaqq.server;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    
    import org.apache.mina.core.filterchain.DefaultIoFilterChainBuilder;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.keepalive.KeepAliveFilter;
    import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
    import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
    import org.apache.mina.filter.logging.LogLevel;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.SocketAcceptor;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
    
    import com.minaqq.protocol.MsgProtocol;
    
    /**
     * @see server启动类,字符串消息測试类
     * @author Herman.Xiong
     * @date 2013年12月6日 09:23:31
     * @file MinaServer.java
     * @package com.minaqq.server
     * @project MINA_QQ
     * @version 1.0
     * @since jdk1.6,mina 2.0
     */
    public class ServerMsgProtocol {
    	
    	//30秒后超时 
        private static final int IDELTIMEOUT = 30;
        //15秒发送一次心跳包
        private static final int HEARTBEATRATE = 15;
        
        private static SocketAcceptor acceptor;
    
        private ServerMsgProtocol() {}
        
        public static SocketAcceptor getAcceptor(){
        	if(null==acceptor){
        		// 创建非堵塞的server端的Socket连接
        		acceptor = new NioSocketAcceptor();
        	}
        	return acceptor;
        }
    
        public static boolean serverStart() {
            DefaultIoFilterChainBuilder filterChain = getAcceptor().getFilterChain();
            // 加入编码过滤器 处理乱码、编码问题
            filterChain.addLast("codec", new ProtocolCodecFilter(new MsgProtocol()));
            LoggingFilter loggingFilter = new LoggingFilter();
            loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
            loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
            // 加入日志过滤器
            filterChain.addLast("loger", loggingFilter);
            // 设置核心消息业务处理器
            getAcceptor().setHandler(new ServerMessageHandler());
            //KeepAliveMessageFactory heartBeatFactory = new KeepAliveMessageFactoryImpl();
            //KeepAliveRequestTimeoutHandler heartBeatHandler = new KeepAliveRequestTimeoutHandlerImpl();
            //KeepAliveFilter heartBeat = new KeepAliveFilter(heartBeatFactory,IdleStatus.BOTH_IDLE, heartBeatHandler);
            // 是否回发 
            //heartBeat.setForwardEvent(false);
            // 发送频率 
            //heartBeat.setRequestInterval(HEARTBEATRATE);
            //getAcceptor().getFilterChain().addLast("heartbeat", heartBeat);
            getAcceptor().getSessionConfig().setReceiveBufferSize(2048*5000);//接收缓冲区1M
            getAcceptor().getSessionConfig().setBothIdleTime(30);
            //getAcceptor().getSessionConfig().setKeepAlive(true);
            // 设置session配置,30秒内无操作进入空暇状态
            getAcceptor().getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, IDELTIMEOUT);
            try {
                // 绑定端口3456
            	getAcceptor().bind(new InetSocketAddress(8888));
            	return true;
            } catch (IOException e) {
                e.printStackTrace();
            }
            return false;
        }
    
    }
    
    server的消息处理:
    package com.minaqq.server;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.mina.core.future.CloseFuture;
    import org.apache.mina.core.future.IoFuture;
    import org.apache.mina.core.future.IoFutureListener;
    import org.apache.mina.core.service.IoHandler;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    
    import com.minaqq.domain.MsgPack;
    /**
     * @see 处理server端消息
     * @author Herman.Xiong
     * @date 2012-6-26 下午01:12:34
     * @file ServerMessageHandler.java
     * @package com.minaqq.server
     * @project MINA_QQ
     * @version 1.0
     * @since jdk1.6,mina 2.0
     */
    public class ServerMessageHandler implements IoHandler{
    
        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
            System.out.println("server发生异常:"+ cause.toString());
        }
    
        public void messageReceived(IoSession session, Object message) throws Exception {
        	MsgPack mp=(MsgPack)message;
            System.out.println("收到client数据messageReceived----------:"+ mp.toString());
    		/*//请求协议
    		mp.setMsgMethod(3000);
    		mp.setMsgPack("我是server发的消息");
    		mp.setMsgLength(mp.getMsgPack().getBytes().length);
            session.write(mp);*/
            /*String content = mp.toString();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            String datetime = sdf.format(new Date());
            System.out.println(datetime+"server接收到数据的内容为messageReceived----------: " + content);*/
            // 拿到全部的clientSession
            /*Collection<IoSession> sessions = session.getService().getManagedSessions().values();
            // 向全部client发送数据
            for (IoSession sess : sessions) {
                sess.write(datetime + "	" + content);
            }*/
            
        }
    
        public void messageSent(IoSession session, Object message) throws Exception {
           /* System.out.println("server发送消息messageSent----------: "+ message);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            String datetime = sdf.format(new Date());
            System.out.println(datetime+"server发送消息messageSent----------: "+message.toString());*/
        }
     
        public void sessionClosed(IoSession session) throws Exception {
            System.out.println("关闭当前session: "+session.getId()+session.getRemoteAddress());
            CloseFuture closeFuture = session.close(true);
            closeFuture.addListener(new IoFutureListener<IoFuture>() {
                public void operationComplete(IoFuture future) {
                    if (future instanceof CloseFuture) {
                        ((CloseFuture) future).setClosed();
                        System.out.println("sessionClosed CloseFuture setClosed-->"+ future.getSession().getId());
                    }
                }
            });
        }
    
        public void sessionCreated(IoSession session) throws Exception {
            System.out.println("创建一个新连接:"+ session.getRemoteAddress()+"  id:  "+session.getId());
            session.write("welcome to the chat room !");
        }
    
        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
            System.out.println("当前连接处于空暇状态:"+ session.getRemoteAddress()+ status);
        }
    
        public void sessionOpened(IoSession session) throws Exception {
            System.out.println("打开一个session id:"+ session.getId()+"  空暇连接个数IdleCount:  "+ session.getBothIdleCount());
        }
    }
    
    自己定义协议类:

    /**
     * @see 自己定义协议
     * @author Herman.Xiong
     * @date 2014年6月11日 10:30:40
     */
    public class MsgProtocol implements ProtocolCodecFactory{
    	private static final Charset charset=Charset.forName("UTF-8");
      
        public ProtocolDecoder getDecoder(IoSession session) throws Exception {  
            return new MsgProtocolDecoder(charset);
        }  
      
        public ProtocolEncoder getEncoder(IoSession session) throws Exception {  
            return new MsgProtocolEncoder(charset);
        }
    }
    协议解码类:
    package com.minaqq.charset;
    
    import java.nio.ByteOrder;
    import java.nio.charset.Charset;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    
    import com.minaqq.domain.MsgPack;
    /**
     * @see 协议解码
     * @author Herman.Xiong
     * @date 2014年6月11日 16:47:24
     */
    public class MsgProtocolDecoder extends CumulativeProtocolDecoder  {  
        private Charset charset=null;  
      
        public MsgProtocolDecoder() {  
            this(Charset.defaultCharset());  
        }  
        
        public MsgProtocolDecoder(Charset charset) {  
            this.charset = charset;  
        }
        
    	public void decode1(IoSession is, IoBuffer buf, ProtocolDecoderOutput out)
    			throws Exception {
    		buf.order(ByteOrder.LITTLE_ENDIAN);
    		MsgPack mp=new MsgPack();
    		//获取消息的内容长度
    		mp.setMsgLength(buf.getInt());
    		//获取消息的功能函数
    		mp.setMsgMethod(buf.getInt());
    		byte[] msg=new byte[mp.getMsgLength()];
    		buf.get(msg);
    		mp.setMsgPack(new String(msg,charset));
    		buf.flip();
    		out.write(mp);
    	}
    	
    	public void dispose(IoSession arg0) throws Exception {
    		
    	}
    	
    	public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)
    			throws Exception {
    		
    	}
    
    	public void decode0(IoSession arg0, IoBuffer arg1, ProtocolDecoderOutput arg2)
    			throws Exception {
    		int limit = arg1.limit();
    		byte[] bytes = new byte[limit];
    		arg1.get(bytes);
    		arg2.write(bytes);
    	}
    
    	protected boolean doDecode(IoSession session, IoBuffer ioBuffer, ProtocolDecoderOutput out) throws Exception {
    		ioBuffer.order(ByteOrder.LITTLE_ENDIAN); 
    		MsgPack mp = (MsgPack) session.getAttribute("nac-msg-pack"); // 从session对象中获取“xhs-upload”属性值 
    		if(null==mp){
    			 if (ioBuffer.remaining() >= 8) {
    				 //取消息体长度
    				 int msgLength = ioBuffer.getInt(); 
    				 int msgMethod = ioBuffer.getInt();
    				 mp=new MsgPack();
    				 mp.setMsgLength(msgLength);
    				 mp.setMsgMethod(msgMethod);
    				 session.setAttribute("nac-msg-pack",mp);
    				 return true;
    			 }
    			 return false;
    		}
    		if(ioBuffer.remaining()>=mp.getMsgLength()){
    			byte [] msgPack=new byte[mp.getMsgLength()];
    			ioBuffer.get(msgPack);
    			mp.setMsgPack(new String(msgPack,charset));
    			session.removeAttribute("nac-msg-pack");
    			out.write(mp);
    			return true;
    		}
    		return false;
    	}   
    
    }
    协议编码类:
    package com.minaqq.charset;
    
    import java.io.NotSerializableException;
    import java.io.Serializable;
    import java.nio.ByteOrder;
    import java.nio.charset.Charset;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
    import org.apache.mina.filter.codec.ProtocolEncoderOutput;
    
    import com.minaqq.domain.MsgPack;
    
    public class MsgProtocolEncoder extends ProtocolEncoderAdapter{
    	private Charset charset=null;
    
        public MsgProtocolEncoder(Charset charset) {
            this.charset = charset;     
        }     
        //在此处实现对MsgProtocolEncoder包的编码工作,并把它写入输出流中     
        public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { 
            if(message instanceof MsgPack){
            	 MsgPack mp = (MsgPack) message; 
            	 IoBuffer buf = IoBuffer.allocate(mp.getMsgLength());
            	 buf.order(ByteOrder.LITTLE_ENDIAN);
                 buf.setAutoExpand(true);    
                 //设置消息内容的长度
                 buf.putInt(mp.getMsgLength()); 
                 //设置消息的功能函数
                 buf.putInt(mp.getMsgMethod());
                 if (null != mp.getMsgPack()) {
                	 buf.put(mp.getMsgPack().getBytes(charset));
                 }   
                 buf.flip();     
                 out.write(buf);  
                 out.flush();
                 buf.free();
            }
        }     
        public void dispose() throws Exception {     
        }
        
    	public void encode0(IoSession arg0, Object arg1, ProtocolEncoderOutput arg2)
    			throws Exception {
    		if (!(arg1 instanceof Serializable)) {
    	        throw new NotSerializableException();
    	    }
    	    IoBuffer buf = IoBuffer.allocate(64);
    	    buf.setAutoExpand(true);
    	    buf.putObject(arg1);
    
    	    int objectSize = buf.position() - 4;
    	    if (objectSize > 1024) {
    	        throw new IllegalArgumentException("The encoded object is too big: " + objectSize + " (> " + 1024
    	                + ')');
    	    }
    
    	    buf.flip();
    	    arg2.write(buf);
    	}
    	
    }
    
    协议实体类:

    package com.minaqq.domain;
    
    import java.io.Serializable;
    
    /**
     * @see 自己定义数据包
     * @author Herman.Xiong
     * @date 2014年6月11日 11:31:45
     */
    public class MsgPack implements Serializable{
    	/**
    	 * 序列化和反序列化的版本
    	 */
    	private static final long serialVersionUID = 1L;
    	//消息长度
    	private int msgLength;
    	//消息方法
    	private int msgMethod;
    	//消息包内容
    	private String msgPack;
    	
    	public MsgPack() {}
    
    	public int getMsgLength() {
    		return msgLength;
    	}
    
    	public void setMsgLength(int msgLength) {
    		this.msgLength = msgLength;
    	}
    
    	public int getMsgMethod() {
    		return msgMethod;
    	}
    
    	public void setMsgMethod(int msgMethod) {
    		this.msgMethod = msgMethod;
    	}
    
    	public String getMsgPack() {
    		return msgPack;
    	}
    
    	public void setMsgPack(String msgPack) {
    		this.msgPack = msgPack;
    	}
    
    	public MsgPack(int msgLength, int msgMethod, String msgPack) {
    		this.msgLength = msgLength;
    		this.msgMethod = msgMethod;
    		this.msgPack = msgPack;
    	}
    
    	public String toString() {
    		return "MsgPack [msgLength=" + msgLength + ", msgMethod=" + msgMethod
    				+ ", msgPack=" + msgPack + "]";
    	}
    	
    }
    
    心跳信息工厂类:
    package com.minaqq.server;
    
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.keepalive.KeepAliveMessageFactory;
    /**
     * @see 发送心跳包的内容
     * getResponse()---->isResponse();获取数据推断心跳事件(目的是推断是否触发心跳超时异常)
     * isRequest()----->getRequest(); 写回数据是心跳事件触发的数据(目的写回给server(client)心跳包)
     * @author Herman.Xiong
     */
    public class KeepAliveMessageFactoryImpl implements KeepAliveMessageFactory{
    	
    	//心跳包内容
        private static final String HEARTBEATREQUEST = "HEARTBEATREQUEST";
        private static final String HEARTBEATRESPONSE = "HEARTBEATRESPONSE";
        
    	/**
         * @see 返回给client的心跳包数据 return 返回结果才是client收到的心跳包数据
         * @author Herman.Xiong
         */
        public Object getRequest(IoSession session) {
            return HEARTBEATREQUEST;
        }
    
        /**
         * @see 接受到的client数据包
         * @author Herman.Xiong
         */
        public Object getResponse(IoSession session, Object request) {
            return request;
        }
    
        /**
         * @see 推断是否是client发送来的的心跳包此推断影响 KeepAliveRequestTimeoutHandler实现类推断是否心跳包发送超时
         * @author Herman.Xiong
         */
        public boolean isRequest(IoSession session, Object message) {
            if(message.equals(HEARTBEATRESPONSE)){
                System.out.println("接收到client心数据包引发心跳事件                 心跳数据包是》》" + message);
    	        return true;
    	    }
            return false;
        }
    
        /**
         * @see  推断发送信息是否是心跳数据包此推断影响 KeepAliveRequestTimeoutHandler实现类 推断是否心跳包发送超时
         * @author Herman.Xiong
         */
        public boolean isResponse(IoSession session, Object message) {
            if(message.equals(HEARTBEATREQUEST)){
                System.out.println("server发送数据包中引发心跳事件: " + message);
                return true;
            }
            return false;
        }
    }
    
    心跳业务处理类:
    package com.minaqq.server;
    
    import org.apache.mina.core.future.CloseFuture;
    import org.apache.mina.core.future.IoFuture;
    import org.apache.mina.core.future.IoFutureListener;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.keepalive.KeepAliveFilter;
    import org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler;
    /**
     * @see 当心跳超时时的处理,也能够用默认处理 这里like
     * KeepAliveRequestTimeoutHandler.LOG的处理
     * @author Herman.Xiong
     */
    public class KeepAliveRequestTimeoutHandlerImpl  implements KeepAliveRequestTimeoutHandler {
    	/**
         * @see org.apache.mina.filter.keepalive.KeepAliveRequestTimeoutHandler心跳超时处理
         * @author Herman.Xiong
         */
        public void keepAliveRequestTimedOut(KeepAliveFilter filter,
                IoSession session) throws Exception {
            System.out.println("server端心跳包发送超时处理(即长时间没有发送(接受)心跳包)---关闭当前长连接");
            CloseFuture closeFuture = session.close(true);
            closeFuture.addListener(new IoFutureListener<IoFuture>() {
                public void operationComplete(IoFuture future) {
                    if (future instanceof CloseFuture) {
                        ((CloseFuture) future).setClosed();
                        System.out.println("sessionClosed CloseFuture setClosed-->"+ future.getSession().getId());
                    }
                }
            });
        }
    }
    
    server发送数据包的线程类:
    package com.minaqq.worker;
    
    import java.util.Map;
    
    import org.apache.mina.core.session.IoSession;
    
    import com.minaqq.domain.MsgPack;
    import com.minaqq.server.ServerMsgProtocol;
    import com.minaqq.utils.XmlUtils;
    /**
     * @see server端发送数据
     * @author Herman.Xiong
     * @date 2014年6月9日 10:38:59
     */
    public class ServerSendMsgThread extends Thread{
    	
    	public void run() {
    		while(true){
    			if(null!=ServerMsgProtocol.getAcceptor()){
    				System.out.println("MinaServer.getAcceptor().getManagedSessionCount() is "+ServerMsgProtocol.getAcceptor().getManagedSessionCount());
    				
    				Map<Long, IoSession> map=ServerMsgProtocol.getAcceptor().getManagedSessions();
    				for (Long key : map.keySet()) {
    					IoSession is = map.get(key);
    					//SocketAddress sa=is.getRemoteAddress();
    					//InetSocketAddress isa=(InetSocketAddress)sa;
    					//is.write("我是中文測试"+"session id is "+key+"  hostName:"+isa.getHostName()+"   address:"+isa.getAddress()+"   port:"+isa.getPort()+"        isa.toString:"+isa.toString());
    					MsgPack mp=new MsgPack();
    					//请求协议
    					mp.setMsgMethod(1000);
    					//mp.setMsgPack("我是服务端");
    					String str="";
    					for (int i = 0; i < 100; i++) {
    						str+=XmlUtils.getXml();
    					}
    					mp.setMsgPack(str);
    					mp.setMsgLength(mp.getMsgPack().getBytes().length);
    					try {
    						is.write(mp);
    					} catch (Exception e) {
    						e.printStackTrace();
    					}
    				}
    			}else {
    				System.out.println("MinaServer.getAcceptor is null ");
    			}
    			try {
    				Thread.sleep(3000);
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    client測试类:
    package com.minaqq.test;
    
    import com.minaqq.client.ClientMsgProtocol;
    import com.minaqq.domain.MsgPack;
    import com.minaqq.worker.ClientSendMsgThread;
    
    public class MsgClientTest {
    	public static void main(String[] args) {
    		ClientMsgProtocol.clientStart();
    		System.out.println("client启动成功......");
    		ClientSendMsgThread csmt=new ClientSendMsgThread();
    		csmt.start();
    		/*MsgPack mp=new MsgPack();
    		//请求协议
    		mp.setMsgMethod(2000);
    		mp.setMsgPack("我是client");
    		mp.setMsgLength(mp.getMsgPack().getBytes().length);
    		ClientMsgProtocol.getIoSession().write(mp);*/
    		System.out.println("client工作线程启动成功......");
    	}
    }
    
    client创建连接类:
    package com.minaqq.client;
    
    import java.net.InetSocketAddress;
    
    import org.apache.mina.core.future.ConnectFuture;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFilter;
    import org.apache.mina.filter.logging.LogLevel;
    import org.apache.mina.filter.logging.LoggingFilter;
    import org.apache.mina.transport.socket.nio.NioSocketConnector;
    
    import com.minaqq.protocol.MsgProtocol;
    
    /**
     * @see 模拟client;
     * 用于连接服务端。并向服务端发送消息
     * @author Herman.Xiong
     * @date 2013年11月26日 11:27:50
     * @version 1.0
     * @serial jdk 1.6
     */
    public class ClientMsgProtocol {
    	
    	private static NioSocketConnector connector ;
    	
    	private static IoSession is;
    	
    	public static NioSocketConnector getConnector(){
        	if(null==connector){
        		// 创建非堵塞的server端的Socket连接
        		connector = new NioSocketConnector();
        	}
        	return connector;
        }
    	
    	public static IoSession getIoSession(){
        	return is;
    	}
    	
    	public static void clientStart(){
    		// 创建client连接器
    		NioSocketConnector connector = getConnector(); 
    		connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MsgProtocol()));
    		LoggingFilter loggingFilter = new LoggingFilter();
            loggingFilter.setMessageReceivedLogLevel(LogLevel.INFO);
            loggingFilter.setMessageSentLogLevel(LogLevel.INFO);
    		connector.getFilterChain().addLast("logger", loggingFilter); 
    		connector.getSessionConfig().setReceiveBufferSize(2048*5000);//接收缓冲区1M
    		connector.setConnectTimeoutMillis(30000); // 设置连接超时
    		connector.setHandler(new TimeClientHandler());// 设置消息处理器
    		ConnectFuture cf = connector.connect(new InetSocketAddress("10.10.2.136",8888));// 建立连接
    		cf.awaitUninterruptibly();// 等待连接创建完毕
    		try {
    			is=cf.getSession();
    			//getIoSession().write(new String(XmlUtils.getXml().getBytes("UTF-8")));// 发送消息
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    }
    
    client消息处理事件类:
    package com.minaqq.client;
    
    import org.apache.mina.core.service.IoHandler;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    
    import com.minaqq.domain.MsgPack;
    
    
    /**
     * @see 处理接收client消的息事件
     * @author Herman.Xiong
     * @date 2013年11月26日 11:23:32
     * @version 1.0
     * @since jdk1.6
     */
    public class TimeClientHandler implements IoHandler{
    	
    	/**
    	 * 接收client发送的消息
    	 */
    	public void messageReceived(IoSession session, Object message) throws Exception { 
    		MsgPack mp=(MsgPack)message;
    		System.out.println("收到服务端发来的消息:"+mp.toString());// 显示接收到的消息
    	}
    
    	public void exceptionCaught(IoSession arg0, Throwable arg1)
    			throws Exception {
    		
    	}
    
    	public void messageSent(IoSession arg0, Object arg1) throws Exception {
    		
    	}
    
    	public void sessionClosed(IoSession arg0) throws Exception {
    		
    	}
    
    	public void sessionCreated(IoSession arg0) throws Exception {
    		
    	}
    
    	public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception {
    		
    	}
    
    	public void sessionOpened(IoSession arg0) throws Exception {
    		
    	}
    }
    
    client发送消息线程类:
    package com.minaqq.worker;
    
    import com.minaqq.client.ClientMsgProtocol;
    import com.minaqq.domain.MsgPack;
    /**
     * @see 模拟client发送数据
     * @author Herman.Xiong
     * @date 2014年6月9日 10:38:59
     */
    public class ClientSendMsgThread extends Thread{
    	public void run() {
    		while(true){
    			if(null!=ClientMsgProtocol.getConnector()){
    				try {
    					//ClientMsgProtocol.getIoSession().write(new String("我是client".getBytes("UTF-8")));
    					MsgPack mp=new MsgPack();
    					//请求协议
    					mp.setMsgMethod(2000);
    					mp.setMsgPack("我是client");
    					mp.setMsgLength(mp.getMsgPack().getBytes().length);
    					ClientMsgProtocol.getIoSession().write(mp);
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    			}else {
    				System.out.println("MinaServer.getAcceptor is null ");
    			}
    			try {
    				Thread.sleep(3000);
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    
    发送xml消息的工具类:
    package com.minaqq.utils;
    
    import com.minaqq.domain.Address;
    import com.minaqq.domain.House;
    import com.minaqq.domain.Person;
    import com.minaqq.domain.PhoneNumber;
    import com.thoughtworks.xstream.XStream;
    import com.thoughtworks.xstream.io.xml.DomDriver;
    
    public class XmlUtils {
    	
    	public static void testXStream(){
    		XStream xstream=new XStream(new DomDriver());
    		xstream.alias("PERSON", Person.class);
    		xstream.alias("ADDRESS",Address.class);
    		xstream.alias("PHONENUMBER", PhoneNumber.class);
    		xstream.alias("HOUSE", House.class);
    		Person person=(Person)xstream.fromXML(XmlUtils.getXml());
    		System.out.println(person.toString());
    	}
    	
    	public static String getXml(){
    		StringBuffer sb=new StringBuffer("<?xml version="1.0" encoding="UTF-8"?>");//
    		sb.append("<PERSON firstName="Herman">");
    		sb.append("<lastName>Xiong</lastName>  ");
    		sb.append("<phonex>");
    		sb.append("<code>0</code>");
    		sb.append("<number>1234567</number>");
    		sb.append("</phonex>");
    		sb.append("<fax>");
    		sb.append("<code>0</code>");
    		sb.append("<number>7654321</number>");
    		sb.append("</fax>");
    		sb.append("<addList>");
    		sb.append("<ADDRESS>");
    		sb.append("<add>上海市</add>");
    		sb.append("<zipCode>123456</zipCode>");
    		sb.append("</ADDRESS>");
    		sb.append("</addList>");
    		sb.append("<house>");
    		sb.append("<HOUSE>");
    		sb.append("<size>300万</size>");
    		sb.append("<price>120平方米</price>");
    		sb.append("</HOUSE>");
    		sb.append("<HOUSE>");
    		sb.append("<size>500万</size>");
    		sb.append("<price>130平方米</price>");
    		sb.append("</HOUSE>");
    		sb.append("<HOUSE>");
    		sb.append("<size>160万</size>");
    		sb.append("<price>61.5平方米</price>");
    		sb.append("</HOUSE>");
    		sb.append("</house>");
    		sb.append("</PERSON>
    ");
    		return sb.toString();
    	}
    }
    
    xml有关的实体类:
    package com.minaqq.domain;
    
    import java.util.Arrays;
    import java.util.List;
    
    import com.thoughtworks.xstream.annotations.XStreamAsAttribute;
    
    public class Person {
    	@XStreamAsAttribute
    	private String firstName;
    	private String lastName;
    	private PhoneNumber phonex;
    	private PhoneNumber fax;
    	private List<Address> addList;
    	private House[] house;
    	public String getFirstName() {
    		return firstName;
    	}
    	public void setFirstName(String firstName) {
    		this.firstName = firstName;
    	}
    	public String getLastName() {
    		return lastName;
    	}
    	public void setLastName(String lastName) {
    		this.lastName = lastName;
    	}
    	public PhoneNumber getPhonex() {
    		return phonex;
    	}
    	public void setPhonex(PhoneNumber phonex) {
    		this.phonex = phonex;
    	}
    	public PhoneNumber getFax() {
    		return fax;
    	}
    	public void setFax(PhoneNumber fax) {
    		this.fax = fax;
    	}
    	public List<Address> getAddList() {
    		return addList;
    	}
    	public void setAddList(List<Address> addList) {
    		this.addList = addList;
    	}
    	public House[] getHouse() {
    		return house;
    	}
    	public void setHouse(House[] house) {
    		this.house = house;
    	}
    	public Person() {
    	}
    	public Person(String firstName, String lastName) {
    		this.firstName=firstName;
    		this.lastName=lastName;
    	}
    	public Person(String firstName, String lastName, PhoneNumber phonex,
    			PhoneNumber fax) {
    		this.firstName = firstName;
    		this.lastName = lastName;
    		this.phonex = phonex;
    		this.fax = fax;
    	}
    	public Person(String firstName, String lastName, PhoneNumber phonex,
    			PhoneNumber fax, List<Address> addList) {
    		this.firstName = firstName;
    		this.lastName = lastName;
    		this.phonex = phonex;
    		this.fax = fax;
    		this.addList = addList;
    	}
    	public Person(String firstName, String lastName, PhoneNumber phonex,
    			PhoneNumber fax, List<Address> addList, House[] house) {
    		this.firstName = firstName;
    		this.lastName = lastName;
    		this.phonex = phonex;
    		this.fax = fax;
    		this.addList = addList;
    		this.house = house;
    	}
    	@Override
    	public String toString() {
    		return "Person [addList=" + addList + ", fax=" + fax + ", firstName="
    				+ firstName + ", house=" + Arrays.toString(house)
    				+ ", lastName=" + lastName + ", phonex=" + phonex + "]";
    	}
    }
    
    xml有关的实体类:
    package com.minaqq.domain;
    
    public class Address {
    	private String add;
    	private String zipCode;
    	public String getAdd() {
    		return add;
    	}
    	public void setAdd(String add) {
    		this.add = add;
    	}
    	public String getZipCode() {
    		return zipCode;
    	}
    	public void setZipCode(String zipCode) {
    		this.zipCode = zipCode;
    	}
    	public Address() {
    	}
    	public Address(String add, String zipCode) {
    		this.add = add;
    		this.zipCode = zipCode;
    	}
    	@Override
    	public String toString() {
    		return "Address [add=" + add + ", zipCode=" + zipCode + "]";
    	}
    }
    
    xml有关的实体类:
    package com.minaqq.domain;
    
    public class PhoneNumber {
    	private int code;
    	private int number;
    	public int getCode() {
    		return code;
    	}
    	public void setCode(int code) {
    		this.code = code;
    	}
    	public int getNumber() {
    		return number;
    	}
    	public void setNumber(int number) {
    		this.number = number;
    	}
    	public PhoneNumber(){}
    	public PhoneNumber(int code,int number){
    		this.code=code;
    		this.number=number;
    	}
    	@Override
    	public String toString() {
    		return "PhoneNumber [code=" + code + ", number=" + number + "]";
    	}
    }
    
    xml有关的实体类:
    package com.minaqq.domain;
    
    public class House {
    	private String size;
    	private String price;
    	public String getSize() {
    		return size;
    	}
    	public void setSize(String size) {
    		this.size = size;
    	}
    	public String getPrice() {
    		return price;
    	}
    	public void setPrice(String price) {
    		this.price = price;
    	}
    	public House() {
    	}
    	public House(String size, String price) {
    		this.size = size;
    		this.price = price;
    	}
    	@Override
    	public String toString() {
    		return "House [price=" + price + ", size=" + size + "]";
    	}
    }
    

    执行效果图,服务图片:


    client图片:

    OK,到此结束了,欢迎大家关注我的个人博客。

    学习资料已经源代码下载请点击:http://download.csdn.net/download/xmt1139057136/7487611

    如有不懂。请大家增加qq群:135430763共同学习。

  • 相关阅读:
    一步一步学Silverlight 2系列(23):Silverlight与HTML混合之无窗口模式
    一步一步学Silverlight 2系列(25):综合实例之Live Search
    一步一步学Silverlight 2系列(19):如何在Silverlight中与HTML DOM交互(上)
    一步一步学Silverlight 2系列(13):数据与通信之WebRequest
    一步一步学Silverlight 2系列(17):数据与通信之ADO.NET Data Services
    一步一步学Silverlight 2系列(24):与浏览器交互相关辅助方法
    一步一步学Silverlight 2系列(22):在Silverlight中如何用JavaScript调用.NET代码
    一步一步学Silverlight 2系列(27):使用Brush进行填充
    一步一步学Silverlight 2系列(16):数据与通信之JSON
    一步一步学Silverlight 2系列(14):数据与通信之WCF
  • 原文地址:https://www.cnblogs.com/zfyouxi/p/5122003.html
Copyright © 2011-2022 走看看