zoukankan      html  css  js  c++  java
  • Mina、Netty、Twisted一起学习(三):TCP前缀固定大小的消息(Header)

    以前的博文于,有介绍切割消息换行的方法。

    但是有一个小问题,这样的方法,设消息中本身就包括换行符,那将会将这条消息切割成两条。结果就不正确了。

    本文介绍第二种消息切割方式,即上一篇博文中讲的第2条:use a fixed length header that indicates the length of the body。用一个固定字节数的Header前缀来指定Body的字节数,以此来切割消息。



    上面图中Header固定为4字节,Header中保存的是一个4字节(32位)的整数,比如12即为0x0000000C。这个整数用来指定Body的长度(字节数)。当读完这么多字节的Body之后,又是下一条消息的Header。


    以下分别用MINA、Netty、Twisted来实现对这样的消息的切合和解码。


    MINA:

    MINA提供了PrefixedStringCodecFactory来对这样的类型的消息进行编码解码。PrefixedStringCodecFactory默认Header的大小是4字节。当然也能够指定成1或2。

    public class TcpServer {
    
    	public static void main(String[] args) throws IOException {
    		IoAcceptor acceptor = new NioSocketAcceptor();
    		
    		// 4字节的Header指定Body的字节数。对这样的消息的处理
    		acceptor.getFilterChain().addLast("codec", 
    				new ProtocolCodecFilter(new PrefixedStringCodecFactory(Charset.forName("UTF-8"))));
    		
    		acceptor.setHandler(new TcpServerHandle());
    		acceptor.bind(new InetSocketAddress(8080));
    	}
    
    }
    
    class TcpServerHandle extends IoHandlerAdapter {
    
    	@Override
    	public void exceptionCaught(IoSession session, Throwable cause)
    			throws Exception {
    		cause.printStackTrace();
    	}
    
    	// 接收到新的数据
    	@Override
    	public void messageReceived(IoSession session, Object message)
    			throws Exception {
    
    		String msg = (String) message;
    		System.out.println("messageReceived:" + msg);
    		
    	}
    
    	@Override
    	public void sessionCreated(IoSession session) throws Exception {
    		System.out.println("sessionCreated");
    	}
    
    	@Override
    	public void sessionClosed(IoSession session) throws Exception {
    		System.out.println("sessionClosed");
    	}
    }

    Netty:

    Netty使用LengthFieldBasedFrameDecoder来处理这样的消息。

    以下代码中的new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4)中包括5个參数,各自是int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip。maxFrameLength为消息的最大长度。lengthFieldOffset为Header的位置。lengthFieldLength为Header的长度,lengthAdjustment为长度调整(默认Header中的值表示Body的长度。并不包括Header自己),initialBytesToStrip为去掉字节数(默认解码后返回Header+Body的所有内容。这里设为4表示去掉4字节的Header。仅仅留下Body)。


    public class TcpServer {
    
    	public static void main(String[] args) throws InterruptedException {
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    		EventLoopGroup workerGroup = new NioEventLoopGroup();
    		try {
    			ServerBootstrap b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup)
    					.channel(NioServerSocketChannel.class)
    					.childHandler(new ChannelInitializer<SocketChannel>() {
    						@Override
    						public void initChannel(SocketChannel ch)
    								throws Exception {
    							ChannelPipeline pipeline = ch.pipeline();
    							
    							// LengthFieldBasedFrameDecoder按行切割消息,取出body
    							pipeline.addLast(new LengthFieldBasedFrameDecoder(80, 0, 4, 0, 4));
    							// 再按UTF-8编码转成字符串
    							pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
    							
    							pipeline.addLast(new TcpServerHandler());
    						}
    					});
    			ChannelFuture f = b.bind(8080).sync();
    			f.channel().closeFuture().sync();
    		} finally {
    			workerGroup.shutdownGracefully();
    			bossGroup.shutdownGracefully();
    		}
    	}
    
    }
    
    class TcpServerHandler extends ChannelInboundHandlerAdapter {
    
    	// 接收到新的数据
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) {
    		
    		String message = (String) msg;
    		System.out.println("channelRead:" + message);
    	}
    
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) {
    		System.out.println("channelActive");
    	}
    
    	@Override
    	public void channelInactive(ChannelHandlerContext ctx) {
    		System.out.println("channelInactive");
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    		cause.printStackTrace();
    		ctx.close();
    	}
    }

    Twisted:

    在Twisted中须要继承Int32StringReceiver,不再继承Protocol。

    Int32StringReceiver表示固定32位(4字节)的Header,另外还有Int16StringReceiver、Int8StringReceiver等。而须要实现的接受数据事件的方法不再是dataReceived,也不是lineReceived。而是stringReceived。

    # -*- coding:utf-8 –*-
    
    from twisted.protocols.basic import Int32StringReceiver
    from twisted.internet.protocol import Factory
    from twisted.internet import reactor
    
    class TcpServerHandle(Int32StringReceiver):
    
        # 新的连接建立
        def connectionMade(self):
            print 'connectionMade'
    
        # 连接断开
        def connectionLost(self, reason):
            print 'connectionLost'
    
        # 接收到新的数据
        def stringReceived(self, data):
            print 'stringReceived:' + data
    
    factory = Factory()
    factory.protocol = TcpServerHandle
    reactor.listenTCP(8080, factory)
    reactor.run()

    以下是Java编写的一个client測试程序:

    public class TcpClient {
    
    	public static void main(String[] args) throws IOException {
    
    		Socket socket = null;
    		DataOutputStream out = null;
    
    		try {
    
    			socket = new Socket("localhost", 8080);
    			out = new DataOutputStream(socket.getOutputStream());
    
    			// 请求server
    			String data1 = "牛顿";
    			byte[] outputBytes1 = data1.getBytes("UTF-8");
    			out.writeInt(outputBytes1.length); // write header
    			out.write(outputBytes1); // write body
    			
    			String data2 = "爱因斯坦";
    			byte[] outputBytes2 = data2.getBytes("UTF-8");
    			out.writeInt(outputBytes2.length); // write header
    			out.write(outputBytes2); // write body
    			
    			out.flush();
    
    		} finally {
    			// 关闭连接
    			out.close();
    			socket.close();
    		}
    
    	}
    
    }


    MINAserver输出结果:

    sessionCreated
    messageReceived:牛顿
    messageReceived:爱因斯坦
    sessionClosed

    Nettyserver输出结果:

    channelActive
    channelRead:牛顿
    channelRead:爱因斯坦
    channelInactive

    Twistedserver输出结果:

    connectionMade
    stringReceived:牛顿
    stringReceived:爱因斯坦
    connectionLost



    作者:叉叉哥   转载请注明出处:http://blog.csdn.net/xiao__gui/article/details/38752105



    版权声明:本文博主原创文章,博客,未经同意不得转载。

  • 相关阅读:
    appfuse的一些资源
    实战: SOLR的分布式部署(复制)CollectionDistribute 快照分发 (精简版)
    在SOLR环境变量的配置 过程中,遇到的 A pseudo attribute name is expected 异常
    Solr应用开发——Solr home目录结构简介1
    升级 Solr 1.4 后性能有所提升
    Solr 删除数据的几种方式
    如何使SOLR系统自动AUTO COMMIT
    Solr应用开发——Solr home目录结构简介2
    升级到 solr 1.4 的注意事项
    SOLR环境变量的配置
  • 原文地址:https://www.cnblogs.com/gcczhongduan/p/4888112.html
Copyright © 2011-2022 走看看