zoukankan      html  css  js  c++  java
  • 第十四章:实现自定义的编码解码器


    本文转自互联网博客,在此感谢博主的总结归纳,本文只提供于巩固复习使用

    本章讲述Netty中如何轻松实现定制的编解码器,由于Netty架构的灵活性,这些编解码器易于重用和测试。为了更容易实现,使用Memcached作为协议例子是因为它更方便我们实现。

    Memcached是免费开源、高性能、分布式的内存对象缓存系统,其目的是加速动态Web应用程序的响应,减轻数据库负载;Memcache实际上是一个以key-value存储任意数据的内存小块。可能有人会问“为什么使用Memcached?因为Memcached协议非常简单,便于讲解。

    14.1 编解码器的范围

    ​ 我们将只实现Memcached协议的一个子集,这足够我们进行添加、检索、删除对象;在Memcached中是通过执行SET,GET,DELETE命令来实现的。Memcached支持很多其他的命令,但我们只使用其中三个命令,简单的东西,我们才会理解的更清楚。

    ​ Memcached有一个二进制和纯文本协议,它们都可以用来与Memcached服务器通信,使用什么类型的协议取决于服务器支持哪些协议。本章主要关注实现二进制协议,因为二进制在网络编程中最常用。

    14.2 实现Memcached的编解码器

    ​ 当想要实现一个给定协议的编解码器,我们应该花一些事件来了解它的运作原理。通常情况下,协议本身都有一些详细的记录。在这里你会发现多少细节?幸运的是Memcached的二进制协议可以很好的扩展。

    ​ 在RFC中有相应的规范,并提供了Memcached二进制协议下载地址:http://code.google.com/p/memcached/wiki/BinaryProtocolRevamped。我们不会执行Memcached的所有命令,只会执行三种操作:SET,GET和DELETE。这样做事为了让事情变得简单。

    ###14.3 了解Memcached二进制协议

    ​ 可以在http://code.google.com/p/memcached/wiki/BinaryProtocolRevamped上详细了解Memcached二进制协议结构。不过这个网站如果不FQ的话好像访问不了。

    14.4 Netty编码器和解码器

    14.4.1 实现Memcached编码器

    先定义memcached操作码(Opcode)和响应状态码(Status):

    package netty.in.action.mem;
     
    /**
     * memcached operation codes
     * @author c.king
     *
     */
    public class Opcode {
    	
    	public static final byte GET = 0x00;
    	public static final byte SET = 0x01;
    	public static final byte DELETE = 0x04;
    }
    
    package netty.in.action.mem;
     
    /**
     * memcached response statuses
     * @author c.king
     *
     */
    public class Status {
    	
    	public static final short NO_ERROR = 0x0000;
    	public static final short KEY_NOT_FOUND = 0x0001;
    	public static final short KEY_EXISTS = 0x0002;
    	public static final short VALUE_TOO_LARGE = 0x0003;
    	public static final short INVALID_ARGUMENTS = 0x0004;
    	public static final short ITEM_NOT_STORED = 0x0005;
    	public static final short INC_DEC_NON_NUM_VAL = 0x0006;
    }
    

    继续编写memcached请求消息体:

    package netty.in.action.mem;
     
    import java.util.Random;
     
    /**
     * memcached request message object
     * @author c.king
     *
     */
    public class MemcachedRequest {
     
    	private static final Random rand = new Random();
    	private int magic = 0x80;// fixed so hard coded
    	private byte opCode; // the operation e.g. set or get
    	private String key; // the key to delete, get or set
    	private int flags = 0xdeadbeef; // random
    	private int expires; // 0 = item never expires
    	private String body; // if opCode is set, the value
    	private int id = rand.nextInt(); // Opaque
    	private long cas; // data version check...not used
    	private boolean hasExtras; // not all ops have extras
     
    	public MemcachedRequest(byte opcode, String key, String value) {
    		this.opCode = opcode;
    		this.key = key;
    		this.body = value == null ? "" : value;
    		// only set command has extras in our example
    		hasExtras = opcode == Opcode.SET;
    	}
     
    	public MemcachedRequest(byte opCode, String key) {
    		this(opCode, key, null);
    	}
     
    	public int getMagic() {
    		return magic;
    	}
     
    	public byte getOpCode() {
    		return opCode;
    	}
     
    	public String getKey() {
    		return key;
    	}
     
    	public int getFlags() {
    		return flags;
    	}
     
    	public int getExpires() {
    		return expires;
    	}
     
    	public String getBody() {
    		return body;
    	}
     
    	public int getId() {
    		return id;
    	}
     
    	public long getCas() {
    		return cas;
    	}
     
    	public boolean isHasExtras() {
    		return hasExtras;
    	}
    }
    

    最后编写memcached请求编码器:

    package netty.in.action.mem;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    import io.netty.util.CharsetUtil;
     
    /**
     * memcached request encoder
     * @author c.king
     *
     */
    public class MemcachedRequestEncoder extends MessageToByteEncoder<MemcachedRequest> {
     
    	@Override
    	protected void encode(ChannelHandlerContext ctx, MemcachedRequest msg, ByteBuf out)
    			throws Exception {
    		// convert key and body to bytes array
    		byte[] key = msg.getKey().getBytes(CharsetUtil.UTF_8);
    		byte[] body = msg.getBody().getBytes(CharsetUtil.UTF_8);
    		// total size of body = key size + body size + extras size
    		int bodySize = key.length + body.length + (msg.isHasExtras() ? 8 : 0);
    		// write magic int
    		out.writeInt(msg.getMagic());
    		// write opcode byte
    		out.writeByte(msg.getOpCode());
    		// write key length (2 byte) i.e a Java short
    		out.writeShort(key.length);
    		// write extras length (1 byte)
    		int extraSize = msg.isHasExtras() ? 0x08 : 0x0;
    		out.writeByte(extraSize);
    		// byte is the data type, not currently implemented in Memcached
    		// but required
    		out.writeByte(0);
    		// next two bytes are reserved, not currently implemented
    		// but are required
    		out.writeShort(0);
    		// write total body length ( 4 bytes - 32 bit int)
    		out.writeInt(bodySize);
    		// write opaque ( 4 bytes) - a 32 bit int that is returned
    		// in the response
    		out.writeInt(msg.getId());
    		// write CAS ( 8 bytes)
    		// 24 byte header finishes with the CAS
    		out.writeLong(msg.getCas());
    		if(msg.isHasExtras()){
    			// write extras
    			// (flags and expiry, 4 bytes each), 8 bytes total
    			out.writeInt(msg.getFlags());
    			out.writeInt(msg.getExpires());
    		}
    		//write key
    		out.writeBytes(key);
    		//write value
    		out.writeBytes(body);
    	}
    }
    

    14.4.2 实现Memcached解码器

    编写memcached响应消息体:

    package netty.in.action.mem;
     
    /**
     * memcached response message object
     * @author c.king
     *
     */
    public class MemcachedResponse {
     
    	private byte magic;
    	private byte opCode;
    	private byte dataType;
    	private short status;
    	private int id;
    	private long cas;
    	private int flags;
    	private int expires;
    	private String key;
    	private String data;
     
    	public MemcachedResponse(byte magic, byte opCode, byte dataType, short status,
    			int id, long cas, int flags, int expires, String key, String data) {
    		this.magic = magic;
    		this.opCode = opCode;
    		this.dataType = dataType;
    		this.status = status;
    		this.id = id;
    		this.cas = cas;
    		this.flags = flags;
    		this.expires = expires;
    		this.key = key;
    		this.data = data;
    	}
     
    	public byte getMagic() {
    		return magic;
    	}
     
    	public byte getOpCode() {
    		return opCode;
    	}
     
    	public byte getDataType() {
    		return dataType;
    	}
     
    	public short getStatus() {
    		return status;
    	}
     
    	public int getId() {
    		return id;
    	}
     
    	public long getCas() {
    		return cas;
    	}
     
    	public int getFlags() {
    		return flags;
    	}
     
    	public int getExpires() {
    		return expires;
    	}
     
    	public String getKey() {
    		return key;
    	}
     
    	public String getData() {
    		return data;
    	}
    }
    

    编写memcached响应解码器:

    package netty.in.action.mem;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    import io.netty.util.CharsetUtil;
     
    import java.util.List;
     
    public class MemcachedResponseDecoder extends ByteToMessageDecoder {
     
    	private enum State {
    		Header, Body
    	}
     
    	private State state = State.Header;
    	private int totalBodySize;
    	private byte magic;
    	private byte opCode;
    	private short keyLength;
    	private byte extraLength;
    	private byte dataType;
    	private short status;
    	private int id;
    	private long cas;
     
    	@Override
    	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
    			throws Exception {
    		switch (state) {
    		case Header:
    			// response header is 24 bytes
    			if (in.readableBytes() < 24) {
    				return;
    			}
    			// read header
    			magic = in.readByte();
    			opCode = in.readByte();
    			keyLength = in.readShort();
    			extraLength = in.readByte();
    			dataType = in.readByte();
    			status = in.readShort();
    			totalBodySize = in.readInt();
    			id = in.readInt();
    			cas = in.readLong();
    			state = State.Body;
    			break;
    		case Body:
    			if (in.readableBytes() < totalBodySize) {
    				return;
    			}
    			int flags = 0;
    			int expires = 0;
    			int actualBodySize = totalBodySize;
    			if (extraLength > 0) {
    				flags = in.readInt();
    				actualBodySize -= 4;
    			}
    			if (extraLength > 4) {
    				expires = in.readInt();
    				actualBodySize -= 4;
    			}
    			String key = "";
    			if (keyLength > 0) {
    				ByteBuf keyBytes = in.readBytes(keyLength);
    				key = keyBytes.toString(CharsetUtil.UTF_8);
    				actualBodySize -= keyLength;
    			}
    			ByteBuf body = in.readBytes(actualBodySize);
    			String data = body.toString(CharsetUtil.UTF_8);
    			out.add(new MemcachedResponse(magic, opCode, dataType, status,
    					id, cas, flags, expires, key, data));
    			state = State.Header;
    			break;
    		default:
    			break;
    		}
    	}
    }
    

    14.5 测试编解码器

    基于netty的编解码器都写完了,下面我们来写一个测试它的类:

    package netty.in.action.mem;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.util.CharsetUtil;
     
    import org.junit.Assert;
    import org.junit.Test;
     
    /**
     * test memcached encoder
     * @author c.king
     *
     */
    public class MemcachedRequestEncoderTest {
     
    	@Test
    	public void testMemcachedRequestEncoder() {
    		MemcachedRequest request = new MemcachedRequest(Opcode.SET, "k1", "v1");
    		EmbeddedChannel channel = new EmbeddedChannel(
    				new MemcachedRequestEncoder());
    		Assert.assertTrue(channel.writeOutbound(request));
    		ByteBuf encoded = (ByteBuf) channel.readOutbound();
    		Assert.assertNotNull(encoded);
    		Assert.assertEquals(request.getMagic(), encoded.readInt());
    		Assert.assertEquals(request.getOpCode(), encoded.readByte());
    		Assert.assertEquals(2, encoded.readShort());
    		Assert.assertEquals((byte) 0x08, encoded.readByte());
    		Assert.assertEquals((byte) 0, encoded.readByte());
    		Assert.assertEquals(0, encoded.readShort());
    		Assert.assertEquals(2 + 2 + 8, encoded.readInt());
    		Assert.assertEquals(request.getId(), encoded.readInt());
    		Assert.assertEquals(request.getCas(), encoded.readLong());
    		Assert.assertEquals(request.getFlags(), encoded.readInt());
    		Assert.assertEquals(request.getExpires(), encoded.readInt());
    		byte[] data = new byte[encoded.readableBytes()];
    		encoded.readBytes(data);
    		Assert.assertArrayEquals((request.getKey() + request.getBody())
    				.getBytes(CharsetUtil.UTF_8), data);
    		Assert.assertFalse(encoded.isReadable());
    		Assert.assertFalse(channel.finish());
    		Assert.assertNull(channel.readInbound());
    	}
    }
    

    14.6 Summary

    本章主要是使用netty写了个模拟memcached二进制协议的处理。至于memcached二进制协议具体是个啥玩意,可以单独了解,这里也没有详细说明。

  • 相关阅读:
    eclipse如何与git 配合工作。
    git托管代码(二)
    PPC2003 安装 CFNET 3.5成功
    我的Window Mobile WCF 項目 第三篇 WM窗体设计
    我的Window Mobile WCF 項目 第一篇Mobile开发和WinForm开发的区别
    我的Window Mobile WCF 項目 第七天
    我的Window Mobile WCF 項目 第二篇 WindowsMobile访问WCF
    WCF 用vs2010 和 vs2008的简单对比测试
    vs2010beta1 和 搜狗输入法 冲突,按下 Ctrl 键就报错,重装搜狗解决
    我的Window Mobile WCF 項目 第六天 (二)
  • 原文地址:https://www.cnblogs.com/daozhangblog/p/12446381.html
Copyright © 2011-2022 走看看